This commit is contained in:
Eric Eastwood 2024-06-25 22:20:27 -05:00
parent 6c791a88b3
commit 27d74b023e
2 changed files with 68 additions and 80 deletions

View file

@ -443,22 +443,16 @@ class SlidingSyncHandler:
membership_change.room_id, membership_change
)
# 1) Fixup part 1
# 1) Fixup
#
# Since we fetched a snapshot of the users room list at some point in time after
# the from/to tokens, we need to revert/rewind some membership changes to match
# the point in time of the `to_token`.
prev_event_ids_in_from_to_range = []
prev_event_ids_in_from_to_range: List[str] = []
for (
room_id,
first_membership_change_after_to_token,
) in first_membership_change_by_room_id_after_to_token.items():
# One of these should exist to be a valid row in `current_state_delta_stream`
assert (
first_membership_change_after_to_token.event_id is not None
or first_membership_change_after_to_token.prev_event_id is not None
)
# 1a) Remove rooms that the user joined after the `to_token`
if first_membership_change_after_to_token.prev_event_id is None:
sync_room_id_set.pop(room_id, None)
@ -469,7 +463,7 @@ class SlidingSyncHandler:
first_membership_change_after_to_token.prev_event_id
)
# 1) Fixup part 2
# 1) Fixup (more)
#
# 1b) 1c) Fetch the previous membership events that apply to the from/to range
# and fixup our working list.
@ -522,18 +516,33 @@ class SlidingSyncHandler:
] = membership_change
# 2) Fixup
last_membership_event_ids_to_include_in_from_to_range: List[str] = []
for (
last_membership_change_in_from_to_range
) in last_membership_change_by_room_id_in_from_to_range.values():
room_id = last_membership_change_in_from_to_range.room_id
sync_room_id_set[room_id]
# 2) Add back newly_left rooms (> `from_token` and <= `to_token`). We
# include newly_left rooms because the last event that the user should see
# is their own leave event
if last_membership_change_in_from_to_range.membership == Membership.LEAVE:
filtered_sync_room_id_set[room_id] = convert_event_to_rooms_for_user(
last_membership_change_in_from_to_range
)
# Save the look-up if we already have the `leave` event
if sync_room_id_set[room_id].event_id == last_membership_change_in_from_to_range.prev_event_id::
filtered_sync_room_id_set[room_id] = sync_room_id_set[room_id]
else:
last_membership_event_ids_to_include_in_from_to_range.append(last_membership_change_in_from_to_range.event_id)
# TODO
# last_membership_events_to_include_in_from_to_range = await self.store.get_events(
# last_membership_event_ids_to_include_in_from_to_range
# )
# for prev_event_in_from_to_range in prev_events_in_from_to_range.values():
# # 1b) 1c) Update the membership with what we found
# sync_room_id_set[prev_event_in_from_to_range.room_id] = (
# convert_event_to_rooms_for_user(prev_event_in_from_to_range)
# )
# Since we fetched the users room list at some point in time after the from/to
# tokens, we need to revert/rewind some membership changes to match the point in

View file

@ -63,7 +63,7 @@ from typing_extensions import Literal
from twisted.internet import defer
from synapse.api.constants import Direction, EventTypes, Membership
from synapse.api.constants import Direction
from synapse.api.filtering import Filter
from synapse.events import EventBase
from synapse.logging.context import make_deferred_yieldable, run_in_background
@ -116,14 +116,13 @@ class _EventsAround:
class CurrentStateDeltaMembership:
"""
Attributes:
event_id: The "current" membership event ID in this room. May be `None` if the
server is no longer in the room or a state reset happened.
event_id: The "current" membership event ID in this room.
prev_event_id: The previous membership event in this room that was replaced by
the "current" one. May be `None` if there was no previous membership event.
room_id: The room ID of the membership event.
"""
event_id: Optional[str]
event_id: str
prev_event_id: Optional[str]
room_id: str
membership: str
@ -410,42 +409,6 @@ def _filter_results(
return True
def _filter_results_by_stream(
lower_token: Optional[RoomStreamToken],
upper_token: Optional[RoomStreamToken],
instance_name: str,
stream_ordering: int,
) -> bool:
"""
Note: This function only works with "live" tokens with `stream_ordering` only.
Returns True if the event persisted by the given instance at the given
topological/stream_ordering falls between the two tokens (taking a None
token to mean unbounded).
Used to filter results from fetching events in the DB against the given
tokens. This is necessary to handle the case where the tokens include
position maps, which we handle by fetching more than necessary from the DB
and then filtering (rather than attempting to construct a complicated SQL
query).
"""
if lower_token:
assert lower_token.topological is None
# If these are live tokens we compare the stream ordering against the
# writers stream position.
if stream_ordering <= lower_token.get_stream_pos_for_instance(instance_name):
return False
if upper_token:
assert upper_token.topological is None
if upper_token.get_stream_pos_for_instance(instance_name) < stream_ordering:
return False
return True
def filter_to_clause(event_filter: Optional[Filter]) -> Tuple[str, List[str]]:
# NB: This may create SQL clauses that don't optimise well (and we don't
# have indices on all possible clauses). E.g. it may create
@ -819,58 +782,74 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
min_from_id = from_key.stream
max_to_id = to_key.get_max_stream_pos()
args: List[Any] = [EventTypes.Member, user_id, min_from_id, max_to_id]
args: List[Any] = [user_id, min_from_id, max_to_id]
# TODO: It would be good to assert that the `to_token` is >=
# the first row in `current_state_delta_stream` for the rooms we're
# interested in. Otherwise, we will end up with empty results and not know
# it.
# Note: There is no index for `(type, state_key)` in
# `current_state_delta_stream`. We also can't just add an index for
# `event_id` and join the `room_memberships` table by `event_id` because it
# may be `null` in `current_state_delta_stream` so nothing will match (it's
# `null` when the server is no longer in the room or a state reset happened
# and it was unset).
# We have to look-up events by `stream_ordering` because
# `current_state_delta_stream.event_id` can be `null` if the server is no
# longer in the room or a state reset happened and it was unset.
# `stream_ordering` is unique across the Synapse instance so this should
# work fine.
sql = """
SELECT
s.event_id,
e.event_id,
s.prev_event_id,
s.room_id,
s.instance_name,
s.stream_id,
e.topological_ordering,
m.membership
FROM current_state_delta_stream AS s
WHERE s.type = ? AND s.state_key = ?
INNER JOIN events AS e ON e.stream_ordering = s.stream_id
INNER JOIN room_memberships AS m ON m.event_id = e.event_id
WHERE m.user_id = ?
AND s.stream_id > ? AND s.stream_id <= ?
ORDER BY s.stream_id ASC
"""
txn.execute(sql, args)
return [
CurrentStateDeltaMembership(
event_id=event_id,
prev_event_id=prev_event_id,
room_id=room_id,
# We can assume that the membership is `LEAVE` as a default. This
# will happen when `current_state_delta_stream.event_id` is null
# because it was unset due to a state reset or the server is no
# longer in the room (everyone on our local server left).
membership=membership if membership else Membership.LEAVE,
# event_pos=PersistedEventPosition(
# instance_name=instance_name,
# stream=stream_ordering,
# ),
)
for event_id, prev_event_id, room_id, instance_name, stream_ordering, membership in txn
if _filter_results_by_stream(
membership_changes: List[CurrentStateDeltaMembership] = []
for (
event_id,
prev_event_id,
room_id,
instance_name,
stream_ordering,
topological_ordering,
membership,
) in txn:
assert event_id is not None
# `prev_event_id` can be `None`
assert room_id is not None
assert instance_name is not None
assert stream_ordering is not None
assert topological_ordering is not None
assert membership is not None
if _filter_results(
from_key,
to_key,
instance_name,
topological_ordering,
stream_ordering,
)
]
):
membership_changes.append(
CurrentStateDeltaMembership(
event_id=event_id,
prev_event_id=prev_event_id,
room_id=room_id,
membership=membership,
# event_pos=PersistedEventPosition(
# instance_name=instance_name,
# stream=stream_ordering,
# ),
)
)
current_state_delta_membership_changes = await self.db_pool.runInteraction(
"get_current_state_delta_membership_changes_for_user", f