Notify user stream listeners to wake up long polling syncs

This commit is contained in:
Mathieu Velten 2023-01-18 23:54:24 +01:00
parent 7b60abea53
commit 46062d8019
4 changed files with 38 additions and 10 deletions

View file

@ -1726,15 +1726,16 @@ class FederationHandler:
await self._device_handler.handle_room_un_partial_stated(room_id) await self._device_handler.handle_room_un_partial_stated(room_id)
logger.info("Clearing partial-state flag for %s", room_id) logger.info("Clearing partial-state flag for %s", room_id)
success = await self.store.clear_partial_state_room(room_id) new_stream_id = await self.store.clear_partial_state_room(room_id)
if success: if new_stream_id is not None:
logger.info("State resync complete for %s", room_id) logger.info("State resync complete for %s", room_id)
self._storage_controllers.state.notify_room_un_partial_stated( self._storage_controllers.state.notify_room_un_partial_stated(
room_id room_id
) )
# Poke the notifier so that other workers see the write to
# the un-partial-stated rooms stream. await self._notifier.on_un_partial_stated_room(
self._notifier.notify_replication() room_id, new_stream_id
)
# TODO(faster_joins) update room stats and user directory? # TODO(faster_joins) update room stats and user directory?
# https://github.com/matrix-org/synapse/issues/12814 # https://github.com/matrix-org/synapse/issues/12814

View file

@ -315,6 +315,32 @@ class Notifier:
event_entries.append((entry, event.event_id)) event_entries.append((entry, event.event_id))
await self.notify_new_room_events(event_entries, max_room_stream_token) await self.notify_new_room_events(event_entries, max_room_stream_token)
async def on_un_partial_stated_room(
self,
room_id: str,
new_token: int,
) -> None:
"""Used by the resync background processes to wake up all listeners
of this room that it just got un-partial-stated.
It will also notify replication listeners of the change in stream.
"""
# Wake up all related user stream notifiers
user_streams = self.room_to_user_streams.get(room_id, set())
time_now_ms = self.clock.time_msec()
for user_stream in user_streams:
try:
user_stream.notify(
StreamKeyType.UN_PARTIAL_STATED_ROOMS, new_token, time_now_ms
)
except Exception:
logger.exception("Failed to notify listener")
# Poke the replication so that other workers also see the write to
# the un-partial-stated rooms stream.
self.notify_replication()
async def notify_new_room_events( async def notify_new_room_events(
self, self,
event_entries: List[Tuple[_PendingRoomEventEntry, str]], event_entries: List[Tuple[_PendingRoomEventEntry, str]],

View file

@ -2330,16 +2330,16 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore):
(room_id,), (room_id,),
) )
async def clear_partial_state_room(self, room_id: str) -> bool: async def clear_partial_state_room(self, room_id: str) -> Optional[int]:
"""Clears the partial state flag for a room. """Clears the partial state flag for a room.
Args: Args:
room_id: The room whose partial state flag is to be cleared. room_id: The room whose partial state flag is to be cleared.
Returns: Returns:
`True` if the partial state flag has been cleared successfully. The corresponding stream id for the un-partial-stated rooms stream.
`False` if the partial state flag could not be cleared because the room `None` if the partial state flag could not be cleared because the room
still contains events with partial state. still contains events with partial state.
""" """
try: try:
@ -2350,7 +2350,7 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore):
room_id, room_id,
un_partial_state_room_stream_id, un_partial_state_room_stream_id,
) )
return True return un_partial_state_room_stream_id
except self.db_pool.engine.module.IntegrityError as e: except self.db_pool.engine.module.IntegrityError as e:
# Assume that any `IntegrityError`s are due to partial state events. # Assume that any `IntegrityError`s are due to partial state events.
logger.info( logger.info(
@ -2358,7 +2358,7 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore):
room_id, room_id,
e, e,
) )
return False return None
def _clear_partial_state_room_txn( def _clear_partial_state_room_txn(
self, self,

View file

@ -627,6 +627,7 @@ class StreamKeyType:
PUSH_RULES: Final = "push_rules_key" PUSH_RULES: Final = "push_rules_key"
TO_DEVICE: Final = "to_device_key" TO_DEVICE: Final = "to_device_key"
DEVICE_LIST: Final = "device_list_key" DEVICE_LIST: Final = "device_list_key"
UN_PARTIAL_STATED_ROOMS = "un_partial_stated_rooms_key"
@attr.s(slots=True, frozen=True, auto_attribs=True) @attr.s(slots=True, frozen=True, auto_attribs=True)