WIP: TODO comments after pairing with Erik

This commit is contained in:
Eric Eastwood 2024-06-03 14:28:45 -05:00
parent 49998e053e
commit 09609cb0db
3 changed files with 59 additions and 6 deletions

View file

@ -22,7 +22,8 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__)
# Everything except `Membership.LEAVE`
# Everything except `Membership.LEAVE` because we want everything that's *still*
# relevant to the user.
MEMBERSHIP_TO_DISPLAY_IN_SYNC = (
Membership.INVITE,
Membership.JOIN,
@ -305,13 +306,19 @@ class SlidingSyncHandler:
from_token: Optional[StreamToken] = None,
) -> AbstractSet[str]:
"""
Fetch room IDs that should be listed for this user in the sync response.
Fetch room IDs that should be listed for this user in the sync response (the
full room list that will be sliced, filtered, sorted).
We're looking for rooms that the user has not left (`invite`, `knock`, `join`,
and `ban`) or newly_left rooms that are > `from_token` and <= `to_token`.
"""
user_id = user.to_string()
# For a sync without from_token, all rooms except leave
# For incremental syncs with a from_token, we only need rooms that have changes
# (some event occured).
# First grab a current snapshot rooms for the user
room_for_user_list = await self.store.get_rooms_for_local_user_where_membership_is(
user_id=user_id,
@ -333,11 +340,16 @@ class SlidingSyncHandler:
sync_room_id_set = {
room_for_user.room_id
for room_for_user in room_for_user_list
# TODO: Include kicks (leave where sender is not the user itself)
if room_for_user.membership in MEMBERSHIP_TO_DISPLAY_IN_SYNC
}
# Find the stream_ordering of the latest room membership event which will mark
# the spot we queried up to.
#
# TODO: With the new `GetRoomsForUserWithStreamOrdering` info, make a instance
# map to stream ordering and construct the new room key from that map,
# `RoomStreamToken(stream=<min in that map>, instance_map=...)`
max_stream_ordering_from_room_list = max(
room_for_user.stream_ordering for room_for_user in room_for_user_list
)
@ -348,11 +360,31 @@ class SlidingSyncHandler:
if max_stream_ordering_from_room_list <= to_token.room_key.stream:
return sync_room_id_set
# ~~Of the membership events we pulled out, there still might be events that fail
# that conditional~~
#
# ~~We can get past the conditional above even though we might have fetched events~~
#
# Each event has an stream ID and instance. We can ask
#
# Multiple event_persisters
#
# For every event (GetRoomsForUserWithStreamOrdering) compare with
# `persisted_after` or add a new function to MultiWriterStreamToken to do the
# same thing.
# When you compare tokens, it could be any of these scenarios
# - Token A <= Token B (every stream pos is lower than the other token)
# - Token A >= Token B
# - It's indeterminate (intertwined, v_1_2, v2_1, both before/after each other)
# We assume the `from_token` is before or at-least equal to the `to_token`
assert (
from_token is None or from_token.room_key.stream <= to_token.room_key.stream
), f"{from_token.room_key.stream if from_token else None} <= {to_token.room_key.stream}"
# We need to `wait_for_stream_token`, when they provide a token
# We assume the `from_token`/`to_token` is before the `max_stream_ordering_from_room_list`
assert (
from_token is None
@ -369,10 +401,17 @@ class SlidingSyncHandler:
# - 1) Add back newly_left rooms (> `from_token` and <= `to_token`)
# - 2a) Remove rooms that the user joined after the `to_token`
# - 2b) Add back rooms that the user left after the `to_token`
#
# TODO: Split this into two separate lookups (from_token.room_key ->
# to_token.room_key) and (to_token.room_key -> RoomStreamToken(...)) to avoid
# needing to do raw stream comparison below since we don't have access to the
# `instance_name` that persisted that event. We could refactor
# `event.internal_metadata` to include it but it might turn out a little
# difficult and a bigger, broader Synapse change than we want to make.
membership_change_events = await self.store.get_membership_changes_for_user(
user_id,
# Start from the `from_token` if given, otherwise from the `to_token` so we
# can still do the 2) fixups.
# Start from the `from_token` if given for the 1) fixups, otherwise from the
# `to_token` so we can still do the 2) fixups.
from_key=from_token.room_key if from_token else to_token.room_key,
# Fetch up to our membership snapshot
to_key=RoomStreamToken(stream=max_stream_ordering_from_room_list),
@ -390,6 +429,7 @@ class SlidingSyncHandler:
for event in membership_change_events:
assert event.internal_metadata.stream_ordering
# TODO: Compare with instance_name/stream_ordering
if (
(
from_token is None
@ -457,6 +497,17 @@ class SlidingSyncHandler:
+ "This is probably a mistake in assembling the `last_membership_change_by_room_id_after_to_token`"
+ "/`first_membership_change_by_room_id_after_to_token` dicts above."
)
# TODO: Instead of reading from `unsigned`, refactor this to use the
# `current_state_delta_stream` table in the future. Probably a new
# `get_membership_changes_for_user()` function that uses
# `current_state_delta_stream` with a join to `room_memberships`. This would
# help in state reset scenarios since `prev_content` is looking at the
# current branch vs the current room state. This is all just data given to
# the client so no real harm to data integrity, but we'd like to be nice to
# the client. Since the `current_state_delta_stream` table is new, it
# doesn't have all events in it. Since this is Sliding Sync, if we ever need
# to, we can signal the client to throw all of their state away by sending
# "operation: RESET".
prev_content = first_membership_change_after_to_token.unsigned.get(
"prev_content", {}
)

View file

@ -1997,7 +1997,9 @@ class SyncHandler:
if since_token:
membership_change_events = await self.store.get_membership_changes_for_user(
user_id,
since_token.room_key,
# TODO: We should make this change,
# https://github.com/element-hq/synapse/pull/17187#discussion_r1617871321
token_before_rooms.room_key,
now_token.room_key,
self.rooms_to_exclude_globally,
)

View file

@ -35,7 +35,7 @@ class RoomsForUser:
sender: str
membership: str
event_id: str
stream_ordering: int
event_pos: PersistedEventPosition
room_version_id: str