Add event.internal_metadata.instance_name and event position to get_last_event_in_room_before_stream_ordering(...)

This commit is contained in:
Eric Eastwood 2024-06-12 14:53:36 -05:00
parent 03547b09e3
commit c94550d542
10 changed files with 82 additions and 28 deletions

View file

@ -90,6 +90,7 @@ def prune_event(event: EventBase) -> EventBase:
pruned_event.internal_metadata.stream_ordering = ( pruned_event.internal_metadata.stream_ordering = (
event.internal_metadata.stream_ordering event.internal_metadata.stream_ordering
) )
pruned_event.internal_metadata.instance_name = event.internal_metadata.instance_name
pruned_event.internal_metadata.outlier = event.internal_metadata.outlier pruned_event.internal_metadata.outlier = event.internal_metadata.outlier
# Mark the event as redacted # Mark the event as redacted
@ -116,6 +117,7 @@ def clone_event(event: EventBase) -> EventBase:
new_event.internal_metadata.stream_ordering = ( new_event.internal_metadata.stream_ordering = (
event.internal_metadata.stream_ordering event.internal_metadata.stream_ordering
) )
new_event.internal_metadata.instance_name = event.internal_metadata.instance_name
new_event.internal_metadata.outlier = event.internal_metadata.outlier new_event.internal_metadata.outlier = event.internal_metadata.outlier
return new_event return new_event

View file

