diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index 2e24b0c338..5603fdeb38 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -375,11 +375,6 @@ class SlidingSyncHandler: # instead from the time of the `to_token`. room_for_user.room_id: room_for_user for room_for_user in room_for_user_list - if filter_membership_for_sync( - membership=room_for_user.membership, - user_id=user_id, - sender=room_for_user.sender, - ) } # Get the `RoomStreamToken` that represents the spot we queried up to when we got @@ -408,6 +403,23 @@ class SlidingSyncHandler: instance_map=immutabledict(instance_to_max_stream_ordering_map), ) + # Since we fetched the users room list at some point in time after the from/to + # tokens, we need to revert/rewind some membership changes to match the point in + # time of the `to_token`. In particular, we need to make these fixups: + # + # - 1a) Remove rooms that the user joined after the `to_token` + # - 1b) Add back rooms that the user left after the `to_token` + # - 1c) Update room membership events to the point in time of the `to_token` + # - 2) Add back newly_left rooms (> `from_token` and <= `to_token`) + + # 1) ----------------------------------------------------- + + # 1) Fetch membership changes that fall in the range from `to_token` up to + # `membership_snapshot_token` + # + # If our `to_token` is already the same or ahead of the latest room membership + # for the user, we don't need to do any "2)" fix-ups and can just straight-up + # use the room list from the snapshot as a base (nothing has changed) current_state_delta_membership_changes_after_to_token = [] if not membership_snapshot_token.is_before_or_eq(to_token.room_key): current_state_delta_membership_changes_after_to_token = ( @@ -419,8 +431,9 @@ class SlidingSyncHandler: ) ) - # We need the first membership event after the `to_token` so we can step - # backward to the previous membership that would apply to the from/to range. + # 1) Assemble a list of the first membership event after the `to_token` so we can + # step backward to the previous membership that would apply to the from/to + # range. first_membership_change_by_room_id_after_to_token: Dict[ str, CurrentStateDeltaMembership ] = {} @@ -430,6 +443,8 @@ class SlidingSyncHandler: membership_change.room_id, membership_change ) + # 1) Fixup part 1 + # # Since we fetched a snapshot of the users room list at some point in time after # the from/to tokens, we need to revert/rewind some membership changes to match # the point in time of the `to_token`. @@ -444,37 +459,81 @@ class SlidingSyncHandler: or first_membership_change_after_to_token.prev_event_id is not None ) - # If the membership change was added after the `to_token`, we need to remove - # it + # 1a) Remove rooms that the user joined after the `to_token` if first_membership_change_after_to_token.prev_event_id is None: sync_room_id_set.pop(room_id, None) - # From the first membership event after the `to_token`, we need to step - # backward to the previous membership that would apply to the from/to range. + # 1b) 1c) From the first membership event after the `to_token`, step backward to the + # previous membership that would apply to the from/to range. else: prev_event_ids_in_from_to_range.append( first_membership_change_after_to_token.prev_event_id ) - # Fetch the previous membership events that apply to the from/to range and fixup - # our working list. + # 1) Fixup part 2 + # + # 1b) 1c) Fetch the previous membership events that apply to the from/to range + # and fixup our working list. prev_events_in_from_to_range = await self.store.get_events( prev_event_ids_in_from_to_range ) for prev_event_in_from_to_range in prev_events_in_from_to_range.values(): - # Update if the membership should be included - if filter_membership_for_sync( - membership=prev_event_in_from_to_range.membership, - user_id=user_id, - sender=prev_event_in_from_to_range.sender, - ): - sync_room_id_set[prev_event_in_from_to_range.room_id] = ( - convert_event_to_rooms_for_user(prev_event_in_from_to_range) - ) - # Otherwise, remove it - else: - sync_room_id_set.pop(prev_event_in_from_to_range.room_id, None) + # 1b) 1c) Update the membership with what we found + sync_room_id_set[prev_event_in_from_to_range.room_id] = ( + convert_event_to_rooms_for_user(prev_event_in_from_to_range) + ) - # TODO: Add back newly_left rooms + filtered_sync_room_id_set = { + room_id: room_for_user + for room_id, room_for_user in sync_room_id_set.items() + if filter_membership_for_sync( + membership=room_for_user.membership, + user_id=user_id, + sender=room_for_user.sender, + ) + } + + # 2) ----------------------------------------------------- + # We fix-up newly_left rooms after the first fixup because it may have removed + # some left rooms that we can figure out are newly_left in the following code + + # 2) Fetch membership changes that fall in the range from `from_token` up to `to_token` + current_state_delta_membership_changes_in_from_to_range = [] + if from_token: + current_state_delta_membership_changes_in_from_to_range = ( + await self.store.get_current_state_delta_membership_changes_for_user( + user_id, + from_key=from_token.room_key, + to_key=to_token.room_key, + excluded_rooms=self.rooms_to_exclude_globally, + ) + ) + + # 2) Assemble a list of the last membership events in some given ranges. Someone + # could have left and joined multiple times during the given range but we only + # care about end-result so we grab the last one. + last_membership_change_by_room_id_in_from_to_range: Dict[ + str, CurrentStateDeltaMembership + ] = {} + for ( + membership_change + ) in current_state_delta_membership_changes_in_from_to_range: + last_membership_change_by_room_id_in_from_to_range[ + membership_change.room_id + ] = membership_change + + # 2) Fixup + for ( + last_membership_change_in_from_to_range + ) in last_membership_change_by_room_id_in_from_to_range.values(): + room_id = last_membership_change_in_from_to_range.room_id + + # 2) Add back newly_left rooms (> `from_token` and <= `to_token`). We + # include newly_left rooms because the last event that the user should see + # is their own leave event + if last_membership_change_in_from_to_range.membership == Membership.LEAVE: + filtered_sync_room_id_set[room_id] = convert_event_to_rooms_for_user( + last_membership_change_in_from_to_range + ) # Since we fetched the users room list at some point in time after the from/to # tokens, we need to revert/rewind some membership changes to match the point in @@ -638,7 +697,7 @@ class SlidingSyncHandler: # last_membership_change_in_from_to_range # ) - return sync_room_id_set + return filtered_sync_room_id_set async def filter_rooms( self, diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py index f5de23080d..595245e70e 100644 --- a/synapse/storage/databases/main/stream.py +++ b/synapse/storage/databases/main/stream.py @@ -63,7 +63,7 @@ from typing_extensions import Literal from twisted.internet import defer -from synapse.api.constants import Direction, EventTypes +from synapse.api.constants import Direction, EventTypes, Membership from synapse.api.filtering import Filter from synapse.events import EventBase from synapse.logging.context import make_deferred_yieldable, run_in_background @@ -126,6 +126,7 @@ class CurrentStateDeltaMembership: event_id: Optional[str] prev_event_id: Optional[str] room_id: str + membership: str # Could be useful but we're not using it yet. # event_pos: PersistedEventPosition @@ -832,7 +833,13 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): # `null` when the server is no longer in the room or a state reset happened # and it was unset). sql = """ - SELECT s.event_id, s.prev_event_id, s.room_id, s.instance_name, s.stream_id + SELECT + s.event_id, + s.prev_event_id, + s.room_id, + s.instance_name, + s.stream_id, + m.membership FROM current_state_delta_stream AS s WHERE s.type = ? AND s.state_key = ? AND s.stream_id > ? AND s.stream_id <= ? @@ -846,12 +853,17 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): event_id=event_id, prev_event_id=prev_event_id, room_id=room_id, + # We can assume that the membership is `LEAVE` as a default. This + # will happen when `current_state_delta_stream.event_id` is null + # because it was unset due to a state reset or the server is no + # longer in the room (everyone on our local server left). + membership=membership if membership else Membership.LEAVE, # event_pos=PersistedEventPosition( # instance_name=instance_name, # stream=stream_ordering, # ), ) - for event_id, prev_event_id, room_id, instance_name, stream_ordering in txn + for event_id, prev_event_id, room_id, instance_name, stream_ordering, membership in txn if _filter_results_by_stream( from_key, to_key,