diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index 16d94925f5..cf448fa3cd 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -18,22 +18,25 @@ # # import logging -from typing import TYPE_CHECKING, Dict, List, Optional, Tuple +from typing import TYPE_CHECKING, Dict, List, Optional, Set, Tuple +import attr from immutabledict import immutabledict -from synapse.api.constants import AccountDataTypes, EventTypes, Membership +from synapse.api.constants import AccountDataTypes, Direction, EventTypes, Membership from synapse.events import EventBase from synapse.storage.roommember import RoomsForUser from synapse.types import ( PersistedEventPosition, Requester, RoomStreamToken, + StreamKeyType, StreamToken, UserID, ) from synapse.types.handlers import OperationType, SlidingSyncConfig, SlidingSyncResult -from synapse.types.state import StateFilter +from synapse.types.state import StateFilter, StateKey +from synapse.visibility import filter_events_for_client if TYPE_CHECKING: from synapse.server import HomeServer @@ -82,6 +85,18 @@ def filter_membership_for_sync(*, membership: str, user_id: str, sender: str) -> return membership != Membership.LEAVE or sender != user_id +# We can't freeze this class because we want to update it in place with the +# de-duplicated data. +@attr.s(slots=True, auto_attribs=True) +class RoomSyncConfig: + """ + Holds the config for what data we should fetch for a room in the sync response. + """ + + timeline_limit: int + required_state: Set[StateKey] + + class SlidingSyncHandler: def __init__(self, hs: "HomeServer"): self.clock = hs.get_clock() @@ -201,6 +216,7 @@ class SlidingSyncHandler: # Assemble sliding window lists lists: Dict[str, SlidingSyncResult.SlidingWindowList] = {} + relevant_room_map: Dict[str, RoomSyncConfig] = {} if sync_config.lists: # Get all of the room IDs that the user should be able to see in the sync # response @@ -225,29 +241,66 @@ class SlidingSyncHandler: ops: List[SlidingSyncResult.SlidingWindowList.Operation] = [] if list_config.ranges: for range in list_config.ranges: + room_id_set = { + room_id + for room_id, _ in sorted_room_info[range[0] : range[1]] + } + ops.append( SlidingSyncResult.SlidingWindowList.Operation( op=OperationType.SYNC, range=range, - room_ids=[ - room_id - for room_id, _ in sorted_room_info[ - range[0] : range[1] - ] - ], + room_ids=list(room_id_set), ) ) + # Update the relevant room map + for room_id in room_id_set: + if relevant_room_map.get(room_id) is not None: + # Take the highest timeline limit + if ( + relevant_room_map[room_id].timeline_limit + < list_config.timeline_limit + ): + relevant_room_map[room_id].timeline_limit = ( + list_config.timeline_limit + ) + + # Union the required state + relevant_room_map[room_id].required_state.update( + list_config.required_state + ) + else: + relevant_room_map[room_id] = RoomSyncConfig( + timeline_limit=list_config.timeline_limit, + required_state=set(list_config.required_state), + ) + lists[list_key] = SlidingSyncResult.SlidingWindowList( count=len(sorted_room_info), ops=ops, ) + # TODO: if (sync_config.room_subscriptions): + + # Fetch room data + rooms: Dict[str, SlidingSyncResult.RoomResult] = {} + for room_id, room_sync_config in relevant_room_map.items(): + room_sync_result = await self.get_room_sync_data( + user=sync_config.user, + room_id=room_id, + room_sync_config=room_sync_config, + rooms_for_user_membership_at_to_token=sync_room_map[room_id], + from_token=from_token, + to_token=to_token, + ) + + rooms[room_id] = room_sync_result + return SlidingSyncResult( next_pos=to_token, lists=lists, - # TODO: Gather room data for rooms in lists and `sync_config.room_subscriptions` - rooms={}, + rooms=rooms, extensions={}, ) @@ -665,3 +718,130 @@ class SlidingSyncHandler: # We want descending order reverse=True, ) + + async def get_room_sync_data( + self, + user: UserID, + room_id: str, + room_sync_config: RoomSyncConfig, + rooms_for_user_membership_at_to_token: RoomsForUser, + from_token: Optional[StreamToken], + to_token: StreamToken, + ) -> SlidingSyncResult.RoomResult: + """ + Fetch room data for a room. + + We fetch data according to the token range (> `from_token` and <= `to_token`). + + Args: + user: User to fetch data for + room_id: The room ID to fetch data for + room_sync_config: Config for what data we should fetch for a room in the + sync response. + rooms_for_user_membership_at_to_token: Membership information for the user + in the room at the time of `to_token`. + from_token: The point in the stream to sync from. + to_token: The point in the stream to sync up to. + """ + + timeline_events: List[EventBase] = [] + limited = False + # We want to use `to_token` (vs `from_token`) because we look backwards from the + # `to_token` up to the `timeline_limit` and we might not reach `from_token` + # before we hit the limit. We will update the room stream position once we've + # fetched the events. + prev_batch_token = to_token + if room_sync_config.timeline_limit > 0: + timeline_events, new_room_key = await self.store.paginate_room_events( + room_id=room_id, + # We're going to paginate backwards from the `to_token` + from_key=to_token.room_key, + to_key=from_token.room_key if from_token is not None else None, + direction=Direction.BACKWARDS, + # We add one so we can determine if there are enough events to saturate + # the limit or not (see `limited`) + limit=room_sync_config.timeline_limit + 1, + event_filter=None, + ) + + # We want to return the events in ascending order (the last event is the + # most recent). + timeline_events.reverse() + + timeline_events = await filter_events_for_client( + self.storage_controllers, + user.to_string(), + timeline_events, + is_peeking=rooms_for_user_membership_at_to_token.membership + != Membership.JOIN, + filter_send_to_client=True, + ) + + # Determine our `limited` status + if len(timeline_events) > room_sync_config.timeline_limit: + limited = True + # Get rid of that extra "+ 1" event because we only used it to determine + # if we hit the limit or not + timeline_events = timeline_events[-room_sync_config.timeline_limit :] + assert timeline_events[0].internal_metadata.stream_ordering + new_room_key = RoomStreamToken( + stream=timeline_events[0].internal_metadata.stream_ordering - 1 + ) + + prev_batch_token = prev_batch_token.copy_and_replace( + StreamKeyType.ROOM, new_room_key + ) + + # Figure out any stripped state events for invite/knocks + stripped_state: List[EventBase] = [] + if rooms_for_user_membership_at_to_token.membership in { + Membership.INVITE, + Membership.KNOCK, + }: + invite_or_knock_event = await self.store.get_event( + rooms_for_user_membership_at_to_token.event_id + ) + + stripped_state = [] + if invite_or_knock_event.membership == Membership.INVITE: + stripped_state = invite_or_knock_event.unsigned.get( + "invite_room_state", [] + ) + elif invite_or_knock_event.membership == Membership.KNOCK: + stripped_state = invite_or_knock_event.unsigned.get( + "knock_room_state", [] + ) + + stripped_state.append(invite_or_knock_event) + + return SlidingSyncResult.RoomResult( + # TODO: Dummy value + name="TODO", + # TODO: Dummy value + avatar=None, + # TODO: Dummy value + heroes=None, + # Since we can't determine whether we've already sent a room down this + # Sliding Sync connection before (we plan to add this optimization in the + # future), we're always returning the requested room state instead of + # updates. + initial=True, + # TODO: Dummy value + required_state=[], + timeline=timeline_events, + # TODO: Dummy value + is_dm=False, + stripped_state=stripped_state, + prev_batch=prev_batch_token, + limited=limited, + # TODO: Dummy values + joined_count=0, + invited_count=0, + # TODO: These are just dummy values. We could potentially just remove these + # since notifications can only really be done correctly on the client anyway + # (encrypted rooms). + notification_count=0, + highlight_count=0, + # TODO: Dummy value + num_live=0, + ) diff --git a/synapse/rest/client/sync.py b/synapse/rest/client/sync.py index 1b0ac20d94..b261b2dd88 100644 --- a/synapse/rest/client/sync.py +++ b/synapse/rest/client/sync.py @@ -761,7 +761,6 @@ class SlidingSyncRestServlet(RestServlet): "lists": { "foo-list": { "ranges": [ [0, 99] ], - "sort": [ "by_notification_level", "by_recency", "by_name" ], "required_state": [ ["m.room.join_rules", ""], ["m.room.history_visibility", ""], @@ -771,7 +770,6 @@ class SlidingSyncRestServlet(RestServlet): "filters": { "is_dm": true }, - "bump_event_types": [ "m.room.message", "m.room.encrypted" ], } }, // Room Subscriptions API @@ -779,10 +777,6 @@ class SlidingSyncRestServlet(RestServlet): "!sub1:bar": { "required_state": [ ["*","*"] ], "timeline_limit": 10, - "include_old_rooms": { - "timeline_limit": 1, - "required_state": [ ["m.room.tombstone", ""], ["m.room.create", ""] ], - } } }, // Extensions API @@ -871,10 +865,11 @@ class SlidingSyncRestServlet(RestServlet): super().__init__() self.auth = hs.get_auth() self.store = hs.get_datastores().main + self.clock = hs.get_clock() self.filtering = hs.get_filtering() self.sliding_sync_handler = hs.get_sliding_sync_handler() + self.event_serializer = hs.get_event_client_serializer() - # TODO: Update this to `on_GET` once we figure out how we want to handle params async def on_POST(self, request: SynapseRequest) -> Tuple[int, JsonDict]: requester = await self.auth.get_user_by_req(request, allow_guest=True) user = requester.user @@ -920,13 +915,14 @@ class SlidingSyncRestServlet(RestServlet): logger.info("Client has disconnected; not serializing response.") return 200, {} - response_content = await self.encode_response(sliding_sync_results) + response_content = await self.encode_response(requester, sliding_sync_results) return 200, response_content # TODO: Is there a better way to encode things? async def encode_response( self, + requester: Requester, sliding_sync_result: SlidingSyncResult, ) -> JsonDict: response: JsonDict = defaultdict(dict) @@ -935,7 +931,9 @@ class SlidingSyncRestServlet(RestServlet): serialized_lists = self.encode_lists(sliding_sync_result.lists) if serialized_lists: response["lists"] = serialized_lists - response["rooms"] = {} # TODO: sliding_sync_result.rooms + response["rooms"] = await self.encode_rooms( + requester, sliding_sync_result.rooms + ) response["extensions"] = {} # TODO: sliding_sync_result.extensions return response @@ -961,6 +959,79 @@ class SlidingSyncRestServlet(RestServlet): return serialized_lists + async def encode_rooms( + self, + requester: Requester, + rooms: Dict[str, SlidingSyncResult.RoomResult], + ) -> JsonDict: + time_now = self.clock.time_msec() + + serialize_options = SerializeEventConfig( + event_format=format_event_for_client_v2_without_room_id, + requester=requester, + ) + + serialized_rooms = {} + for room_id, room_result in rooms.items(): + serialized_timeline = await self.event_serializer.serialize_events( + room_result.timeline, + time_now, + config=serialize_options, + # TODO + # bundle_aggregations=room.timeline.bundled_aggregations, + ) + + serialized_required_state = await self.event_serializer.serialize_events( + room_result.required_state, + time_now, + config=serialize_options, + ) + + serialized_rooms[room_id] = { + "name": room_result.name, + "required_state": serialized_required_state, + "timeline": serialized_timeline, + "prev_batch": await room_result.prev_batch.to_string(self.store), + "limited": room_result.limited, + "joined_count": room_result.joined_count, + "invited_count": room_result.invited_count, + "notification_count": room_result.notification_count, + "highlight_count": room_result.highlight_count, + "num_live": room_result.num_live, + } + + if room_result.avatar: + serialized_rooms[room_id]["avatar"] = room_result.avatar + + if room_result.heroes: + serialized_rooms[room_id]["heroes"] = room_result.heroes + + # We should only include the `initial` key if it's `True` to save bandwidth. + # The absense of this flag means `False`. + if room_result.initial: + serialized_rooms[room_id]["initial"] = room_result.initial + + # Field should be absent on non-DM rooms + if room_result.is_dm: + serialized_rooms[room_id]["is_dm"] = room_result.is_dm + + # Stripped state only applies to invite/knock rooms + if room_result.stripped_state: + serialized_stripped_state = ( + await self.event_serializer.serialize_events( + room_result.stripped_state, + time_now, + config=serialize_options, + ) + ) + + # TODO: Would be good to rename this to `stripped_state` so it can be + # shared between invite and knock rooms, see + # https://github.com/matrix-org/matrix-spec-proposals/pull/3575#discussion_r1117629919 + serialized_rooms[room_id]["invite_state"] = serialized_stripped_state + + return serialized_rooms + def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None: SyncRestServlet(hs).register(http_server) diff --git a/synapse/types/handlers/__init__.py b/synapse/types/handlers/__init__.py index 1d65551d5b..b544398a35 100644 --- a/synapse/types/handlers/__init__.py +++ b/synapse/types/handlers/__init__.py @@ -162,8 +162,9 @@ class SlidingSyncResult: timeline: Latest events in the room. The last event is the most recent is_dm: Flag to specify whether the room is a direct-message room (most likely between two people). - invite_state: Stripped state events. Same as `rooms.invite.$room_id.invite_state` - in sync v2, absent on joined/left rooms + stripped_state: Stripped state events (for rooms where the usre is + invited/knocked). Same as `rooms.invite.$room_id.invite_state` in sync v2, + absent on joined/left rooms prev_batch: A token that can be passed as a start parameter to the `/rooms//messages` API to retrieve earlier messages. limited: True if their are more events than fit between the given position and now. @@ -192,7 +193,7 @@ class SlidingSyncResult: required_state: List[EventBase] timeline: List[EventBase] is_dm: bool - invite_state: List[EventBase] + stripped_state: Optional[List[EventBase]] prev_batch: StreamToken limited: bool joined_count: int diff --git a/synapse/types/rest/client/__init__.py b/synapse/types/rest/client/__init__.py index e2c79c4106..25fbd772f6 100644 --- a/synapse/types/rest/client/__init__.py +++ b/synapse/types/rest/client/__init__.py @@ -152,9 +152,6 @@ class SlidingSyncBody(RequestBodyModel): anyway. timeline_limit: The maximum number of timeline events to return per response. (Max 1000 messages) - include_old_rooms: Determines if `predecessor` rooms are included in the - `rooms` response. The user MUST be joined to old rooms for them to show up - in the response. """ class IncludeOldRooms(RequestBodyModel): @@ -167,7 +164,6 @@ class SlidingSyncBody(RequestBodyModel): timeline_limit: int else: timeline_limit: conint(le=1000, strict=True) # type: ignore[valid-type] - include_old_rooms: Optional[IncludeOldRooms] = None class SlidingSyncList(CommonRoomParameters): """ @@ -208,9 +204,6 @@ class SlidingSyncBody(RequestBodyModel): } timeline_limit: The maximum number of timeline events to return per response. - include_old_rooms: Determines if `predecessor` rooms are included in the - `rooms` response. The user MUST be joined to old rooms for them to show up - in the response. include_heroes: Return a stripped variant of membership events (containing `user_id` and optionally `avatar_url` and `displayname`) for the users used to calculate the room name.