From 09609cb0dbca3a4cfd9fbf90cc962e765ec469c0 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Mon, 3 Jun 2024 14:28:45 -0500 Subject: [PATCH] WIP: TODO comments after pairing with Erik --- synapse/handlers/sliding_sync.py | 59 +++++++++++++++++++++++++++++--- synapse/handlers/sync.py | 4 ++- synapse/storage/roommember.py | 2 +- 3 files changed, 59 insertions(+), 6 deletions(-) diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index 0c72dc6c40..39009ef762 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -22,7 +22,8 @@ if TYPE_CHECKING: logger = logging.getLogger(__name__) -# Everything except `Membership.LEAVE` +# Everything except `Membership.LEAVE` because we want everything that's *still* +# relevant to the user. MEMBERSHIP_TO_DISPLAY_IN_SYNC = ( Membership.INVITE, Membership.JOIN, @@ -305,13 +306,19 @@ class SlidingSyncHandler: from_token: Optional[StreamToken] = None, ) -> AbstractSet[str]: """ - Fetch room IDs that should be listed for this user in the sync response. + Fetch room IDs that should be listed for this user in the sync response (the + full room list that will be sliced, filtered, sorted). We're looking for rooms that the user has not left (`invite`, `knock`, `join`, and `ban`) or newly_left rooms that are > `from_token` and <= `to_token`. """ user_id = user.to_string() + # For a sync without from_token, all rooms except leave + + # For incremental syncs with a from_token, we only need rooms that have changes + # (some event occured). + # First grab a current snapshot rooms for the user room_for_user_list = await self.store.get_rooms_for_local_user_where_membership_is( user_id=user_id, @@ -333,11 +340,16 @@ class SlidingSyncHandler: sync_room_id_set = { room_for_user.room_id for room_for_user in room_for_user_list + # TODO: Include kicks (leave where sender is not the user itself) if room_for_user.membership in MEMBERSHIP_TO_DISPLAY_IN_SYNC } # Find the stream_ordering of the latest room membership event which will mark # the spot we queried up to. + # + # TODO: With the new `GetRoomsForUserWithStreamOrdering` info, make a instance + # map to stream ordering and construct the new room key from that map, + # `RoomStreamToken(stream=, instance_map=...)` max_stream_ordering_from_room_list = max( room_for_user.stream_ordering for room_for_user in room_for_user_list ) @@ -348,11 +360,31 @@ class SlidingSyncHandler: if max_stream_ordering_from_room_list <= to_token.room_key.stream: return sync_room_id_set + # ~~Of the membership events we pulled out, there still might be events that fail + # that conditional~~ + # + # ~~We can get past the conditional above even though we might have fetched events~~ + # + # Each event has an stream ID and instance. We can ask + # + # Multiple event_persisters + # + # For every event (GetRoomsForUserWithStreamOrdering) compare with + # `persisted_after` or add a new function to MultiWriterStreamToken to do the + # same thing. + + # When you compare tokens, it could be any of these scenarios + # - Token A <= Token B (every stream pos is lower than the other token) + # - Token A >= Token B + # - It's indeterminate (intertwined, v_1_2, v2_1, both before/after each other) + # We assume the `from_token` is before or at-least equal to the `to_token` assert ( from_token is None or from_token.room_key.stream <= to_token.room_key.stream ), f"{from_token.room_key.stream if from_token else None} <= {to_token.room_key.stream}" + # We need to `wait_for_stream_token`, when they provide a token + # We assume the `from_token`/`to_token` is before the `max_stream_ordering_from_room_list` assert ( from_token is None @@ -369,10 +401,17 @@ class SlidingSyncHandler: # - 1) Add back newly_left rooms (> `from_token` and <= `to_token`) # - 2a) Remove rooms that the user joined after the `to_token` # - 2b) Add back rooms that the user left after the `to_token` + # + # TODO: Split this into two separate lookups (from_token.room_key -> + # to_token.room_key) and (to_token.room_key -> RoomStreamToken(...)) to avoid + # needing to do raw stream comparison below since we don't have access to the + # `instance_name` that persisted that event. We could refactor + # `event.internal_metadata` to include it but it might turn out a little + # difficult and a bigger, broader Synapse change than we want to make. membership_change_events = await self.store.get_membership_changes_for_user( user_id, - # Start from the `from_token` if given, otherwise from the `to_token` so we - # can still do the 2) fixups. + # Start from the `from_token` if given for the 1) fixups, otherwise from the + # `to_token` so we can still do the 2) fixups. from_key=from_token.room_key if from_token else to_token.room_key, # Fetch up to our membership snapshot to_key=RoomStreamToken(stream=max_stream_ordering_from_room_list), @@ -390,6 +429,7 @@ class SlidingSyncHandler: for event in membership_change_events: assert event.internal_metadata.stream_ordering + # TODO: Compare with instance_name/stream_ordering if ( ( from_token is None @@ -457,6 +497,17 @@ class SlidingSyncHandler: + "This is probably a mistake in assembling the `last_membership_change_by_room_id_after_to_token`" + "/`first_membership_change_by_room_id_after_to_token` dicts above." ) + # TODO: Instead of reading from `unsigned`, refactor this to use the + # `current_state_delta_stream` table in the future. Probably a new + # `get_membership_changes_for_user()` function that uses + # `current_state_delta_stream` with a join to `room_memberships`. This would + # help in state reset scenarios since `prev_content` is looking at the + # current branch vs the current room state. This is all just data given to + # the client so no real harm to data integrity, but we'd like to be nice to + # the client. Since the `current_state_delta_stream` table is new, it + # doesn't have all events in it. Since this is Sliding Sync, if we ever need + # to, we can signal the client to throw all of their state away by sending + # "operation: RESET". prev_content = first_membership_change_after_to_token.unsigned.get( "prev_content", {} ) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 5908f2e930..e7bfe44c68 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -1997,7 +1997,9 @@ class SyncHandler: if since_token: membership_change_events = await self.store.get_membership_changes_for_user( user_id, - since_token.room_key, + # TODO: We should make this change, + # https://github.com/element-hq/synapse/pull/17187#discussion_r1617871321 + token_before_rooms.room_key, now_token.room_key, self.rooms_to_exclude_globally, ) diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 7471f81a19..80c9630867 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -35,7 +35,7 @@ class RoomsForUser: sender: str membership: str event_id: str - stream_ordering: int + event_pos: PersistedEventPosition room_version_id: str