diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index 1d07e22c91..90991031aa 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -37,7 +37,7 @@ from synapse.types import ( UserID, ) from synapse.types.handlers import OperationType, SlidingSyncConfig, SlidingSyncResult -from synapse.types.state import StateFilter, StateKey +from synapse.types.state import StateFilter from synapse.visibility import filter_events_for_client if TYPE_CHECKING: @@ -764,6 +764,7 @@ class SlidingSyncHandler: if room_sync_config.timeline_limit > 0: newly_joined = False if ( + # We can only determine new-ness if we have a `from_token` to define our range from_token is not None and rooms_for_user_membership_at_to_token.membership == Membership.JOIN ): @@ -778,11 +779,11 @@ class SlidingSyncHandler: room_id=room_id, # We're going to paginate backwards from the `to_token` from_key=to_token.room_key, - # We should return historical messages (outside token range) in the + # We should return historical messages (before token range) in the # following cases because we want clients to be able to show a basic # screen of information: # - Initial sync (because no `from_token` to limit us anyway) - # - When users newly_joined + # - When users `newly_joined` # - TODO: For an incremental sync where we haven't sent it down this # connection before to_key=( @@ -832,12 +833,15 @@ class SlidingSyncHandler: num_live = 0 if from_token is not None: for timeline_event in timeline_events: - if ( - timeline_event.internal_metadata.stream_ordering - > from_token.room_key.get_stream_pos_for_instance( - timeline_event.internal_metadata.instance_name - ) - ): + # This fields should be present for all persisted events + assert timeline_event.internal_metadata.stream_ordering is not None + assert timeline_event.internal_metadata.instance_name is not None + + persisted_position = PersistedEventPosition( + instance_name=timeline_event.internal_metadata.instance_name, + stream=timeline_event.internal_metadata.stream_ordering, + ) + if persisted_position.persisted_after(from_token.room_key): num_live += 1 prev_batch_token = prev_batch_token.copy_and_replace( diff --git a/tests/rest/client/test_sync.py b/tests/rest/client/test_sync.py index 2b06767b8a..5b611cd096 100644 --- a/tests/rest/client/test_sync.py +++ b/tests/rest/client/test_sync.py @@ -35,7 +35,7 @@ from synapse.api.constants import ( ) from synapse.rest.client import devices, knock, login, read_marker, receipts, room, sync from synapse.server import HomeServer -from synapse.types import JsonDict, RoomStreamToken, StreamKeyType +from synapse.types import JsonDict, RoomStreamToken, StreamKeyType, StreamToken from synapse.util import Clock from tests import unittest @@ -1282,7 +1282,7 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase): def test_sync_list(self) -> None: """ - Test that room IDs show up in the Sliding Sync lists + Test that room IDs show up in the Sliding Sync `lists` """ alice_user_id = self.register_user("alice", "correcthorse") alice_access_token = self.login(alice_user_id, "correcthorse") @@ -1387,7 +1387,7 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase): def test_filter_list(self) -> None: """ - Test that filters apply to lists + Test that filters apply to `lists` """ user1_id = self.register_user("user1", "pass") user1_tok = self.login(user1_id, "pass") @@ -1462,7 +1462,7 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase): def test_sort_list(self) -> None: """ - Test that the lists are sorted by `stream_ordering` + Test that the `lists` are sorted by `stream_ordering` """ user1_id = self.register_user("user1", "pass") user1_tok = self.login(user1_id, "pass") @@ -1516,3 +1516,135 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase): ], channel.json_body["lists"]["foo-list"], ) + + def test_rooms_limited_initial_sync(self) -> None: + """ + Test that we mark `rooms` as `limited=True` when we saturate the `timeline_limit` + on initial sync. + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + user2_id = self.register_user("user2", "pass") + user2_tok = self.login(user2_id, "pass") + + room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok) + self.helper.send(room_id1, "activity1", tok=user2_tok) + self.helper.send(room_id1, "activity2", tok=user2_tok) + event_response3 = self.helper.send(room_id1, "activity3", tok=user2_tok) + event_pos3 = self.get_success( + self.store.get_position_for_event(event_response3["event_id"]) + ) + event_response4 = self.helper.send(room_id1, "activity4", tok=user2_tok) + event_pos4 = self.get_success( + self.store.get_position_for_event(event_response4["event_id"]) + ) + event_response5 = self.helper.send(room_id1, "activity5", tok=user2_tok) + user1_join_response = self.helper.join(room_id1, user1_id, tok=user1_tok) + + # Make the Sliding Sync request + channel = self.make_request( + "POST", + self.sync_endpoint, + { + "lists": { + "foo-list": { + "ranges": [[0, 1]], + "required_state": [], + "timeline_limit": 3, + } + } + }, + access_token=user1_tok, + ) + self.assertEqual(channel.code, 200, channel.json_body) + + # We expect to saturate the `timeline_limit` (there are more than 3 messages in the room) + self.assertEqual( + channel.json_body["rooms"][room_id1]["limited"], + True, + channel.json_body["rooms"][room_id1], + ) + # Check to make sure the latest events are returned + self.assertEqual( + [ + event["event_id"] + for event in channel.json_body["rooms"][room_id1]["timeline"] + ], + [ + event_response4["event_id"], + event_response5["event_id"], + user1_join_response["event_id"], + ], + channel.json_body["rooms"][room_id1]["timeline"], + ) + + # Check to make sure the `prev_batch` points at the right place + prev_batch_token = self.get_success( + StreamToken.from_string( + self.store, channel.json_body["rooms"][room_id1]["prev_batch"] + ) + ) + prev_batch_room_stream_token_serialized = self.get_success( + prev_batch_token.room_key.to_string(self.store) + ) + # If we use the `prev_batch` token to look backwards, we should see `event3` + # next so make sure the token encompasses it + self.assertEqual( + event_pos3.persisted_after(prev_batch_token.room_key), + False, + f"`prev_batch` token {prev_batch_room_stream_token_serialized} should be >= event_pos3={self.get_success(event_pos3.to_room_stream_token().to_string(self.store))}", + ) + # If we use the `prev_batch` token to look backwards, we shouldn't see `event4` + # anymore since it was just returned in this response. + self.assertEqual( + event_pos4.persisted_after(prev_batch_token.room_key), + True, + f"`prev_batch` token {prev_batch_room_stream_token_serialized} should be < event_pos4={self.get_success(event_pos4.to_room_stream_token().to_string(self.store))}", + ) + + def test_not_limited_initial_sync(self) -> None: + """ + Test that we mark `rooms` as `limited=False` when there are no more events to + paginate to. + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + user2_id = self.register_user("user2", "pass") + user2_tok = self.login(user2_id, "pass") + + room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok) + self.helper.send(room_id1, "activity1", tok=user2_tok) + self.helper.send(room_id1, "activity2", tok=user2_tok) + self.helper.send(room_id1, "activity3", tok=user2_tok) + self.helper.join(room_id1, user1_id, tok=user1_tok) + + # Make the Sliding Sync request + channel = self.make_request( + "POST", + self.sync_endpoint, + { + "lists": { + "foo-list": { + "ranges": [[0, 1]], + "required_state": [], + "timeline_limit": 100, + } + } + }, + access_token=user1_tok, + ) + self.assertEqual(channel.code, 200, channel.json_body) + + # The timeline should be `limited=False` because we have all of the events (no + # more to paginate to) + self.assertEqual( + channel.json_body["rooms"][room_id1]["limited"], + False, + channel.json_body["rooms"][room_id1], + ) + # We're just looking to make sure we got all of the events before hitting the `timeline_limit` + self.assertEqual( + len(channel.json_body["rooms"][room_id1]["timeline"]), + 9, + channel.json_body["rooms"][room_id1]["timeline"], + )