From 5389374ef8a9222bd821c88862ff654e42ef83a4 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Mon, 9 Sep 2024 04:36:22 -0500 Subject: [PATCH] Sliding Sync: Speed up incremental sync by avoiding extra work (#17665) Speed up incremental sync by avoiding extra work. We first look at the state delta changes and only fetch and calculate further derived things if they have changed. --- changelog.d/17665.misc | 1 + synapse/handlers/sliding_sync/__init__.py | 151 ++++++-- synapse/rest/client/sync.py | 8 +- synapse/types/handlers/sliding_sync.py | 10 +- .../client/sliding_sync/test_rooms_meta.py | 349 +++++++++++++++++- 5 files changed, 472 insertions(+), 47 deletions(-) create mode 100644 changelog.d/17665.misc diff --git a/changelog.d/17665.misc b/changelog.d/17665.misc new file mode 100644 index 0000000000..28921087a6 --- /dev/null +++ b/changelog.d/17665.misc @@ -0,0 +1 @@ +Speed up incremental Sliding Sync requests by avoiding extra work. diff --git a/synapse/handlers/sliding_sync/__init__.py b/synapse/handlers/sliding_sync/__init__.py index 7f084cb916..444cc32f36 100644 --- a/synapse/handlers/sliding_sync/__init__.py +++ b/synapse/handlers/sliding_sync/__init__.py @@ -44,6 +44,7 @@ from synapse.storage.roommember import ( ) from synapse.types import ( JsonDict, + MutableStateMap, PersistedEventPosition, Requester, RoomStreamToken, @@ -753,26 +754,78 @@ class SlidingSyncHandler: # indicate to the client that a state reset happened. Perhaps we should indicate # this by setting `initial: True` and empty `required_state`. - # Check whether the room has a name set - name_state_ids = await self.get_current_state_ids_at( - room_id=room_id, - room_membership_for_user_at_to_token=room_membership_for_user_at_to_token, - state_filter=StateFilter.from_types([(EventTypes.Name, "")]), - to_token=to_token, - ) - name_event_id = name_state_ids.get((EventTypes.Name, "")) - - room_membership_summary: Mapping[str, MemberSummary] - empty_membership_summary = MemberSummary([], 0) - if room_membership_for_user_at_to_token.membership in ( - Membership.LEAVE, - Membership.BAN, - ): - # TODO: Figure out how to get the membership summary for left/banned rooms - room_membership_summary = {} + # Get the changes to current state in the token range from the + # `current_state_delta_stream` table. + # + # For incremental syncs, we can do this first to determine if something relevant + # has changed and strategically avoid fetching other costly things. + room_state_delta_id_map: MutableStateMap[str] = {} + name_event_id: Optional[str] = None + membership_changed = False + name_changed = False + avatar_changed = False + if initial: + # Check whether the room has a name set + name_state_ids = await self.get_current_state_ids_at( + room_id=room_id, + room_membership_for_user_at_to_token=room_membership_for_user_at_to_token, + state_filter=StateFilter.from_types([(EventTypes.Name, "")]), + to_token=to_token, + ) + name_event_id = name_state_ids.get((EventTypes.Name, "")) else: - room_membership_summary = await self.store.get_room_summary(room_id) - # TODO: Reverse/rewind back to the `to_token` + assert from_bound is not None + + # TODO: Limit the number of state events we're about to send down + # the room, if its too many we should change this to an + # `initial=True`? + deltas = await self.store.get_current_state_deltas_for_room( + room_id=room_id, + from_token=from_bound, + to_token=to_token.room_key, + ) + for delta in deltas: + # TODO: Handle state resets where event_id is None + if delta.event_id is not None: + room_state_delta_id_map[(delta.event_type, delta.state_key)] = ( + delta.event_id + ) + + if delta.event_type == EventTypes.Member: + membership_changed = True + elif delta.event_type == EventTypes.Name and delta.state_key == "": + name_changed = True + elif ( + delta.event_type == EventTypes.RoomAvatar and delta.state_key == "" + ): + avatar_changed = True + + room_membership_summary: Optional[Mapping[str, MemberSummary]] = None + empty_membership_summary = MemberSummary([], 0) + # We need the room summary for: + # - Always for initial syncs (or the first time we send down the room) + # - When the room has no name, we need `heroes` + # - When the membership has changed so we need to give updated `heroes` and + # `joined_count`/`invited_count`. + # + # Ideally, instead of just looking at `name_changed`, we'd check if the room + # name is not set but this is a good enough approximation that saves us from + # having to pull out the full event. This just means, we're generating the + # summary whenever the room name changes instead of only when it changes to + # `None`. + if initial or name_changed or membership_changed: + # We can't trace the function directly because it's cached and the `@cached` + # decorator doesn't mix with `@trace` yet. + with start_active_span("get_room_summary"): + if room_membership_for_user_at_to_token.membership in ( + Membership.LEAVE, + Membership.BAN, + ): + # TODO: Figure out how to get the membership summary for left/banned rooms + room_membership_summary = {} + else: + room_membership_summary = await self.store.get_room_summary(room_id) + # TODO: Reverse/rewind back to the `to_token` # `heroes` are required if the room name is not set. # @@ -786,7 +839,12 @@ class SlidingSyncHandler: # TODO: Should we also check for `EventTypes.CanonicalAlias` # (`m.room.canonical_alias`) as a fallback for the room name? see # https://github.com/matrix-org/matrix-spec-proposals/pull/3575#discussion_r1671260153 - if name_event_id is None: + # + # We need to fetch the `heroes` if the room name is not set. But we only need to + # get them on initial syncs (or the first time we send down the room) or if the + # membership has changed which may change the heroes. + if name_event_id is None and (initial or (not initial and membership_changed)): + assert room_membership_summary is not None hero_user_ids = extract_heroes_from_room_summary( room_membership_summary, me=user.to_string() ) @@ -904,9 +962,15 @@ class SlidingSyncHandler: # We need this base set of info for the response so let's just fetch it along # with the `required_state` for the room - meta_room_state = [(EventTypes.Name, ""), (EventTypes.RoomAvatar, "")] + [ + hero_room_state = [ (EventTypes.Member, hero_user_id) for hero_user_id in hero_user_ids ] + meta_room_state = list(hero_room_state) + if initial or name_changed: + meta_room_state.append((EventTypes.Name, "")) + if initial or avatar_changed: + meta_room_state.append((EventTypes.RoomAvatar, "")) + state_filter = StateFilter.all() if required_state_filter != StateFilter.all(): state_filter = StateFilter( @@ -929,21 +993,22 @@ class SlidingSyncHandler: else: assert from_bound is not None - # TODO: Limit the number of state events we're about to send down - # the room, if its too many we should change this to an - # `initial=True`? - deltas = await self.store.get_current_state_deltas_for_room( - room_id=room_id, - from_token=from_bound, - to_token=to_token.room_key, - ) - # TODO: Filter room state before fetching events - # TODO: Handle state resets where event_id is None events = await self.store.get_events( - [d.event_id for d in deltas if d.event_id] + state_filter.filter_state(room_state_delta_id_map).values() ) room_state = {(s.type, s.state_key): s for s in events.values()} + # If the membership changed and we have to get heroes, get the remaining + # heroes from the state + if hero_user_ids: + hero_membership_state = await self.get_current_state_at( + room_id=room_id, + room_membership_for_user_at_to_token=room_membership_for_user_at_to_token, + state_filter=StateFilter.from_types(hero_room_state), + to_token=to_token, + ) + room_state.update(hero_membership_state) + required_room_state: StateMap[EventBase] = {} if required_state_filter != StateFilter.none(): required_room_state = required_state_filter.filter_state(room_state) @@ -1050,6 +1115,20 @@ class SlidingSyncHandler: set_tag(SynapseTags.RESULT_PREFIX + "initial", initial) + joined_count: Optional[int] = None + if initial or membership_changed: + assert room_membership_summary is not None + joined_count = room_membership_summary.get( + Membership.JOIN, empty_membership_summary + ).count + + invited_count: Optional[int] = None + if initial or membership_changed: + assert room_membership_summary is not None + invited_count = room_membership_summary.get( + Membership.INVITE, empty_membership_summary + ).count + return SlidingSyncResult.RoomResult( name=room_name, avatar=room_avatar, @@ -1065,12 +1144,8 @@ class SlidingSyncHandler: unstable_expanded_timeline=unstable_expanded_timeline, num_live=num_live, bump_stamp=bump_stamp, - joined_count=room_membership_summary.get( - Membership.JOIN, empty_membership_summary - ).count, - invited_count=room_membership_summary.get( - Membership.INVITE, empty_membership_summary - ).count, + joined_count=joined_count, + invited_count=invited_count, # TODO: These are just dummy values. We could potentially just remove these # since notifications can only really be done correctly on the client anyway # (encrypted rooms). diff --git a/synapse/rest/client/sync.py b/synapse/rest/client/sync.py index cc9fbfe546..9e2bf98189 100644 --- a/synapse/rest/client/sync.py +++ b/synapse/rest/client/sync.py @@ -1011,12 +1011,16 @@ class SlidingSyncRestServlet(RestServlet): for room_id, room_result in rooms.items(): serialized_rooms[room_id] = { "bump_stamp": room_result.bump_stamp, - "joined_count": room_result.joined_count, - "invited_count": room_result.invited_count, "notification_count": room_result.notification_count, "highlight_count": room_result.highlight_count, } + if room_result.joined_count is not None: + serialized_rooms[room_id]["joined_count"] = room_result.joined_count + + if room_result.invited_count is not None: + serialized_rooms[room_id]["invited_count"] = room_result.invited_count + if room_result.name: serialized_rooms[room_id]["name"] = room_result.name diff --git a/synapse/types/handlers/sliding_sync.py b/synapse/types/handlers/sliding_sync.py index 9d934dd563..e1b2af7a03 100644 --- a/synapse/types/handlers/sliding_sync.py +++ b/synapse/types/handlers/sliding_sync.py @@ -197,8 +197,8 @@ class SlidingSyncResult: # Only optional because it won't be included for invite/knock rooms with `stripped_state` num_live: Optional[int] bump_stamp: int - joined_count: int - invited_count: int + joined_count: Optional[int] + invited_count: Optional[int] notification_count: int highlight_count: int @@ -207,6 +207,12 @@ class SlidingSyncResult: # If this is the first time the client is seeing the room, we should not filter it out # under any circumstance. self.initial + # We need to let the client know if any of the info has changed + or self.name is not None + or self.avatar is not None + or bool(self.heroes) + or self.joined_count is not None + or self.invited_count is not None # We need to let the client know if there are any new events or bool(self.required_state) or bool(self.timeline_events) diff --git a/tests/rest/client/sliding_sync/test_rooms_meta.py b/tests/rest/client/sliding_sync/test_rooms_meta.py index aac2e60586..6d2742e25f 100644 --- a/tests/rest/client/sliding_sync/test_rooms_meta.py +++ b/tests/rest/client/sliding_sync/test_rooms_meta.py @@ -13,7 +13,7 @@ # import logging -from parameterized import parameterized_class +from parameterized import parameterized, parameterized_class from twisted.test.proto_helpers import MemoryReactor @@ -67,10 +67,11 @@ class SlidingSyncRoomsMetaTestCase(SlidingSyncBase): super().prepare(reactor, clock, hs) - def test_rooms_meta_when_joined(self) -> None: + def test_rooms_meta_when_joined_initial(self) -> None: """ - Test that the `rooms` `name` and `avatar` are included in the response and - reflect the current state of the room when the user is joined to the room. + Test that the `rooms` `name` and `avatar` are included in the initial sync + response and reflect the current state of the room when the user is joined to + the room. """ user1_id = self.register_user("user1", "pass") user1_tok = self.login(user1_id, "pass") @@ -107,6 +108,7 @@ class SlidingSyncRoomsMetaTestCase(SlidingSyncBase): response_body, _ = self.do_sync(sync_body, tok=user1_tok) # Reflect the current state of the room + self.assertEqual(response_body["rooms"][room_id1]["initial"], True) self.assertEqual( response_body["rooms"][room_id1]["name"], "my super room", @@ -129,6 +131,178 @@ class SlidingSyncRoomsMetaTestCase(SlidingSyncBase): response_body["rooms"][room_id1].get("is_dm"), ) + def test_rooms_meta_when_joined_incremental_no_change(self) -> None: + """ + Test that the `rooms` `name` and `avatar` aren't included in an incremental sync + response if they haven't changed. + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + user2_id = self.register_user("user2", "pass") + user2_tok = self.login(user2_id, "pass") + + room_id1 = self.helper.create_room_as( + user2_id, + tok=user2_tok, + extra_content={ + "name": "my super room", + }, + ) + # Set the room avatar URL + self.helper.send_state( + room_id1, + EventTypes.RoomAvatar, + {"url": "mxc://DUMMY_MEDIA_ID"}, + tok=user2_tok, + ) + + self.helper.join(room_id1, user1_id, tok=user1_tok) + + # Make the Sliding Sync request + sync_body = { + "lists": { + "foo-list": { + "ranges": [[0, 1]], + "required_state": [], + # This needs to be set to one so the `RoomResult` isn't empty and + # the room comes down incremental sync when we send a new message. + "timeline_limit": 1, + } + } + } + response_body, from_token = self.do_sync(sync_body, tok=user1_tok) + + # Send a message to make the room come down sync + self.helper.send(room_id1, "message in room1", tok=user2_tok) + + # Incremental sync + response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok) + + # We should only see changed meta info (nothing changed so we shouldn't see any + # of these fields) + self.assertNotIn( + "initial", + response_body["rooms"][room_id1], + ) + self.assertNotIn( + "name", + response_body["rooms"][room_id1], + ) + self.assertNotIn( + "avatar", + response_body["rooms"][room_id1], + ) + self.assertNotIn( + "joined_count", + response_body["rooms"][room_id1], + ) + self.assertNotIn( + "invited_count", + response_body["rooms"][room_id1], + ) + self.assertIsNone( + response_body["rooms"][room_id1].get("is_dm"), + ) + + @parameterized.expand( + [ + ("in_required_state", True), + ("not_in_required_state", False), + ] + ) + def test_rooms_meta_when_joined_incremental_with_state_change( + self, test_description: str, include_changed_state_in_required_state: bool + ) -> None: + """ + Test that the `rooms` `name` and `avatar` are included in an incremental sync + response if they changed. + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + user2_id = self.register_user("user2", "pass") + user2_tok = self.login(user2_id, "pass") + + room_id1 = self.helper.create_room_as( + user2_id, + tok=user2_tok, + extra_content={ + "name": "my super room", + }, + ) + # Set the room avatar URL + self.helper.send_state( + room_id1, + EventTypes.RoomAvatar, + {"url": "mxc://DUMMY_MEDIA_ID"}, + tok=user2_tok, + ) + + self.helper.join(room_id1, user1_id, tok=user1_tok) + + # Make the Sliding Sync request + sync_body = { + "lists": { + "foo-list": { + "ranges": [[0, 1]], + "required_state": ( + [[EventTypes.Name, ""], [EventTypes.RoomAvatar, ""]] + # Conditionally include the changed state in the + # `required_state` to make sure whether we request it or not, + # the new room name still flows down to the client. + if include_changed_state_in_required_state + else [] + ), + "timeline_limit": 0, + } + } + } + response_body, from_token = self.do_sync(sync_body, tok=user1_tok) + + # Update the room name + self.helper.send_state( + room_id1, + EventTypes.Name, + {EventContentFields.ROOM_NAME: "my super duper room"}, + tok=user2_tok, + ) + # Update the room avatar URL + self.helper.send_state( + room_id1, + EventTypes.RoomAvatar, + {"url": "mxc://DUMMY_MEDIA_ID_UPDATED"}, + tok=user2_tok, + ) + + # Incremental sync + response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok) + + # We should only see changed meta info (the room name and avatar) + self.assertNotIn( + "initial", + response_body["rooms"][room_id1], + ) + self.assertEqual( + response_body["rooms"][room_id1]["name"], + "my super duper room", + response_body["rooms"][room_id1], + ) + self.assertEqual( + response_body["rooms"][room_id1]["avatar"], + "mxc://DUMMY_MEDIA_ID_UPDATED", + response_body["rooms"][room_id1], + ) + self.assertNotIn( + "joined_count", + response_body["rooms"][room_id1], + ) + self.assertNotIn( + "invited_count", + response_body["rooms"][room_id1], + ) + self.assertIsNone( + response_body["rooms"][room_id1].get("is_dm"), + ) + def test_rooms_meta_when_invited(self) -> None: """ Test that the `rooms` `name` and `avatar` are included in the response and @@ -186,6 +360,7 @@ class SlidingSyncRoomsMetaTestCase(SlidingSyncBase): # This should still reflect the current state of the room even when the user is # invited. + self.assertEqual(response_body["rooms"][room_id1]["initial"], True) self.assertEqual( response_body["rooms"][room_id1]["name"], "my super duper room", @@ -264,6 +439,7 @@ class SlidingSyncRoomsMetaTestCase(SlidingSyncBase): response_body, _ = self.do_sync(sync_body, tok=user1_tok) # Reflect the state of the room at the time of leaving + self.assertEqual(response_body["rooms"][room_id1]["initial"], True) self.assertEqual( response_body["rooms"][room_id1]["name"], "my super room", @@ -338,6 +514,7 @@ class SlidingSyncRoomsMetaTestCase(SlidingSyncBase): # Room1 has a name so we shouldn't see any `heroes` which the client would use # the calculate the room name themselves. + self.assertEqual(response_body["rooms"][room_id1]["initial"], True) self.assertEqual( response_body["rooms"][room_id1]["name"], "my super room", @@ -354,6 +531,7 @@ class SlidingSyncRoomsMetaTestCase(SlidingSyncBase): ) # Room2 doesn't have a name so we should see `heroes` populated + self.assertEqual(response_body["rooms"][room_id2]["initial"], True) self.assertIsNone(response_body["rooms"][room_id2].get("name")) self.assertCountEqual( [ @@ -425,6 +603,7 @@ class SlidingSyncRoomsMetaTestCase(SlidingSyncBase): response_body, _ = self.do_sync(sync_body, tok=user1_tok) # Room2 doesn't have a name so we should see `heroes` populated + self.assertEqual(response_body["rooms"][room_id1]["initial"], True) self.assertIsNone(response_body["rooms"][room_id1].get("name")) self.assertCountEqual( [ @@ -497,7 +676,8 @@ class SlidingSyncRoomsMetaTestCase(SlidingSyncBase): } response_body, _ = self.do_sync(sync_body, tok=user1_tok) - # Room2 doesn't have a name so we should see `heroes` populated + # Room doesn't have a name so we should see `heroes` populated + self.assertEqual(response_body["rooms"][room_id1]["initial"], True) self.assertIsNone(response_body["rooms"][room_id1].get("name")) self.assertCountEqual( [ @@ -527,6 +707,165 @@ class SlidingSyncRoomsMetaTestCase(SlidingSyncBase): 0, ) + def test_rooms_meta_heroes_incremental_sync_no_change(self) -> None: + """ + Test that the `rooms` `heroes` aren't included in an incremental sync + response if they haven't changed. + + (when the room doesn't have a room name set) + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + user2_id = self.register_user("user2", "pass") + user2_tok = self.login(user2_id, "pass") + user3_id = self.register_user("user3", "pass") + _user3_tok = self.login(user3_id, "pass") + + room_id = self.helper.create_room_as( + user2_id, + tok=user2_tok, + extra_content={ + # No room name set so that `heroes` is populated + # + # "name": "my super room2", + }, + ) + self.helper.join(room_id, user1_id, tok=user1_tok) + # User3 is invited + self.helper.invite(room_id, src=user2_id, targ=user3_id, tok=user2_tok) + + # Make the Sliding Sync request + sync_body = { + "lists": { + "foo-list": { + "ranges": [[0, 1]], + "required_state": [], + # This needs to be set to one so the `RoomResult` isn't empty and + # the room comes down incremental sync when we send a new message. + "timeline_limit": 1, + } + } + } + response_body, from_token = self.do_sync(sync_body, tok=user1_tok) + + # Send a message to make the room come down sync + self.helper.send(room_id, "message in room", tok=user2_tok) + + # Incremental sync + response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok) + + # This is an incremental sync and the second time we have seen this room so it + # isn't `initial` + self.assertNotIn( + "initial", + response_body["rooms"][room_id], + ) + # Room shouldn't have a room name because we're testing the `heroes` field which + # will only has a chance to appear if the room doesn't have a name. + self.assertNotIn( + "name", + response_body["rooms"][room_id], + ) + # No change to heroes + self.assertNotIn( + "heroes", + response_body["rooms"][room_id], + ) + # No change to member counts + self.assertNotIn( + "joined_count", + response_body["rooms"][room_id], + ) + self.assertNotIn( + "invited_count", + response_body["rooms"][room_id], + ) + # We didn't request any state so we shouldn't see any `required_state` + self.assertNotIn( + "required_state", + response_body["rooms"][room_id], + ) + + def test_rooms_meta_heroes_incremental_sync_with_membership_change(self) -> None: + """ + Test that the `rooms` `heroes` are included in an incremental sync response if + the membership has changed. + + (when the room doesn't have a room name set) + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + user2_id = self.register_user("user2", "pass") + user2_tok = self.login(user2_id, "pass") + user3_id = self.register_user("user3", "pass") + user3_tok = self.login(user3_id, "pass") + + room_id = self.helper.create_room_as( + user2_id, + tok=user2_tok, + extra_content={ + # No room name set so that `heroes` is populated + # + # "name": "my super room2", + }, + ) + self.helper.join(room_id, user1_id, tok=user1_tok) + # User3 is invited + self.helper.invite(room_id, src=user2_id, targ=user3_id, tok=user2_tok) + + # Make the Sliding Sync request + sync_body = { + "lists": { + "foo-list": { + "ranges": [[0, 1]], + "required_state": [], + "timeline_limit": 0, + } + } + } + response_body, from_token = self.do_sync(sync_body, tok=user1_tok) + + # User3 joins (membership change) + self.helper.join(room_id, user3_id, tok=user3_tok) + + # Incremental sync + response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok) + + # This is an incremental sync and the second time we have seen this room so it + # isn't `initial` + self.assertNotIn( + "initial", + response_body["rooms"][room_id], + ) + # Room shouldn't have a room name because we're testing the `heroes` field which + # will only has a chance to appear if the room doesn't have a name. + self.assertNotIn( + "name", + response_body["rooms"][room_id], + ) + # Membership change so we should see heroes and membership counts + self.assertCountEqual( + [ + hero["user_id"] + for hero in response_body["rooms"][room_id].get("heroes", []) + ], + # Heroes shouldn't include the user themselves (we shouldn't see user1) + [user2_id, user3_id], + ) + self.assertEqual( + response_body["rooms"][room_id]["joined_count"], + 3, + ) + self.assertEqual( + response_body["rooms"][room_id]["invited_count"], + 0, + ) + # We didn't request any state so we shouldn't see any `required_state` + self.assertNotIn( + "required_state", + response_body["rooms"][room_id], + ) + def test_rooms_bump_stamp(self) -> None: """ Test that `bump_stamp` is present and pointing to relevant events.