From 1268a5413f28f82803133911c60c675fe41a460c Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Mon, 3 Jun 2024 20:09:35 -0500 Subject: [PATCH] Properly compare tokens and event positions Avoid flawed raw stream comparison. --- synapse/handlers/sliding_sync.py | 212 +++++++++++++++---------------- 1 file changed, 106 insertions(+), 106 deletions(-) diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index 0cfebf0ed5..e84ddc0b81 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -172,7 +172,7 @@ class SlidingSyncResult: next_pos: StreamToken lists: Dict[str, SlidingWindowList] - rooms: List[RoomResult] + rooms: Dict[str, RoomResult] extensions: JsonMapping def __bool__(self) -> bool: @@ -182,10 +182,21 @@ class SlidingSyncResult: """ return bool(self.lists or self.rooms or self.extensions) + @staticmethod + def empty(next_pos: StreamToken) -> "SlidingSyncResult": + "Return a new empty result" + return SlidingSyncResult( + next_pos=next_pos, + lists={}, + rooms={}, + extensions={}, + ) + class SlidingSyncHandler: def __init__(self, hs: "HomeServer"): self.hs_config = hs.config + self.clock = hs.get_clock() self.store = hs.get_datastores().main self.auth_blocking = hs.get_auth_blocking() self.notifier = hs.get_notifier() @@ -197,7 +208,7 @@ class SlidingSyncHandler: requester: Requester, sync_config: SlidingSyncConfig, from_token: Optional[StreamToken] = None, - timeout: int = 0, + timeout_ms: int = 0, ) -> SlidingSyncResult: """Get the sync for a client if we have new data for it now. Otherwise wait for new data to arrive on the server. If the timeout expires, then @@ -212,7 +223,30 @@ class SlidingSyncHandler: # any to-device messages before that token (since we now know that the device # has received them). (see sync v2 for how to do this) - if timeout == 0 or from_token is None: + # If the we're working with a user-provided token, we need to make sure to wait + # for this worker to catch up with the token. + if from_token is not None: + # We need to make sure this worker has caught up with the token. If + # this returns false, it means we timed out waiting, and we should + # just return an empty response. + before_wait_ts = self.clock.time_msec() + if not await self.notifier.wait_for_stream_token(from_token): + logger.warning( + "Timed out waiting for worker to catch up. Returning empty response" + ) + return SlidingSyncResult.empty(from_token) + + # If we've spent significant time waiting to catch up, take it off + # the timeout. + after_wait_ts = self.clock.time_msec() + if after_wait_ts - before_wait_ts > 1_000: + timeout_ms -= after_wait_ts - before_wait_ts + timeout_ms = max(timeout_ms, 0) + + # We're going to respond immediately if the timeout is 0 or if this is an + # initial sync (without a `from_token`) so we can avoid calling + # `notifier.wait_for_events()`. + if timeout_ms == 0 or from_token is None: now_token = self.event_sources.get_current_token() result = await self.current_sync_for_user( sync_config, @@ -232,7 +266,7 @@ class SlidingSyncHandler: result = await self.notifier.wait_for_events( sync_config.user.to_string(), - timeout, + timeout_ms, current_sync_callback, from_token=from_token, ) @@ -296,7 +330,7 @@ class SlidingSyncHandler: next_pos=to_token, lists=lists, # TODO: Gather room data for rooms in lists and `sync_config.room_subscriptions` - rooms=[], + rooms={}, extensions={}, ) @@ -345,7 +379,7 @@ class SlidingSyncHandler: # # First we need to get the max stream_ordering of each event persister instance # that we queried events from. - instance_to_max_stream_ordering_map = {} + instance_to_max_stream_ordering_map: Dict[str, int] = {} for room_for_user in room_for_user_list: instance_name = room_for_user.event_pos.instance_name stream_ordering = room_for_user.event_pos.stream @@ -368,53 +402,24 @@ class SlidingSyncHandler: instance_map=immutabledict(instance_to_max_stream_ordering_map), ) - # - # TODO: With the new `RoomsForUser.event_pos` info, make a instance - # map to stream ordering and construct the new room key from that map, - # `RoomStreamToken(stream=, instance_map=...)` - max_stream_ordering_from_room_list = max( - room_for_user.event_pos.stream for room_for_user in room_for_user_list - ) - # If our `to_token` is already the same or ahead of the latest room membership # for the user, we can just straight-up return the room list (nothing has # changed) - if max_stream_ordering_from_room_list <= to_token.room_key.stream: + if membership_snapshot_token.is_before_or_eq(to_token.room_key): return sync_room_id_set - # ~~Of the membership events we pulled out, there still might be events that fail - # that conditional~~ - # - # ~~We can get past the conditional above even though we might have fetched events~~ - # - # Each event has an stream ID and instance. We can ask - # - # Multiple event_persisters - # - # For every event (GetRoomsForUserWithStreamOrdering) compare with - # `persisted_after` or add a new function to MultiWriterStreamToken to do the - # same thing. - - # When you compare tokens, it could be any of these scenarios - # - Token A <= Token B (every stream pos is lower than the other token) - # - Token A >= Token B - # - It's indeterminate (intertwined, v_1_2, v2_1, both before/after each other) - # We assume the `from_token` is before or at-least equal to the `to_token` - assert ( - from_token is None or from_token.room_key.stream <= to_token.room_key.stream - ), f"{from_token.room_key.stream if from_token else None} <= {to_token.room_key.stream}" - - # We need to `wait_for_stream_token`, when they provide a token + assert from_token is None or from_token.room_key.is_before_or_eq( + to_token.room_key + ), f"{from_token.room_key if from_token else None} < {to_token.room_key}" # We assume the `from_token`/`to_token` is before the `max_stream_ordering_from_room_list` - assert ( - from_token is None - or from_token.room_key.stream < max_stream_ordering_from_room_list - ), f"{from_token.room_key.stream if from_token else None} < {max_stream_ordering_from_room_list}" - assert ( - to_token.room_key.stream < max_stream_ordering_from_room_list - ), f"{to_token.room_key.stream} < {max_stream_ordering_from_room_list}" + assert from_token is None or from_token.room_key.is_before_or_eq( + membership_snapshot_token + ), f"{from_token.room_key if from_token else None} < {membership_snapshot_token}" + assert to_token.room_key.is_before_or_eq( + membership_snapshot_token + ), f"{to_token.room_key} < {membership_snapshot_token}" # Since we fetched the users room list at some point in time after the from/to # tokens, we need to revert/rewind some membership changes to match the point in @@ -424,72 +429,38 @@ class SlidingSyncHandler: # - 2a) Remove rooms that the user joined after the `to_token` # - 2b) Add back rooms that the user left after the `to_token` # - # TODO: Split this into two separate lookups (from_token.room_key -> - # to_token.room_key) and (to_token.room_key -> RoomStreamToken(...)) to avoid - # needing to do raw stream comparison below since we don't have access to the - # `instance_name` that persisted that event. We could refactor - # `event.internal_metadata` to include it but it might turn out a little - # difficult and a bigger, broader Synapse change than we want to make. - membership_change_events = await self.store.get_membership_changes_for_user( - user_id, - # Start from the `from_token` if given for the 1) fixups, otherwise from the - # `to_token` so we can still do the 2) fixups. - from_key=from_token.room_key if from_token else to_token.room_key, - # Fetch up to our membership snapshot - to_key=membership_snapshot_token, - excluded_rooms=self.rooms_to_exclude_globally, - ) + # We're doing two separate lookups for membership changes using the + # `RoomStreamToken`'s. We could request everything in one range, + # [`from_token.room_key`, `membership_snapshot_token`), but then we want to avoid + # raw stream comparison (which is flawed) in order to bucket events since we + # don't have access to the `instance_name` that persisted that event in + # `event.internal_metadata`. We could refactor `event.internal_metadata` to + # include `instance_name` but it might turn out a little difficult and a bigger, + # broader Synapse change than we want to make. - # Assemble a list of the last membership events in some given ranges. Someone + # 1) ----------------------------------------------------- + + # 1) Fetch membership changes that fall in the range of `from_token` and `to_token` + membership_change_events_in_from_to_range = [] + if from_token: + membership_change_events_in_from_to_range = ( + await self.store.get_membership_changes_for_user( + user_id, + from_key=from_token.room_key, + to_key=to_token.room_key, + excluded_rooms=self.rooms_to_exclude_globally, + ) + ) + + # 1) Assemble a list of the last membership events in some given ranges. Someone # could have left and joined multiple times during the given range but we only # care about end-result so we grab the last one. last_membership_change_by_room_id_in_from_to_range: Dict[str, EventBase] = {} - last_membership_change_by_room_id_after_to_token: Dict[str, EventBase] = {} - # We also need the first membership event after the `to_token` so we can step - # backward to the previous membership that would apply to the from/to range. - first_membership_change_by_room_id_after_to_token: Dict[str, EventBase] = {} - for event in membership_change_events: + for event in membership_change_events_in_from_to_range: assert event.internal_metadata.stream_ordering + last_membership_change_by_room_id_in_from_to_range[event.room_id] = event - # TODO: Compare with instance_name/stream_ordering - if ( - ( - from_token is None - or event.internal_metadata.stream_ordering - > from_token.room_key.stream - ) - and event.internal_metadata.stream_ordering <= to_token.room_key.stream - ): - last_membership_change_by_room_id_in_from_to_range[event.room_id] = ( - event - ) - elif ( - event.internal_metadata.stream_ordering > to_token.room_key.stream - and event.internal_metadata.stream_ordering - <= max_stream_ordering_from_room_list - ): - last_membership_change_by_room_id_after_to_token[event.room_id] = event - # Only set if we haven't already set it - first_membership_change_by_room_id_after_to_token.setdefault( - event.room_id, event - ) - else: - # We don't expect this to happen since we should only be fetching - # `membership_change_events` that fall in the given ranges above. It - # doesn't hurt anything to ignore an event we don't need but may - # indicate a bug in the logic above. - raise AssertionError( - "Membership event with stream_ordering=%s should fall in the given ranges above" - + " (%d > x <= %d) or (%d > x <= %d). We shouldn't be fetching extra membership" - + " events that aren't used.", - event.internal_metadata.stream_ordering, - from_token.room_key.stream if from_token else None, - to_token.room_key.stream, - to_token.room_key.stream, - max_stream_ordering_from_room_list, - ) - - # 1) + # 1) Fixup for ( last_membership_change_in_from_to_range ) in last_membership_change_by_room_id_in_from_to_range.values(): @@ -501,7 +472,36 @@ class SlidingSyncHandler: if last_membership_change_in_from_to_range.membership == Membership.LEAVE: sync_room_id_set.add(room_id) - # 2) + # 2) ----------------------------------------------------- + + # 2) Fetch membership changes that fall in the range from `to_token` up to + # `membership_snapshot_token` + membership_change_events_after_to_token = ( + await self.store.get_membership_changes_for_user( + user_id, + from_key=to_token.room_key, + to_key=membership_snapshot_token, + excluded_rooms=self.rooms_to_exclude_globally, + ) + ) + + # 2) Assemble a list of the last membership events in some given ranges. Someone + # could have left and joined multiple times during the given range but we only + # care about end-result so we grab the last one. + last_membership_change_by_room_id_after_to_token: Dict[str, EventBase] = {} + # We also need the first membership event after the `to_token` so we can step + # backward to the previous membership that would apply to the from/to range. + first_membership_change_by_room_id_after_to_token: Dict[str, EventBase] = {} + for event in membership_change_events_after_to_token: + assert event.internal_metadata.stream_ordering + + last_membership_change_by_room_id_after_to_token[event.room_id] = event + # Only set if we haven't already set it + first_membership_change_by_room_id_after_to_token.setdefault( + event.room_id, event + ) + + # 2) Fixup for ( last_membership_change_after_to_token ) in last_membership_change_by_room_id_after_to_token.values():