Non lazy loading sync not blocking during fast join

Signed-off-by: Mathieu Velten <mathieuv@matrix.org>
This commit is contained in:
Mathieu Velten 2023-01-12 18:35:00 +01:00
parent 4db3331bb9
commit 3bad02fbfe
8 changed files with 108 additions and 15 deletions

1
changelog.d/14831.misc Normal file
View file

@ -0,0 +1 @@
Non lazy loading sync not blocking during fast join.

View file

@ -1817,11 +1817,34 @@ class SyncHandler:
) )
sync_result_builder.now_token = now_token sync_result_builder.now_token = now_token
# Retrieve rooms that got un partial stated in the meantime, only useful in case
# of a non lazy-loading-members sync.
un_partial_stated_rooms = set()
if not sync_result_builder.sync_config.filter_collection.lazy_load_members():
un_partial_state_rooms_since = 0
if sync_result_builder.since_token is not None:
un_partial_state_rooms_since = int(
sync_result_builder.since_token.un_partial_state_rooms_key
)
un_partial_state_rooms_now = int(
sync_result_builder.now_token.un_partial_state_rooms_key
)
if un_partial_state_rooms_since != un_partial_state_rooms_now:
un_partial_stated_rooms = (
await self.store.get_un_partial_stated_rooms_between(
un_partial_state_rooms_since,
un_partial_state_rooms_now,
)
)
# 2. We check up front if anything has changed, if it hasn't then there is # 2. We check up front if anything has changed, if it hasn't then there is
# no point in going further. # no point in going further.
if not sync_result_builder.full_state: if not sync_result_builder.full_state:
if since_token and not ephemeral_by_room and not account_data_by_room: if since_token and not ephemeral_by_room and not account_data_by_room:
have_changed = await self._have_rooms_changed(sync_result_builder) have_changed = await self._have_rooms_changed(
sync_result_builder, un_partial_stated_rooms
)
log_kv({"rooms_have_changed": have_changed}) log_kv({"rooms_have_changed": have_changed})
if not have_changed: if not have_changed:
tags_by_room = await self.store.get_updated_tags( tags_by_room = await self.store.get_updated_tags(
@ -1835,7 +1858,7 @@ class SyncHandler:
ignored_users = await self.store.ignored_users(user_id) ignored_users = await self.store.ignored_users(user_id)
if since_token: if since_token:
room_changes = await self._get_rooms_changed( room_changes = await self._get_rooms_changed(
sync_result_builder, ignored_users sync_result_builder, ignored_users, un_partial_stated_rooms
) )
tags_by_room = await self.store.get_updated_tags( tags_by_room = await self.store.get_updated_tags(
user_id, since_token.account_data_key user_id, since_token.account_data_key
@ -1888,7 +1911,9 @@ class SyncHandler:
) )
async def _have_rooms_changed( async def _have_rooms_changed(
self, sync_result_builder: "SyncResultBuilder" self,
sync_result_builder: "SyncResultBuilder",
un_partial_stated_rooms: Set[str],
) -> bool: ) -> bool:
"""Returns whether there may be any new events that should be sent down """Returns whether there may be any new events that should be sent down
the sync. Returns True if there are. the sync. Returns True if there are.
@ -1905,6 +1930,11 @@ class SyncHandler:
stream_id = since_token.room_key.stream stream_id = since_token.room_key.stream
for room_id in sync_result_builder.joined_room_ids: for room_id in sync_result_builder.joined_room_ids:
# If a room has been un partial stated in the meantime,
# let's consider it has changes and deal with it accordingly
# in _get_rooms_changed.
if room_id in un_partial_stated_rooms:
return True
if self.store.has_room_changed_since(room_id, stream_id): if self.store.has_room_changed_since(room_id, stream_id):
return True return True
return False return False
@ -1913,6 +1943,7 @@ class SyncHandler:
self, self,
sync_result_builder: "SyncResultBuilder", sync_result_builder: "SyncResultBuilder",
ignored_users: FrozenSet[str], ignored_users: FrozenSet[str],
un_partial_stated_rooms: Set[str],
) -> _RoomChanges: ) -> _RoomChanges:
"""Determine the changes in rooms to report to the user. """Determine the changes in rooms to report to the user.
@ -2116,7 +2147,24 @@ class SyncHandler:
room_entry = room_to_events.get(room_id, None) room_entry = room_to_events.get(room_id, None)
newly_joined = room_id in newly_joined_rooms newly_joined = room_id in newly_joined_rooms
if room_entry:
# In case of a non lazy-loading-members sync we want to include
# rooms that got un partial stated in the meantime, and we need
# to include the full state of them.
if (
not sync_config.filter_collection.lazy_load_members()
and room_id in un_partial_stated_rooms
):
entry = RoomSyncResultBuilder(
room_id=room_id,
rtype="joined",
events=None,
newly_joined=True,
full_state=True,
since_token=None,
upto_token=now_token,
)
elif room_entry:
events, start_key = room_entry events, start_key = room_entry
prev_batch_token = now_token.copy_and_replace( prev_batch_token = now_token.copy_and_replace(
@ -2186,6 +2234,13 @@ class SyncHandler:
knocked = [] knocked = []
for event in room_list: for event in room_list:
# Do not include rooms that we don't have the full state yet
# in case of non lazy-loading-members sync.
if (
not sync_config.filter_collection.lazy_load_members()
) and await self.store.is_partial_state_room(event.room_id):
continue
if event.room_version_id not in KNOWN_ROOM_VERSIONS: if event.room_version_id not in KNOWN_ROOM_VERSIONS:
continue continue

View file

@ -292,6 +292,7 @@ class RelationsWorkerStore(SQLBaseStore):
to_device_key=0, to_device_key=0,
device_list_key=0, device_list_key=0,
groups_key=0, groups_key=0,
un_partial_state_rooms_key=0,
) )
return events[:limit], next_token return events[:limit], next_token

