From c94550d54278ace5bc0d22544612bfed92dbe17d Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 12 Jun 2024 14:53:36 -0500 Subject: [PATCH] Add `event.internal_metadata.instance_name` and event position to `get_last_event_in_room_before_stream_ordering(...)` --- synapse/events/utils.py | 2 + synapse/handlers/message.py | 3 +- synapse/handlers/sync.py | 10 +++-- synapse/storage/databases/main/events.py | 1 + .../storage/databases/main/events_worker.py | 16 +++++--- synapse/storage/databases/main/stream.py | 34 +++++++++++++++-- synapse/synapse_rust/events.pyi | 2 + tests/events/test_utils.py | 3 ++ tests/storage/test_event_chain.py | 1 + tests/storage/test_stream.py | 38 ++++++++++++------- 10 files changed, 82 insertions(+), 28 deletions(-) diff --git a/synapse/events/utils.py b/synapse/events/utils.py index 0772472312..b997d82d71 100644 --- a/synapse/events/utils.py +++ b/synapse/events/utils.py @@ -90,6 +90,7 @@ def prune_event(event: EventBase) -> EventBase: pruned_event.internal_metadata.stream_ordering = ( event.internal_metadata.stream_ordering ) + pruned_event.internal_metadata.instance_name = event.internal_metadata.instance_name pruned_event.internal_metadata.outlier = event.internal_metadata.outlier # Mark the event as redacted @@ -116,6 +117,7 @@ def clone_event(event: EventBase) -> EventBase: new_event.internal_metadata.stream_ordering = ( event.internal_metadata.stream_ordering ) + new_event.internal_metadata.instance_name = event.internal_metadata.instance_name new_event.internal_metadata.outlier = event.internal_metadata.outlier return new_event diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index de5bd44a5f..16d01efc67 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -201,7 +201,7 @@ class MessageHandler: if at_token: last_event_id = ( - await self.store.get_last_event_in_room_before_stream_ordering( + await self.store.get_last_event_id_in_room_before_stream_ordering( room_id, end_token=at_token.room_key, ) @@ -1551,6 +1551,7 @@ class EventCreationHandler: # stream_ordering entry manually (as it was persisted on # another worker). event.internal_metadata.stream_ordering = stream_id + event.internal_metadata.instance_name = writer_instance return event diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 39964726c5..881f66e2da 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -1038,9 +1038,11 @@ class SyncHandler: # FIXME: This gets the state at the latest event before the stream ordering, # which might not be the same as the "current state" of the room at the time # of the stream token if there were multiple forward extremities at the time. - last_event_id = await self.store.get_last_event_in_room_before_stream_ordering( - room_id, - end_token=stream_position.room_key, + last_event_id = ( + await self.store.get_last_event_id_in_room_before_stream_ordering( + room_id, + end_token=stream_position.room_key, + ) ) if last_event_id: @@ -1521,7 +1523,7 @@ class SyncHandler: # We need to make sure the first event in our batch points to the # last event in the previous batch. last_event_id_prev_batch = ( - await self.store.get_last_event_in_room_before_stream_ordering( + await self.store.get_last_event_id_in_room_before_stream_ordering( room_id, end_token=since_token.room_key, ) diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index f1bd85aa27..66428e6c8e 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -207,6 +207,7 @@ class PersistEventsStore: async with stream_ordering_manager as stream_orderings: for (event, _), stream in zip(events_and_contexts, stream_orderings): event.internal_metadata.stream_ordering = stream + event.internal_metadata.instance_name = self._instance_name await self.db_pool.runInteraction( "persist_events", diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index c06c44deb1..e264d36f02 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -156,6 +156,7 @@ class _EventRow: event_id: str stream_ordering: int + instance_name: str json: str internal_metadata: str format_version: Optional[int] @@ -1354,6 +1355,7 @@ class EventsWorkerStore(SQLBaseStore): rejected_reason=rejected_reason, ) original_ev.internal_metadata.stream_ordering = row.stream_ordering + original_ev.internal_metadata.instance_name = row.instance_name original_ev.internal_metadata.outlier = row.outlier # Consistency check: if the content of the event has been modified in the @@ -1439,6 +1441,7 @@ class EventsWorkerStore(SQLBaseStore): SELECT e.event_id, e.stream_ordering, + e.instance_name, ej.internal_metadata, ej.json, ej.format_version, @@ -1462,13 +1465,14 @@ class EventsWorkerStore(SQLBaseStore): event_dict[event_id] = _EventRow( event_id=event_id, stream_ordering=row[1], - internal_metadata=row[2], - json=row[3], - format_version=row[4], - room_version_id=row[5], - rejected_reason=row[6], + instance_name=row[2], + internal_metadata=row[3], + json=row[4], + format_version=row[5], + room_version_id=row[6], + rejected_reason=row[7], redactions=[], - outlier=bool(row[7]), # This is an int in SQLite3 + outlier=bool(row[8]), # This is an int in SQLite3 ) # check for redactions diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py index dd5f76dd53..d7cd5ae414 100644 --- a/synapse/storage/databases/main/stream.py +++ b/synapse/storage/databases/main/stream.py @@ -895,7 +895,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): "get_room_event_before_stream_ordering", _f ) - async def get_last_event_in_room_before_stream_ordering( + async def get_last_event_id_in_room_before_stream_ordering( self, room_id: str, end_token: RoomStreamToken, @@ -910,10 +910,36 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): The ID of the most recent event, or None if there are no events in the room before this stream ordering. """ + last_event_result = await self.get_last_event_in_room_before_stream_ordering( + room_id, end_token + ) + + if last_event_result: + return last_event_result[0] + + return None + + async def get_last_event_in_room_before_stream_ordering( + self, + room_id: str, + end_token: RoomStreamToken, + ) -> Optional[Tuple[str, PersistedEventPosition]]: + """ + Returns the ID and event position of the last event in a room at or before a + stream ordering. + + Args: + room_id + end_token: The token used to stream from + + Returns: + The ID of the most recent event and it's position, or None if there are no + events in the room before this stream ordering. + """ def get_last_event_in_room_before_stream_ordering_txn( txn: LoggingTransaction, - ) -> Optional[str]: + ) -> Optional[Tuple[str, PersistedEventPosition]]: # We're looking for the closest event at or before the token. We need to # handle the fact that the stream token can be a vector clock (with an # `instance_map`) and events can be persisted on different instances @@ -975,7 +1001,9 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): topological_ordering=topological_ordering, stream_ordering=stream_ordering, ): - return event_id + return event_id, PersistedEventPosition( + instance_name, stream_ordering + ) return None diff --git a/synapse/synapse_rust/events.pyi b/synapse/synapse_rust/events.pyi index 69837617f5..1682d0d151 100644 --- a/synapse/synapse_rust/events.pyi +++ b/synapse/synapse_rust/events.pyi @@ -19,6 +19,8 @@ class EventInternalMetadata: stream_ordering: Optional[int] """the stream ordering of this event. None, until it has been persisted.""" + instance_name: Optional[str] + """the instance name of the server that persisted this event. None, until it has been persisted.""" outlier: bool """whether this event is an outlier (ie, whether we have the state at that diff --git a/tests/events/test_utils.py b/tests/events/test_utils.py index d5ac66a6ed..30f8787758 100644 --- a/tests/events/test_utils.py +++ b/tests/events/test_utils.py @@ -625,6 +625,8 @@ class CloneEventTestCase(stdlib_unittest.TestCase): ) original.internal_metadata.stream_ordering = 1234 self.assertEqual(original.internal_metadata.stream_ordering, 1234) + original.internal_metadata.instance_name = "worker1" + self.assertEqual(original.internal_metadata.instance_name, "worker1") cloned = clone_event(original) cloned.unsigned["b"] = 3 @@ -632,6 +634,7 @@ class CloneEventTestCase(stdlib_unittest.TestCase): self.assertEqual(original.unsigned, {"a": 1, "b": 2}) self.assertEqual(cloned.unsigned, {"a": 1, "b": 3}) self.assertEqual(cloned.internal_metadata.stream_ordering, 1234) + self.assertEqual(cloned.internal_metadata.instance_name, "worker1") self.assertEqual(cloned.internal_metadata.txn_id, "txn") diff --git a/tests/storage/test_event_chain.py b/tests/storage/test_event_chain.py index 27d5b0125f..81feb3ec29 100644 --- a/tests/storage/test_event_chain.py +++ b/tests/storage/test_event_chain.py @@ -431,6 +431,7 @@ class EventChainStoreTestCase(HomeserverTestCase): for e in events: e.internal_metadata.stream_ordering = self._next_stream_ordering + e.internal_metadata.instance_name = self.hs.get_instance_name() self._next_stream_ordering += 1 def _persist(txn: LoggingTransaction) -> None: diff --git a/tests/storage/test_stream.py b/tests/storage/test_stream.py index 2dd4083199..b8dd796a71 100644 --- a/tests/storage/test_stream.py +++ b/tests/storage/test_stream.py @@ -336,14 +336,14 @@ class GetLastEventInRoomBeforeStreamOrderingTestCase(HomeserverTestCase): room_id = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True) - last_event = self.get_success( + last_event_result = self.get_success( self.store.get_last_event_in_room_before_stream_ordering( room_id=room_id, end_token=before_room_token.room_key, ) ) - self.assertIsNone(last_event) + self.assertIsNone(last_event_result) def test_after_room_created(self) -> None: """ @@ -356,14 +356,16 @@ class GetLastEventInRoomBeforeStreamOrderingTestCase(HomeserverTestCase): after_room_token = self.event_sources.get_current_token() - last_event = self.get_success( + last_event_result = self.get_success( self.store.get_last_event_in_room_before_stream_ordering( room_id=room_id, end_token=after_room_token.room_key, ) ) + assert last_event_result is not None + last_event_id, _ = last_event_result - self.assertIsNotNone(last_event) + self.assertIsNotNone(last_event_id) def test_activity_in_other_rooms(self) -> None: """ @@ -380,16 +382,18 @@ class GetLastEventInRoomBeforeStreamOrderingTestCase(HomeserverTestCase): after_room_token = self.event_sources.get_current_token() - last_event = self.get_success( + last_event_result = self.get_success( self.store.get_last_event_in_room_before_stream_ordering( room_id=room_id1, end_token=after_room_token.room_key, ) ) + assert last_event_result is not None + last_event_id, _ = last_event_result # Make sure it's the event we expect (which also means we know it's from the # correct room) - self.assertEqual(last_event, event_response["event_id"]) + self.assertEqual(last_event_id, event_response["event_id"]) def test_activity_after_token_has_no_effect(self) -> None: """ @@ -408,15 +412,17 @@ class GetLastEventInRoomBeforeStreamOrderingTestCase(HomeserverTestCase): self.helper.send(room_id1, "after1", tok=user1_tok) self.helper.send(room_id1, "after2", tok=user1_tok) - last_event = self.get_success( + last_event_result = self.get_success( self.store.get_last_event_in_room_before_stream_ordering( room_id=room_id1, end_token=after_room_token.room_key, ) ) + assert last_event_result is not None + last_event_id, _ = last_event_result # Make sure it's the last event before the token - self.assertEqual(last_event, event_response["event_id"]) + self.assertEqual(last_event_id, event_response["event_id"]) def test_last_event_within_sharded_token(self) -> None: """ @@ -457,18 +463,20 @@ class GetLastEventInRoomBeforeStreamOrderingTestCase(HomeserverTestCase): self.helper.send(room_id1, "after1", tok=user1_tok) self.helper.send(room_id1, "after2", tok=user1_tok) - last_event = self.get_success( + last_event_result = self.get_success( self.store.get_last_event_in_room_before_stream_ordering( room_id=room_id1, end_token=end_token, ) ) + assert last_event_result is not None + last_event_id, _ = last_event_result # Should find closest event before the token in room1 self.assertEqual( - last_event, + last_event_id, event_response3["event_id"], - f"We expected {event_response3['event_id']} but saw {last_event} which corresponds to " + f"We expected {event_response3['event_id']} but saw {last_event_id} which corresponds to " + str( { "event1": event_response1["event_id"], @@ -515,18 +523,20 @@ class GetLastEventInRoomBeforeStreamOrderingTestCase(HomeserverTestCase): self.helper.send(room_id1, "after1", tok=user1_tok) self.helper.send(room_id1, "after2", tok=user1_tok) - last_event = self.get_success( + last_event_result = self.get_success( self.store.get_last_event_in_room_before_stream_ordering( room_id=room_id1, end_token=end_token, ) ) + assert last_event_result is not None + last_event_id, _ = last_event_result # Should find closest event before the token in room1 self.assertEqual( - last_event, + last_event_id, event_response2["event_id"], - f"We expected {event_response2['event_id']} but saw {last_event} which corresponds to " + f"We expected {event_response2['event_id']} but saw {last_event_id} which corresponds to " + str( { "event1": event_response1["event_id"],