Properly compare tokens and event positions

Avoid flawed raw stream comparison.
This commit is contained in:
Eric Eastwood 2024-06-03 20:09:35 -05:00
parent 9c6ec25aca
commit 1268a5413f

View file

@ -172,7 +172,7 @@ class SlidingSyncResult:
next_pos: StreamToken
lists: Dict[str, SlidingWindowList]
rooms: List[RoomResult]
rooms: Dict[str, RoomResult]
extensions: JsonMapping
def __bool__(self) -> bool:
@ -182,10 +182,21 @@ class SlidingSyncResult:
"""
return bool(self.lists or self.rooms or self.extensions)
@staticmethod
def empty(next_pos: StreamToken) -> "SlidingSyncResult":
"Return a new empty result"
return SlidingSyncResult(
next_pos=next_pos,
lists={},
rooms={},
extensions={},
)
class SlidingSyncHandler:
def __init__(self, hs: "HomeServer"):
self.hs_config = hs.config
self.clock = hs.get_clock()
self.store = hs.get_datastores().main
self.auth_blocking = hs.get_auth_blocking()
self.notifier = hs.get_notifier()
@ -197,7 +208,7 @@ class SlidingSyncHandler:
requester: Requester,
sync_config: SlidingSyncConfig,
from_token: Optional[StreamToken] = None,
timeout: int = 0,
timeout_ms: int = 0,
) -> SlidingSyncResult:
"""Get the sync for a client if we have new data for it now. Otherwise
wait for new data to arrive on the server. If the timeout expires, then
@ -212,7 +223,30 @@ class SlidingSyncHandler:
# any to-device messages before that token (since we now know that the device
# has received them). (see sync v2 for how to do this)
if timeout == 0 or from_token is None:
# If the we're working with a user-provided token, we need to make sure to wait
# for this worker to catch up with the token.
if from_token is not None:
# We need to make sure this worker has caught up with the token. If
# this returns false, it means we timed out waiting, and we should
# just return an empty response.
before_wait_ts = self.clock.time_msec()
if not await self.notifier.wait_for_stream_token(from_token):
logger.warning(
"Timed out waiting for worker to catch up. Returning empty response"
)
return SlidingSyncResult.empty(from_token)
# If we've spent significant time waiting to catch up, take it off
# the timeout.
after_wait_ts = self.clock.time_msec()
if after_wait_ts - before_wait_ts > 1_000:
timeout_ms -= after_wait_ts - before_wait_ts
timeout_ms = max(timeout_ms, 0)
# We're going to respond immediately if the timeout is 0 or if this is an
# initial sync (without a `from_token`) so we can avoid calling
# `notifier.wait_for_events()`.
if timeout_ms == 0 or from_token is None:
now_token = self.event_sources.get_current_token()
result = await self.current_sync_for_user(
sync_config,
@ -232,7 +266,7 @@ class SlidingSyncHandler:
result = await self.notifier.wait_for_events(
sync_config.user.to_string(),
timeout,
timeout_ms,
current_sync_callback,
from_token=from_token,
)
@ -296,7 +330,7 @@ class SlidingSyncHandler:
next_pos=to_token,
lists=lists,
# TODO: Gather room data for rooms in lists and `sync_config.room_subscriptions`
rooms=[],
rooms={},
extensions={},
)
@ -345,7 +379,7 @@ class SlidingSyncHandler:
#
# First we need to get the max stream_ordering of each event persister instance
# that we queried events from.
instance_to_max_stream_ordering_map = {}
instance_to_max_stream_ordering_map: Dict[str, int] = {}
for room_for_user in room_for_user_list:
instance_name = room_for_user.event_pos.instance_name
stream_ordering = room_for_user.event_pos.stream
@ -368,53 +402,24 @@ class SlidingSyncHandler:
instance_map=immutabledict(instance_to_max_stream_ordering_map),
)
#
# TODO: With the new `RoomsForUser.event_pos` info, make a instance
# map to stream ordering and construct the new room key from that map,
# `RoomStreamToken(stream=<min in that map>, instance_map=...)`
max_stream_ordering_from_room_list = max(
room_for_user.event_pos.stream for room_for_user in room_for_user_list
)
# If our `to_token` is already the same or ahead of the latest room membership
# for the user, we can just straight-up return the room list (nothing has
# changed)
if max_stream_ordering_from_room_list <= to_token.room_key.stream:
if membership_snapshot_token.is_before_or_eq(to_token.room_key):
return sync_room_id_set
# ~~Of the membership events we pulled out, there still might be events that fail
# that conditional~~
#
# ~~We can get past the conditional above even though we might have fetched events~~
#
# Each event has an stream ID and instance. We can ask
#
# Multiple event_persisters
#
# For every event (GetRoomsForUserWithStreamOrdering) compare with
# `persisted_after` or add a new function to MultiWriterStreamToken to do the
# same thing.
# When you compare tokens, it could be any of these scenarios
# - Token A <= Token B (every stream pos is lower than the other token)
# - Token A >= Token B
# - It's indeterminate (intertwined, v_1_2, v2_1, both before/after each other)
# We assume the `from_token` is before or at-least equal to the `to_token`
assert (
from_token is None or from_token.room_key.stream <= to_token.room_key.stream
), f"{from_token.room_key.stream if from_token else None} <= {to_token.room_key.stream}"
# We need to `wait_for_stream_token`, when they provide a token
assert from_token is None or from_token.room_key.is_before_or_eq(
to_token.room_key
), f"{from_token.room_key if from_token else None} < {to_token.room_key}"
# We assume the `from_token`/`to_token` is before the `max_stream_ordering_from_room_list`
assert (
from_token is None
or from_token.room_key.stream < max_stream_ordering_from_room_list
), f"{from_token.room_key.stream if from_token else None} < {max_stream_ordering_from_room_list}"
assert (
to_token.room_key.stream < max_stream_ordering_from_room_list
), f"{to_token.room_key.stream} < {max_stream_ordering_from_room_list}"
assert from_token is None or from_token.room_key.is_before_or_eq(
membership_snapshot_token
), f"{from_token.room_key if from_token else None} < {membership_snapshot_token}"
assert to_token.room_key.is_before_or_eq(
membership_snapshot_token
), f"{to_token.room_key} < {membership_snapshot_token}"
# Since we fetched the users room list at some point in time after the from/to
# tokens, we need to revert/rewind some membership changes to match the point in
@ -424,72 +429,38 @@ class SlidingSyncHandler:
# - 2a) Remove rooms that the user joined after the `to_token`
# - 2b) Add back rooms that the user left after the `to_token`
#
# TODO: Split this into two separate lookups (from_token.room_key ->
# to_token.room_key) and (to_token.room_key -> RoomStreamToken(...)) to avoid
# needing to do raw stream comparison below since we don't have access to the
# `instance_name` that persisted that event. We could refactor
# `event.internal_metadata` to include it but it might turn out a little
# difficult and a bigger, broader Synapse change than we want to make.
membership_change_events = await self.store.get_membership_changes_for_user(
user_id,
# Start from the `from_token` if given for the 1) fixups, otherwise from the
# `to_token` so we can still do the 2) fixups.
from_key=from_token.room_key if from_token else to_token.room_key,
# Fetch up to our membership snapshot
to_key=membership_snapshot_token,
excluded_rooms=self.rooms_to_exclude_globally,
)
# We're doing two separate lookups for membership changes using the
# `RoomStreamToken`'s. We could request everything in one range,
# [`from_token.room_key`, `membership_snapshot_token`), but then we want to avoid
# raw stream comparison (which is flawed) in order to bucket events since we
# don't have access to the `instance_name` that persisted that event in
# `event.internal_metadata`. We could refactor `event.internal_metadata` to
# include `instance_name` but it might turn out a little difficult and a bigger,
# broader Synapse change than we want to make.
# Assemble a list of the last membership events in some given ranges. Someone
# 1) -----------------------------------------------------
# 1) Fetch membership changes that fall in the range of `from_token` and `to_token`
membership_change_events_in_from_to_range = []
if from_token:
membership_change_events_in_from_to_range = (
await self.store.get_membership_changes_for_user(
user_id,
from_key=from_token.room_key,
to_key=to_token.room_key,
excluded_rooms=self.rooms_to_exclude_globally,
)
)
# 1) Assemble a list of the last membership events in some given ranges. Someone
# could have left and joined multiple times during the given range but we only
# care about end-result so we grab the last one.
last_membership_change_by_room_id_in_from_to_range: Dict[str, EventBase] = {}
last_membership_change_by_room_id_after_to_token: Dict[str, EventBase] = {}
# We also need the first membership event after the `to_token` so we can step
# backward to the previous membership that would apply to the from/to range.
first_membership_change_by_room_id_after_to_token: Dict[str, EventBase] = {}
for event in membership_change_events:
for event in membership_change_events_in_from_to_range:
assert event.internal_metadata.stream_ordering
last_membership_change_by_room_id_in_from_to_range[event.room_id] = event
# TODO: Compare with instance_name/stream_ordering
if (
(
from_token is None
or event.internal_metadata.stream_ordering
> from_token.room_key.stream
)
and event.internal_metadata.stream_ordering <= to_token.room_key.stream
):
last_membership_change_by_room_id_in_from_to_range[event.room_id] = (
event
)
elif (
event.internal_metadata.stream_ordering > to_token.room_key.stream
and event.internal_metadata.stream_ordering
<= max_stream_ordering_from_room_list
):
last_membership_change_by_room_id_after_to_token[event.room_id] = event
# Only set if we haven't already set it
first_membership_change_by_room_id_after_to_token.setdefault(
event.room_id, event
)
else:
# We don't expect this to happen since we should only be fetching
# `membership_change_events` that fall in the given ranges above. It
# doesn't hurt anything to ignore an event we don't need but may
# indicate a bug in the logic above.
raise AssertionError(
"Membership event with stream_ordering=%s should fall in the given ranges above"
+ " (%d > x <= %d) or (%d > x <= %d). We shouldn't be fetching extra membership"
+ " events that aren't used.",
event.internal_metadata.stream_ordering,
from_token.room_key.stream if from_token else None,
to_token.room_key.stream,
to_token.room_key.stream,
max_stream_ordering_from_room_list,
)
# 1)
# 1) Fixup
for (
last_membership_change_in_from_to_range
) in last_membership_change_by_room_id_in_from_to_range.values():
@ -501,7 +472,36 @@ class SlidingSyncHandler:
if last_membership_change_in_from_to_range.membership == Membership.LEAVE:
sync_room_id_set.add(room_id)
# 2)
# 2) -----------------------------------------------------
# 2) Fetch membership changes that fall in the range from `to_token` up to
# `membership_snapshot_token`
membership_change_events_after_to_token = (
await self.store.get_membership_changes_for_user(
user_id,
from_key=to_token.room_key,
to_key=membership_snapshot_token,
excluded_rooms=self.rooms_to_exclude_globally,
)
)
# 2) Assemble a list of the last membership events in some given ranges. Someone
# could have left and joined multiple times during the given range but we only
# care about end-result so we grab the last one.
last_membership_change_by_room_id_after_to_token: Dict[str, EventBase] = {}
# We also need the first membership event after the `to_token` so we can step
# backward to the previous membership that would apply to the from/to range.
first_membership_change_by_room_id_after_to_token: Dict[str, EventBase] = {}
for event in membership_change_events_after_to_token:
assert event.internal_metadata.stream_ordering
last_membership_change_by_room_id_after_to_token[event.room_id] = event
# Only set if we haven't already set it
first_membership_change_by_room_id_after_to_token.setdefault(
event.room_id, event
)
# 2) Fixup
for (
last_membership_change_after_to_token
) in last_membership_change_by_room_id_after_to_token.values():