diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index dbbbbc66bf..5d63099499 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -448,34 +448,20 @@ class SlidingSyncHandler: # Since we fetched a snapshot of the users room list at some point in time after # the from/to tokens, we need to revert/rewind some membership changes to match # the point in time of the `to_token`. - prev_event_ids_in_from_to_range: List[str] = [] for ( room_id, first_membership_change_after_to_token, ) in first_membership_change_by_room_id_after_to_token.items(): # 1a) Remove rooms that the user joined after the `to_token` - if first_membership_change_after_to_token.prev_event_id is None: + if first_membership_change_after_to_token.prev_event is None: sync_room_id_set.pop(room_id, None) # 1b) 1c) From the first membership event after the `to_token`, step backward to the # previous membership that would apply to the from/to range. else: - prev_event_ids_in_from_to_range.append( - first_membership_change_after_to_token.prev_event_id + sync_room_id_set[room_id] = convert_event_to_rooms_for_user( + first_membership_change_after_to_token.prev_event ) - # 1) Fixup (more) - # - # 1b) 1c) Fetch the previous membership events that apply to the from/to range - # and fixup our working list. - prev_events_in_from_to_range = await self.store.get_events( - prev_event_ids_in_from_to_range - ) - for prev_event_in_from_to_range in prev_events_in_from_to_range.values(): - # 1b) 1c) Update the membership with what we found - sync_room_id_set[prev_event_in_from_to_range.room_id] = ( - convert_event_to_rooms_for_user(prev_event_in_from_to_range) - ) - filtered_sync_room_id_set = { room_id: room_for_user for room_id, room_for_user in sync_room_id_set.items() @@ -516,195 +502,18 @@ class SlidingSyncHandler: ] = membership_change # 2) Fixup - last_membership_event_ids_to_include_in_from_to_range: List[str] = [] for ( last_membership_change_in_from_to_range ) in last_membership_change_by_room_id_in_from_to_range.values(): room_id = last_membership_change_in_from_to_range.room_id - sync_room_id_set[room_id] - # 2) Add back newly_left rooms (> `from_token` and <= `to_token`). We # include newly_left rooms because the last event that the user should see # is their own leave event if last_membership_change_in_from_to_range.membership == Membership.LEAVE: - # Save the look-up if we already have the `leave` event - if sync_room_id_set[room_id].event_id == last_membership_change_in_from_to_range.prev_event_id:: - filtered_sync_room_id_set[room_id] = sync_room_id_set[room_id] - else: - last_membership_event_ids_to_include_in_from_to_range.append(last_membership_change_in_from_to_range.event_id) - - # TODO - # last_membership_events_to_include_in_from_to_range = await self.store.get_events( - # last_membership_event_ids_to_include_in_from_to_range - # ) - # for prev_event_in_from_to_range in prev_events_in_from_to_range.values(): - # # 1b) 1c) Update the membership with what we found - # sync_room_id_set[prev_event_in_from_to_range.room_id] = ( - # convert_event_to_rooms_for_user(prev_event_in_from_to_range) - # ) - - # Since we fetched the users room list at some point in time after the from/to - # tokens, we need to revert/rewind some membership changes to match the point in - # time of the `to_token`. In particular, we need to make these fixups: - # - # - 1a) Remove rooms that the user joined after the `to_token` - # - 1b) Add back rooms that the user left after the `to_token` - # - 2) Add back newly_left rooms (> `from_token` and <= `to_token`) - - # # 1) ----------------------------------------------------- - - # # 1) Fetch membership changes that fall in the range from `to_token` up to - # # `membership_snapshot_token` - # # - # # If our `to_token` is already the same or ahead of the latest room membership - # # for the user, we don't need to do any "2)" fix-ups and can just straight-up - # # use the room list from the snapshot as a base (nothing has changed) - # membership_change_events_after_to_token = [] - # if not membership_snapshot_token.is_before_or_eq(to_token.room_key): - # membership_change_events_after_to_token = ( - # await self.store.get_membership_changes_for_user( - # user_id, - # from_key=to_token.room_key, - # to_key=membership_snapshot_token, - # excluded_rooms=self.rooms_to_exclude_globally, - # ) - # ) - - # # 1) Assemble a list of the last membership events in some given ranges. Someone - # # could have left and joined multiple times during the given range but we only - # # care about end-result so we grab the last one. - # last_membership_change_by_room_id_after_to_token: Dict[str, EventBase] = {} - # # We also need the first membership event after the `to_token` so we can step - # # backward to the previous membership that would apply to the from/to range. - # first_membership_change_by_room_id_after_to_token: Dict[str, EventBase] = {} - # for event in membership_change_events_after_to_token: - # last_membership_change_by_room_id_after_to_token[event.room_id] = event - # # Only set if we haven't already set it - # first_membership_change_by_room_id_after_to_token.setdefault( - # event.room_id, event - # ) - - # # 1) Fixup - # for ( - # last_membership_change_after_to_token - # ) in last_membership_change_by_room_id_after_to_token.values(): - # room_id = last_membership_change_after_to_token.room_id - - # # We want to find the first membership change after the `to_token` then step - # # backward to know the membership in the from/to range. - # first_membership_change_after_to_token = ( - # first_membership_change_by_room_id_after_to_token.get(room_id) - # ) - # assert first_membership_change_after_to_token is not None, ( - # "If there was a `last_membership_change_after_to_token` that we're iterating over, " - # + "then there should be corresponding a first change. For example, even if there " - # + "is only one event after the `to_token`, the first and last event will be same event. " - # + "This is probably a mistake in assembling the `last_membership_change_by_room_id_after_to_token`" - # + "/`first_membership_change_by_room_id_after_to_token` dicts above." - # ) - # # TODO: Instead of reading from `unsigned`, refactor this to use the - # # `current_state_delta_stream` table in the future. Probably a new - # # `get_membership_changes_for_user()` function that uses - # # `current_state_delta_stream` with a join to `room_memberships`. This would - # # help in state reset scenarios since `prev_content` is looking at the - # # current branch vs the current room state. This is all just data given to - # # the client so no real harm to data integrity, but we'd like to be nice to - # # the client. Since the `current_state_delta_stream` table is new, it - # # doesn't have all events in it. Since this is Sliding Sync, if we ever need - # # to, we can signal the client to throw all of their state away by sending - # # "operation: RESET". - # prev_content = first_membership_change_after_to_token.unsigned.get( - # "prev_content", {} - # ) - # prev_membership = prev_content.get("membership", None) - # prev_sender = first_membership_change_after_to_token.unsigned.get( - # "prev_sender", None - # ) - - # # Check if the previous membership (membership that applies to the from/to - # # range) should be included in our `sync_room_id_set` - # should_prev_membership_be_included = ( - # prev_membership is not None - # and prev_sender is not None - # and filter_membership_for_sync( - # membership=prev_membership, - # user_id=user_id, - # sender=prev_sender, - # ) - # ) - - # # Check if the last membership (membership that applies to our snapshot) was - # # already included in our `sync_room_id_set` - # was_last_membership_already_included = filter_membership_for_sync( - # membership=last_membership_change_after_to_token.membership, - # user_id=user_id, - # sender=last_membership_change_after_to_token.sender, - # ) - - # # 1a) Add back rooms that the user left after the `to_token` - # # - # # For example, if the last membership event after the `to_token` is a leave - # # event, then the room was excluded from `sync_room_id_set` when we first - # # crafted it above. We should add these rooms back as long as the user also - # # was part of the room before the `to_token`. - # if ( - # not was_last_membership_already_included - # and should_prev_membership_be_included - # ): - # # TODO: Assign the correct membership event at the `to_token` here - # # (currently we're setting it as the last event after the `to_token`) - # sync_room_id_set[room_id] = convert_event_to_rooms_for_user( - # last_membership_change_after_to_token - # ) - # # 1b) Remove rooms that the user joined (hasn't left) after the `to_token` - # # - # # For example, if the last membership event after the `to_token` is a "join" - # # event, then the room was included `sync_room_id_set` when we first crafted - # # it above. We should remove these rooms as long as the user also wasn't - # # part of the room before the `to_token`. - # elif ( - # was_last_membership_already_included - # and not should_prev_membership_be_included - # ): - # del sync_room_id_set[room_id] - - # # 2) ----------------------------------------------------- - # # We fix-up newly_left rooms after the first fixup because it may have removed - # # some left rooms that we can figure out are newly_left in the following code - - # # 2) Fetch membership changes that fall in the range from `from_token` up to `to_token` - # membership_change_events_in_from_to_range = [] - # if from_token: - # membership_change_events_in_from_to_range = ( - # await self.store.get_membership_changes_for_user( - # user_id, - # from_key=from_token.room_key, - # to_key=to_token.room_key, - # excluded_rooms=self.rooms_to_exclude_globally, - # ) - # ) - - # # 2) Assemble a list of the last membership events in some given ranges. Someone - # # could have left and joined multiple times during the given range but we only - # # care about end-result so we grab the last one. - # last_membership_change_by_room_id_in_from_to_range: Dict[str, EventBase] = {} - # for event in membership_change_events_in_from_to_range: - # last_membership_change_by_room_id_in_from_to_range[event.room_id] = event - - # # 2) Fixup - # for ( - # last_membership_change_in_from_to_range - # ) in last_membership_change_by_room_id_in_from_to_range.values(): - # room_id = last_membership_change_in_from_to_range.room_id - - # # 2) Add back newly_left rooms (> `from_token` and <= `to_token`). We - # # include newly_left rooms because the last event that the user should see - # # is their own leave event - # if last_membership_change_in_from_to_range.membership == Membership.LEAVE: - # sync_room_id_set[room_id] = convert_event_to_rooms_for_user( - # last_membership_change_in_from_to_range - # ) + filtered_sync_room_id_set[room_id] = convert_event_to_rooms_for_user( + last_membership_change_in_from_to_range.event + ) return filtered_sync_room_id_set diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py index ed571b0de7..ce135ededc 100644 --- a/synapse/storage/databases/main/stream.py +++ b/synapse/storage/databases/main/stream.py @@ -113,21 +113,37 @@ class _EventsAround: @attr.s(slots=True, frozen=True, auto_attribs=True) -class CurrentStateDeltaMembership: +class _CurrentStateDeltaMembershipReturn: """ Attributes: event_id: The "current" membership event ID in this room. prev_event_id: The previous membership event in this room that was replaced by the "current" one. May be `None` if there was no previous membership event. room_id: The room ID of the membership event. + membership: The membership state of the user in the room. """ event_id: str prev_event_id: Optional[str] room_id: str membership: str - # Could be useful but we're not using it yet. - # event_pos: PersistedEventPosition + + +@attr.s(slots=True, frozen=True, auto_attribs=True) +class CurrentStateDeltaMembership: + """ + Attributes: + event: The "current" membership event in this room. + prev_event: The previous membership event in this room that was replaced by + the "current" one. May be `None` if there was no previous membership event. + room_id: The room ID of the membership event. + membership: The membership state of the user in the room. + """ + + event: EventBase + prev_event: Optional[EventBase] + room_id: str + membership: str def generate_pagination_where_clause( @@ -776,7 +792,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): if not has_changed: return [] - def f(txn: LoggingTransaction) -> List[CurrentStateDeltaMembership]: + def f(txn: LoggingTransaction) -> List[_CurrentStateDeltaMembershipReturn]: # To handle tokens with a non-empty instance_map we fetch more # results than necessary and then filter down min_from_id = from_key.stream @@ -813,7 +829,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): txn.execute(sql, args) - membership_changes: List[CurrentStateDeltaMembership] = [] + membership_changes: List[_CurrentStateDeltaMembershipReturn] = [] for ( event_id, prev_event_id, @@ -839,7 +855,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): stream_ordering, ): membership_changes.append( - CurrentStateDeltaMembership( + _CurrentStateDeltaMembershipReturn( event_id=event_id, prev_event_id=prev_event_id, room_id=room_id, @@ -851,17 +867,37 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): ) ) - current_state_delta_membership_changes = await self.db_pool.runInteraction( + return membership_changes + + membership_changes = await self.db_pool.runInteraction( "get_current_state_delta_membership_changes_for_user", f ) + # Fetch all events in one go + event_ids = [] + for m in membership_changes: + event_ids.append(m.event_id) + if m.prev_event_id is not None: + event_ids.append(m.prev_event_id) + + events = await self.get_events(event_ids, get_prev_content=False) + rooms_to_exclude: AbstractSet[str] = set() if excluded_rooms is not None: rooms_to_exclude = set(excluded_rooms) return [ - membership_change - for membership_change in current_state_delta_membership_changes + CurrentStateDeltaMembership( + event=events[membership_change.event_id], + prev_event=( + events[membership_change.prev_event_id] + if membership_change.prev_event_id + else None + ), + room_id=membership_change.room_id, + membership=membership_change.membership, + ) + for membership_change in membership_changes if membership_change.room_id not in rooms_to_exclude ]