diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index d1caf16975..215986d2fa 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -1673,128 +1673,17 @@ class SyncHandler: # See https://github.com/matrix-org/matrix-doc/issues/1144 raise NotImplementedError() - # Note: we get the users room list *before* we get the current token, this - # avoids checking back in history if rooms are joined after the token is fetched. - token_before_rooms = self.event_sources.get_current_token() - mutable_joined_room_ids = set(await self.store.get_rooms_for_user(user_id)) - - # NB: The now_token gets changed by some of the generate_sync_* methods, - # this is due to some of the underlying streams not supporting the ability - # to query up to a given point. - # Always use the `now_token` in `SyncResultBuilder` - now_token = self.event_sources.get_current_token() - log_kv({"now_token": now_token}) - - # Since we fetched the users room list before the token, there's a small window - # during which membership events may have been persisted, so we fetch these now - # and modify the joined room list for any changes between the get_rooms_for_user - # call and the get_current_token call. - membership_change_events = [] - if since_token: - membership_change_events = await self.store.get_membership_changes_for_user( - user_id, - since_token.room_key, - now_token.room_key, - self.rooms_to_exclude_globally, - ) - - mem_last_change_by_room_id: Dict[str, EventBase] = {} - for event in membership_change_events: - mem_last_change_by_room_id[event.room_id] = event - - # For the latest membership event in each room found, add/remove the room ID - # from the joined room list accordingly. In this case we only care if the - # latest change is JOIN. - - for room_id, event in mem_last_change_by_room_id.items(): - assert event.internal_metadata.stream_ordering - if ( - event.internal_metadata.stream_ordering - < token_before_rooms.room_key.stream - ): - continue - - logger.info( - "User membership change between getting rooms and current token: %s %s %s", - user_id, - event.membership, - room_id, - ) - # User joined a room - we have to then check the room state to ensure we - # respect any bans if there's a race between the join and ban events. - if event.membership == Membership.JOIN: - user_ids_in_room = await self.store.get_users_in_room(room_id) - if user_id in user_ids_in_room: - mutable_joined_room_ids.add(room_id) - # The user left the room, or left and was re-invited but not joined yet - else: - mutable_joined_room_ids.discard(room_id) - - # Tweak the set of rooms to return to the client for eager (non-lazy) syncs. - mutable_rooms_to_exclude = set(self.rooms_to_exclude_globally) - if not sync_config.filter_collection.lazy_load_members(): - # Non-lazy syncs should never include partially stated rooms. - # Exclude all partially stated rooms from this sync. - results = await self.store.is_partial_state_room_batched( - mutable_joined_room_ids - ) - mutable_rooms_to_exclude.update( - room_id - for room_id, is_partial_state in results.items() - if is_partial_state - ) - membership_change_events = [ - event - for event in membership_change_events - if not results.get(event.room_id, False) - ] - - # Incremental eager syncs should additionally include rooms that - # - we are joined to - # - are full-stated - # - became fully-stated at some point during the sync period - # (These rooms will have been omitted during a previous eager sync.) - forced_newly_joined_room_ids: Set[str] = set() - if since_token and not sync_config.filter_collection.lazy_load_members(): - un_partial_stated_rooms = ( - await self.store.get_un_partial_stated_rooms_between( - since_token.un_partial_stated_rooms_key, - now_token.un_partial_stated_rooms_key, - mutable_joined_room_ids, - ) - ) - results = await self.store.is_partial_state_room_batched( - un_partial_stated_rooms - ) - forced_newly_joined_room_ids.update( - room_id - for room_id, is_partial_state in results.items() - if not is_partial_state - ) - - # Now we have our list of joined room IDs, exclude as configured and freeze - joined_room_ids = frozenset( - room_id - for room_id in mutable_joined_room_ids - if room_id not in mutable_rooms_to_exclude + sync_result_builder = await self.get_sync_result_builder( + sync_config, + since_token, + full_state, ) logger.debug( "Calculating sync response for %r between %s and %s", sync_config.user, - since_token, - now_token, - ) - - sync_result_builder = SyncResultBuilder( - sync_config, - full_state, - since_token=since_token, - now_token=now_token, - joined_room_ids=joined_room_ids, - excluded_room_ids=frozenset(mutable_rooms_to_exclude), - forced_newly_joined_room_ids=frozenset(forced_newly_joined_room_ids), - membership_change_events=membership_change_events, + sync_result_builder.since_token, + sync_result_builder.now_token, ) logger.debug("Fetching account data") @@ -1918,37 +1807,157 @@ class SyncHandler: # them since the appservice doesn't have to make a massive initial sync. # (related to https://github.com/matrix-org/matrix-doc/issues/1144) - # NB: The now_token gets changed by some of the generate_sync_* methods, + sync_result_builder = await self.get_sync_result_builder( + sync_config, + since_token, + full_state=False, + ) + + # 1. Calculate `device_lists` + device_lists = await self._generate_sync_entry_for_device_list( + sync_result_builder + ) + + # 2. Calculate `to_device` events + await self._generate_sync_entry_for_to_device(sync_result_builder) + + return E2eeSyncResult( + to_device=sync_result_builder.to_device, + device_lists=device_lists, + # device_one_time_keys_count: JsonMapping + # device_unused_fallback_key_types: List[str] + next_batch=sync_result_builder.now_token, + ) + + async def get_sync_result_builder( + self, + sync_config: SyncConfig, + since_token: Optional[StreamToken] = None, + full_state: bool = False, + ) -> "SyncResultBuilder": + """ + Assemble a `SyncResultBuilder` with all of the necessary context + """ + user_id = sync_config.user.to_string() + + # NB: The `now_token` gets changed by some of the `generate_sync_*` methods, # this is due to some of the underlying streams not supporting the ability # to query up to a given point. # Always use the `now_token` in `SyncResultBuilder` now_token = self.event_sources.get_current_token() log_kv({"now_token": now_token}) - joined_room_ids = await self.store.get_rooms_for_user(user_id) + # Note: we get the users room list *before* we get the current token, this + # avoids checking back in history if rooms are joined after the token is fetched. + token_before_rooms = self.event_sources.get_current_token() + mutable_joined_room_ids = set(await self.store.get_rooms_for_user(user_id)) + + # Since we fetched the users room list before the token, there's a small window + # during which membership events may have been persisted, so we fetch these now + # and modify the joined room list for any changes between the get_rooms_for_user + # call and the get_current_token call. + membership_change_events = [] + if since_token: + membership_change_events = await self.store.get_membership_changes_for_user( + user_id, + since_token.room_key, + now_token.room_key, + self.rooms_to_exclude_globally, + ) + + mem_last_change_by_room_id: Dict[str, EventBase] = {} + for event in membership_change_events: + mem_last_change_by_room_id[event.room_id] = event + + # For the latest membership event in each room found, add/remove the room ID + # from the joined room list accordingly. In this case we only care if the + # latest change is JOIN. + + for room_id, event in mem_last_change_by_room_id.items(): + assert event.internal_metadata.stream_ordering + if ( + event.internal_metadata.stream_ordering + < token_before_rooms.room_key.stream + ): + continue + + logger.info( + "User membership change between getting rooms and current token: %s %s %s", + user_id, + event.membership, + room_id, + ) + # User joined a room - we have to then check the room state to ensure we + # respect any bans if there's a race between the join and ban events. + if event.membership == Membership.JOIN: + user_ids_in_room = await self.store.get_users_in_room(room_id) + if user_id in user_ids_in_room: + mutable_joined_room_ids.add(room_id) + # The user left the room, or left and was re-invited but not joined yet + else: + mutable_joined_room_ids.discard(room_id) + + # Tweak the set of rooms to return to the client for eager (non-lazy) syncs. + mutable_rooms_to_exclude = set(self.rooms_to_exclude_globally) + if not sync_config.filter_collection.lazy_load_members(): + # Non-lazy syncs should never include partially stated rooms. + # Exclude all partially stated rooms from this sync. + results = await self.store.is_partial_state_room_batched( + mutable_joined_room_ids + ) + mutable_rooms_to_exclude.update( + room_id + for room_id, is_partial_state in results.items() + if is_partial_state + ) + membership_change_events = [ + event + for event in membership_change_events + if not results.get(event.room_id, False) + ] + + # Incremental eager syncs should additionally include rooms that + # - we are joined to + # - are full-stated + # - became fully-stated at some point during the sync period + # (These rooms will have been omitted during a previous eager sync.) + forced_newly_joined_room_ids: Set[str] = set() + if since_token and not sync_config.filter_collection.lazy_load_members(): + un_partial_stated_rooms = ( + await self.store.get_un_partial_stated_rooms_between( + since_token.un_partial_stated_rooms_key, + now_token.un_partial_stated_rooms_key, + mutable_joined_room_ids, + ) + ) + results = await self.store.is_partial_state_room_batched( + un_partial_stated_rooms + ) + forced_newly_joined_room_ids.update( + room_id + for room_id, is_partial_state in results.items() + if not is_partial_state + ) + + # Now we have our list of joined room IDs, exclude as configured and freeze + joined_room_ids = frozenset( + room_id + for room_id in mutable_joined_room_ids + if room_id not in mutable_rooms_to_exclude + ) sync_result_builder = SyncResultBuilder( sync_config, - full_state=False, + full_state, since_token=since_token, now_token=now_token, joined_room_ids=joined_room_ids, - # Dummy values to fill out `SyncResultBuilder` - excluded_room_ids=frozenset({}), - forced_newly_joined_room_ids=frozenset({}), - membership_change_events=[], + excluded_room_ids=frozenset(mutable_rooms_to_exclude), + forced_newly_joined_room_ids=frozenset(forced_newly_joined_room_ids), + membership_change_events=membership_change_events, ) - await self._generate_sync_entry_for_to_device(sync_result_builder) - - return E2eeSyncResult( - to_device=sync_result_builder.to_device, - # to_device: List[JsonDict] - # device_lists: DeviceListUpdates - # device_one_time_keys_count: JsonMapping - # device_unused_fallback_key_types: List[str] - next_batch=sync_result_builder.now_token, - ) + return sync_result_builder @measure_func("_generate_sync_entry_for_device_list") async def _generate_sync_entry_for_device_list(