diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index 5603fdeb38..dbbbbc66bf 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -443,22 +443,16 @@ class SlidingSyncHandler: membership_change.room_id, membership_change ) - # 1) Fixup part 1 + # 1) Fixup # # Since we fetched a snapshot of the users room list at some point in time after # the from/to tokens, we need to revert/rewind some membership changes to match # the point in time of the `to_token`. - prev_event_ids_in_from_to_range = [] + prev_event_ids_in_from_to_range: List[str] = [] for ( room_id, first_membership_change_after_to_token, ) in first_membership_change_by_room_id_after_to_token.items(): - # One of these should exist to be a valid row in `current_state_delta_stream` - assert ( - first_membership_change_after_to_token.event_id is not None - or first_membership_change_after_to_token.prev_event_id is not None - ) - # 1a) Remove rooms that the user joined after the `to_token` if first_membership_change_after_to_token.prev_event_id is None: sync_room_id_set.pop(room_id, None) @@ -469,7 +463,7 @@ class SlidingSyncHandler: first_membership_change_after_to_token.prev_event_id ) - # 1) Fixup part 2 + # 1) Fixup (more) # # 1b) 1c) Fetch the previous membership events that apply to the from/to range # and fixup our working list. @@ -522,18 +516,33 @@ class SlidingSyncHandler: ] = membership_change # 2) Fixup + last_membership_event_ids_to_include_in_from_to_range: List[str] = [] for ( last_membership_change_in_from_to_range ) in last_membership_change_by_room_id_in_from_to_range.values(): room_id = last_membership_change_in_from_to_range.room_id + sync_room_id_set[room_id] + # 2) Add back newly_left rooms (> `from_token` and <= `to_token`). We # include newly_left rooms because the last event that the user should see # is their own leave event if last_membership_change_in_from_to_range.membership == Membership.LEAVE: - filtered_sync_room_id_set[room_id] = convert_event_to_rooms_for_user( - last_membership_change_in_from_to_range - ) + # Save the look-up if we already have the `leave` event + if sync_room_id_set[room_id].event_id == last_membership_change_in_from_to_range.prev_event_id:: + filtered_sync_room_id_set[room_id] = sync_room_id_set[room_id] + else: + last_membership_event_ids_to_include_in_from_to_range.append(last_membership_change_in_from_to_range.event_id) + + # TODO + # last_membership_events_to_include_in_from_to_range = await self.store.get_events( + # last_membership_event_ids_to_include_in_from_to_range + # ) + # for prev_event_in_from_to_range in prev_events_in_from_to_range.values(): + # # 1b) 1c) Update the membership with what we found + # sync_room_id_set[prev_event_in_from_to_range.room_id] = ( + # convert_event_to_rooms_for_user(prev_event_in_from_to_range) + # ) # Since we fetched the users room list at some point in time after the from/to # tokens, we need to revert/rewind some membership changes to match the point in diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py index 595245e70e..ed571b0de7 100644 --- a/synapse/storage/databases/main/stream.py +++ b/synapse/storage/databases/main/stream.py @@ -63,7 +63,7 @@ from typing_extensions import Literal from twisted.internet import defer -from synapse.api.constants import Direction, EventTypes, Membership +from synapse.api.constants import Direction from synapse.api.filtering import Filter from synapse.events import EventBase from synapse.logging.context import make_deferred_yieldable, run_in_background @@ -116,14 +116,13 @@ class _EventsAround: class CurrentStateDeltaMembership: """ Attributes: - event_id: The "current" membership event ID in this room. May be `None` if the - server is no longer in the room or a state reset happened. + event_id: The "current" membership event ID in this room. prev_event_id: The previous membership event in this room that was replaced by the "current" one. May be `None` if there was no previous membership event. room_id: The room ID of the membership event. """ - event_id: Optional[str] + event_id: str prev_event_id: Optional[str] room_id: str membership: str @@ -410,42 +409,6 @@ def _filter_results( return True -def _filter_results_by_stream( - lower_token: Optional[RoomStreamToken], - upper_token: Optional[RoomStreamToken], - instance_name: str, - stream_ordering: int, -) -> bool: - """ - Note: This function only works with "live" tokens with `stream_ordering` only. - - Returns True if the event persisted by the given instance at the given - topological/stream_ordering falls between the two tokens (taking a None - token to mean unbounded). - - Used to filter results from fetching events in the DB against the given - tokens. This is necessary to handle the case where the tokens include - position maps, which we handle by fetching more than necessary from the DB - and then filtering (rather than attempting to construct a complicated SQL - query). - """ - if lower_token: - assert lower_token.topological is None - - # If these are live tokens we compare the stream ordering against the - # writers stream position. - if stream_ordering <= lower_token.get_stream_pos_for_instance(instance_name): - return False - - if upper_token: - assert upper_token.topological is None - - if upper_token.get_stream_pos_for_instance(instance_name) < stream_ordering: - return False - - return True - - def filter_to_clause(event_filter: Optional[Filter]) -> Tuple[str, List[str]]: # NB: This may create SQL clauses that don't optimise well (and we don't # have indices on all possible clauses). E.g. it may create @@ -819,58 +782,74 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): min_from_id = from_key.stream max_to_id = to_key.get_max_stream_pos() - args: List[Any] = [EventTypes.Member, user_id, min_from_id, max_to_id] + args: List[Any] = [user_id, min_from_id, max_to_id] # TODO: It would be good to assert that the `to_token` is >= # the first row in `current_state_delta_stream` for the rooms we're # interested in. Otherwise, we will end up with empty results and not know # it. - # Note: There is no index for `(type, state_key)` in - # `current_state_delta_stream`. We also can't just add an index for - # `event_id` and join the `room_memberships` table by `event_id` because it - # may be `null` in `current_state_delta_stream` so nothing will match (it's - # `null` when the server is no longer in the room or a state reset happened - # and it was unset). + # We have to look-up events by `stream_ordering` because + # `current_state_delta_stream.event_id` can be `null` if the server is no + # longer in the room or a state reset happened and it was unset. + # `stream_ordering` is unique across the Synapse instance so this should + # work fine. sql = """ SELECT - s.event_id, + e.event_id, s.prev_event_id, s.room_id, s.instance_name, s.stream_id, + e.topological_ordering, m.membership FROM current_state_delta_stream AS s - WHERE s.type = ? AND s.state_key = ? + INNER JOIN events AS e ON e.stream_ordering = s.stream_id + INNER JOIN room_memberships AS m ON m.event_id = e.event_id + WHERE m.user_id = ? AND s.stream_id > ? AND s.stream_id <= ? ORDER BY s.stream_id ASC """ txn.execute(sql, args) - return [ - CurrentStateDeltaMembership( - event_id=event_id, - prev_event_id=prev_event_id, - room_id=room_id, - # We can assume that the membership is `LEAVE` as a default. This - # will happen when `current_state_delta_stream.event_id` is null - # because it was unset due to a state reset or the server is no - # longer in the room (everyone on our local server left). - membership=membership if membership else Membership.LEAVE, - # event_pos=PersistedEventPosition( - # instance_name=instance_name, - # stream=stream_ordering, - # ), - ) - for event_id, prev_event_id, room_id, instance_name, stream_ordering, membership in txn - if _filter_results_by_stream( + membership_changes: List[CurrentStateDeltaMembership] = [] + for ( + event_id, + prev_event_id, + room_id, + instance_name, + stream_ordering, + topological_ordering, + membership, + ) in txn: + assert event_id is not None + # `prev_event_id` can be `None` + assert room_id is not None + assert instance_name is not None + assert stream_ordering is not None + assert topological_ordering is not None + assert membership is not None + + if _filter_results( from_key, to_key, instance_name, + topological_ordering, stream_ordering, - ) - ] + ): + membership_changes.append( + CurrentStateDeltaMembership( + event_id=event_id, + prev_event_id=prev_event_id, + room_id=room_id, + membership=membership, + # event_pos=PersistedEventPosition( + # instance_name=instance_name, + # stream=stream_ordering, + # ), + ) + ) current_state_delta_membership_changes = await self.db_pool.runInteraction( "get_current_state_delta_membership_changes_for_user", f