View file

@ -26,6 +26,7 @@ from typing import (
Mapping, Mapping,
Optional, Optional,
Sequence, Sequence,
Set,
Tuple, Tuple,
Union, Union,
cast, cast,
@ -1285,10 +1286,39 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
# explanation.) # explanation.)
return self._un_partial_stated_rooms_stream_id_gen.get_current_token() return self._un_partial_stated_rooms_stream_id_gen.get_current_token()
async def get_un_partial_stated_rooms_between(
self, last_id: int, current_id: int
) -> Set[str]:
"""Get all rooms that got un partial stated between `last_id` exclusive and
`current_id` inclusive.
Returns:
The list of rooms.
"""
if last_id == current_id:
return set()
def _get_un_partial_stated_rooms_between_txn(
txn: LoggingTransaction,
) -> Set[str]:
sql = """
SELECT DISTINCT room_id FROM un_partial_stated_room_stream
WHERE ? < stream_id AND stream_id <= ?
"""
txn.execute(sql, (last_id, current_id))
return {r[0] for r in txn}
return await self.db_pool.runInteraction(
"get_un_partial_stated_rooms_between",
_get_un_partial_stated_rooms_between_txn,
)
async def get_un_partial_stated_rooms_from_stream( async def get_un_partial_stated_rooms_from_stream(
self, instance_name: str, last_id: int, current_id: int, limit: int self, instance_name: str, last_id: int, current_id: int, limit: int
) -> Tuple[List[Tuple[int, Tuple[str]]], int, bool]: ) -> Tuple[List[Tuple[int, Tuple[str]]], int, bool]:
"""Get updates for caches replication stream. """Get updates for un partial stated rooms replication stream.
Args: Args:
instance_name: The writer we want to fetch updates from. Unused instance_name: The writer we want to fetch updates from. Unused

View file

@ -58,6 +58,7 @@ class EventSources:
push_rules_key = self.store.get_max_push_rules_stream_id() push_rules_key = self.store.get_max_push_rules_stream_id()
to_device_key = self.store.get_to_device_stream_token() to_device_key = self.store.get_to_device_stream_token()
device_list_key = self.store.get_device_stream_token() device_list_key = self.store.get_device_stream_token()
un_partial_state_rooms_key = self.store.get_un_partial_stated_rooms_token()
token = StreamToken( token = StreamToken(
room_key=self.sources.room.get_current_key(), room_key=self.sources.room.get_current_key(),
@ -70,6 +71,7 @@ class EventSources:
device_list_key=device_list_key, device_list_key=device_list_key,
# Groups key is unused. # Groups key is unused.
groups_key=0, groups_key=0,
un_partial_state_rooms_key=un_partial_state_rooms_key,
) )
return token return token
@ -107,5 +109,6 @@ class EventSources:
to_device_key=0, to_device_key=0,
device_list_key=0, device_list_key=0,
groups_key=0, groups_key=0,
un_partial_state_rooms_key=0,
) )
return token return token

