Add more tests

This commit is contained in:
Eric Eastwood 2024-06-18 18:10:17 -05:00
parent 81d36f36c1
commit 9791209a3d
4 changed files with 296 additions and 24 deletions

View file

@ -769,26 +769,29 @@ class SlidingSyncHandler:
and rooms_for_user_membership_at_to_token.membership == Membership.JOIN and rooms_for_user_membership_at_to_token.membership == Membership.JOIN
): ):
newly_joined = ( newly_joined = (
rooms_for_user_membership_at_to_token.event_pos.stream rooms_for_user_membership_at_to_token.event_pos.persisted_after(
> from_token.room_key.get_stream_pos_for_instance( from_token.room_key
rooms_for_user_membership_at_to_token.event_pos.instance_name
) )
) )
# We should return historical messages (before token range) in the
# following cases because we want clients to be able to show a basic
# screen of information:
# - Initial sync (because no `from_token` to limit us anyway)
# - When users `newly_joined`
# - TODO: For an incremental sync where we haven't sent it down this
# connection before
should_limit_timeline_to_token_range = (
from_token is not None and not newly_joined
)
timeline_events, new_room_key = await self.store.paginate_room_events( timeline_events, new_room_key = await self.store.paginate_room_events(
room_id=room_id, room_id=room_id,
# We're going to paginate backwards from the `to_token` # We're going to paginate backwards from the `to_token`
from_key=to_token.room_key, from_key=to_token.room_key,
# We should return historical messages (before token range) in the
# following cases because we want clients to be able to show a basic
# screen of information:
# - Initial sync (because no `from_token` to limit us anyway)
# - When users `newly_joined`
# - TODO: For an incremental sync where we haven't sent it down this
# connection before
to_key=( to_key=(
from_token.room_key from_token.room_key
if from_token is not None and not newly_joined if should_limit_timeline_to_token_range
else None else None
), ),
direction=Direction.BACKWARDS, direction=Direction.BACKWARDS,
@ -832,7 +835,7 @@ class SlidingSyncHandler:
# old events in the timeline) # old events in the timeline)
num_live = 0 num_live = 0
if from_token is not None: if from_token is not None:
for timeline_event in timeline_events: for timeline_event in reversed(timeline_events):
# This fields should be present for all persisted events # This fields should be present for all persisted events
assert timeline_event.internal_metadata.stream_ordering is not None assert timeline_event.internal_metadata.stream_ordering is not None
assert timeline_event.internal_metadata.instance_name is not None assert timeline_event.internal_metadata.instance_name is not None
@ -843,6 +846,12 @@ class SlidingSyncHandler:
) )
if persisted_position.persisted_after(from_token.room_key): if persisted_position.persisted_after(from_token.room_key):
num_live += 1 num_live += 1
else:
# Since we're iterating over the timeline events in
# reverse-chronological order, we can break once we hit an event
# that's not live. In the future, we could potentially optimize
# this more with a binary search (bisect).
break
prev_batch_token = prev_batch_token.copy_and_replace( prev_batch_token = prev_batch_token.copy_and_replace(
StreamKeyType.ROOM, new_room_key StreamKeyType.ROOM, new_room_key

View file

@ -785,7 +785,7 @@ class SlidingSyncRestServlet(RestServlet):
Response JSON:: Response JSON::
{ {
"next_pos": "s58_224_0_13_10_1_1_16_0_1", "pos": "s58_224_0_13_10_1_1_16_0_1",
"lists": { "lists": {
"foo-list": { "foo-list": {
"count": 1337, "count": 1337,
@ -824,7 +824,8 @@ class SlidingSyncRestServlet(RestServlet):
"joined_count": 41, "joined_count": 41,
"invited_count": 1, "invited_count": 1,
"notification_count": 1, "notification_count": 1,
"highlight_count": 0 "highlight_count": 0,
"num_live": 2"
}, },
// rooms from list // rooms from list
"!foo:bar": { "!foo:bar": {
@ -849,7 +850,8 @@ class SlidingSyncRestServlet(RestServlet):
"joined_count": 4, "joined_count": 4,
"invited_count": 0, "invited_count": 0,
"notification_count": 54, "notification_count": 54,
"highlight_count": 3 "highlight_count": 3,
"num_live": 1,
}, },
// ... 99 more items // ... 99 more items
}, },
@ -927,7 +929,7 @@ class SlidingSyncRestServlet(RestServlet):
) -> JsonDict: ) -> JsonDict:
response: JsonDict = defaultdict(dict) response: JsonDict = defaultdict(dict)
response["next_pos"] = await sliding_sync_result.next_pos.to_string(self.store) response["pos"] = await sliding_sync_result.next_pos.to_string(self.store)
serialized_lists = self.encode_lists(sliding_sync_result.lists) serialized_lists = self.encode_lists(sliding_sync_result.lists)
if serialized_lists: if serialized_lists:
response["lists"] = serialized_lists response["lists"] = serialized_lists

View file

@ -1078,6 +1078,9 @@ class PersistedPosition:
stream: int stream: int
def persisted_after(self, token: AbstractMultiWriterStreamToken) -> bool: def persisted_after(self, token: AbstractMultiWriterStreamToken) -> bool:
"""
Checks whether this position happened after the token
"""
return token.get_stream_pos_for_instance(self.instance_name) < self.stream return token.get_stream_pos_for_instance(self.instance_name) < self.stream

View file

@ -19,6 +19,7 @@
# #
# #
import json import json
import logging
from typing import List from typing import List
from parameterized import parameterized, parameterized_class from parameterized import parameterized, parameterized_class
@ -35,7 +36,7 @@ from synapse.api.constants import (
) )
from synapse.rest.client import devices, knock, login, read_marker, receipts, room, sync from synapse.rest.client import devices, knock, login, read_marker, receipts, room, sync
from synapse.server import HomeServer from synapse.server import HomeServer
from synapse.types import JsonDict, RoomStreamToken, StreamKeyType, StreamToken from synapse.types import JsonDict, RoomStreamToken, StreamKeyType, StreamToken, UserID
from synapse.util import Clock from synapse.util import Clock
from tests import unittest from tests import unittest
@ -44,6 +45,8 @@ from tests.federation.transport.test_knocking import (
) )
from tests.server import TimedOutException from tests.server import TimedOutException
logger = logging.getLogger(__name__)
class FilterTestCase(unittest.HomeserverTestCase): class FilterTestCase(unittest.HomeserverTestCase):
user_id = "@apple:test" user_id = "@apple:test"
@ -1379,11 +1382,9 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
channel.await_result(timeout_ms=200) channel.await_result(timeout_ms=200)
self.assertEqual(channel.code, 200, channel.json_body) self.assertEqual(channel.code, 200, channel.json_body)
# We expect the `next_pos` in the result to be the same as what we requested # We expect the next `pos` in the result to be the same as what we requested
# with because we weren't able to find anything new yet. # with because we weren't able to find anything new yet.
self.assertEqual( self.assertEqual(channel.json_body["pos"], future_position_token_serialized)
channel.json_body["next_pos"], future_position_token_serialized
)
def test_filter_list(self) -> None: def test_filter_list(self) -> None:
""" """
@ -1602,7 +1603,15 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
f"`prev_batch` token {prev_batch_room_stream_token_serialized} should be < event_pos4={self.get_success(event_pos4.to_room_stream_token().to_string(self.store))}", f"`prev_batch` token {prev_batch_room_stream_token_serialized} should be < event_pos4={self.get_success(event_pos4.to_room_stream_token().to_string(self.store))}",
) )
def test_not_limited_initial_sync(self) -> None: # With no `from_token` (initial sync), it's all historical since there is no
# "current" range
self.assertEqual(
channel.json_body["rooms"][room_id1]["num_live"],
0,
channel.json_body["rooms"][room_id1],
)
def test_rooms_not_limited_initial_sync(self) -> None:
""" """
Test that we mark `rooms` as `limited=False` when there are no more events to Test that we mark `rooms` as `limited=False` when there are no more events to
paginate to. paginate to.
@ -1619,6 +1628,7 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
self.helper.join(room_id1, user1_id, tok=user1_tok) self.helper.join(room_id1, user1_id, tok=user1_tok)
# Make the Sliding Sync request # Make the Sliding Sync request
timeline_limit = 100
channel = self.make_request( channel = self.make_request(
"POST", "POST",
self.sync_endpoint, self.sync_endpoint,
@ -1627,7 +1637,7 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
"foo-list": { "foo-list": {
"ranges": [[0, 1]], "ranges": [[0, 1]],
"required_state": [], "required_state": [],
"timeline_limit": 100, "timeline_limit": timeline_limit,
} }
} }
}, },
@ -1642,9 +1652,257 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
False, False,
channel.json_body["rooms"][room_id1], channel.json_body["rooms"][room_id1],
) )
expected_number_of_events = 9
# We're just looking to make sure we got all of the events before hitting the `timeline_limit` # We're just looking to make sure we got all of the events before hitting the `timeline_limit`
self.assertEqual( self.assertEqual(
len(channel.json_body["rooms"][room_id1]["timeline"]), len(channel.json_body["rooms"][room_id1]["timeline"]),
9, expected_number_of_events,
channel.json_body["rooms"][room_id1]["timeline"], channel.json_body["rooms"][room_id1]["timeline"],
) )
self.assertLessEqual(expected_number_of_events, timeline_limit)
# With no `from_token` (initial sync), it's all historical since there is no
# "live" token range.
self.assertEqual(
channel.json_body["rooms"][room_id1]["num_live"],
0,
channel.json_body["rooms"][room_id1],
)
def test_rooms_incremental_sync(self) -> None:
"""
Test that `rooms` data during an incremental sync after an initial sync.
"""
user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")
user2_id = self.register_user("user2", "pass")
user2_tok = self.login(user2_id, "pass")
room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok)
self.helper.join(room_id1, user1_id, tok=user1_tok)
self.helper.send(room_id1, "activity before initial sync1", tok=user2_tok)
# Make an initial Sliding Sync request to grab a token. This is also a sanity
# check that we can go from initial to incremental sync.
sync_params = {
"lists": {
"foo-list": {
"ranges": [[0, 1]],
"required_state": [],
"timeline_limit": 3,
}
}
}
channel = self.make_request(
"POST",
self.sync_endpoint,
sync_params,
access_token=user1_tok,
)
self.assertEqual(channel.code, 200, channel.json_body)
next_pos = channel.json_body["pos"]
# Send some events but don't send enough to saturate the `timeline_limit`.
# We want to later test that we only get the new events since the `next_pos`
event_response2 = self.helper.send(room_id1, "activity after2", tok=user2_tok)
event_response3 = self.helper.send(room_id1, "activity after3", tok=user2_tok)
# Make an incremental Sliding Sync request (what we're trying to test)
channel = self.make_request(
"POST",
self.sync_endpoint + f"?pos={next_pos}",
sync_params,
access_token=user1_tok,
)
self.assertEqual(channel.code, 200, channel.json_body)
# We only expect to see the new events since the last sync which isn't enough to
# fill up the `timeline_limit`.
self.assertEqual(
channel.json_body["rooms"][room_id1]["limited"],
False,
f'Our `timeline_limit` was {sync_params["lists"]["foo-list"]["timeline_limit"]} '
+ f'and {len(channel.json_body["rooms"][room_id1]["timeline"])} events were returned in the timeline. '
+ str(channel.json_body["rooms"][room_id1]),
)
# Check to make sure the latest events are returned
self.assertEqual(
[
event["event_id"]
for event in channel.json_body["rooms"][room_id1]["timeline"]
],
[
event_response2["event_id"],
event_response3["event_id"],
],
channel.json_body["rooms"][room_id1]["timeline"],
)
# All events are "live"
self.assertEqual(
channel.json_body["rooms"][room_id1]["num_live"],
2,
channel.json_body["rooms"][room_id1],
)
def test_rooms_newly_joined_incremental_sync(self) -> None:
"""
Test that when we make an incremental sync with a `newly_joined` `rooms`, we are
able to see some historical events before the `from_token`.
"""
user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")
user2_id = self.register_user("user2", "pass")
user2_tok = self.login(user2_id, "pass")
room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok)
self.helper.send(room_id1, "activity before token1", tok=user2_tok)
event_response2 = self.helper.send(
room_id1, "activity before token2", tok=user2_tok
)
from_token = self.event_sources.get_current_token()
# Join the room after the `from_token` which will make us consider this room as
# `newly_joined`.
user1_join_response = self.helper.join(room_id1, user1_id, tok=user1_tok)
# Send some events but don't send enough to saturate the `timeline_limit`.
# We want to later test that we only get the new events since the `next_pos`
event_response3 = self.helper.send(
room_id1, "activity after token3", tok=user2_tok
)
event_response4 = self.helper.send(
room_id1, "activity after token4", tok=user2_tok
)
# The `timeline_limit` is set to 4 so we can at least see one historical event
# before the `from_token`. We should see historical events because this is a
# `newly_joined` room.
timeline_limit = 4
# Make an incremental Sliding Sync request (what we're trying to test)
channel = self.make_request(
"POST",
self.sync_endpoint
+ f"?pos={self.get_success(
from_token.to_string(self.store)
)}",
{
"lists": {
"foo-list": {
"ranges": [[0, 1]],
"required_state": [],
"timeline_limit": timeline_limit,
}
}
},
access_token=user1_tok,
)
self.assertEqual(channel.code, 200, channel.json_body)
# We should see the new events and the rest should be filled with historical
# events which will make us `limited=True` since there are more to paginate to.
self.assertEqual(
channel.json_body["rooms"][room_id1]["limited"],
True,
f"Our `timeline_limit` was {timeline_limit} "
+ f'and {len(channel.json_body["rooms"][room_id1]["timeline"])} events were returned in the timeline. '
+ str(channel.json_body["rooms"][room_id1]),
)
# Check to make sure that the "live" and historical events are returned
self.assertEqual(
[
event["event_id"]
for event in channel.json_body["rooms"][room_id1]["timeline"]
],
[
event_response2["event_id"],
user1_join_response["event_id"],
event_response3["event_id"],
event_response4["event_id"],
],
channel.json_body["rooms"][room_id1]["timeline"],
)
# Only events after the `from_token` are "live" (join, event3, event4)
self.assertEqual(
channel.json_body["rooms"][room_id1]["num_live"],
3,
channel.json_body["rooms"][room_id1],
)
def test_rooms_invite_sync(self) -> None:
"""
Test that `rooms` we are invited to have some stripped `invite_state` and that
we can't see any timeline events because we haven't joined the room yet.
"""
user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")
user1 = UserID.from_string(user1_id)
user2_id = self.register_user("user2", "pass")
user2_tok = self.login(user2_id, "pass")
user2 = UserID.from_string(user2_id)
room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok)
self.helper.send(room_id1, "activity before1", tok=user2_tok)
self.helper.send(room_id1, "activity before2", tok=user2_tok)
self.helper.invite(room_id1, src=user2_id, targ=user1_id, tok=user2_tok)
self.helper.send(room_id1, "activity after3", tok=user2_tok)
self.helper.send(room_id1, "activity after4", tok=user2_tok)
# Make the Sliding Sync request
channel = self.make_request(
"POST",
self.sync_endpoint,
{
"lists": {
"foo-list": {
"ranges": [[0, 1]],
"required_state": [],
"timeline_limit": 3,
}
}
},
access_token=user1_tok,
)
self.assertEqual(channel.code, 200, channel.json_body)
# Should not see anything (except maybe the invite event) because we haven't
# joined yet (`filter_events_for_client(...)` is doing the work here)
self.assertEqual(
channel.json_body["rooms"][room_id1]["timeline"],
[],
channel.json_body["rooms"][room_id1]["timeline"],
)
# We should have some stripped state so the potential joiner can identify the
# room (we don't care about the order).
self.assertCountEqual(
channel.json_body["rooms"][room_id1]["invite_state"],
[
{
"content": {"creator": user2_id, "room_version": "10"},
"sender": user2_id,
"state_key": "",
"type": "m.room.create",
},
{
"content": {"join_rule": "public"},
"sender": user2_id,
"state_key": "",
"type": "m.room.join_rules",
},
{
"content": {"displayname": user2.localpart, "membership": "join"},
"sender": user2_id,
"state_key": user2_id,
"type": "m.room.member",
},
{
"content": {"displayname": user1.localpart, "membership": "invite"},
"sender": user2_id,
"state_key": user1_id,
"type": "m.room.member",
},
],
channel.json_body["rooms"][room_id1]["invite_state"],
)