@ -201,7 +201,7 @@ class MessageHandler:
if at_token: if at_token:
last_event_id = ( last_event_id = (
await self.store.get_last_event_in_room_before_stream_ordering( await self.store.get_last_event_id_in_room_before_stream_ordering(
room_id, room_id,
end_token=at_token.room_key, end_token=at_token.room_key,
) )
@ -1551,6 +1551,7 @@ class EventCreationHandler:
# stream_ordering entry manually (as it was persisted on # stream_ordering entry manually (as it was persisted on
# another worker). # another worker).
event.internal_metadata.stream_ordering = stream_id event.internal_metadata.stream_ordering = stream_id
event.internal_metadata.instance_name = writer_instance
return event return event

View file

@ -1038,9 +1038,11 @@ class SyncHandler:
# FIXME: This gets the state at the latest event before the stream ordering, # FIXME: This gets the state at the latest event before the stream ordering,
# which might not be the same as the "current state" of the room at the time # which might not be the same as the "current state" of the room at the time
# of the stream token if there were multiple forward extremities at the time. # of the stream token if there were multiple forward extremities at the time.
last_event_id = await self.store.get_last_event_in_room_before_stream_ordering( last_event_id = (
room_id, await self.store.get_last_event_id_in_room_before_stream_ordering(
end_token=stream_position.room_key, room_id,
end_token=stream_position.room_key,
)
) )
if last_event_id: if last_event_id:
@ -1521,7 +1523,7 @@ class SyncHandler:
# We need to make sure the first event in our batch points to the # We need to make sure the first event in our batch points to the
# last event in the previous batch. # last event in the previous batch.
last_event_id_prev_batch = ( last_event_id_prev_batch = (
await self.store.get_last_event_in_room_before_stream_ordering( await self.store.get_last_event_id_in_room_before_stream_ordering(
room_id, room_id,
end_token=since_token.room_key, end_token=since_token.room_key,
) )

View file

@ -207,6 +207,7 @@ class PersistEventsStore:
async with stream_ordering_manager as stream_orderings: async with stream_ordering_manager as stream_orderings:
for (event, _), stream in zip(events_and_contexts, stream_orderings): for (event, _), stream in zip(events_and_contexts, stream_orderings):
event.internal_metadata.stream_ordering = stream event.internal_metadata.stream_ordering = stream
event.internal_metadata.instance_name = self._instance_name
await self.db_pool.runInteraction( await self.db_pool.runInteraction(
"persist_events", "persist_events",

View file

@ -156,6 +156,7 @@ class _EventRow:
event_id: str event_id: str
stream_ordering: int stream_ordering: int
instance_name: str
json: str json: str
internal_metadata: str internal_metadata: str
format_version: Optional[int] format_version: Optional[int]
@ -1354,6 +1355,7 @@ class EventsWorkerStore(SQLBaseStore):
rejected_reason=rejected_reason, rejected_reason=rejected_reason,
) )
original_ev.internal_metadata.stream_ordering = row.stream_ordering original_ev.internal_metadata.stream_ordering = row.stream_ordering
original_ev.internal_metadata.instance_name = row.instance_name
original_ev.internal_metadata.outlier = row.outlier original_ev.internal_metadata.outlier = row.outlier
# Consistency check: if the content of the event has been modified in the # Consistency check: if the content of the event has been modified in the
@ -1439,6 +1441,7 @@ class EventsWorkerStore(SQLBaseStore):
SELECT SELECT
e.event_id, e.event_id,
e.stream_ordering, e.stream_ordering,
e.instance_name,
ej.internal_metadata, ej.internal_metadata,
ej.json, ej.json,
ej.format_version, ej.format_version,
@ -1462,13 +1465,14 @@ class EventsWorkerStore(SQLBaseStore):
event_dict[event_id] = _EventRow( event_dict[event_id] = _EventRow(
event_id=event_id, event_id=event_id,
stream_ordering=row[1], stream_ordering=row[1],
internal_metadata=row[2], instance_name=row[2],
json=row[3], internal_metadata=row[3],
format_version=row[4], json=row[4],
room_version_id=row[5], format_version=row[5],
rejected_reason=row[6], room_version_id=row[6],
rejected_reason=row[7],
redactions=[], redactions=[],
outlier=bool(row[7]), # This is an int in SQLite3 outlier=bool(row[8]), # This is an int in SQLite3
) )
# check for redactions # check for redactions

View file

@ -895,7 +895,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
"get_room_event_before_stream_ordering", _f "get_room_event_before_stream_ordering", _f
) )
async def get_last_event_in_room_before_stream_ordering( async def get_last_event_id_in_room_before_stream_ordering(
self, self,
room_id: str, room_id: str,
end_token: RoomStreamToken, end_token: RoomStreamToken,
@ -910,10 +910,36 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
The ID of the most recent event, or None if there are no events in the room The ID of the most recent event, or None if there are no events in the room
before this stream ordering. before this stream ordering.
""" """
last_event_result = await self.get_last_event_in_room_before_stream_ordering(
room_id, end_token
)
if last_event_result:
return last_event_result[0]
return None
async def get_last_event_in_room_before_stream_ordering(
self,
room_id: str,
end_token: RoomStreamToken,
) -> Optional[Tuple[str, PersistedEventPosition]]:
"""
Returns the ID and event position of the last event in a room at or before a
stream ordering.
Args:
room_id
end_token: The token used to stream from
Returns:
The ID of the most recent event and it's position, or None if there are no
events in the room before this stream ordering.
"""
def get_last_event_in_room_before_stream_ordering_txn( def get_last_event_in_room_before_stream_ordering_txn(
txn: LoggingTransaction, txn: LoggingTransaction,
) -> Optional[str]: ) -> Optional[Tuple[str, PersistedEventPosition]]:
# We're looking for the closest event at or before the token. We need to # We're looking for the closest event at or before the token. We need to
# handle the fact that the stream token can be a vector clock (with an # handle the fact that the stream token can be a vector clock (with an
# `instance_map`) and events can be persisted on different instances # `instance_map`) and events can be persisted on different instances
@ -975,7 +1001,9 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
topological_ordering=topological_ordering, topological_ordering=topological_ordering,
stream_ordering=stream_ordering, stream_ordering=stream_ordering,
): ):
return event_id return event_id, PersistedEventPosition(
instance_name, stream_ordering
)
return None return None

View file

@ -19,6 +19,8 @@ class EventInternalMetadata:
stream_ordering: Optional[int] stream_ordering: Optional[int]
"""the stream ordering of this event. None, until it has been persisted.""" """the stream ordering of this event. None, until it has been persisted."""
instance_name: Optional[str]
"""the instance name of the server that persisted this event. None, until it has been persisted."""
outlier: bool outlier: bool
"""whether this event is an outlier (ie, whether we have the state at that """whether this event is an outlier (ie, whether we have the state at that

View file

@ -625,6 +625,8 @@ class CloneEventTestCase(stdlib_unittest.TestCase):
) )
original.internal_metadata.stream_ordering = 1234 original.internal_metadata.stream_ordering = 1234
self.assertEqual(original.internal_metadata.stream_ordering, 1234) self.assertEqual(original.internal_metadata.stream_ordering, 1234)
original.internal_metadata.instance_name = "worker1"
self.assertEqual(original.internal_metadata.instance_name, "worker1")
cloned = clone_event(original) cloned = clone_event(original)
cloned.unsigned["b"] = 3 cloned.unsigned["b"] = 3
@ -632,6 +634,7 @@ class CloneEventTestCase(stdlib_unittest.TestCase):
self.assertEqual(original.unsigned, {"a": 1, "b": 2}) self.assertEqual(original.unsigned, {"a": 1, "b": 2})
self.assertEqual(cloned.unsigned, {"a": 1, "b": 3}) self.assertEqual(cloned.unsigned, {"a": 1, "b": 3})
self.assertEqual(cloned.internal_metadata.stream_ordering, 1234) self.assertEqual(cloned.internal_metadata.stream_ordering, 1234)
self.assertEqual(cloned.internal_metadata.instance_name, "worker1")
self.assertEqual(cloned.internal_metadata.txn_id, "txn") self.assertEqual(cloned.internal_metadata.txn_id, "txn")

View file

@ -431,6 +431,7 @@ class EventChainStoreTestCase(HomeserverTestCase):
for e in events: for e in events:
e.internal_metadata.stream_ordering = self._next_stream_ordering e.internal_metadata.stream_ordering = self._next_stream_ordering
e.internal_metadata.instance_name = self.hs.get_instance_name()
self._next_stream_ordering += 1 self._next_stream_ordering += 1
def _persist(txn: LoggingTransaction) -> None: def _persist(txn: LoggingTransaction) -> None:

View file

@ -336,14 +336,14 @@ class GetLastEventInRoomBeforeStreamOrderingTestCase(HomeserverTestCase):
room_id = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True) room_id = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True)
last_event = self.get_success( last_event_result = self.get_success(
self.store.get_last_event_in_room_before_stream_ordering( self.store.get_last_event_in_room_before_stream_ordering(
room_id=room_id, room_id=room_id,
end_token=before_room_token.room_key, end_token=before_room_token.room_key,
) )
) )
self.assertIsNone(last_event) self.assertIsNone(last_event_result)
def test_after_room_created(self) -> None: def test_after_room_created(self) -> None:
""" """
@ -356,14 +356,16 @@ class GetLastEventInRoomBeforeStreamOrderingTestCase(HomeserverTestCase):
after_room_token = self.event_sources.get_current_token() after_room_token = self.event_sources.get_current_token()
last_event = self.get_success( last_event_result = self.get_success(
self.store.get_last_event_in_room_before_stream_ordering( self.store.get_last_event_in_room_before_stream_ordering(
room_id=room_id, room_id=room_id,
end_token=after_room_token.room_key, end_token=after_room_token.room_key,
) )
) )
assert last_event_result is not None
last_event_id, _ = last_event_result
self.assertIsNotNone(last_event) self.assertIsNotNone(last_event_id)
def test_activity_in_other_rooms(self) -> None: def test_activity_in_other_rooms(self) -> None:
""" """
@ -380,16 +382,18 @@ class GetLastEventInRoomBeforeStreamOrderingTestCase(HomeserverTestCase):
after_room_token = self.event_sources.get_current_token() after_room_token = self.event_sources.get_current_token()
last_event = self.get_success( last_event_result = self.get_success(
self.store.get_last_event_in_room_before_stream_ordering( self.store.get_last_event_in_room_before_stream_ordering(
room_id=room_id1, room_id=room_id1,
end_token=after_room_token.room_key, end_token=after_room_token.room_key,
) )
) )
assert last_event_result is not None
last_event_id, _ = last_event_result
# Make sure it's the event we expect (which also means we know it's from the # Make sure it's the event we expect (which also means we know it's from the
# correct room) # correct room)
self.assertEqual(last_event, event_response["event_id"]) self.assertEqual(last_event_id, event_response["event_id"])
def test_activity_after_token_has_no_effect(self) -> None: def test_activity_after_token_has_no_effect(self) -> None:
""" """
@ -408,15 +412,17 @@ class GetLastEventInRoomBeforeStreamOrderingTestCase(HomeserverTestCase):
self.helper.send(room_id1, "after1", tok=user1_tok) self.helper.send(room_id1, "after1", tok=user1_tok)
self.helper.send(room_id1, "after2", tok=user1_tok) self.helper.send(room_id1, "after2", tok=user1_tok)
last_event = self.get_success( last_event_result = self.get_success(
self.store.get_last_event_in_room_before_stream_ordering( self.store.get_last_event_in_room_before_stream_ordering(
room_id=room_id1, room_id=room_id1,
end_token=after_room_token.room_key, end_token=after_room_token.room_key,
) )
) )
assert last_event_result is not None
last_event_id, _ = last_event_result
# Make sure it's the last event before the token # Make sure it's the last event before the token
self.assertEqual(last_event, event_response["event_id"]) self.assertEqual(last_event_id, event_response["event_id"])
def test_last_event_within_sharded_token(self) -> None: def test_last_event_within_sharded_token(self) -> None:
""" """
@ -457,18 +463,20 @@ class GetLastEventInRoomBeforeStreamOrderingTestCase(HomeserverTestCase):
self.helper.send(room_id1, "after1", tok=user1_tok) self.helper.send(room_id1, "after1", tok=user1_tok)
self.helper.send(room_id1, "after2", tok=user1_tok) self.helper.send(room_id1, "after2", tok=user1_tok)
last_event = self.get_success( last_event_result = self.get_success(
self.store.get_last_event_in_room_before_stream_ordering( self.store.get_last_event_in_room_before_stream_ordering(
room_id=room_id1, room_id=room_id1,
end_token=end_token, end_token=end_token,
) )
) )
assert last_event_result is not None
last_event_id, _ = last_event_result
# Should find closest event before the token in room1 # Should find closest event before the token in room1
self.assertEqual( self.assertEqual(
last_event, last_event_id,
event_response3["event_id"], event_response3["event_id"],
f"We expected {event_response3['event_id']} but saw {last_event} which corresponds to " f"We expected {event_response3['event_id']} but saw {last_event_id} which corresponds to "
+ str( + str(
{ {
"event1": event_response1["event_id"], "event1": event_response1["event_id"],
@ -515,18 +523,20 @@ class GetLastEventInRoomBeforeStreamOrderingTestCase(HomeserverTestCase):
self.helper.send(room_id1, "after1", tok=user1_tok) self.helper.send(room_id1, "after1", tok=user1_tok)
self.helper.send(room_id1, "after2", tok=user1_tok) self.helper.send(room_id1, "after2", tok=user1_tok)
last_event = self.get_success( last_event_result = self.get_success(
self.store.get_last_event_in_room_before_stream_ordering( self.store.get_last_event_in_room_before_stream_ordering(
room_id=room_id1, room_id=room_id1,
end_token=end_token, end_token=end_token,
) )
) )
assert last_event_result is not None
last_event_id, _ = last_event_result
# Should find closest event before the token in room1 # Should find closest event before the token in room1
self.assertEqual( self.assertEqual(
last_event, last_event_id,
event_response2["event_id"], event_response2["event_id"],
f"We expected {event_response2['event_id']} but saw {last_event} which corresponds to " f"We expected {event_response2['event_id']} but saw {last_event_id} which corresponds to "
+ str( + str(
{ {
"event1": event_response1["event_id"], "event1": event_response1["event_id"],