Create membership_snapshot_token with instance_map

This commit is contained in:
Eric Eastwood 2024-06-03 19:23:31 -05:00
parent 8dca8f548c
commit 9c6ec25aca

View file

@ -3,6 +3,7 @@ from enum import Enum
from typing import TYPE_CHECKING, AbstractSet, Dict, Final, List, Optional, Tuple
import attr
from immutabledict import immutabledict
from synapse._pydantic_compat import HAS_PYDANTIC_V2
@ -314,11 +315,6 @@ class SlidingSyncHandler:
"""
user_id = user.to_string()
# For a sync without from_token, all rooms except leave
# For incremental syncs with a from_token, we only need rooms that have changes
# (some event occured).
# First grab a current snapshot rooms for the user
room_for_user_list = await self.store.get_rooms_for_local_user_where_membership_is(
user_id=user_id,
@ -344,14 +340,40 @@ class SlidingSyncHandler:
if room_for_user.membership in MEMBERSHIP_TO_DISPLAY_IN_SYNC
}
# Find the stream_ordering of the latest room membership event which will mark
# the spot we queried up to.
# Get the `RoomStreamToken` that represents the spot we queried up to when we got
# our membership snapshot from `get_rooms_for_local_user_where_membership_is()`.
#
# First we need to get the max stream_ordering of each event persister instance
# that we queried events from.
instance_to_max_stream_ordering_map = {}
for room_for_user in room_for_user_list:
instance_name = room_for_user.event_pos.instance_name
stream_ordering = room_for_user.event_pos.stream
current_instance_max_stream_ordering = (
instance_to_max_stream_ordering_map.get(instance_name)
)
if (
current_instance_max_stream_ordering is None
or stream_ordering > current_instance_max_stream_ordering
):
instance_to_max_stream_ordering_map[instance_name] = stream_ordering
# Then assemble the `RoomStreamToken`
membership_snapshot_token = RoomStreamToken(
stream=min(
stream_ordering
for stream_ordering in instance_to_max_stream_ordering_map.values()
),
instance_map=immutabledict(instance_to_max_stream_ordering_map),
)
#
# TODO: With the new `RoomsForUser.event_pos` info, make a instance
# map to stream ordering and construct the new room key from that map,
# `RoomStreamToken(stream=<min in that map>, instance_map=...)`
max_stream_ordering_from_room_list = max(
room_for_user.stream_ordering for room_for_user in room_for_user_list
room_for_user.event_pos.stream for room_for_user in room_for_user_list
)
# If our `to_token` is already the same or ahead of the latest room membership
@ -414,7 +436,7 @@ class SlidingSyncHandler:
# `to_token` so we can still do the 2) fixups.
from_key=from_token.room_key if from_token else to_token.room_key,
# Fetch up to our membership snapshot
to_key=RoomStreamToken(stream=max_stream_ordering_from_room_list),
to_key=membership_snapshot_token,
excluded_rooms=self.rooms_to_exclude_globally,
)