From 079194c54740e5046bb988a1b6d602bdd21044ec Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Mon, 17 Jun 2024 18:03:02 -0500 Subject: [PATCH 01/20] Return some room timeline data in Sliding Sync --- synapse/handlers/sliding_sync.py | 202 ++++++++++++++++++++++++-- synapse/rest/client/sync.py | 89 ++++++++++-- synapse/types/handlers/__init__.py | 7 +- synapse/types/rest/client/__init__.py | 7 - 4 files changed, 275 insertions(+), 30 deletions(-) diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index 16d94925f5..cf448fa3cd 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -18,22 +18,25 @@ # # import logging -from typing import TYPE_CHECKING, Dict, List, Optional, Tuple +from typing import TYPE_CHECKING, Dict, List, Optional, Set, Tuple +import attr from immutabledict import immutabledict -from synapse.api.constants import AccountDataTypes, EventTypes, Membership +from synapse.api.constants import AccountDataTypes, Direction, EventTypes, Membership from synapse.events import EventBase from synapse.storage.roommember import RoomsForUser from synapse.types import ( PersistedEventPosition, Requester, RoomStreamToken, + StreamKeyType, StreamToken, UserID, ) from synapse.types.handlers import OperationType, SlidingSyncConfig, SlidingSyncResult -from synapse.types.state import StateFilter +from synapse.types.state import StateFilter, StateKey +from synapse.visibility import filter_events_for_client if TYPE_CHECKING: from synapse.server import HomeServer @@ -82,6 +85,18 @@ def filter_membership_for_sync(*, membership: str, user_id: str, sender: str) -> return membership != Membership.LEAVE or sender != user_id +# We can't freeze this class because we want to update it in place with the +# de-duplicated data. +@attr.s(slots=True, auto_attribs=True) +class RoomSyncConfig: + """ + Holds the config for what data we should fetch for a room in the sync response. + """ + + timeline_limit: int + required_state: Set[StateKey] + + class SlidingSyncHandler: def __init__(self, hs: "HomeServer"): self.clock = hs.get_clock() @@ -201,6 +216,7 @@ class SlidingSyncHandler: # Assemble sliding window lists lists: Dict[str, SlidingSyncResult.SlidingWindowList] = {} + relevant_room_map: Dict[str, RoomSyncConfig] = {} if sync_config.lists: # Get all of the room IDs that the user should be able to see in the sync # response @@ -225,29 +241,66 @@ class SlidingSyncHandler: ops: List[SlidingSyncResult.SlidingWindowList.Operation] = [] if list_config.ranges: for range in list_config.ranges: + room_id_set = { + room_id + for room_id, _ in sorted_room_info[range[0] : range[1]] + } + ops.append( SlidingSyncResult.SlidingWindowList.Operation( op=OperationType.SYNC, range=range, - room_ids=[ - room_id - for room_id, _ in sorted_room_info[ - range[0] : range[1] - ] - ], + room_ids=list(room_id_set), ) ) + # Update the relevant room map + for room_id in room_id_set: + if relevant_room_map.get(room_id) is not None: + # Take the highest timeline limit + if ( + relevant_room_map[room_id].timeline_limit + < list_config.timeline_limit + ): + relevant_room_map[room_id].timeline_limit = ( + list_config.timeline_limit + ) + + # Union the required state + relevant_room_map[room_id].required_state.update( + list_config.required_state + ) + else: + relevant_room_map[room_id] = RoomSyncConfig( + timeline_limit=list_config.timeline_limit, + required_state=set(list_config.required_state), + ) + lists[list_key] = SlidingSyncResult.SlidingWindowList( count=len(sorted_room_info), ops=ops, ) + # TODO: if (sync_config.room_subscriptions): + + # Fetch room data + rooms: Dict[str, SlidingSyncResult.RoomResult] = {} + for room_id, room_sync_config in relevant_room_map.items(): + room_sync_result = await self.get_room_sync_data( + user=sync_config.user, + room_id=room_id, + room_sync_config=room_sync_config, + rooms_for_user_membership_at_to_token=sync_room_map[room_id], + from_token=from_token, + to_token=to_token, + ) + + rooms[room_id] = room_sync_result + return SlidingSyncResult( next_pos=to_token, lists=lists, - # TODO: Gather room data for rooms in lists and `sync_config.room_subscriptions` - rooms={}, + rooms=rooms, extensions={}, ) @@ -665,3 +718,130 @@ class SlidingSyncHandler: # We want descending order reverse=True, ) + + async def get_room_sync_data( + self, + user: UserID, + room_id: str, + room_sync_config: RoomSyncConfig, + rooms_for_user_membership_at_to_token: RoomsForUser, + from_token: Optional[StreamToken], + to_token: StreamToken, + ) -> SlidingSyncResult.RoomResult: + """ + Fetch room data for a room. + + We fetch data according to the token range (> `from_token` and <= `to_token`). + + Args: + user: User to fetch data for + room_id: The room ID to fetch data for + room_sync_config: Config for what data we should fetch for a room in the + sync response. + rooms_for_user_membership_at_to_token: Membership information for the user + in the room at the time of `to_token`. + from_token: The point in the stream to sync from. + to_token: The point in the stream to sync up to. + """ + + timeline_events: List[EventBase] = [] + limited = False + # We want to use `to_token` (vs `from_token`) because we look backwards from the + # `to_token` up to the `timeline_limit` and we might not reach `from_token` + # before we hit the limit. We will update the room stream position once we've + # fetched the events. + prev_batch_token = to_token + if room_sync_config.timeline_limit > 0: + timeline_events, new_room_key = await self.store.paginate_room_events( + room_id=room_id, + # We're going to paginate backwards from the `to_token` + from_key=to_token.room_key, + to_key=from_token.room_key if from_token is not None else None, + direction=Direction.BACKWARDS, + # We add one so we can determine if there are enough events to saturate + # the limit or not (see `limited`) + limit=room_sync_config.timeline_limit + 1, + event_filter=None, + ) + + # We want to return the events in ascending order (the last event is the + # most recent). + timeline_events.reverse() + + timeline_events = await filter_events_for_client( + self.storage_controllers, + user.to_string(), + timeline_events, + is_peeking=rooms_for_user_membership_at_to_token.membership + != Membership.JOIN, + filter_send_to_client=True, + ) + + # Determine our `limited` status + if len(timeline_events) > room_sync_config.timeline_limit: + limited = True + # Get rid of that extra "+ 1" event because we only used it to determine + # if we hit the limit or not + timeline_events = timeline_events[-room_sync_config.timeline_limit :] + assert timeline_events[0].internal_metadata.stream_ordering + new_room_key = RoomStreamToken( + stream=timeline_events[0].internal_metadata.stream_ordering - 1 + ) + + prev_batch_token = prev_batch_token.copy_and_replace( + StreamKeyType.ROOM, new_room_key + ) + + # Figure out any stripped state events for invite/knocks + stripped_state: List[EventBase] = [] + if rooms_for_user_membership_at_to_token.membership in { + Membership.INVITE, + Membership.KNOCK, + }: + invite_or_knock_event = await self.store.get_event( + rooms_for_user_membership_at_to_token.event_id + ) + + stripped_state = [] + if invite_or_knock_event.membership == Membership.INVITE: + stripped_state = invite_or_knock_event.unsigned.get( + "invite_room_state", [] + ) + elif invite_or_knock_event.membership == Membership.KNOCK: + stripped_state = invite_or_knock_event.unsigned.get( + "knock_room_state", [] + ) + + stripped_state.append(invite_or_knock_event) + + return SlidingSyncResult.RoomResult( + # TODO: Dummy value + name="TODO", + # TODO: Dummy value + avatar=None, + # TODO: Dummy value + heroes=None, + # Since we can't determine whether we've already sent a room down this + # Sliding Sync connection before (we plan to add this optimization in the + # future), we're always returning the requested room state instead of + # updates. + initial=True, + # TODO: Dummy value + required_state=[], + timeline=timeline_events, + # TODO: Dummy value + is_dm=False, + stripped_state=stripped_state, + prev_batch=prev_batch_token, + limited=limited, + # TODO: Dummy values + joined_count=0, + invited_count=0, + # TODO: These are just dummy values. We could potentially just remove these + # since notifications can only really be done correctly on the client anyway + # (encrypted rooms). + notification_count=0, + highlight_count=0, + # TODO: Dummy value + num_live=0, + ) diff --git a/synapse/rest/client/sync.py b/synapse/rest/client/sync.py index 1b0ac20d94..b261b2dd88 100644 --- a/synapse/rest/client/sync.py +++ b/synapse/rest/client/sync.py @@ -761,7 +761,6 @@ class SlidingSyncRestServlet(RestServlet): "lists": { "foo-list": { "ranges": [ [0, 99] ], - "sort": [ "by_notification_level", "by_recency", "by_name" ], "required_state": [ ["m.room.join_rules", ""], ["m.room.history_visibility", ""], @@ -771,7 +770,6 @@ class SlidingSyncRestServlet(RestServlet): "filters": { "is_dm": true }, - "bump_event_types": [ "m.room.message", "m.room.encrypted" ], } }, // Room Subscriptions API @@ -779,10 +777,6 @@ class SlidingSyncRestServlet(RestServlet): "!sub1:bar": { "required_state": [ ["*","*"] ], "timeline_limit": 10, - "include_old_rooms": { - "timeline_limit": 1, - "required_state": [ ["m.room.tombstone", ""], ["m.room.create", ""] ], - } } }, // Extensions API @@ -871,10 +865,11 @@ class SlidingSyncRestServlet(RestServlet): super().__init__() self.auth = hs.get_auth() self.store = hs.get_datastores().main + self.clock = hs.get_clock() self.filtering = hs.get_filtering() self.sliding_sync_handler = hs.get_sliding_sync_handler() + self.event_serializer = hs.get_event_client_serializer() - # TODO: Update this to `on_GET` once we figure out how we want to handle params async def on_POST(self, request: SynapseRequest) -> Tuple[int, JsonDict]: requester = await self.auth.get_user_by_req(request, allow_guest=True) user = requester.user @@ -920,13 +915,14 @@ class SlidingSyncRestServlet(RestServlet): logger.info("Client has disconnected; not serializing response.") return 200, {} - response_content = await self.encode_response(sliding_sync_results) + response_content = await self.encode_response(requester, sliding_sync_results) return 200, response_content # TODO: Is there a better way to encode things? async def encode_response( self, + requester: Requester, sliding_sync_result: SlidingSyncResult, ) -> JsonDict: response: JsonDict = defaultdict(dict) @@ -935,7 +931,9 @@ class SlidingSyncRestServlet(RestServlet): serialized_lists = self.encode_lists(sliding_sync_result.lists) if serialized_lists: response["lists"] = serialized_lists - response["rooms"] = {} # TODO: sliding_sync_result.rooms + response["rooms"] = await self.encode_rooms( + requester, sliding_sync_result.rooms + ) response["extensions"] = {} # TODO: sliding_sync_result.extensions return response @@ -961,6 +959,79 @@ class SlidingSyncRestServlet(RestServlet): return serialized_lists + async def encode_rooms( + self, + requester: Requester, + rooms: Dict[str, SlidingSyncResult.RoomResult], + ) -> JsonDict: + time_now = self.clock.time_msec() + + serialize_options = SerializeEventConfig( + event_format=format_event_for_client_v2_without_room_id, + requester=requester, + ) + + serialized_rooms = {} + for room_id, room_result in rooms.items(): + serialized_timeline = await self.event_serializer.serialize_events( + room_result.timeline, + time_now, + config=serialize_options, + # TODO + # bundle_aggregations=room.timeline.bundled_aggregations, + ) + + serialized_required_state = await self.event_serializer.serialize_events( + room_result.required_state, + time_now, + config=serialize_options, + ) + + serialized_rooms[room_id] = { + "name": room_result.name, + "required_state": serialized_required_state, + "timeline": serialized_timeline, + "prev_batch": await room_result.prev_batch.to_string(self.store), + "limited": room_result.limited, + "joined_count": room_result.joined_count, + "invited_count": room_result.invited_count, + "notification_count": room_result.notification_count, + "highlight_count": room_result.highlight_count, + "num_live": room_result.num_live, + } + + if room_result.avatar: + serialized_rooms[room_id]["avatar"] = room_result.avatar + + if room_result.heroes: + serialized_rooms[room_id]["heroes"] = room_result.heroes + + # We should only include the `initial` key if it's `True` to save bandwidth. + # The absense of this flag means `False`. + if room_result.initial: + serialized_rooms[room_id]["initial"] = room_result.initial + + # Field should be absent on non-DM rooms + if room_result.is_dm: + serialized_rooms[room_id]["is_dm"] = room_result.is_dm + + # Stripped state only applies to invite/knock rooms + if room_result.stripped_state: + serialized_stripped_state = ( + await self.event_serializer.serialize_events( + room_result.stripped_state, + time_now, + config=serialize_options, + ) + ) + + # TODO: Would be good to rename this to `stripped_state` so it can be + # shared between invite and knock rooms, see + # https://github.com/matrix-org/matrix-spec-proposals/pull/3575#discussion_r1117629919 + serialized_rooms[room_id]["invite_state"] = serialized_stripped_state + + return serialized_rooms + def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None: SyncRestServlet(hs).register(http_server) diff --git a/synapse/types/handlers/__init__.py b/synapse/types/handlers/__init__.py index 1d65551d5b..b544398a35 100644 --- a/synapse/types/handlers/__init__.py +++ b/synapse/types/handlers/__init__.py @@ -162,8 +162,9 @@ class SlidingSyncResult: timeline: Latest events in the room. The last event is the most recent is_dm: Flag to specify whether the room is a direct-message room (most likely between two people). - invite_state: Stripped state events. Same as `rooms.invite.$room_id.invite_state` - in sync v2, absent on joined/left rooms + stripped_state: Stripped state events (for rooms where the usre is + invited/knocked). Same as `rooms.invite.$room_id.invite_state` in sync v2, + absent on joined/left rooms prev_batch: A token that can be passed as a start parameter to the `/rooms//messages` API to retrieve earlier messages. limited: True if their are more events than fit between the given position and now. @@ -192,7 +193,7 @@ class SlidingSyncResult: required_state: List[EventBase] timeline: List[EventBase] is_dm: bool - invite_state: List[EventBase] + stripped_state: Optional[List[EventBase]] prev_batch: StreamToken limited: bool joined_count: int diff --git a/synapse/types/rest/client/__init__.py b/synapse/types/rest/client/__init__.py index e2c79c4106..25fbd772f6 100644 --- a/synapse/types/rest/client/__init__.py +++ b/synapse/types/rest/client/__init__.py @@ -152,9 +152,6 @@ class SlidingSyncBody(RequestBodyModel): anyway. timeline_limit: The maximum number of timeline events to return per response. (Max 1000 messages) - include_old_rooms: Determines if `predecessor` rooms are included in the - `rooms` response. The user MUST be joined to old rooms for them to show up - in the response. """ class IncludeOldRooms(RequestBodyModel): @@ -167,7 +164,6 @@ class SlidingSyncBody(RequestBodyModel): timeline_limit: int else: timeline_limit: conint(le=1000, strict=True) # type: ignore[valid-type] - include_old_rooms: Optional[IncludeOldRooms] = None class SlidingSyncList(CommonRoomParameters): """ @@ -208,9 +204,6 @@ class SlidingSyncBody(RequestBodyModel): } timeline_limit: The maximum number of timeline events to return per response. - include_old_rooms: Determines if `predecessor` rooms are included in the - `rooms` response. The user MUST be joined to old rooms for them to show up - in the response. include_heroes: Return a stripped variant of membership events (containing `user_id` and optionally `avatar_url` and `displayname`) for the users used to calculate the room name. From 3e0f759dbc34cb3be0a1946cd36e617fc3c5a17c Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Mon, 17 Jun 2024 18:26:59 -0500 Subject: [PATCH 02/20] Strip invite/knock event itself and avoid mutating event `unsigned` Make sure we don't run into https://github.com/element-hq/synapse/issues/14919 (https://github.com/matrix-org/synapse/issues/14919) --- synapse/events/utils.py | 18 ++++++++++++++++++ synapse/handlers/sliding_sync.py | 14 ++++++++------ synapse/rest/client/sync.py | 10 +--------- .../storage/databases/main/events_worker.py | 12 ++---------- synapse/types/handlers/__init__.py | 4 ++-- 5 files changed, 31 insertions(+), 27 deletions(-) diff --git a/synapse/events/utils.py b/synapse/events/utils.py index b997d82d71..f937fd4698 100644 --- a/synapse/events/utils.py +++ b/synapse/events/utils.py @@ -836,3 +836,21 @@ def maybe_upsert_event_field( del container[key] return upsert_okay + + +def strip_event(event: EventBase) -> JsonDict: + """ + Used for "stripped state" events which provide a simplified view of the state of a + room intended to help a potential joiner identify the room (relevant when the user + is invited or knocked). + + Stripped state events can only have the `sender`, `type`, `state_key` and `content` + properties present. + """ + + return { + "type": event.type, + "state_key": event.state_key, + "content": event.content, + "sender": event.sender, + } diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index cf448fa3cd..23f971c1f7 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -25,8 +25,10 @@ from immutabledict import immutabledict from synapse.api.constants import AccountDataTypes, Direction, EventTypes, Membership from synapse.events import EventBase +from synapse.events.utils import strip_event from synapse.storage.roommember import RoomsForUser from synapse.types import ( + JsonDict, PersistedEventPosition, Requester, RoomStreamToken, @@ -793,7 +795,7 @@ class SlidingSyncHandler: ) # Figure out any stripped state events for invite/knocks - stripped_state: List[EventBase] = [] + stripped_state: List[JsonDict] = [] if rooms_for_user_membership_at_to_token.membership in { Membership.INVITE, Membership.KNOCK, @@ -804,15 +806,15 @@ class SlidingSyncHandler: stripped_state = [] if invite_or_knock_event.membership == Membership.INVITE: - stripped_state = invite_or_knock_event.unsigned.get( - "invite_room_state", [] + stripped_state.extend( + invite_or_knock_event.unsigned.get("invite_room_state", []) ) elif invite_or_knock_event.membership == Membership.KNOCK: - stripped_state = invite_or_knock_event.unsigned.get( - "knock_room_state", [] + stripped_state.extend( + invite_or_knock_event.unsigned.get("knock_room_state", []) ) - stripped_state.append(invite_or_knock_event) + stripped_state.append(strip_event(invite_or_knock_event)) return SlidingSyncResult.RoomResult( # TODO: Dummy value diff --git a/synapse/rest/client/sync.py b/synapse/rest/client/sync.py index b261b2dd88..a9be37bbf3 100644 --- a/synapse/rest/client/sync.py +++ b/synapse/rest/client/sync.py @@ -1017,18 +1017,10 @@ class SlidingSyncRestServlet(RestServlet): # Stripped state only applies to invite/knock rooms if room_result.stripped_state: - serialized_stripped_state = ( - await self.event_serializer.serialize_events( - room_result.stripped_state, - time_now, - config=serialize_options, - ) - ) - # TODO: Would be good to rename this to `stripped_state` so it can be # shared between invite and knock rooms, see # https://github.com/matrix-org/matrix-spec-proposals/pull/3575#discussion_r1117629919 - serialized_rooms[room_id]["invite_state"] = serialized_stripped_state + serialized_rooms[room_id]["invite_state"] = room_result.stripped_state return serialized_rooms diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index e264d36f02..f0f390cec4 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -55,7 +55,7 @@ from synapse.api.room_versions import ( ) from synapse.events import EventBase, make_event_from_dict from synapse.events.snapshot import EventContext -from synapse.events.utils import prune_event +from synapse.events.utils import prune_event, strip_event from synapse.logging.context import ( PreserveLoggingContext, current_context, @@ -1025,15 +1025,7 @@ class EventsWorkerStore(SQLBaseStore): state_to_include = await self.get_events(selected_state_ids.values()) - return [ - { - "type": e.type, - "state_key": e.state_key, - "content": e.content, - "sender": e.sender, - } - for e in state_to_include.values() - ] + return [strip_event(e) for e in state_to_include.values()] def _maybe_start_fetch_thread(self) -> None: """Starts an event fetch thread if we are not yet at the maximum number.""" diff --git a/synapse/types/handlers/__init__.py b/synapse/types/handlers/__init__.py index b544398a35..04b0ab972b 100644 --- a/synapse/types/handlers/__init__.py +++ b/synapse/types/handlers/__init__.py @@ -31,7 +31,7 @@ else: from pydantic import Extra from synapse.events import EventBase -from synapse.types import JsonMapping, StreamToken, UserID +from synapse.types import JsonDict, JsonMapping, StreamToken, UserID from synapse.types.rest.client import SlidingSyncBody @@ -193,7 +193,7 @@ class SlidingSyncResult: required_state: List[EventBase] timeline: List[EventBase] is_dm: bool - stripped_state: Optional[List[EventBase]] + stripped_state: Optional[List[JsonDict]] prev_batch: StreamToken limited: bool joined_count: int From 5e2fd4e93ca2084ee92533b59e6d45b3a914fa89 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Mon, 17 Jun 2024 18:29:44 -0500 Subject: [PATCH 03/20] Add changelog --- changelog.d/17320.feature | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/17320.feature diff --git a/changelog.d/17320.feature b/changelog.d/17320.feature new file mode 100644 index 0000000000..1e524f3eca --- /dev/null +++ b/changelog.d/17320.feature @@ -0,0 +1 @@ +Add `rooms` data to experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint. From 8ce06f145260540f0c81c1594a011556e90f32c8 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Mon, 17 Jun 2024 18:54:23 -0500 Subject: [PATCH 04/20] Fix sort being lost --- synapse/handlers/sliding_sync.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index 23f971c1f7..e61b86d779 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -243,21 +243,21 @@ class SlidingSyncHandler: ops: List[SlidingSyncResult.SlidingWindowList.Operation] = [] if list_config.ranges: for range in list_config.ranges: - room_id_set = { + sliced_room_ids = [ room_id for room_id, _ in sorted_room_info[range[0] : range[1]] - } + ] ops.append( SlidingSyncResult.SlidingWindowList.Operation( op=OperationType.SYNC, range=range, - room_ids=list(room_id_set), + room_ids=sliced_room_ids, ) ) # Update the relevant room map - for room_id in room_id_set: + for room_id in sliced_room_ids: if relevant_room_map.get(room_id) is not None: # Take the highest timeline limit if ( From aa5f54aa135de8ae7fdc201792d548de494cbd40 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 18 Jun 2024 08:26:10 -0500 Subject: [PATCH 05/20] Start on required_state --- synapse/handlers/sliding_sync.py | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index e61b86d779..5b834fe9ef 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -93,10 +93,16 @@ def filter_membership_for_sync(*, membership: str, user_id: str, sender: str) -> class RoomSyncConfig: """ Holds the config for what data we should fetch for a room in the sync response. + + Attributes: + timeline_limit: The maximum number of events to return in the timeline. + required_state: The minimum set of state events requested for the room. The + values are close to `StateKey` but actually use a syntax where you can provide + `*` and `$LAZY` as the state key part of the tuple (type, state_key). """ timeline_limit: int - required_state: Set[StateKey] + required_state: Set[Tuple[str, str]] class SlidingSyncHandler: @@ -816,6 +822,14 @@ class SlidingSyncHandler: stripped_state.append(strip_event(invite_or_knock_event)) + required_state = [] + if len(room_sync_config.required_state) > 0: + required_state = await self.storage_controllers.state.get_state_at( + room_id, + to_token, + state_filter=StateFilter.from_types(TODO), + ) + return SlidingSyncResult.RoomResult( # TODO: Dummy value name="TODO", From 5c175d5488ac7b700906a722ee16404527d8d711 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 18 Jun 2024 09:35:20 -0500 Subject: [PATCH 06/20] Add some notes from pairing --- synapse/handlers/sliding_sync.py | 20 ++++++++++++++++++-- synapse/rest/client/sync.py | 1 + 2 files changed, 19 insertions(+), 2 deletions(-) diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index 5b834fe9ef..f9ec4f7961 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -764,6 +764,13 @@ class SlidingSyncHandler: room_id=room_id, # We're going to paginate backwards from the `to_token` from_key=to_token.room_key, + # We should always return historical messages (outside token range) in + # these cases because clients want to be able to show a basic screen of + # information: + # - Initial sync (because no `from_token`) + # - When users newly_join + # - TODO: For incremental sync where we haven't sent it down this + # connection before to_key=from_token.room_key if from_token is not None else None, direction=Direction.BACKWARDS, # We add one so we can determine if there are enough events to saturate @@ -824,14 +831,23 @@ class SlidingSyncHandler: required_state = [] if len(room_sync_config.required_state) > 0: - required_state = await self.storage_controllers.state.get_state_at( + await self.storage_controllers.state.get_current_state( room_id, - to_token, state_filter=StateFilter.from_types(TODO), + await_full_state=False, ) + # TODO: rewind + + # required_state = await self.storage_controllers.state.get_state_at( + # room_id, + # to_token, + # state_filter=StateFilter.from_types(TODO), + # ) + return SlidingSyncResult.RoomResult( # TODO: Dummy value + # TODO: Make this optional because a computed name doesn't make sense for translated cases name="TODO", # TODO: Dummy value avatar=None, diff --git a/synapse/rest/client/sync.py b/synapse/rest/client/sync.py index a9be37bbf3..0ae31f23e9 100644 --- a/synapse/rest/client/sync.py +++ b/synapse/rest/client/sync.py @@ -1021,6 +1021,7 @@ class SlidingSyncRestServlet(RestServlet): # shared between invite and knock rooms, see # https://github.com/matrix-org/matrix-spec-proposals/pull/3575#discussion_r1117629919 serialized_rooms[room_id]["invite_state"] = room_result.stripped_state + # TODO: `knocked_state` but that isn't specced yet return serialized_rooms From 9089bfe4dc505c02739968cdb1b67220e060580d Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 18 Jun 2024 10:06:29 -0500 Subject: [PATCH 07/20] Remove required_state for now --- synapse/handlers/sliding_sync.py | 23 ++++------------------- 1 file changed, 4 insertions(+), 19 deletions(-) diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index f9ec4f7961..f8fd2c6c5e 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -96,9 +96,10 @@ class RoomSyncConfig: Attributes: timeline_limit: The maximum number of events to return in the timeline. - required_state: The minimum set of state events requested for the room. The - values are close to `StateKey` but actually use a syntax where you can provide - `*` and `$LAZY` as the state key part of the tuple (type, state_key). + required_state: The set of state events requested for the room. The + values are close to `StateKey` but actually use a syntax where you can + provide `*` wildcard and `$LAZY` for lazy room members as the `state_key` part + of the tuple (type, state_key). """ timeline_limit: int @@ -829,22 +830,6 @@ class SlidingSyncHandler: stripped_state.append(strip_event(invite_or_knock_event)) - required_state = [] - if len(room_sync_config.required_state) > 0: - await self.storage_controllers.state.get_current_state( - room_id, - state_filter=StateFilter.from_types(TODO), - await_full_state=False, - ) - - # TODO: rewind - - # required_state = await self.storage_controllers.state.get_state_at( - # room_id, - # to_token, - # state_filter=StateFilter.from_types(TODO), - # ) - return SlidingSyncResult.RoomResult( # TODO: Dummy value # TODO: Make this optional because a computed name doesn't make sense for translated cases From 94279915d4432fefb87b2d210a8cd03fd633c002 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 18 Jun 2024 10:09:33 -0500 Subject: [PATCH 08/20] Clean up knock_state comments --- synapse/rest/client/sync.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/synapse/rest/client/sync.py b/synapse/rest/client/sync.py index 0ae31f23e9..db44773824 100644 --- a/synapse/rest/client/sync.py +++ b/synapse/rest/client/sync.py @@ -1017,11 +1017,13 @@ class SlidingSyncRestServlet(RestServlet): # Stripped state only applies to invite/knock rooms if room_result.stripped_state: - # TODO: Would be good to rename this to `stripped_state` so it can be - # shared between invite and knock rooms, see + # TODO: `knocked_state` but that isn't specced yet. + # + # TODO: Instead of adding `knocked_state`, it would be good to rename + # this to `stripped_state` so it can be shared between invite and knock + # rooms, see # https://github.com/matrix-org/matrix-spec-proposals/pull/3575#discussion_r1117629919 serialized_rooms[room_id]["invite_state"] = room_result.stripped_state - # TODO: `knocked_state` but that isn't specced yet return serialized_rooms From 19b22971711da0c8bdbaebed0d2f7a7ccb01e2ae Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 18 Jun 2024 10:36:50 -0500 Subject: [PATCH 09/20] Calculate `num_live` --- synapse/handlers/sliding_sync.py | 55 ++++++++++++++++++++++++++------ 1 file changed, 45 insertions(+), 10 deletions(-) diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index f8fd2c6c5e..1d07e22c91 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -753,6 +753,7 @@ class SlidingSyncHandler: to_token: The point in the stream to sync up to. """ + # Assemble the list of timeline events timeline_events: List[EventBase] = [] limited = False # We want to use `to_token` (vs `from_token`) because we look backwards from the @@ -761,18 +762,34 @@ class SlidingSyncHandler: # fetched the events. prev_batch_token = to_token if room_sync_config.timeline_limit > 0: + newly_joined = False + if ( + from_token is not None + and rooms_for_user_membership_at_to_token.membership == Membership.JOIN + ): + newly_joined = ( + rooms_for_user_membership_at_to_token.event_pos.stream + > from_token.room_key.get_stream_pos_for_instance( + rooms_for_user_membership_at_to_token.event_pos.instance_name + ) + ) + timeline_events, new_room_key = await self.store.paginate_room_events( room_id=room_id, # We're going to paginate backwards from the `to_token` from_key=to_token.room_key, - # We should always return historical messages (outside token range) in - # these cases because clients want to be able to show a basic screen of - # information: - # - Initial sync (because no `from_token`) - # - When users newly_join - # - TODO: For incremental sync where we haven't sent it down this + # We should return historical messages (outside token range) in the + # following cases because we want clients to be able to show a basic + # screen of information: + # - Initial sync (because no `from_token` to limit us anyway) + # - When users newly_joined + # - TODO: For an incremental sync where we haven't sent it down this # connection before - to_key=from_token.room_key if from_token is not None else None, + to_key=( + from_token.room_key + if from_token is not None and not newly_joined + else None + ), direction=Direction.BACKWARDS, # We add one so we can determine if there are enough events to saturate # the limit or not (see `limited`) @@ -804,6 +821,25 @@ class SlidingSyncHandler: stream=timeline_events[0].internal_metadata.stream_ordering - 1 ) + # Determine how many "live" events we have (events within the given token range). + # + # This is mostly useful to determine whether a given @mention event should + # make a noise or not. Clients cannot rely solely on the absence of + # `initial: true` to determine live events because if a room not in the + # sliding window bumps into the window because of an @mention it will have + # `initial: true` yet contain a single live event (with potentially other + # old events in the timeline) + num_live = 0 + if from_token is not None: + for timeline_event in timeline_events: + if ( + timeline_event.internal_metadata.stream_ordering + > from_token.room_key.get_stream_pos_for_instance( + timeline_event.internal_metadata.instance_name + ) + ): + num_live += 1 + prev_batch_token = prev_batch_token.copy_and_replace( StreamKeyType.ROOM, new_room_key ) @@ -838,7 +874,7 @@ class SlidingSyncHandler: avatar=None, # TODO: Dummy value heroes=None, - # Since we can't determine whether we've already sent a room down this + # TODO: Since we can't determine whether we've already sent a room down this # Sliding Sync connection before (we plan to add this optimization in the # future), we're always returning the requested room state instead of # updates. @@ -859,6 +895,5 @@ class SlidingSyncHandler: # (encrypted rooms). notification_count=0, highlight_count=0, - # TODO: Dummy value - num_live=0, + num_live=num_live, ) From 81d36f36c1731738b38f0b7842de1ce84a570d74 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 18 Jun 2024 15:28:23 -0500 Subject: [PATCH 10/20] Add tests for `limited` --- synapse/handlers/sliding_sync.py | 22 +++-- tests/rest/client/test_sync.py | 140 ++++++++++++++++++++++++++++++- 2 files changed, 149 insertions(+), 13 deletions(-) diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index 1d07e22c91..90991031aa 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -37,7 +37,7 @@ from synapse.types import ( UserID, ) from synapse.types.handlers import OperationType, SlidingSyncConfig, SlidingSyncResult -from synapse.types.state import StateFilter, StateKey +from synapse.types.state import StateFilter from synapse.visibility import filter_events_for_client if TYPE_CHECKING: @@ -764,6 +764,7 @@ class SlidingSyncHandler: if room_sync_config.timeline_limit > 0: newly_joined = False if ( + # We can only determine new-ness if we have a `from_token` to define our range from_token is not None and rooms_for_user_membership_at_to_token.membership == Membership.JOIN ): @@ -778,11 +779,11 @@ class SlidingSyncHandler: room_id=room_id, # We're going to paginate backwards from the `to_token` from_key=to_token.room_key, - # We should return historical messages (outside token range) in the + # We should return historical messages (before token range) in the # following cases because we want clients to be able to show a basic # screen of information: # - Initial sync (because no `from_token` to limit us anyway) - # - When users newly_joined + # - When users `newly_joined` # - TODO: For an incremental sync where we haven't sent it down this # connection before to_key=( @@ -832,12 +833,15 @@ class SlidingSyncHandler: num_live = 0 if from_token is not None: for timeline_event in timeline_events: - if ( - timeline_event.internal_metadata.stream_ordering - > from_token.room_key.get_stream_pos_for_instance( - timeline_event.internal_metadata.instance_name - ) - ): + # This fields should be present for all persisted events + assert timeline_event.internal_metadata.stream_ordering is not None + assert timeline_event.internal_metadata.instance_name is not None + + persisted_position = PersistedEventPosition( + instance_name=timeline_event.internal_metadata.instance_name, + stream=timeline_event.internal_metadata.stream_ordering, + ) + if persisted_position.persisted_after(from_token.room_key): num_live += 1 prev_batch_token = prev_batch_token.copy_and_replace( diff --git a/tests/rest/client/test_sync.py b/tests/rest/client/test_sync.py index 2b06767b8a..5b611cd096 100644 --- a/tests/rest/client/test_sync.py +++ b/tests/rest/client/test_sync.py @@ -35,7 +35,7 @@ from synapse.api.constants import ( ) from synapse.rest.client import devices, knock, login, read_marker, receipts, room, sync from synapse.server import HomeServer -from synapse.types import JsonDict, RoomStreamToken, StreamKeyType +from synapse.types import JsonDict, RoomStreamToken, StreamKeyType, StreamToken from synapse.util import Clock from tests import unittest @@ -1282,7 +1282,7 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase): def test_sync_list(self) -> None: """ - Test that room IDs show up in the Sliding Sync lists + Test that room IDs show up in the Sliding Sync `lists` """ alice_user_id = self.register_user("alice", "correcthorse") alice_access_token = self.login(alice_user_id, "correcthorse") @@ -1387,7 +1387,7 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase): def test_filter_list(self) -> None: """ - Test that filters apply to lists + Test that filters apply to `lists` """ user1_id = self.register_user("user1", "pass") user1_tok = self.login(user1_id, "pass") @@ -1462,7 +1462,7 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase): def test_sort_list(self) -> None: """ - Test that the lists are sorted by `stream_ordering` + Test that the `lists` are sorted by `stream_ordering` """ user1_id = self.register_user("user1", "pass") user1_tok = self.login(user1_id, "pass") @@ -1516,3 +1516,135 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase): ], channel.json_body["lists"]["foo-list"], ) + + def test_rooms_limited_initial_sync(self) -> None: + """ + Test that we mark `rooms` as `limited=True` when we saturate the `timeline_limit` + on initial sync. + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + user2_id = self.register_user("user2", "pass") + user2_tok = self.login(user2_id, "pass") + + room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok) + self.helper.send(room_id1, "activity1", tok=user2_tok) + self.helper.send(room_id1, "activity2", tok=user2_tok) + event_response3 = self.helper.send(room_id1, "activity3", tok=user2_tok) + event_pos3 = self.get_success( + self.store.get_position_for_event(event_response3["event_id"]) + ) + event_response4 = self.helper.send(room_id1, "activity4", tok=user2_tok) + event_pos4 = self.get_success( + self.store.get_position_for_event(event_response4["event_id"]) + ) + event_response5 = self.helper.send(room_id1, "activity5", tok=user2_tok) + user1_join_response = self.helper.join(room_id1, user1_id, tok=user1_tok) + + # Make the Sliding Sync request + channel = self.make_request( + "POST", + self.sync_endpoint, + { + "lists": { + "foo-list": { + "ranges": [[0, 1]], + "required_state": [], + "timeline_limit": 3, + } + } + }, + access_token=user1_tok, + ) + self.assertEqual(channel.code, 200, channel.json_body) + + # We expect to saturate the `timeline_limit` (there are more than 3 messages in the room) + self.assertEqual( + channel.json_body["rooms"][room_id1]["limited"], + True, + channel.json_body["rooms"][room_id1], + ) + # Check to make sure the latest events are returned + self.assertEqual( + [ + event["event_id"] + for event in channel.json_body["rooms"][room_id1]["timeline"] + ], + [ + event_response4["event_id"], + event_response5["event_id"], + user1_join_response["event_id"], + ], + channel.json_body["rooms"][room_id1]["timeline"], + ) + + # Check to make sure the `prev_batch` points at the right place + prev_batch_token = self.get_success( + StreamToken.from_string( + self.store, channel.json_body["rooms"][room_id1]["prev_batch"] + ) + ) + prev_batch_room_stream_token_serialized = self.get_success( + prev_batch_token.room_key.to_string(self.store) + ) + # If we use the `prev_batch` token to look backwards, we should see `event3` + # next so make sure the token encompasses it + self.assertEqual( + event_pos3.persisted_after(prev_batch_token.room_key), + False, + f"`prev_batch` token {prev_batch_room_stream_token_serialized} should be >= event_pos3={self.get_success(event_pos3.to_room_stream_token().to_string(self.store))}", + ) + # If we use the `prev_batch` token to look backwards, we shouldn't see `event4` + # anymore since it was just returned in this response. + self.assertEqual( + event_pos4.persisted_after(prev_batch_token.room_key), + True, + f"`prev_batch` token {prev_batch_room_stream_token_serialized} should be < event_pos4={self.get_success(event_pos4.to_room_stream_token().to_string(self.store))}", + ) + + def test_not_limited_initial_sync(self) -> None: + """ + Test that we mark `rooms` as `limited=False` when there are no more events to + paginate to. + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + user2_id = self.register_user("user2", "pass") + user2_tok = self.login(user2_id, "pass") + + room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok) + self.helper.send(room_id1, "activity1", tok=user2_tok) + self.helper.send(room_id1, "activity2", tok=user2_tok) + self.helper.send(room_id1, "activity3", tok=user2_tok) + self.helper.join(room_id1, user1_id, tok=user1_tok) + + # Make the Sliding Sync request + channel = self.make_request( + "POST", + self.sync_endpoint, + { + "lists": { + "foo-list": { + "ranges": [[0, 1]], + "required_state": [], + "timeline_limit": 100, + } + } + }, + access_token=user1_tok, + ) + self.assertEqual(channel.code, 200, channel.json_body) + + # The timeline should be `limited=False` because we have all of the events (no + # more to paginate to) + self.assertEqual( + channel.json_body["rooms"][room_id1]["limited"], + False, + channel.json_body["rooms"][room_id1], + ) + # We're just looking to make sure we got all of the events before hitting the `timeline_limit` + self.assertEqual( + len(channel.json_body["rooms"][room_id1]["timeline"]), + 9, + channel.json_body["rooms"][room_id1]["timeline"], + ) From 9791209a3d5c82ad9975acea06aaacb55de2326a Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 18 Jun 2024 18:10:17 -0500 Subject: [PATCH 11/20] Add more tests --- synapse/handlers/sliding_sync.py | 33 ++-- synapse/rest/client/sync.py | 10 +- synapse/types/__init__.py | 3 + tests/rest/client/test_sync.py | 274 ++++++++++++++++++++++++++++++- 4 files changed, 296 insertions(+), 24 deletions(-) diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index 90991031aa..c1b0b2153a 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -769,26 +769,29 @@ class SlidingSyncHandler: and rooms_for_user_membership_at_to_token.membership == Membership.JOIN ): newly_joined = ( - rooms_for_user_membership_at_to_token.event_pos.stream - > from_token.room_key.get_stream_pos_for_instance( - rooms_for_user_membership_at_to_token.event_pos.instance_name + rooms_for_user_membership_at_to_token.event_pos.persisted_after( + from_token.room_key ) ) + # We should return historical messages (before token range) in the + # following cases because we want clients to be able to show a basic + # screen of information: + # - Initial sync (because no `from_token` to limit us anyway) + # - When users `newly_joined` + # - TODO: For an incremental sync where we haven't sent it down this + # connection before + should_limit_timeline_to_token_range = ( + from_token is not None and not newly_joined + ) + timeline_events, new_room_key = await self.store.paginate_room_events( room_id=room_id, # We're going to paginate backwards from the `to_token` from_key=to_token.room_key, - # We should return historical messages (before token range) in the - # following cases because we want clients to be able to show a basic - # screen of information: - # - Initial sync (because no `from_token` to limit us anyway) - # - When users `newly_joined` - # - TODO: For an incremental sync where we haven't sent it down this - # connection before to_key=( from_token.room_key - if from_token is not None and not newly_joined + if should_limit_timeline_to_token_range else None ), direction=Direction.BACKWARDS, @@ -832,7 +835,7 @@ class SlidingSyncHandler: # old events in the timeline) num_live = 0 if from_token is not None: - for timeline_event in timeline_events: + for timeline_event in reversed(timeline_events): # This fields should be present for all persisted events assert timeline_event.internal_metadata.stream_ordering is not None assert timeline_event.internal_metadata.instance_name is not None @@ -843,6 +846,12 @@ class SlidingSyncHandler: ) if persisted_position.persisted_after(from_token.room_key): num_live += 1 + else: + # Since we're iterating over the timeline events in + # reverse-chronological order, we can break once we hit an event + # that's not live. In the future, we could potentially optimize + # this more with a binary search (bisect). + break prev_batch_token = prev_batch_token.copy_and_replace( StreamKeyType.ROOM, new_room_key diff --git a/synapse/rest/client/sync.py b/synapse/rest/client/sync.py index db44773824..434eaa4789 100644 --- a/synapse/rest/client/sync.py +++ b/synapse/rest/client/sync.py @@ -785,7 +785,7 @@ class SlidingSyncRestServlet(RestServlet): Response JSON:: { - "next_pos": "s58_224_0_13_10_1_1_16_0_1", + "pos": "s58_224_0_13_10_1_1_16_0_1", "lists": { "foo-list": { "count": 1337, @@ -824,7 +824,8 @@ class SlidingSyncRestServlet(RestServlet): "joined_count": 41, "invited_count": 1, "notification_count": 1, - "highlight_count": 0 + "highlight_count": 0, + "num_live": 2" }, // rooms from list "!foo:bar": { @@ -849,7 +850,8 @@ class SlidingSyncRestServlet(RestServlet): "joined_count": 4, "invited_count": 0, "notification_count": 54, - "highlight_count": 3 + "highlight_count": 3, + "num_live": 1, }, // ... 99 more items }, @@ -927,7 +929,7 @@ class SlidingSyncRestServlet(RestServlet): ) -> JsonDict: response: JsonDict = defaultdict(dict) - response["next_pos"] = await sliding_sync_result.next_pos.to_string(self.store) + response["pos"] = await sliding_sync_result.next_pos.to_string(self.store) serialized_lists = self.encode_lists(sliding_sync_result.lists) if serialized_lists: response["lists"] = serialized_lists diff --git a/synapse/types/__init__.py b/synapse/types/__init__.py index 151658df53..b52236d602 100644 --- a/synapse/types/__init__.py +++ b/synapse/types/__init__.py @@ -1078,6 +1078,9 @@ class PersistedPosition: stream: int def persisted_after(self, token: AbstractMultiWriterStreamToken) -> bool: + """ + Checks whether this position happened after the token + """ return token.get_stream_pos_for_instance(self.instance_name) < self.stream diff --git a/tests/rest/client/test_sync.py b/tests/rest/client/test_sync.py index 5b611cd096..d538716e5a 100644 --- a/tests/rest/client/test_sync.py +++ b/tests/rest/client/test_sync.py @@ -19,6 +19,7 @@ # # import json +import logging from typing import List from parameterized import parameterized, parameterized_class @@ -35,7 +36,7 @@ from synapse.api.constants import ( ) from synapse.rest.client import devices, knock, login, read_marker, receipts, room, sync from synapse.server import HomeServer -from synapse.types import JsonDict, RoomStreamToken, StreamKeyType, StreamToken +from synapse.types import JsonDict, RoomStreamToken, StreamKeyType, StreamToken, UserID from synapse.util import Clock from tests import unittest @@ -44,6 +45,8 @@ from tests.federation.transport.test_knocking import ( ) from tests.server import TimedOutException +logger = logging.getLogger(__name__) + class FilterTestCase(unittest.HomeserverTestCase): user_id = "@apple:test" @@ -1379,11 +1382,9 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase): channel.await_result(timeout_ms=200) self.assertEqual(channel.code, 200, channel.json_body) - # We expect the `next_pos` in the result to be the same as what we requested + # We expect the next `pos` in the result to be the same as what we requested # with because we weren't able to find anything new yet. - self.assertEqual( - channel.json_body["next_pos"], future_position_token_serialized - ) + self.assertEqual(channel.json_body["pos"], future_position_token_serialized) def test_filter_list(self) -> None: """ @@ -1602,7 +1603,15 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase): f"`prev_batch` token {prev_batch_room_stream_token_serialized} should be < event_pos4={self.get_success(event_pos4.to_room_stream_token().to_string(self.store))}", ) - def test_not_limited_initial_sync(self) -> None: + # With no `from_token` (initial sync), it's all historical since there is no + # "current" range + self.assertEqual( + channel.json_body["rooms"][room_id1]["num_live"], + 0, + channel.json_body["rooms"][room_id1], + ) + + def test_rooms_not_limited_initial_sync(self) -> None: """ Test that we mark `rooms` as `limited=False` when there are no more events to paginate to. @@ -1619,6 +1628,7 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase): self.helper.join(room_id1, user1_id, tok=user1_tok) # Make the Sliding Sync request + timeline_limit = 100 channel = self.make_request( "POST", self.sync_endpoint, @@ -1627,7 +1637,7 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase): "foo-list": { "ranges": [[0, 1]], "required_state": [], - "timeline_limit": 100, + "timeline_limit": timeline_limit, } } }, @@ -1642,9 +1652,257 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase): False, channel.json_body["rooms"][room_id1], ) + expected_number_of_events = 9 # We're just looking to make sure we got all of the events before hitting the `timeline_limit` self.assertEqual( len(channel.json_body["rooms"][room_id1]["timeline"]), - 9, + expected_number_of_events, channel.json_body["rooms"][room_id1]["timeline"], ) + self.assertLessEqual(expected_number_of_events, timeline_limit) + + # With no `from_token` (initial sync), it's all historical since there is no + # "live" token range. + self.assertEqual( + channel.json_body["rooms"][room_id1]["num_live"], + 0, + channel.json_body["rooms"][room_id1], + ) + + def test_rooms_incremental_sync(self) -> None: + """ + Test that `rooms` data during an incremental sync after an initial sync. + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + user2_id = self.register_user("user2", "pass") + user2_tok = self.login(user2_id, "pass") + + room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok) + self.helper.join(room_id1, user1_id, tok=user1_tok) + self.helper.send(room_id1, "activity before initial sync1", tok=user2_tok) + + # Make an initial Sliding Sync request to grab a token. This is also a sanity + # check that we can go from initial to incremental sync. + sync_params = { + "lists": { + "foo-list": { + "ranges": [[0, 1]], + "required_state": [], + "timeline_limit": 3, + } + } + } + channel = self.make_request( + "POST", + self.sync_endpoint, + sync_params, + access_token=user1_tok, + ) + self.assertEqual(channel.code, 200, channel.json_body) + next_pos = channel.json_body["pos"] + + # Send some events but don't send enough to saturate the `timeline_limit`. + # We want to later test that we only get the new events since the `next_pos` + event_response2 = self.helper.send(room_id1, "activity after2", tok=user2_tok) + event_response3 = self.helper.send(room_id1, "activity after3", tok=user2_tok) + + # Make an incremental Sliding Sync request (what we're trying to test) + channel = self.make_request( + "POST", + self.sync_endpoint + f"?pos={next_pos}", + sync_params, + access_token=user1_tok, + ) + self.assertEqual(channel.code, 200, channel.json_body) + + # We only expect to see the new events since the last sync which isn't enough to + # fill up the `timeline_limit`. + self.assertEqual( + channel.json_body["rooms"][room_id1]["limited"], + False, + f'Our `timeline_limit` was {sync_params["lists"]["foo-list"]["timeline_limit"]} ' + + f'and {len(channel.json_body["rooms"][room_id1]["timeline"])} events were returned in the timeline. ' + + str(channel.json_body["rooms"][room_id1]), + ) + # Check to make sure the latest events are returned + self.assertEqual( + [ + event["event_id"] + for event in channel.json_body["rooms"][room_id1]["timeline"] + ], + [ + event_response2["event_id"], + event_response3["event_id"], + ], + channel.json_body["rooms"][room_id1]["timeline"], + ) + + # All events are "live" + self.assertEqual( + channel.json_body["rooms"][room_id1]["num_live"], + 2, + channel.json_body["rooms"][room_id1], + ) + + def test_rooms_newly_joined_incremental_sync(self) -> None: + """ + Test that when we make an incremental sync with a `newly_joined` `rooms`, we are + able to see some historical events before the `from_token`. + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + user2_id = self.register_user("user2", "pass") + user2_tok = self.login(user2_id, "pass") + + room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok) + self.helper.send(room_id1, "activity before token1", tok=user2_tok) + event_response2 = self.helper.send( + room_id1, "activity before token2", tok=user2_tok + ) + + from_token = self.event_sources.get_current_token() + + # Join the room after the `from_token` which will make us consider this room as + # `newly_joined`. + user1_join_response = self.helper.join(room_id1, user1_id, tok=user1_tok) + + # Send some events but don't send enough to saturate the `timeline_limit`. + # We want to later test that we only get the new events since the `next_pos` + event_response3 = self.helper.send( + room_id1, "activity after token3", tok=user2_tok + ) + event_response4 = self.helper.send( + room_id1, "activity after token4", tok=user2_tok + ) + + # The `timeline_limit` is set to 4 so we can at least see one historical event + # before the `from_token`. We should see historical events because this is a + # `newly_joined` room. + timeline_limit = 4 + # Make an incremental Sliding Sync request (what we're trying to test) + channel = self.make_request( + "POST", + self.sync_endpoint + + f"?pos={self.get_success( + from_token.to_string(self.store) + )}", + { + "lists": { + "foo-list": { + "ranges": [[0, 1]], + "required_state": [], + "timeline_limit": timeline_limit, + } + } + }, + access_token=user1_tok, + ) + self.assertEqual(channel.code, 200, channel.json_body) + + # We should see the new events and the rest should be filled with historical + # events which will make us `limited=True` since there are more to paginate to. + self.assertEqual( + channel.json_body["rooms"][room_id1]["limited"], + True, + f"Our `timeline_limit` was {timeline_limit} " + + f'and {len(channel.json_body["rooms"][room_id1]["timeline"])} events were returned in the timeline. ' + + str(channel.json_body["rooms"][room_id1]), + ) + # Check to make sure that the "live" and historical events are returned + self.assertEqual( + [ + event["event_id"] + for event in channel.json_body["rooms"][room_id1]["timeline"] + ], + [ + event_response2["event_id"], + user1_join_response["event_id"], + event_response3["event_id"], + event_response4["event_id"], + ], + channel.json_body["rooms"][room_id1]["timeline"], + ) + + # Only events after the `from_token` are "live" (join, event3, event4) + self.assertEqual( + channel.json_body["rooms"][room_id1]["num_live"], + 3, + channel.json_body["rooms"][room_id1], + ) + + def test_rooms_invite_sync(self) -> None: + """ + Test that `rooms` we are invited to have some stripped `invite_state` and that + we can't see any timeline events because we haven't joined the room yet. + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + user1 = UserID.from_string(user1_id) + user2_id = self.register_user("user2", "pass") + user2_tok = self.login(user2_id, "pass") + user2 = UserID.from_string(user2_id) + + room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok) + self.helper.send(room_id1, "activity before1", tok=user2_tok) + self.helper.send(room_id1, "activity before2", tok=user2_tok) + self.helper.invite(room_id1, src=user2_id, targ=user1_id, tok=user2_tok) + self.helper.send(room_id1, "activity after3", tok=user2_tok) + self.helper.send(room_id1, "activity after4", tok=user2_tok) + + # Make the Sliding Sync request + channel = self.make_request( + "POST", + self.sync_endpoint, + { + "lists": { + "foo-list": { + "ranges": [[0, 1]], + "required_state": [], + "timeline_limit": 3, + } + } + }, + access_token=user1_tok, + ) + self.assertEqual(channel.code, 200, channel.json_body) + + # Should not see anything (except maybe the invite event) because we haven't + # joined yet (`filter_events_for_client(...)` is doing the work here) + self.assertEqual( + channel.json_body["rooms"][room_id1]["timeline"], + [], + channel.json_body["rooms"][room_id1]["timeline"], + ) + # We should have some stripped state so the potential joiner can identify the + # room (we don't care about the order). + self.assertCountEqual( + channel.json_body["rooms"][room_id1]["invite_state"], + [ + { + "content": {"creator": user2_id, "room_version": "10"}, + "sender": user2_id, + "state_key": "", + "type": "m.room.create", + }, + { + "content": {"join_rule": "public"}, + "sender": user2_id, + "state_key": "", + "type": "m.room.join_rules", + }, + { + "content": {"displayname": user2.localpart, "membership": "join"}, + "sender": user2_id, + "state_key": user2_id, + "type": "m.room.member", + }, + { + "content": {"displayname": user1.localpart, "membership": "invite"}, + "sender": user2_id, + "state_key": user1_id, + "type": "m.room.member", + }, + ], + channel.json_body["rooms"][room_id1]["invite_state"], + ) From 70ecd4d8d3646ddb1fb55b37cdf9a07612a59d2f Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 18 Jun 2024 19:38:35 -0500 Subject: [PATCH 12/20] Fix lint --- synapse/handlers/sliding_sync.py | 22 ++++++++++------------ 1 file changed, 10 insertions(+), 12 deletions(-) diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index c1b0b2153a..7a6ef1a2d9 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -774,24 +774,22 @@ class SlidingSyncHandler: ) ) - # We should return historical messages (before token range) in the - # following cases because we want clients to be able to show a basic - # screen of information: - # - Initial sync (because no `from_token` to limit us anyway) - # - When users `newly_joined` - # - TODO: For an incremental sync where we haven't sent it down this - # connection before - should_limit_timeline_to_token_range = ( - from_token is not None and not newly_joined - ) - timeline_events, new_room_key = await self.store.paginate_room_events( room_id=room_id, # We're going to paginate backwards from the `to_token` from_key=to_token.room_key, to_key=( + # Determine whether we should limit the timeline to the token range. + # + # We should return historical messages (before token range) in the + # following cases because we want clients to be able to show a basic + # screen of information: + # - Initial sync (because no `from_token` to limit us anyway) + # - When users `newly_joined` + # - TODO: For an incremental sync where we haven't sent it down this + # connection before from_token.room_key - if should_limit_timeline_to_token_range + if from_token is not None and not newly_joined else None ), direction=Direction.BACKWARDS, From 71eabe5e63fc2d637785866c6e1f471fe67d0966 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 18 Jun 2024 19:41:41 -0500 Subject: [PATCH 13/20] Make room name optional --- synapse/handlers/sliding_sync.py | 3 +-- synapse/rest/client/sync.py | 4 +++- synapse/types/handlers/__init__.py | 2 +- 3 files changed, 5 insertions(+), 4 deletions(-) diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index 7a6ef1a2d9..f2b29ce1d1 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -879,8 +879,7 @@ class SlidingSyncHandler: return SlidingSyncResult.RoomResult( # TODO: Dummy value - # TODO: Make this optional because a computed name doesn't make sense for translated cases - name="TODO", + name=None, # TODO: Dummy value avatar=None, # TODO: Dummy value diff --git a/synapse/rest/client/sync.py b/synapse/rest/client/sync.py index 434eaa4789..da28c2b3a5 100644 --- a/synapse/rest/client/sync.py +++ b/synapse/rest/client/sync.py @@ -990,7 +990,6 @@ class SlidingSyncRestServlet(RestServlet): ) serialized_rooms[room_id] = { - "name": room_result.name, "required_state": serialized_required_state, "timeline": serialized_timeline, "prev_batch": await room_result.prev_batch.to_string(self.store), @@ -1002,6 +1001,9 @@ class SlidingSyncRestServlet(RestServlet): "num_live": room_result.num_live, } + if room_result.name: + serialized_rooms[room_id]["name"] = room_result.name + if room_result.avatar: serialized_rooms[room_id]["avatar"] = room_result.avatar diff --git a/synapse/types/handlers/__init__.py b/synapse/types/handlers/__init__.py index 04b0ab972b..1b544456a6 100644 --- a/synapse/types/handlers/__init__.py +++ b/synapse/types/handlers/__init__.py @@ -186,7 +186,7 @@ class SlidingSyncResult: (with potentially other old events in the timeline). """ - name: str + name: Optional[str] avatar: Optional[str] heroes: Optional[List[EventBase]] initial: bool From 39b4f10533fded08647c198c80e6b185bc8558e0 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 18 Jun 2024 19:55:12 -0500 Subject: [PATCH 14/20] Update comments --- synapse/handlers/sliding_sync.py | 14 +++++++++----- tests/rest/client/test_sync.py | 2 +- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index f2b29ce1d1..cb5274d495 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -756,10 +756,10 @@ class SlidingSyncHandler: # Assemble the list of timeline events timeline_events: List[EventBase] = [] limited = False - # We want to use `to_token` (vs `from_token`) because we look backwards from the - # `to_token` up to the `timeline_limit` and we might not reach `from_token` - # before we hit the limit. We will update the room stream position once we've - # fetched the events. + # We want to start off using the `to_token` (vs `from_token`) because we look + # backwards from the `to_token` up to the `timeline_limit` and we might not + # reach the `from_token` before we hit the limit. We will update the room stream + # position once we've fetched the events. prev_batch_token = to_token if room_sync_config.timeline_limit > 0: newly_joined = False @@ -803,6 +803,7 @@ class SlidingSyncHandler: # most recent). timeline_events.reverse() + # Make sure we don't expose any events that the client shouldn't see timeline_events = await filter_events_for_client( self.storage_controllers, user.to_string(), @@ -851,11 +852,14 @@ class SlidingSyncHandler: # this more with a binary search (bisect). break + # Update the `prev_batch_token` to point to the position that allows us to + # keep paginating backwards from the oldest event we return in the timeline. prev_batch_token = prev_batch_token.copy_and_replace( StreamKeyType.ROOM, new_room_key ) - # Figure out any stripped state events for invite/knocks + # Figure out any stripped state events for invite/knocks. This allows the + # potential joiner to identify the room. stripped_state: List[JsonDict] = [] if rooms_for_user_membership_at_to_token.membership in { Membership.INVITE, diff --git a/tests/rest/client/test_sync.py b/tests/rest/client/test_sync.py index d538716e5a..838ff6e2b4 100644 --- a/tests/rest/client/test_sync.py +++ b/tests/rest/client/test_sync.py @@ -1874,7 +1874,7 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase): [], channel.json_body["rooms"][room_id1]["timeline"], ) - # We should have some stripped state so the potential joiner can identify the + # We should have some `stripped_state` so the potential joiner can identify the # room (we don't care about the order). self.assertCountEqual( channel.json_body["rooms"][room_id1]["invite_state"], From 9883b0f63f87cf34b50e28390a0fa29d8e014443 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 18 Jun 2024 21:00:26 -0500 Subject: [PATCH 15/20] Add bundled aggregations --- synapse/handlers/sliding_sync.py | 16 +++++++++++++++- synapse/rest/client/sync.py | 5 ++--- synapse/types/handlers/__init__.py | 10 ++++++++-- 3 files changed, 25 insertions(+), 6 deletions(-) diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index cb5274d495..e418a6e074 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -114,6 +114,7 @@ class SlidingSyncHandler: self.auth_blocking = hs.get_auth_blocking() self.notifier = hs.get_notifier() self.event_sources = hs.get_event_sources() + self.relations_handler = hs.get_relations_handler() self.rooms_to_exclude_globally = hs.config.server.rooms_to_exclude_from_sync async def wait_for_sync_for_user( @@ -881,6 +882,18 @@ class SlidingSyncHandler: stripped_state.append(strip_event(invite_or_knock_event)) + # TODO: Handle timeline gaps (`get_timeline_gaps()`) + + # If the timeline is `limited=True`, the client does not have all events + # necessary to calculate aggregations themselves. + bundled_aggregations = None + if limited: + bundled_aggregations = ( + await self.relations_handler.get_bundled_aggregations( + timeline_events, user.to_string() + ) + ) + return SlidingSyncResult.RoomResult( # TODO: Dummy value name=None, @@ -895,7 +908,8 @@ class SlidingSyncHandler: initial=True, # TODO: Dummy value required_state=[], - timeline=timeline_events, + timeline_events=timeline_events, + bundled_aggregations=bundled_aggregations, # TODO: Dummy value is_dm=False, stripped_state=stripped_state, diff --git a/synapse/rest/client/sync.py b/synapse/rest/client/sync.py index da28c2b3a5..4333ee8c2b 100644 --- a/synapse/rest/client/sync.py +++ b/synapse/rest/client/sync.py @@ -976,11 +976,10 @@ class SlidingSyncRestServlet(RestServlet): serialized_rooms = {} for room_id, room_result in rooms.items(): serialized_timeline = await self.event_serializer.serialize_events( - room_result.timeline, + room_result.timeline_events, time_now, config=serialize_options, - # TODO - # bundle_aggregations=room.timeline.bundled_aggregations, + bundle_aggregations=room_result.bundled_aggregations, ) serialized_required_state = await self.event_serializer.serialize_events( diff --git a/synapse/types/handlers/__init__.py b/synapse/types/handlers/__init__.py index 1b544456a6..1ba5ea55c1 100644 --- a/synapse/types/handlers/__init__.py +++ b/synapse/types/handlers/__init__.py @@ -31,6 +31,7 @@ else: from pydantic import Extra from synapse.events import EventBase +from synapse.handlers.relations import BundledAggregations from synapse.types import JsonDict, JsonMapping, StreamToken, UserID from synapse.types.rest.client import SlidingSyncBody @@ -159,7 +160,11 @@ class SlidingSyncResult: entirely and NOT send "initial":false as this is wasteful on bandwidth. The absence of this flag means 'false'. required_state: The current state of the room - timeline: Latest events in the room. The last event is the most recent + timeline: Latest events in the room. The last event is the most recent. + bundled_aggregations: A mapping of event ID to the bundled aggregations for + the timeline events above. This allows clients to show accurate reaction + counts (or edits, threads), even if some of the reaction events were skipped + over in a gappy sync. is_dm: Flag to specify whether the room is a direct-message room (most likely between two people). stripped_state: Stripped state events (for rooms where the usre is @@ -191,7 +196,8 @@ class SlidingSyncResult: heroes: Optional[List[EventBase]] initial: bool required_state: List[EventBase] - timeline: List[EventBase] + timeline_events: List[EventBase] + bundled_aggregations: Optional[Dict[str, BundledAggregations]] is_dm: bool stripped_state: Optional[List[JsonDict]] prev_batch: StreamToken From 1c06153a0d3c24039a70b0c770947874bc05c246 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 18 Jun 2024 21:22:40 -0500 Subject: [PATCH 16/20] Determine limited before filtering --- synapse/handlers/sliding_sync.py | 27 ++++++++++++++++----------- tests/rest/client/test_sync.py | 8 ++++++++ 2 files changed, 24 insertions(+), 11 deletions(-) diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index e418a6e074..fe369949c5 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -804,17 +804,9 @@ class SlidingSyncHandler: # most recent). timeline_events.reverse() - # Make sure we don't expose any events that the client shouldn't see - timeline_events = await filter_events_for_client( - self.storage_controllers, - user.to_string(), - timeline_events, - is_peeking=rooms_for_user_membership_at_to_token.membership - != Membership.JOIN, - filter_send_to_client=True, - ) - - # Determine our `limited` status + # Determine our `limited` status based on the timeline. We do this before + # filtering the events so we can accurately determine if there is more to + # paginate even if we filter out some/all events. if len(timeline_events) > room_sync_config.timeline_limit: limited = True # Get rid of that extra "+ 1" event because we only used it to determine @@ -825,6 +817,19 @@ class SlidingSyncHandler: stream=timeline_events[0].internal_metadata.stream_ordering - 1 ) + # TODO: Does `newly_joined` affect `limited`? It does in sync v2 but I fail + # to understand why. + + # Make sure we don't expose any events that the client shouldn't see + timeline_events = await filter_events_for_client( + self.storage_controllers, + user.to_string(), + timeline_events, + is_peeking=rooms_for_user_membership_at_to_token.membership + != Membership.JOIN, + filter_send_to_client=True, + ) + # Determine how many "live" events we have (events within the given token range). # # This is mostly useful to determine whether a given @mention event should diff --git a/tests/rest/client/test_sync.py b/tests/rest/client/test_sync.py index 838ff6e2b4..df85c94bd5 100644 --- a/tests/rest/client/test_sync.py +++ b/tests/rest/client/test_sync.py @@ -1874,6 +1874,13 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase): [], channel.json_body["rooms"][room_id1]["timeline"], ) + # Even though we don't get any timeline events because they are filtered out, + # there is still more to paginate + self.assertEqual( + channel.json_body["rooms"][room_id1]["limited"], + True, + channel.json_body["rooms"][room_id1], + ) # We should have some `stripped_state` so the potential joiner can identify the # room (we don't care about the order). self.assertCountEqual( @@ -1906,3 +1913,4 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase): ], channel.json_body["rooms"][room_id1]["invite_state"], ) + From c81f3006a5e768e0e3f099dd7e001a7f1768b2c6 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 19 Jun 2024 12:54:39 -0500 Subject: [PATCH 17/20] Add better support for leave/ban --- synapse/handlers/sliding_sync.py | 48 ++-- synapse/storage/databases/main/stream.py | 20 ++ tests/rest/client/test_sync.py | 350 ++++++++++++++++++++++- tests/rest/client/utils.py | 4 +- 4 files changed, 399 insertions(+), 23 deletions(-) diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index fe369949c5..0d2f4dbfff 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -775,24 +775,36 @@ class SlidingSyncHandler: ) ) + # We're going to paginate backwards from the `to_token` + from_bound = to_token.room_key + # People shouldn't see past their leave/ban event + if rooms_for_user_membership_at_to_token.membership in ( + Membership.LEAVE, + Membership.BAN, + ): + from_bound = ( + rooms_for_user_membership_at_to_token.event_pos.to_room_stream_token() + ) + + # Determine whether we should limit the timeline to the token range. + # + # We should return historical messages (before token range) in the + # following cases because we want clients to be able to show a basic + # screen of information: + # - Initial sync (because no `from_token` to limit us anyway) + # - When users `newly_joined` + # - TODO: For an incremental sync where we haven't sent it down this + # connection before + to_bound = ( + from_token.room_key + if from_token is not None and not newly_joined + else None + ) + timeline_events, new_room_key = await self.store.paginate_room_events( room_id=room_id, - # We're going to paginate backwards from the `to_token` - from_key=to_token.room_key, - to_key=( - # Determine whether we should limit the timeline to the token range. - # - # We should return historical messages (before token range) in the - # following cases because we want clients to be able to show a basic - # screen of information: - # - Initial sync (because no `from_token` to limit us anyway) - # - When users `newly_joined` - # - TODO: For an incremental sync where we haven't sent it down this - # connection before - from_token.room_key - if from_token is not None and not newly_joined - else None - ), + from_key=from_bound, + to_key=to_bound, direction=Direction.BACKWARDS, # We add one so we can determine if there are enough events to saturate # the limit or not (see `limited`) @@ -867,10 +879,10 @@ class SlidingSyncHandler: # Figure out any stripped state events for invite/knocks. This allows the # potential joiner to identify the room. stripped_state: List[JsonDict] = [] - if rooms_for_user_membership_at_to_token.membership in { + if rooms_for_user_membership_at_to_token.membership in ( Membership.INVITE, Membership.KNOCK, - }: + ): invite_or_knock_event = await self.store.get_event( rooms_for_user_membership_at_to_token.event_id ) diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py index ff0d723684..c21e69ecda 100644 --- a/synapse/storage/databases/main/stream.py +++ b/synapse/storage/databases/main/stream.py @@ -1551,6 +1551,9 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): ) -> Tuple[List[EventBase], RoomStreamToken]: """Returns list of events before or after a given token. + When Direction.FORWARDS: from_key < x <= to_key + When Direction.BACKWARDS: from_key >= x > to_key + Args: room_id from_key: The token used to stream from @@ -1567,6 +1570,23 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): and `to_key`). """ + # We can bail early if we're looking forwards, and our `to_key` is already + # before our `from_key`. + if ( + direction == Direction.FORWARDS + and to_key is not None + and to_key.is_before_or_eq(from_key) + ): + return [], from_key + # Or vice-versa, if we're looking backwards and our `from_key` is already before + # our `to_key`. + elif ( + direction == Direction.BACKWARDS + and to_key is not None + and from_key.is_before_or_eq(to_key) + ): + return [], from_key + rows, token = await self.db_pool.runInteraction( "paginate_room_events", self._paginate_room_events_txn, diff --git a/tests/rest/client/test_sync.py b/tests/rest/client/test_sync.py index df85c94bd5..32542a64e8 100644 --- a/tests/rest/client/test_sync.py +++ b/tests/rest/client/test_sync.py @@ -31,6 +31,7 @@ from synapse.api.constants import ( AccountDataTypes, EventContentFields, EventTypes, + HistoryVisibility, ReceiptTypes, RelationTypes, ) @@ -1831,10 +1832,11 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase): channel.json_body["rooms"][room_id1], ) - def test_rooms_invite_sync(self) -> None: + def test_rooms_invite_shared_history_initial_sync(self) -> None: """ Test that `rooms` we are invited to have some stripped `invite_state` and that - we can't see any timeline events because we haven't joined the room yet. + we can't see any timeline events because the history visiblity is `shared` and + we haven't joined the room yet. """ user1_id = self.register_user("user1", "pass") user1_tok = self.login(user1_id, "pass") @@ -1844,6 +1846,16 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase): user2 = UserID.from_string(user2_id) room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok) + # Ensure we're testing with a room with `shared` history visibility which means + # history visible until you actually join the room. + history_visibility_response = self.helper.get_state( + room_id1, EventTypes.RoomHistoryVisibility, tok=user2_tok + ) + self.assertEqual( + history_visibility_response.get("history_visibility"), + HistoryVisibility.SHARED, + ) + self.helper.send(room_id1, "activity before1", tok=user2_tok) self.helper.send(room_id1, "activity before2", tok=user2_tok) self.helper.invite(room_id1, src=user2_id, targ=user1_id, tok=user2_tok) @@ -1868,12 +1880,21 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase): self.assertEqual(channel.code, 200, channel.json_body) # Should not see anything (except maybe the invite event) because we haven't - # joined yet (`filter_events_for_client(...)` is doing the work here) + # joined yet (history visibility is `shared`) (`filter_events_for_client(...)` + # is doing the work here) self.assertEqual( channel.json_body["rooms"][room_id1]["timeline"], [], channel.json_body["rooms"][room_id1]["timeline"], ) + # No "live" events in a initial sync (no `from_token` to define the "live" + # range) and no events returned in the timeline anyway so nothing could be + # "live". + self.assertEqual( + channel.json_body["rooms"][room_id1]["num_live"], + 0, + channel.json_body["rooms"][room_id1], + ) # Even though we don't get any timeline events because they are filtered out, # there is still more to paginate self.assertEqual( @@ -1914,3 +1935,326 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase): channel.json_body["rooms"][room_id1]["invite_state"], ) + + def test_rooms_invite_world_readable_history_initial_sync(self) -> None: + """ + Test that `rooms` we are invited to have some stripped `invite_state` and that + we can't see any timeline events because the history visiblity is `shared` and + we haven't joined the room yet. + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + user1 = UserID.from_string(user1_id) + user2_id = self.register_user("user2", "pass") + user2_tok = self.login(user2_id, "pass") + user2 = UserID.from_string(user2_id) + + room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok, + extra_content={ + "preset": "public_chat", + "initial_state": [ + { + "content": {"history_visibility": HistoryVisibility.WORLD_READABLE}, + "state_key": "", + "type": EventTypes.RoomHistoryVisibility, + } + ], + },) + # Ensure we're testing with a room with `world_readable` history visibility + # which means events are visible to anyone even without membership. + history_visibility_response = self.helper.get_state( + room_id1, EventTypes.RoomHistoryVisibility, tok=user2_tok + ) + self.assertEqual( + history_visibility_response.get("history_visibility"), + HistoryVisibility.WORLD_READABLE, + ) + + self.helper.send(room_id1, "activity before1", tok=user2_tok) + event_response2 = self.helper.send(room_id1, "activity before2", tok=user2_tok) + use1_invite_response = self.helper.invite(room_id1, src=user2_id, targ=user1_id, tok=user2_tok) + event_response3 = self.helper.send(room_id1, "activity after3", tok=user2_tok) + event_response4 = self.helper.send(room_id1, "activity after4", tok=user2_tok) + + # Make the Sliding Sync request + channel = self.make_request( + "POST", + self.sync_endpoint, + { + "lists": { + "foo-list": { + "ranges": [[0, 1]], + "required_state": [], + # Large enough to see the latest events and before the invite + "timeline_limit": 4, + } + } + }, + access_token=user1_tok, + ) + self.assertEqual(channel.code, 200, channel.json_body) + + # Should see the last 4 events in the room + self.assertEqual( + [ + event["event_id"] + for event in channel.json_body["rooms"][room_id1]["timeline"] + ], + [ + event_response2["event_id"], + use1_invite_response["event_id"], + event_response3["event_id"], + event_response4["event_id"], + ], + channel.json_body["rooms"][room_id1]["timeline"], + ) + # No "live" events in a initial sync (no `from_token` to define the "live" + # range) + self.assertEqual( + channel.json_body["rooms"][room_id1]["num_live"], + 0, + channel.json_body["rooms"][room_id1], + ) + # There is still more to paginate + self.assertEqual( + channel.json_body["rooms"][room_id1]["limited"], + True, + channel.json_body["rooms"][room_id1], + ) + # We should have some `stripped_state` so the potential joiner can identify the + # room (we don't care about the order). + self.assertCountEqual( + channel.json_body["rooms"][room_id1]["invite_state"], + [ + { + "content": {"creator": user2_id, "room_version": "10"}, + "sender": user2_id, + "state_key": "", + "type": "m.room.create", + }, + { + "content": {"join_rule": "public"}, + "sender": user2_id, + "state_key": "", + "type": "m.room.join_rules", + }, + { + "content": {"displayname": user2.localpart, "membership": "join"}, + "sender": user2_id, + "state_key": user2_id, + "type": "m.room.member", + }, + { + "content": {"displayname": user1.localpart, "membership": "invite"}, + "sender": user2_id, + "state_key": user1_id, + "type": "m.room.member", + }, + ], + channel.json_body["rooms"][room_id1]["invite_state"], + ) + + def test_rooms_ban_initial_sync(self) -> None: + """ + Test that `rooms` we are banned from in an intial sync only allows us to see + timeline events up to the ban event. + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + user2_id = self.register_user("user2", "pass") + user2_tok = self.login(user2_id, "pass") + + room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok) + self.helper.send(room_id1, "activity before1", tok=user2_tok) + self.helper.send(room_id1, "activity before2", tok=user2_tok) + self.helper.join(room_id1, user1_id, tok=user1_tok) + + event_response3 = self.helper.send(room_id1, "activity after3", tok=user2_tok) + event_response4 = self.helper.send(room_id1, "activity after4", tok=user2_tok) + user1_ban_response = self.helper.ban( + room_id1, src=user2_id, targ=user1_id, tok=user2_tok + ) + + self.helper.send(room_id1, "activity after5", tok=user2_tok) + self.helper.send(room_id1, "activity after6", tok=user2_tok) + + # Make the Sliding Sync request + channel = self.make_request( + "POST", + self.sync_endpoint, + { + "lists": { + "foo-list": { + "ranges": [[0, 1]], + "required_state": [], + "timeline_limit": 3, + } + } + }, + access_token=user1_tok, + ) + self.assertEqual(channel.code, 200, channel.json_body) + + # We should see events before the ban but not after + self.assertEqual( + [ + event["event_id"] + for event in channel.json_body["rooms"][room_id1]["timeline"] + ], + [ + event_response3["event_id"], + event_response4["event_id"], + user1_ban_response["event_id"], + ], + channel.json_body["rooms"][room_id1]["timeline"], + ) + # No "live" events in a initial sync (no `from_token` to define the "live" + # range) + self.assertEqual( + channel.json_body["rooms"][room_id1]["num_live"], + 0, + channel.json_body["rooms"][room_id1], + ) + # There are more events to paginate to + self.assertEqual( + channel.json_body["rooms"][room_id1]["limited"], + True, + channel.json_body["rooms"][room_id1], + ) + + def test_rooms_ban_incremental_sync1(self) -> None: + """ + Test that `rooms` we are banned from during the next incremental sync only + allows us to see timeline events up to the ban event. + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + user2_id = self.register_user("user2", "pass") + user2_tok = self.login(user2_id, "pass") + + room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok) + self.helper.send(room_id1, "activity before1", tok=user2_tok) + self.helper.send(room_id1, "activity before2", tok=user2_tok) + self.helper.join(room_id1, user1_id, tok=user1_tok) + + from_token = self.event_sources.get_current_token() + + event_response3 = self.helper.send(room_id1, "activity after3", tok=user2_tok) + event_response4 = self.helper.send(room_id1, "activity after4", tok=user2_tok) + # The ban is within the token range (between the `from_token` and the sliding + # sync request) + user1_ban_response = self.helper.ban( + room_id1, src=user2_id, targ=user1_id, tok=user2_tok + ) + + self.helper.send(room_id1, "activity after5", tok=user2_tok) + self.helper.send(room_id1, "activity after6", tok=user2_tok) + + # Make the Sliding Sync request + channel = self.make_request( + "POST", + self.sync_endpoint + + f"?pos={self.get_success( + from_token.to_string(self.store) + )}", + { + "lists": { + "foo-list": { + "ranges": [[0, 1]], + "required_state": [], + "timeline_limit": 4, + } + } + }, + access_token=user1_tok, + ) + self.assertEqual(channel.code, 200, channel.json_body) + + # We should see events before the ban but not after + self.assertEqual( + [ + event["event_id"] + for event in channel.json_body["rooms"][room_id1]["timeline"] + ], + [ + event_response3["event_id"], + event_response4["event_id"], + user1_ban_response["event_id"], + ], + channel.json_body["rooms"][room_id1]["timeline"], + ) + # All live events in the incremental sync + self.assertEqual( + channel.json_body["rooms"][room_id1]["num_live"], + 3, + channel.json_body["rooms"][room_id1], + ) + # There aren't anymore events to paginate to in this range + self.assertEqual( + channel.json_body["rooms"][room_id1]["limited"], + False, + channel.json_body["rooms"][room_id1], + ) + + def test_rooms_ban_incremental_sync2(self) -> None: + """ + Test that `rooms` we are banned from before the incremental sync doesn't return + any events in the timeline. + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + user2_id = self.register_user("user2", "pass") + user2_tok = self.login(user2_id, "pass") + + room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok) + self.helper.send(room_id1, "activity before1", tok=user2_tok) + self.helper.join(room_id1, user1_id, tok=user1_tok) + + self.helper.send(room_id1, "activity after2", tok=user2_tok) + # The ban is before we get our `from_token` + self.helper.ban(room_id1, src=user2_id, targ=user1_id, tok=user2_tok) + + self.helper.send(room_id1, "activity after3", tok=user2_tok) + + from_token = self.event_sources.get_current_token() + + self.helper.send(room_id1, "activity after4", tok=user2_tok) + + # Make the Sliding Sync request + channel = self.make_request( + "POST", + self.sync_endpoint + + f"?pos={self.get_success( + from_token.to_string(self.store) + )}", + { + "lists": { + "foo-list": { + "ranges": [[0, 1]], + "required_state": [], + "timeline_limit": 4, + } + } + }, + access_token=user1_tok, + ) + self.assertEqual(channel.code, 200, channel.json_body) + + # Nothing to see for this banned user in the room in the token range + self.assertEqual( + channel.json_body["rooms"][room_id1]["timeline"], + [], + channel.json_body["rooms"][room_id1]["timeline"], + ) + # No events returned in the timeline so nothing is "live" + self.assertEqual( + channel.json_body["rooms"][room_id1]["num_live"], + 0, + channel.json_body["rooms"][room_id1], + ) + # There aren't anymore events to paginate to in this range + self.assertEqual( + channel.json_body["rooms"][room_id1]["limited"], + False, + channel.json_body["rooms"][room_id1], + ) diff --git a/tests/rest/client/utils.py b/tests/rest/client/utils.py index f0ba40a1f1..e43140720d 100644 --- a/tests/rest/client/utils.py +++ b/tests/rest/client/utils.py @@ -261,9 +261,9 @@ class RestHelper: targ: str, expect_code: int = HTTPStatus.OK, tok: Optional[str] = None, - ) -> None: + ) -> JsonDict: """A convenience helper: `change_membership` with `membership` preset to "ban".""" - self.change_membership( + return self.change_membership( room=room, src=src, targ=targ, From d801db0d96ef53e1eaa42c7540f744a56de90b59 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 19 Jun 2024 13:24:01 -0500 Subject: [PATCH 18/20] Fix lints --- tests/rest/client/test_sync.py | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/tests/rest/client/test_sync.py b/tests/rest/client/test_sync.py index 32542a64e8..6db6f855ba 100644 --- a/tests/rest/client/test_sync.py +++ b/tests/rest/client/test_sync.py @@ -1935,7 +1935,6 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase): channel.json_body["rooms"][room_id1]["invite_state"], ) - def test_rooms_invite_world_readable_history_initial_sync(self) -> None: """ Test that `rooms` we are invited to have some stripped `invite_state` and that @@ -1949,17 +1948,22 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase): user2_tok = self.login(user2_id, "pass") user2 = UserID.from_string(user2_id) - room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok, + room_id1 = self.helper.create_room_as( + user2_id, + tok=user2_tok, extra_content={ "preset": "public_chat", "initial_state": [ { - "content": {"history_visibility": HistoryVisibility.WORLD_READABLE}, + "content": { + "history_visibility": HistoryVisibility.WORLD_READABLE + }, "state_key": "", "type": EventTypes.RoomHistoryVisibility, } ], - },) + }, + ) # Ensure we're testing with a room with `world_readable` history visibility # which means events are visible to anyone even without membership. history_visibility_response = self.helper.get_state( @@ -1972,7 +1976,9 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase): self.helper.send(room_id1, "activity before1", tok=user2_tok) event_response2 = self.helper.send(room_id1, "activity before2", tok=user2_tok) - use1_invite_response = self.helper.invite(room_id1, src=user2_id, targ=user1_id, tok=user2_tok) + use1_invite_response = self.helper.invite( + room_id1, src=user2_id, targ=user1_id, tok=user2_tok + ) event_response3 = self.helper.send(room_id1, "activity after3", tok=user2_tok) event_response4 = self.helper.send(room_id1, "activity after4", tok=user2_tok) From 884b44801253c6b97ae07f958744c8443649153e Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 19 Jun 2024 13:50:28 -0500 Subject: [PATCH 19/20] Update some wording --- synapse/handlers/sliding_sync.py | 6 +++--- tests/rest/client/test_sync.py | 12 ++++++------ 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index 0d2f4dbfff..3e49054e43 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -264,7 +264,7 @@ class SlidingSyncHandler: ) ) - # Update the relevant room map + # Take the superset of the `RoomSyncConfig` for each room for room_id in sliced_room_ids: if relevant_room_map.get(room_id) is not None: # Take the highest timeline limit @@ -739,7 +739,7 @@ class SlidingSyncHandler: to_token: StreamToken, ) -> SlidingSyncResult.RoomResult: """ - Fetch room data for a room. + Fetch room data for the sync response. We fetch data according to the token range (> `from_token` and <= `to_token`). @@ -760,7 +760,7 @@ class SlidingSyncHandler: # We want to start off using the `to_token` (vs `from_token`) because we look # backwards from the `to_token` up to the `timeline_limit` and we might not # reach the `from_token` before we hit the limit. We will update the room stream - # position once we've fetched the events. + # position once we've fetched the events to point to the earliest event fetched. prev_batch_token = to_token if room_sync_config.timeline_limit > 0: newly_joined = False diff --git a/tests/rest/client/test_sync.py b/tests/rest/client/test_sync.py index 3213059a78..a55804c96c 100644 --- a/tests/rest/client/test_sync.py +++ b/tests/rest/client/test_sync.py @@ -1607,7 +1607,7 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase): ) # With no `from_token` (initial sync), it's all historical since there is no - # "current" range + # "live" range self.assertEqual( channel.json_body["rooms"][room_id1]["num_live"], 0, @@ -1674,7 +1674,7 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase): def test_rooms_incremental_sync(self) -> None: """ - Test that `rooms` data during an incremental sync after an initial sync. + Test `rooms` data during an incremental sync after an initial sync. """ user1_id = self.register_user("user1", "pass") user1_tok = self.login(user1_id, "pass") @@ -1889,7 +1889,7 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase): [], channel.json_body["rooms"][room_id1]["timeline"], ) - # No "live" events in a initial sync (no `from_token` to define the "live" + # No "live" events in an initial sync (no `from_token` to define the "live" # range) and no events returned in the timeline anyway so nothing could be # "live". self.assertEqual( @@ -2016,7 +2016,7 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase): ], channel.json_body["rooms"][room_id1]["timeline"], ) - # No "live" events in a initial sync (no `from_token` to define the "live" + # No "live" events in an initial sync (no `from_token` to define the "live" # range) self.assertEqual( channel.json_body["rooms"][room_id1]["num_live"], @@ -2116,7 +2116,7 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase): ], channel.json_body["rooms"][room_id1]["timeline"], ) - # No "live" events in a initial sync (no `from_token` to define the "live" + # No "live" events in an initial sync (no `from_token` to define the "live" # range) self.assertEqual( channel.json_body["rooms"][room_id1]["num_live"], @@ -2206,7 +2206,7 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase): def test_rooms_ban_incremental_sync2(self) -> None: """ - Test that `rooms` we are banned from before the incremental sync doesn't return + Test that `rooms` we are banned from before the incremental sync don't return any events in the timeline. """ user1_id = self.register_user("user1", "pass") From 0eb029472e5410b780156f12db13434b003f42ae Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 20 Jun 2024 14:34:10 -0500 Subject: [PATCH 20/20] Remove unused `IncludeOldRooms` class --- synapse/types/rest/client/__init__.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/synapse/types/rest/client/__init__.py b/synapse/types/rest/client/__init__.py index 25fbd772f6..5d453769b5 100644 --- a/synapse/types/rest/client/__init__.py +++ b/synapse/types/rest/client/__init__.py @@ -154,10 +154,6 @@ class SlidingSyncBody(RequestBodyModel): (Max 1000 messages) """ - class IncludeOldRooms(RequestBodyModel): - timeline_limit: StrictInt - required_state: List[Tuple[StrictStr, StrictStr]] - required_state: List[Tuple[StrictStr, StrictStr]] # mypy workaround via https://github.com/pydantic/pydantic/issues/156#issuecomment-1130883884 if TYPE_CHECKING: