WIP: Add back newly_left

This commit is contained in:
Eric Eastwood 2024-06-25 21:07:19 -05:00
parent fbd92e1c9d
commit 6c791a88b3
2 changed files with 101 additions and 30 deletions

View file

@ -375,11 +375,6 @@ class SlidingSyncHandler:
# instead from the time of the `to_token`. # instead from the time of the `to_token`.
room_for_user.room_id: room_for_user room_for_user.room_id: room_for_user
for room_for_user in room_for_user_list for room_for_user in room_for_user_list
if filter_membership_for_sync(
membership=room_for_user.membership,
user_id=user_id,
sender=room_for_user.sender,
)
} }
# Get the `RoomStreamToken` that represents the spot we queried up to when we got # Get the `RoomStreamToken` that represents the spot we queried up to when we got
@ -408,6 +403,23 @@ class SlidingSyncHandler:
instance_map=immutabledict(instance_to_max_stream_ordering_map), instance_map=immutabledict(instance_to_max_stream_ordering_map),
) )
# Since we fetched the users room list at some point in time after the from/to
# tokens, we need to revert/rewind some membership changes to match the point in
# time of the `to_token`. In particular, we need to make these fixups:
#
# - 1a) Remove rooms that the user joined after the `to_token`
# - 1b) Add back rooms that the user left after the `to_token`
# - 1c) Update room membership events to the point in time of the `to_token`
# - 2) Add back newly_left rooms (> `from_token` and <= `to_token`)
# 1) -----------------------------------------------------
# 1) Fetch membership changes that fall in the range from `to_token` up to
# `membership_snapshot_token`
#
# If our `to_token` is already the same or ahead of the latest room membership
# for the user, we don't need to do any "2)" fix-ups and can just straight-up
# use the room list from the snapshot as a base (nothing has changed)
current_state_delta_membership_changes_after_to_token = [] current_state_delta_membership_changes_after_to_token = []
if not membership_snapshot_token.is_before_or_eq(to_token.room_key): if not membership_snapshot_token.is_before_or_eq(to_token.room_key):
current_state_delta_membership_changes_after_to_token = ( current_state_delta_membership_changes_after_to_token = (
@ -419,8 +431,9 @@ class SlidingSyncHandler:
) )
) )
# We need the first membership event after the `to_token` so we can step # 1) Assemble a list of the first membership event after the `to_token` so we can
# backward to the previous membership that would apply to the from/to range. # step backward to the previous membership that would apply to the from/to
# range.
first_membership_change_by_room_id_after_to_token: Dict[ first_membership_change_by_room_id_after_to_token: Dict[
str, CurrentStateDeltaMembership str, CurrentStateDeltaMembership
] = {} ] = {}
@ -430,6 +443,8 @@ class SlidingSyncHandler:
membership_change.room_id, membership_change membership_change.room_id, membership_change
) )
# 1) Fixup part 1
#
# Since we fetched a snapshot of the users room list at some point in time after # Since we fetched a snapshot of the users room list at some point in time after
# the from/to tokens, we need to revert/rewind some membership changes to match # the from/to tokens, we need to revert/rewind some membership changes to match
# the point in time of the `to_token`. # the point in time of the `to_token`.
@ -444,37 +459,81 @@ class SlidingSyncHandler:
or first_membership_change_after_to_token.prev_event_id is not None or first_membership_change_after_to_token.prev_event_id is not None
) )
# If the membership change was added after the `to_token`, we need to remove # 1a) Remove rooms that the user joined after the `to_token`
# it
if first_membership_change_after_to_token.prev_event_id is None: if first_membership_change_after_to_token.prev_event_id is None:
sync_room_id_set.pop(room_id, None) sync_room_id_set.pop(room_id, None)
# From the first membership event after the `to_token`, we need to step # 1b) 1c) From the first membership event after the `to_token`, step backward to the
# backward to the previous membership that would apply to the from/to range. # previous membership that would apply to the from/to range.
else: else:
prev_event_ids_in_from_to_range.append( prev_event_ids_in_from_to_range.append(
first_membership_change_after_to_token.prev_event_id first_membership_change_after_to_token.prev_event_id
) )
# Fetch the previous membership events that apply to the from/to range and fixup # 1) Fixup part 2
# our working list. #
# 1b) 1c) Fetch the previous membership events that apply to the from/to range
# and fixup our working list.
prev_events_in_from_to_range = await self.store.get_events( prev_events_in_from_to_range = await self.store.get_events(
prev_event_ids_in_from_to_range prev_event_ids_in_from_to_range
) )
for prev_event_in_from_to_range in prev_events_in_from_to_range.values(): for prev_event_in_from_to_range in prev_events_in_from_to_range.values():
# Update if the membership should be included # 1b) 1c) Update the membership with what we found
if filter_membership_for_sync( sync_room_id_set[prev_event_in_from_to_range.room_id] = (
membership=prev_event_in_from_to_range.membership, convert_event_to_rooms_for_user(prev_event_in_from_to_range)
user_id=user_id, )
sender=prev_event_in_from_to_range.sender,
):
sync_room_id_set[prev_event_in_from_to_range.room_id] = (
convert_event_to_rooms_for_user(prev_event_in_from_to_range)
)
# Otherwise, remove it
else:
sync_room_id_set.pop(prev_event_in_from_to_range.room_id, None)
# TODO: Add back newly_left rooms filtered_sync_room_id_set = {
room_id: room_for_user
for room_id, room_for_user in sync_room_id_set.items()
if filter_membership_for_sync(
membership=room_for_user.membership,
user_id=user_id,
sender=room_for_user.sender,
)
}
# 2) -----------------------------------------------------
# We fix-up newly_left rooms after the first fixup because it may have removed
# some left rooms that we can figure out are newly_left in the following code
# 2) Fetch membership changes that fall in the range from `from_token` up to `to_token`
current_state_delta_membership_changes_in_from_to_range = []
if from_token:
current_state_delta_membership_changes_in_from_to_range = (
await self.store.get_current_state_delta_membership_changes_for_user(
user_id,
from_key=from_token.room_key,
to_key=to_token.room_key,
excluded_rooms=self.rooms_to_exclude_globally,
)
)
# 2) Assemble a list of the last membership events in some given ranges. Someone
# could have left and joined multiple times during the given range but we only
# care about end-result so we grab the last one.
last_membership_change_by_room_id_in_from_to_range: Dict[
str, CurrentStateDeltaMembership
] = {}
for (
membership_change
) in current_state_delta_membership_changes_in_from_to_range:
last_membership_change_by_room_id_in_from_to_range[
membership_change.room_id
] = membership_change
# 2) Fixup
for (
last_membership_change_in_from_to_range
) in last_membership_change_by_room_id_in_from_to_range.values():
room_id = last_membership_change_in_from_to_range.room_id
# 2) Add back newly_left rooms (> `from_token` and <= `to_token`). We
# include newly_left rooms because the last event that the user should see
# is their own leave event
if last_membership_change_in_from_to_range.membership == Membership.LEAVE:
filtered_sync_room_id_set[room_id] = convert_event_to_rooms_for_user(
last_membership_change_in_from_to_range
)
# Since we fetched the users room list at some point in time after the from/to # Since we fetched the users room list at some point in time after the from/to
# tokens, we need to revert/rewind some membership changes to match the point in # tokens, we need to revert/rewind some membership changes to match the point in
@ -638,7 +697,7 @@ class SlidingSyncHandler:
# last_membership_change_in_from_to_range # last_membership_change_in_from_to_range
# ) # )
return sync_room_id_set return filtered_sync_room_id_set
async def filter_rooms( async def filter_rooms(
self, self,

View file

@ -63,7 +63,7 @@ from typing_extensions import Literal
from twisted.internet import defer from twisted.internet import defer
from synapse.api.constants import Direction, EventTypes from synapse.api.constants import Direction, EventTypes, Membership
from synapse.api.filtering import Filter from synapse.api.filtering import Filter
from synapse.events import EventBase from synapse.events import EventBase
from synapse.logging.context import make_deferred_yieldable, run_in_background from synapse.logging.context import make_deferred_yieldable, run_in_background
@ -126,6 +126,7 @@ class CurrentStateDeltaMembership:
event_id: Optional[str] event_id: Optional[str]
prev_event_id: Optional[str] prev_event_id: Optional[str]
room_id: str room_id: str
membership: str
# Could be useful but we're not using it yet. # Could be useful but we're not using it yet.
# event_pos: PersistedEventPosition # event_pos: PersistedEventPosition
@ -832,7 +833,13 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
# `null` when the server is no longer in the room or a state reset happened # `null` when the server is no longer in the room or a state reset happened
# and it was unset). # and it was unset).
sql = """ sql = """
SELECT s.event_id, s.prev_event_id, s.room_id, s.instance_name, s.stream_id SELECT
s.event_id,
s.prev_event_id,
s.room_id,
s.instance_name,
s.stream_id,
m.membership
FROM current_state_delta_stream AS s FROM current_state_delta_stream AS s
WHERE s.type = ? AND s.state_key = ? WHERE s.type = ? AND s.state_key = ?
AND s.stream_id > ? AND s.stream_id <= ? AND s.stream_id > ? AND s.stream_id <= ?
@ -846,12 +853,17 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
event_id=event_id, event_id=event_id,
prev_event_id=prev_event_id, prev_event_id=prev_event_id,
room_id=room_id, room_id=room_id,
# We can assume that the membership is `LEAVE` as a default. This
# will happen when `current_state_delta_stream.event_id` is null
# because it was unset due to a state reset or the server is no
# longer in the room (everyone on our local server left).
membership=membership if membership else Membership.LEAVE,
# event_pos=PersistedEventPosition( # event_pos=PersistedEventPosition(
# instance_name=instance_name, # instance_name=instance_name,
# stream=stream_ordering, # stream=stream_ordering,
# ), # ),
) )
for event_id, prev_event_id, room_id, instance_name, stream_ordering in txn for event_id, prev_event_id, room_id, instance_name, stream_ordering, membership in txn
if _filter_results_by_stream( if _filter_results_by_stream(
from_key, from_key,
to_key, to_key,