Start of gathering room list to display in sync

This commit is contained in:
Eric Eastwood 2024-05-20 18:18:11 -05:00
parent bfa8c63e57
commit 07d84ab66c
5 changed files with 148 additions and 35 deletions

View file

@ -1,12 +1,5 @@
import itertools
import logging import logging
from enum import Enum from typing import TYPE_CHECKING, AbstractSet, Dict, List, Optional
from typing import (
TYPE_CHECKING,
Dict,
List,
Optional,
)
import attr import attr
@ -17,14 +10,10 @@ if TYPE_CHECKING or HAS_PYDANTIC_V2:
else: else:
from pydantic import Extra from pydantic import Extra
from synapse.api.constants import Membership
from synapse.events import EventBase from synapse.events import EventBase
from synapse.rest.client.models import SlidingSyncBody from synapse.rest.client.models import SlidingSyncBody
from synapse.types import ( from synapse.types import JsonMapping, Requester, RoomStreamToken, StreamToken, UserID
JsonMapping,
Requester,
StreamToken,
UserID,
)
if TYPE_CHECKING: if TYPE_CHECKING:
from synapse.server import HomeServer from synapse.server import HomeServer
@ -137,6 +126,7 @@ class SlidingSyncHandler:
self.auth_blocking = hs.get_auth_blocking() self.auth_blocking = hs.get_auth_blocking()
self.notifier = hs.get_notifier() self.notifier = hs.get_notifier()
self.event_sources = hs.get_event_sources() self.event_sources = hs.get_event_sources()
self.rooms_to_exclude_globally = hs.config.server.rooms_to_exclude_from_sync
async def wait_for_sync_for_user( async def wait_for_sync_for_user(
self, self,
@ -154,15 +144,16 @@ class SlidingSyncHandler:
# auth_blocking will occur) # auth_blocking will occur)
await self.auth_blocking.check_auth_blocking(requester=requester) await self.auth_blocking.check_auth_blocking(requester=requester)
# TODO: If the To-Device extension is enabled and we have a since token, delete # TODO: If the To-Device extension is enabled and we have a `from_token`, delete
# any to-device messages before that token (since we now know that the device # any to-device messages before that token (since we now know that the device
# has received them). (see sync v2 for how to do this) # has received them). (see sync v2 for how to do this)
if timeout == 0 or from_token is None: if timeout == 0 or from_token is None:
now_token = self.event_sources.get_current_token()
return await self.current_sync_for_user( return await self.current_sync_for_user(
sync_config, sync_config,
from_token=from_token, from_token=from_token,
to_token=self.event_sources.get_current_token(), to_token=now_token,
) )
else: else:
# Otherwise, we wait for something to happen and report it to the user. # Otherwise, we wait for something to happen and report it to the user.
@ -182,15 +173,138 @@ class SlidingSyncHandler:
from_token=from_token, from_token=from_token,
) )
pass
async def current_sync_for_user( async def current_sync_for_user(
self, self,
sync_config: SlidingSyncConfig, sync_config: SlidingSyncConfig,
from_token: Optional[StreamToken] = None, from_token: Optional[StreamToken] = None,
to_token: Optional[StreamToken] = None, to_token: StreamToken = None,
): ):
user_id = sync_config.user.to_string() user_id = sync_config.user.to_string()
# TODO: Should we exclude app services here? There could be an argument to allow app_service = self.store.get_app_service_by_user_id(user_id)
# them since the appservice doesn't have to make a massive initial sync. if app_service:
# (related to https://github.com/matrix-org/matrix-doc/issues/1144) # We no longer support AS users using /sync directly.
# See https://github.com/matrix-org/matrix-doc/issues/1144
raise NotImplementedError()
room_id_list = await self.get_current_room_ids_for_user(
sync_config.user,
from_token=from_token,
to_token=to_token,
)
logger.info("Sliding sync rooms for user %s: %s", user_id, room_id_list)
# TODO: sync_config.room_subscriptions
# TODO: Calculate Membership changes between the last sync and the current sync.
async def get_current_room_ids_for_user(
self,
user: UserID,
from_token: Optional[StreamToken] = None,
to_token: StreamToken = None,
) -> AbstractSet[str]:
"""
Fetch room IDs that the user has not left since the given `from_token`
or newly_left rooms since the `from_token` and <= `to_token`.
"""
user_id = user.to_string()
room_for_user_list = await self.store.get_rooms_for_local_user_where_membership_is(
user_id=user_id,
# List everything except `Membership.LEAVE`
membership_list=(
Membership.INVITE,
Membership.JOIN,
Membership.KNOCK,
Membership.BAN,
),
excluded_rooms=self.rooms_to_exclude_globally,
)
max_stream_ordering_from_room_list = max(
room_for_user.stream_ordering for room_for_user in room_for_user_list
)
sync_room_id_set = {
room_for_user.room_id for room_for_user in room_for_user_list
}
# We assume the `from_token` is before the `to_token`
assert from_token.room_key.stream < to_token.room_key.stream
# We assume the `from_token`/`to_token` is before the `max_stream_ordering_from_room_list`
assert from_token.room_key.stream < max_stream_ordering_from_room_list
assert to_token.room_key.stream < max_stream_ordering_from_room_list
# Since we fetched the users room list at some point in time after the to/from
# tokens, we need to revert some membership changes to match the point in time
# of the `to_token`.
#
# - 1) Add back newly left rooms (> `from_token` and <= `to_token`)
# - 2a) Remove rooms that the user joined after the `to_token`
# - 2b) Add back rooms that the user left after the `to_token`
membership_change_events = await self.store.get_membership_changes_for_user(
user_id,
from_key=from_token.room_key,
to_key=RoomStreamToken(stream=max_stream_ordering_from_room_list),
excluded_rooms=self.rooms_to_exclude_globally,
)
# Assemble a list of the last membership events in some given ranges. Someone
# could have left and joined multiple times during the given range but we only
# care about end-result.
last_membership_change_by_room_id_in_from_to_range: Dict[str, EventBase] = {}
last_membership_change_by_room_id_after_to_token: Dict[str, EventBase] = {}
for event in membership_change_events:
assert event.internal_metadata.stream_ordering
if (
event.internal_metadata.stream_ordering > from_token.room_key.stream
and event.internal_metadata.stream_ordering <= to_token.room_key.stream
):
last_membership_change_by_room_id_in_from_to_range[event.room_id] = (
event
)
elif (
event.internal_metadata.stream_ordering > to_token.room_key.stream
and event.internal_metadata.stream_ordering
<= max_stream_ordering_from_room_list
):
last_membership_change_by_room_id_after_to_token[event.room_id] = event
else:
raise AssertionError(
"Membership event with stream_ordering=%s should fall in the given ranges above"
+ " (%d > x <= %d) or (%d > x <= %d).",
event.internal_metadata.stream_ordering,
from_token.room_key.stream,
to_token.room_key.stream,
to_token.room_key.stream,
max_stream_ordering_from_room_list,
)
# 1)
for event in last_membership_change_by_room_id_in_from_to_range.values():
# 1) Add back newly left rooms (> `from_token` and <= `to_token`). We
# include newly_left rooms because the last event that the user should see
# is their own leave event
if event.membership == Membership.LEAVE:
sync_room_id_set.add(event.room_id)
# 2)
for event in last_membership_change_by_room_id_after_to_token.values():
# 2a) Add back rooms that the user left after the `to_token`
if event.membership == Membership.LEAVE:
sync_room_id_set.add(event.room_id)
# 2b) Remove rooms that the user joined after the `to_token`
elif event.membership != Membership.LEAVE and (
# Make sure the user wasn't joined before the `to_token` at some point in time
last_membership_change_by_room_id_in_from_to_range.get(event.room_id)
is None
# Or at-least the last membership change in the from/to range was a leave event
or last_membership_change_by_room_id_in_from_to_range.get(
event.room_id
).membership
== Membership.LEAVE
):
sync_room_id_set.discard(event.room_id)
return sync_room_id_set

