From 46062d8019d862b3a3794e5010677b6f2676fc83 Mon Sep 17 00:00:00 2001 From: Mathieu Velten Date: Wed, 18 Jan 2023 23:54:24 +0100 Subject: [PATCH] Notify user stream listeners to wake up long polling syncs --- synapse/handlers/federation.py | 11 ++++++----- synapse/notifier.py | 26 ++++++++++++++++++++++++++ synapse/storage/databases/main/room.py | 10 +++++----- synapse/types/__init__.py | 1 + 4 files changed, 38 insertions(+), 10 deletions(-) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index eca75f1108..3fcd5f3db4 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -1726,15 +1726,16 @@ class FederationHandler: await self._device_handler.handle_room_un_partial_stated(room_id) logger.info("Clearing partial-state flag for %s", room_id) - success = await self.store.clear_partial_state_room(room_id) - if success: + new_stream_id = await self.store.clear_partial_state_room(room_id) + if new_stream_id is not None: logger.info("State resync complete for %s", room_id) self._storage_controllers.state.notify_room_un_partial_stated( room_id ) - # Poke the notifier so that other workers see the write to - # the un-partial-stated rooms stream. - self._notifier.notify_replication() + + await self._notifier.on_un_partial_stated_room( + room_id, new_stream_id + ) # TODO(faster_joins) update room stats and user directory? # https://github.com/matrix-org/synapse/issues/12814 diff --git a/synapse/notifier.py b/synapse/notifier.py index 26b97cf766..258e60367e 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -315,6 +315,32 @@ class Notifier: event_entries.append((entry, event.event_id)) await self.notify_new_room_events(event_entries, max_room_stream_token) + async def on_un_partial_stated_room( + self, + room_id: str, + new_token: int, + ) -> None: + """Used by the resync background processes to wake up all listeners + of this room that it just got un-partial-stated. + + It will also notify replication listeners of the change in stream. + """ + + # Wake up all related user stream notifiers + user_streams = self.room_to_user_streams.get(room_id, set()) + time_now_ms = self.clock.time_msec() + for user_stream in user_streams: + try: + user_stream.notify( + StreamKeyType.UN_PARTIAL_STATED_ROOMS, new_token, time_now_ms + ) + except Exception: + logger.exception("Failed to notify listener") + + # Poke the replication so that other workers also see the write to + # the un-partial-stated rooms stream. + self.notify_replication() + async def notify_new_room_events( self, event_entries: List[Tuple[_PendingRoomEventEntry, str]], diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py index 7a16d27627..09ba372f7a 100644 --- a/synapse/storage/databases/main/room.py +++ b/synapse/storage/databases/main/room.py @@ -2330,16 +2330,16 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore): (room_id,), ) - async def clear_partial_state_room(self, room_id: str) -> bool: + async def clear_partial_state_room(self, room_id: str) -> Optional[int]: """Clears the partial state flag for a room. Args: room_id: The room whose partial state flag is to be cleared. Returns: - `True` if the partial state flag has been cleared successfully. + The corresponding stream id for the un-partial-stated rooms stream. - `False` if the partial state flag could not be cleared because the room + `None` if the partial state flag could not be cleared because the room still contains events with partial state. """ try: @@ -2350,7 +2350,7 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore): room_id, un_partial_state_room_stream_id, ) - return True + return un_partial_state_room_stream_id except self.db_pool.engine.module.IntegrityError as e: # Assume that any `IntegrityError`s are due to partial state events. logger.info( @@ -2358,7 +2358,7 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore): room_id, e, ) - return False + return None def _clear_partial_state_room_txn( self, diff --git a/synapse/types/__init__.py b/synapse/types/__init__.py index dfea825d98..1ae1e9e526 100644 --- a/synapse/types/__init__.py +++ b/synapse/types/__init__.py @@ -627,6 +627,7 @@ class StreamKeyType: PUSH_RULES: Final = "push_rules_key" TO_DEVICE: Final = "to_device_key" DEVICE_LIST: Final = "device_list_key" + UN_PARTIAL_STATED_ROOMS = "un_partial_stated_rooms_key" @attr.s(slots=True, frozen=True, auto_attribs=True)