Add test for notifier.wait_for_stream_token(from_token)

This commit is contained in:
Eric Eastwood 2024-06-03 23:45:17 -05:00
parent 8bb357a35e
commit 03dd87ab3c

View file

@ -34,7 +34,7 @@ from synapse.api.constants import (
)
from synapse.rest.client import devices, knock, login, read_marker, receipts, room, sync
from synapse.server import HomeServer
from synapse.types import JsonDict
from synapse.types import JsonDict, RoomStreamToken, StreamKeyType
from synapse.util import Clock
from tests import unittest
@ -1227,6 +1227,8 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.sync_endpoint = "/_matrix/client/unstable/org.matrix.msc3575/sync"
self.store = hs.get_datastores().main
self.event_sources = hs.get_event_sources()
def test_sync_list(self) -> None:
"""
@ -1280,3 +1282,57 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
],
channel.json_body["lists"]["foo-list"],
)
def test_wait_for_sync_token(self) -> None:
"""
Test that worker will wait until it catches up to the given token
"""
alice_user_id = self.register_user("alice", "correcthorse")
alice_access_token = self.login(alice_user_id, "correcthorse")
# Create a future token that will cause us to wait. Since we never send a new
# event to reach that future stream_ordering, the worker will wait until the
# full timeout.
current_token = self.event_sources.get_current_token()
future_position_token = current_token.copy_and_replace(
StreamKeyType.ROOM,
RoomStreamToken(stream=current_token.room_key.stream + 1),
)
future_position_token_serialized = self.get_success(
future_position_token.to_string(self.store)
)
# Make the Sliding Sync request
channel = self.make_request(
"POST",
self.sync_endpoint + f"?pos={future_position_token_serialized}",
{
"lists": {
"foo-list": {
"ranges": [[0, 99]],
"sort": ["by_notification_level", "by_recency", "by_name"],
"required_state": [
["m.room.join_rules", ""],
["m.room.history_visibility", ""],
["m.space.child", "*"],
],
"timeline_limit": 1,
}
}
},
access_token=alice_access_token,
await_result=False,
)
# Block for 10 seconds to make `notifier.wait_for_stream_token(from_token)`
# timeout
with self.assertRaises(TimedOutException):
channel.await_result(timeout_ms=9900)
channel.await_result(timeout_ms=200)
self.assertEqual(channel.code, 200, channel.json_body)
# We expect the `next_pos` in the result to be the same as what we requested
# with because we weren't able to find anything new yet.
self.assertEqual(
channel.json_body["next_pos"], future_position_token_serialized
)