View file

@ -1938,7 +1938,7 @@ class SyncHandler:
""" """
user_id = sync_config.user.to_string() user_id = sync_config.user.to_string()
# Note: we get the users room list *before* we get the current token, this # Note: we get the users room list *before* we get the `now_token`, this
# avoids checking back in history if rooms are joined after the token is fetched. # avoids checking back in history if rooms are joined after the token is fetched.
token_before_rooms = self.event_sources.get_current_token() token_before_rooms = self.event_sources.get_current_token()
mutable_joined_room_ids = set(await self.store.get_rooms_for_user(user_id)) mutable_joined_room_ids = set(await self.store.get_rooms_for_user(user_id))
@ -1950,10 +1950,10 @@ class SyncHandler:
now_token = self.event_sources.get_current_token() now_token = self.event_sources.get_current_token()
log_kv({"now_token": now_token}) log_kv({"now_token": now_token})
# Since we fetched the users room list before the token, there's a small window # Since we fetched the users room list before calculating the `now_token` (see
# during which membership events may have been persisted, so we fetch these now # above), there's a small window during which membership events may have been
# and modify the joined room list for any changes between the get_rooms_for_user # persisted, so we fetch these now and modify the joined room list for any
# call and the get_current_token call. # changes between the get_rooms_for_user call and the get_current_token call.
membership_change_events = [] membership_change_events = []
if since_token: if since_token:
membership_change_events = await self.store.get_membership_changes_for_user( membership_change_events = await self.store.get_membership_changes_for_user(
@ -1963,16 +1963,17 @@ class SyncHandler:
self.rooms_to_exclude_globally, self.rooms_to_exclude_globally,
) )
mem_last_change_by_room_id: Dict[str, EventBase] = {} last_membership_change_by_room_id: Dict[str, EventBase] = {}
for event in membership_change_events: for event in membership_change_events:
mem_last_change_by_room_id[event.room_id] = event last_membership_change_by_room_id[event.room_id] = event
# For the latest membership event in each room found, add/remove the room ID # For the latest membership event in each room found, add/remove the room ID
# from the joined room list accordingly. In this case we only care if the # from the joined room list accordingly. In this case we only care if the
# latest change is JOIN. # latest change is JOIN.
for room_id, event in mem_last_change_by_room_id.items(): for room_id, event in last_membership_change_by_room_id.items():
assert event.internal_metadata.stream_ordering assert event.internal_metadata.stream_ordering
# Skip any events that TODO
if ( if (
event.internal_metadata.stream_ordering event.internal_metadata.stream_ordering
< token_before_rooms.room_key.stream < token_before_rooms.room_key.stream

View file

@ -18,7 +18,7 @@
# [This file includes modifications made by New Vector Limited] # [This file includes modifications made by New Vector Limited]
# #
# #
from typing import TYPE_CHECKING, Dict, Optional, List, Optional, Tuple, Union from typing import TYPE_CHECKING, Dict, List, Optional, Tuple, Union
from synapse._pydantic_compat import HAS_PYDANTIC_V2 from synapse._pydantic_compat import HAS_PYDANTIC_V2

View file

@ -23,13 +23,11 @@ import logging
import re import re
from collections import defaultdict from collections import defaultdict
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union
from typing_extensions import Annotated
from synapse.api.constants import AccountDataTypes, EduTypes, Membership, PresenceState from synapse.api.constants import AccountDataTypes, EduTypes, Membership, PresenceState
from synapse.api.errors import Codes, StoreError, SynapseError from synapse.api.errors import Codes, StoreError, SynapseError
from synapse.api.filtering import FilterCollection from synapse.api.filtering import FilterCollection
from synapse.api.presence import UserPresenceState from synapse.api.presence import UserPresenceState
from synapse.rest.client.models import SlidingSyncBody
from synapse.events.utils import ( from synapse.events.utils import (
SerializeEventConfig, SerializeEventConfig,
format_event_for_client_v2_without_room_id, format_event_for_client_v2_without_room_id,
@ -56,7 +54,7 @@ from synapse.http.servlet import (
) )
from synapse.http.site import SynapseRequest from synapse.http.site import SynapseRequest
from synapse.logging.opentracing import trace_with_opname from synapse.logging.opentracing import trace_with_opname
from synapse.rest.models import RequestBodyModel from synapse.rest.client.models import SlidingSyncBody
from synapse.types import JsonDict, Requester, StreamToken from synapse.types import JsonDict, Requester, StreamToken
from synapse.util import json_decoder from synapse.util import json_decoder
from synapse.util.caches.lrucache import LruCache from synapse.util.caches.lrucache import LruCache

View file

@ -109,9 +109,9 @@ from synapse.handlers.room_summary import RoomSummaryHandler
from synapse.handlers.search import SearchHandler from synapse.handlers.search import SearchHandler
from synapse.handlers.send_email import SendEmailHandler from synapse.handlers.send_email import SendEmailHandler
from synapse.handlers.set_password import SetPasswordHandler from synapse.handlers.set_password import SetPasswordHandler
from synapse.handlers.sliding_sync import SlidingSyncHandler
from synapse.handlers.sso import SsoHandler from synapse.handlers.sso import SsoHandler
from synapse.handlers.stats import StatsHandler from synapse.handlers.stats import StatsHandler
from synapse.handlers.sliding_sync import SlidingSyncHandler
from synapse.handlers.sync import SyncHandler from synapse.handlers.sync import SyncHandler
from synapse.handlers.typing import FollowerTypingHandler, TypingWriterHandler from synapse.handlers.typing import FollowerTypingHandler, TypingWriterHandler
from synapse.handlers.user_directory import UserDirectoryHandler from synapse.handlers.user_directory import UserDirectoryHandler