Just fetch full events for get_current_state_delta_membership_changes_for_user(...)

Makes downstream logic simpler and although we may look-up some events
we don't use, the lookup is all done in one go instead of fetching events
from event_ids in a couple different places.
This commit is contained in:
Eric Eastwood 2024-06-25 22:37:45 -05:00
parent 27d74b023e
commit fb8fbd489c
2 changed files with 51 additions and 206 deletions

View file

@ -448,34 +448,20 @@ class SlidingSyncHandler:
# Since we fetched a snapshot of the users room list at some point in time after
# the from/to tokens, we need to revert/rewind some membership changes to match
# the point in time of the `to_token`.
prev_event_ids_in_from_to_range: List[str] = []
for (
room_id,
first_membership_change_after_to_token,
) in first_membership_change_by_room_id_after_to_token.items():
# 1a) Remove rooms that the user joined after the `to_token`
if first_membership_change_after_to_token.prev_event_id is None:
if first_membership_change_after_to_token.prev_event is None:
sync_room_id_set.pop(room_id, None)
# 1b) 1c) From the first membership event after the `to_token`, step backward to the
# previous membership that would apply to the from/to range.
else:
prev_event_ids_in_from_to_range.append(
first_membership_change_after_to_token.prev_event_id
sync_room_id_set[room_id] = convert_event_to_rooms_for_user(
first_membership_change_after_to_token.prev_event
)
# 1) Fixup (more)
#
# 1b) 1c) Fetch the previous membership events that apply to the from/to range
# and fixup our working list.
prev_events_in_from_to_range = await self.store.get_events(
prev_event_ids_in_from_to_range
)
for prev_event_in_from_to_range in prev_events_in_from_to_range.values():
# 1b) 1c) Update the membership with what we found
sync_room_id_set[prev_event_in_from_to_range.room_id] = (
convert_event_to_rooms_for_user(prev_event_in_from_to_range)
)
filtered_sync_room_id_set = {
room_id: room_for_user
for room_id, room_for_user in sync_room_id_set.items()
@ -516,195 +502,18 @@ class SlidingSyncHandler:
] = membership_change
# 2) Fixup
last_membership_event_ids_to_include_in_from_to_range: List[str] = []
for (
last_membership_change_in_from_to_range
) in last_membership_change_by_room_id_in_from_to_range.values():
room_id = last_membership_change_in_from_to_range.room_id
sync_room_id_set[room_id]
# 2) Add back newly_left rooms (> `from_token` and <= `to_token`). We
# include newly_left rooms because the last event that the user should see
# is their own leave event
if last_membership_change_in_from_to_range.membership == Membership.LEAVE:
# Save the look-up if we already have the `leave` event
if sync_room_id_set[room_id].event_id == last_membership_change_in_from_to_range.prev_event_id::
filtered_sync_room_id_set[room_id] = sync_room_id_set[room_id]
else:
last_membership_event_ids_to_include_in_from_to_range.append(last_membership_change_in_from_to_range.event_id)
# TODO
# last_membership_events_to_include_in_from_to_range = await self.store.get_events(
# last_membership_event_ids_to_include_in_from_to_range
# )
# for prev_event_in_from_to_range in prev_events_in_from_to_range.values():
# # 1b) 1c) Update the membership with what we found
# sync_room_id_set[prev_event_in_from_to_range.room_id] = (
# convert_event_to_rooms_for_user(prev_event_in_from_to_range)
# )
# Since we fetched the users room list at some point in time after the from/to
# tokens, we need to revert/rewind some membership changes to match the point in
# time of the `to_token`. In particular, we need to make these fixups:
#
# - 1a) Remove rooms that the user joined after the `to_token`
# - 1b) Add back rooms that the user left after the `to_token`
# - 2) Add back newly_left rooms (> `from_token` and <= `to_token`)
# # 1) -----------------------------------------------------
# # 1) Fetch membership changes that fall in the range from `to_token` up to
# # `membership_snapshot_token`
# #
# # If our `to_token` is already the same or ahead of the latest room membership
# # for the user, we don't need to do any "2)" fix-ups and can just straight-up
# # use the room list from the snapshot as a base (nothing has changed)
# membership_change_events_after_to_token = []
# if not membership_snapshot_token.is_before_or_eq(to_token.room_key):
# membership_change_events_after_to_token = (
# await self.store.get_membership_changes_for_user(
# user_id,
# from_key=to_token.room_key,
# to_key=membership_snapshot_token,
# excluded_rooms=self.rooms_to_exclude_globally,
# )
# )
# # 1) Assemble a list of the last membership events in some given ranges. Someone
# # could have left and joined multiple times during the given range but we only
# # care about end-result so we grab the last one.
# last_membership_change_by_room_id_after_to_token: Dict[str, EventBase] = {}
# # We also need the first membership event after the `to_token` so we can step
# # backward to the previous membership that would apply to the from/to range.
# first_membership_change_by_room_id_after_to_token: Dict[str, EventBase] = {}
# for event in membership_change_events_after_to_token:
# last_membership_change_by_room_id_after_to_token[event.room_id] = event
# # Only set if we haven't already set it
# first_membership_change_by_room_id_after_to_token.setdefault(
# event.room_id, event
# )
# # 1) Fixup
# for (
# last_membership_change_after_to_token
# ) in last_membership_change_by_room_id_after_to_token.values():
# room_id = last_membership_change_after_to_token.room_id
# # We want to find the first membership change after the `to_token` then step
# # backward to know the membership in the from/to range.
# first_membership_change_after_to_token = (
# first_membership_change_by_room_id_after_to_token.get(room_id)
# )
# assert first_membership_change_after_to_token is not None, (
# "If there was a `last_membership_change_after_to_token` that we're iterating over, "
# + "then there should be corresponding a first change. For example, even if there "
# + "is only one event after the `to_token`, the first and last event will be same event. "
# + "This is probably a mistake in assembling the `last_membership_change_by_room_id_after_to_token`"
# + "/`first_membership_change_by_room_id_after_to_token` dicts above."
# )
# # TODO: Instead of reading from `unsigned`, refactor this to use the
# # `current_state_delta_stream` table in the future. Probably a new
# # `get_membership_changes_for_user()` function that uses
# # `current_state_delta_stream` with a join to `room_memberships`. This would
# # help in state reset scenarios since `prev_content` is looking at the
# # current branch vs the current room state. This is all just data given to
# # the client so no real harm to data integrity, but we'd like to be nice to
# # the client. Since the `current_state_delta_stream` table is new, it
# # doesn't have all events in it. Since this is Sliding Sync, if we ever need
# # to, we can signal the client to throw all of their state away by sending
# # "operation: RESET".
# prev_content = first_membership_change_after_to_token.unsigned.get(
# "prev_content", {}
# )
# prev_membership = prev_content.get("membership", None)
# prev_sender = first_membership_change_after_to_token.unsigned.get(
# "prev_sender", None
# )
# # Check if the previous membership (membership that applies to the from/to
# # range) should be included in our `sync_room_id_set`
# should_prev_membership_be_included = (
# prev_membership is not None
# and prev_sender is not None
# and filter_membership_for_sync(
# membership=prev_membership,
# user_id=user_id,
# sender=prev_sender,
# )
# )
# # Check if the last membership (membership that applies to our snapshot) was
# # already included in our `sync_room_id_set`
# was_last_membership_already_included = filter_membership_for_sync(
# membership=last_membership_change_after_to_token.membership,
# user_id=user_id,
# sender=last_membership_change_after_to_token.sender,
# )
# # 1a) Add back rooms that the user left after the `to_token`
# #
# # For example, if the last membership event after the `to_token` is a leave
# # event, then the room was excluded from `sync_room_id_set` when we first
# # crafted it above. We should add these rooms back as long as the user also
# # was part of the room before the `to_token`.
# if (
# not was_last_membership_already_included
# and should_prev_membership_be_included
# ):
# # TODO: Assign the correct membership event at the `to_token` here
# # (currently we're setting it as the last event after the `to_token`)
# sync_room_id_set[room_id] = convert_event_to_rooms_for_user(
# last_membership_change_after_to_token
# )
# # 1b) Remove rooms that the user joined (hasn't left) after the `to_token`
# #
# # For example, if the last membership event after the `to_token` is a "join"
# # event, then the room was included `sync_room_id_set` when we first crafted
# # it above. We should remove these rooms as long as the user also wasn't
# # part of the room before the `to_token`.
# elif (
# was_last_membership_already_included
# and not should_prev_membership_be_included
# ):
# del sync_room_id_set[room_id]
# # 2) -----------------------------------------------------
# # We fix-up newly_left rooms after the first fixup because it may have removed
# # some left rooms that we can figure out are newly_left in the following code
# # 2) Fetch membership changes that fall in the range from `from_token` up to `to_token`
# membership_change_events_in_from_to_range = []
# if from_token:
# membership_change_events_in_from_to_range = (
# await self.store.get_membership_changes_for_user(
# user_id,
# from_key=from_token.room_key,
# to_key=to_token.room_key,
# excluded_rooms=self.rooms_to_exclude_globally,
# )
# )
# # 2) Assemble a list of the last membership events in some given ranges. Someone
# # could have left and joined multiple times during the given range but we only
# # care about end-result so we grab the last one.
# last_membership_change_by_room_id_in_from_to_range: Dict[str, EventBase] = {}
# for event in membership_change_events_in_from_to_range:
# last_membership_change_by_room_id_in_from_to_range[event.room_id] = event
# # 2) Fixup
# for (
# last_membership_change_in_from_to_range
# ) in last_membership_change_by_room_id_in_from_to_range.values():
# room_id = last_membership_change_in_from_to_range.room_id
# # 2) Add back newly_left rooms (> `from_token` and <= `to_token`). We
# # include newly_left rooms because the last event that the user should see
# # is their own leave event
# if last_membership_change_in_from_to_range.membership == Membership.LEAVE:
# sync_room_id_set[room_id] = convert_event_to_rooms_for_user(
# last_membership_change_in_from_to_range
# )
filtered_sync_room_id_set[room_id] = convert_event_to_rooms_for_user(
last_membership_change_in_from_to_range.event
)
return filtered_sync_room_id_set

View file

@ -113,21 +113,37 @@ class _EventsAround:
@attr.s(slots=True, frozen=True, auto_attribs=True)
class CurrentStateDeltaMembership:
class _CurrentStateDeltaMembershipReturn:
"""
Attributes:
event_id: The "current" membership event ID in this room.
prev_event_id: The previous membership event in this room that was replaced by
the "current" one. May be `None` if there was no previous membership event.
room_id: The room ID of the membership event.
membership: The membership state of the user in the room.
"""
event_id: str
prev_event_id: Optional[str]
room_id: str
membership: str
# Could be useful but we're not using it yet.
# event_pos: PersistedEventPosition
@attr.s(slots=True, frozen=True, auto_attribs=True)
class CurrentStateDeltaMembership:
"""
Attributes:
event: The "current" membership event in this room.
prev_event: The previous membership event in this room that was replaced by
the "current" one. May be `None` if there was no previous membership event.
room_id: The room ID of the membership event.
membership: The membership state of the user in the room.
"""
event: EventBase
prev_event: Optional[EventBase]
room_id: str
membership: str
def generate_pagination_where_clause(
@ -776,7 +792,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
if not has_changed:
return []
def f(txn: LoggingTransaction) -> List[CurrentStateDeltaMembership]:
def f(txn: LoggingTransaction) -> List[_CurrentStateDeltaMembershipReturn]:
# To handle tokens with a non-empty instance_map we fetch more
# results than necessary and then filter down
min_from_id = from_key.stream
@ -813,7 +829,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
txn.execute(sql, args)
membership_changes: List[CurrentStateDeltaMembership] = []
membership_changes: List[_CurrentStateDeltaMembershipReturn] = []
for (
event_id,
prev_event_id,
@ -839,7 +855,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
stream_ordering,
):
membership_changes.append(
CurrentStateDeltaMembership(
_CurrentStateDeltaMembershipReturn(
event_id=event_id,
prev_event_id=prev_event_id,
room_id=room_id,
@ -851,17 +867,37 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
)
)
current_state_delta_membership_changes = await self.db_pool.runInteraction(
return membership_changes
membership_changes = await self.db_pool.runInteraction(
"get_current_state_delta_membership_changes_for_user", f
)
# Fetch all events in one go
event_ids = []
for m in membership_changes:
event_ids.append(m.event_id)
if m.prev_event_id is not None:
event_ids.append(m.prev_event_id)
events = await self.get_events(event_ids, get_prev_content=False)
rooms_to_exclude: AbstractSet[str] = set()
if excluded_rooms is not None:
rooms_to_exclude = set(excluded_rooms)
return [
membership_change
for membership_change in current_state_delta_membership_changes
CurrentStateDeltaMembership(
event=events[membership_change.event_id],
prev_event=(
events[membership_change.prev_event_id]
if membership_change.prev_event_id
else None
),
room_id=membership_change.room_id,
membership=membership_change.membership,
)
for membership_change in membership_changes
if membership_change.room_id not in rooms_to_exclude
]