View file

@ -646,12 +646,13 @@ class StreamToken:
7. `to_device_key`: `274711` 7. `to_device_key`: `274711`
8. `device_list_key`: `265584` 8. `device_list_key`: `265584`
9. `groups_key`: `1` (note that this key is now unused) 9. `groups_key`: `1` (note that this key is now unused)
10. `un_partial_state_rooms_key`: `379`
You can see how many of these keys correspond to the various You can see how many of these keys correspond to the various
fields in a "/sync" response: fields in a "/sync" response:
```json ```json
{ {
"next_batch": "s12_4_0_1_1_1_1_4_1", "next_batch": "s12_4_0_1_1_1_1_4_1_1",
"presence": { "presence": {
"events": [] "events": []
}, },
@ -663,7 +664,7 @@ class StreamToken:
"!QrZlfIDQLNLdZHqTnt:hs1": { "!QrZlfIDQLNLdZHqTnt:hs1": {
"timeline": { "timeline": {
"events": [], "events": [],
"prev_batch": "s10_4_0_1_1_1_1_4_1", "prev_batch": "s10_4_0_1_1_1_1_4_1_1",
"limited": false "limited": false
}, },
"state": { "state": {
@ -699,6 +700,7 @@ class StreamToken:
device_list_key: int device_list_key: int
# Note that the groups key is no longer used and may have bogus values. # Note that the groups key is no longer used and may have bogus values.
groups_key: int groups_key: int
un_partial_state_rooms_key: int
_SEPARATOR = "_" _SEPARATOR = "_"
START: ClassVar["StreamToken"] START: ClassVar["StreamToken"]
@ -737,6 +739,7 @@ class StreamToken:
# serialized so that there will not be confusion in the future # serialized so that there will not be confusion in the future
# if additional tokens are added. # if additional tokens are added.
str(self.groups_key), str(self.groups_key),
str(self.un_partial_state_rooms_key),
] ]
) )
@ -769,7 +772,7 @@ class StreamToken:
return attr.evolve(self, **{key: new_value}) return attr.evolve(self, **{key: new_value})
StreamToken.START = StreamToken(RoomStreamToken(None, 0), 0, 0, 0, 0, 0, 0, 0, 0) StreamToken.START = StreamToken(RoomStreamToken(None, 0), 0, 0, 0, 0, 0, 0, 0, 0, 0)
@attr.s(slots=True, frozen=True, auto_attribs=True) @attr.s(slots=True, frozen=True, auto_attribs=True)

View file

@ -1831,7 +1831,7 @@ class RoomMessagesTestCase(unittest.HomeserverTestCase):
def test_topo_token_is_accepted(self) -> None: def test_topo_token_is_accepted(self) -> None:
"""Test Topo Token is accepted.""" """Test Topo Token is accepted."""
token = "t1-0_0_0_0_0_0_0_0_0" token = "t1-0_0_0_0_0_0_0_0_0_0"
channel = self.make_request( channel = self.make_request(
"GET", "GET",
"/_synapse/admin/v1/rooms/%s/messages?from=%s" % (self.room_id, token), "/_synapse/admin/v1/rooms/%s/messages?from=%s" % (self.room_id, token),
@ -1845,7 +1845,7 @@ class RoomMessagesTestCase(unittest.HomeserverTestCase):
def test_stream_token_is_accepted_for_fwd_pagianation(self) -> None: def test_stream_token_is_accepted_for_fwd_pagianation(self) -> None:
"""Test that stream token is accepted for forward pagination.""" """Test that stream token is accepted for forward pagination."""
token = "s0_0_0_0_0_0_0_0_0" token = "s0_0_0_0_0_0_0_0_0_0"
channel = self.make_request( channel = self.make_request(
"GET", "GET",
"/_synapse/admin/v1/rooms/%s/messages?from=%s" % (self.room_id, token), "/_synapse/admin/v1/rooms/%s/messages?from=%s" % (self.room_id, token),

View file

@ -1987,7 +1987,7 @@ class RoomMessageListTestCase(RoomBase):
self.room_id = self.helper.create_room_as(self.user_id) self.room_id = self.helper.create_room_as(self.user_id)
def test_topo_token_is_accepted(self) -> None: def test_topo_token_is_accepted(self) -> None:
token = "t1-0_0_0_0_0_0_0_0_0" token = "t1-0_0_0_0_0_0_0_0_0_0"
channel = self.make_request( channel = self.make_request(
"GET", "/rooms/%s/messages?access_token=x&from=%s" % (self.room_id, token) "GET", "/rooms/%s/messages?access_token=x&from=%s" % (self.room_id, token)
) )
@ -1998,7 +1998,7 @@ class RoomMessageListTestCase(RoomBase):
self.assertTrue("end" in channel.json_body) self.assertTrue("end" in channel.json_body)
def test_stream_token_is_accepted_for_fwd_pagianation(self) -> None: def test_stream_token_is_accepted_for_fwd_pagianation(self) -> None:
token = "s0_0_0_0_0_0_0_0_0" token = "s0_0_0_0_0_0_0_0_0_0"
channel = self.make_request( channel = self.make_request(
"GET", "/rooms/%s/messages?access_token=x&from=%s" % (self.room_id, token) "GET", "/rooms/%s/messages?access_token=x&from=%s" % (self.room_id, token)
) )
@ -2728,7 +2728,7 @@ class LabelsTestCase(unittest.HomeserverTestCase):
"""Test that we can filter by a label on a /messages request.""" """Test that we can filter by a label on a /messages request."""
self._send_labelled_messages_in_room() self._send_labelled_messages_in_room()
token = "s0_0_0_0_0_0_0_0_0" token = "s0_0_0_0_0_0_0_0_0_0"
channel = self.make_request( channel = self.make_request(
"GET", "GET",
"/rooms/%s/messages?access_token=%s&from=%s&filter=%s" "/rooms/%s/messages?access_token=%s&from=%s&filter=%s"
@ -2745,7 +2745,7 @@ class LabelsTestCase(unittest.HomeserverTestCase):
"""Test that we can filter by the absence of a label on a /messages request.""" """Test that we can filter by the absence of a label on a /messages request."""
self._send_labelled_messages_in_room() self._send_labelled_messages_in_room()
token = "s0_0_0_0_0_0_0_0_0" token = "s0_0_0_0_0_0_0_0_0_0"
channel = self.make_request( channel = self.make_request(
"GET", "GET",
"/rooms/%s/messages?access_token=%s&from=%s&filter=%s" "/rooms/%s/messages?access_token=%s&from=%s&filter=%s"
@ -2768,7 +2768,7 @@ class LabelsTestCase(unittest.HomeserverTestCase):
""" """
self._send_labelled_messages_in_room() self._send_labelled_messages_in_room()
token = "s0_0_0_0_0_0_0_0_0" token = "s0_0_0_0_0_0_0_0_0_0"
channel = self.make_request( channel = self.make_request(
"GET", "GET",
"/rooms/%s/messages?access_token=%s&from=%s&filter=%s" "/rooms/%s/messages?access_token=%s&from=%s&filter=%s"