Add get_current_state_delta_membership_changes_for_user(...) (using current_state_delta_stream)

(still need to add newly_left rooms back)
This commit is contained in:
Eric Eastwood 2024-06-25 20:16:50 -05:00
parent 83d6f76606
commit fbd92e1c9d
3 changed files with 426 additions and 145 deletions

View file

@ -27,6 +27,7 @@ from synapse.api.constants import AccountDataTypes, Direction, EventTypes, Membe
from synapse.events import EventBase
from synapse.events.utils import strip_event
from synapse.handlers.relations import BundledAggregations
from synapse.storage.databases.main.stream import CurrentStateDeltaMembership
from synapse.storage.roommember import RoomsForUser
from synapse.types import (
JsonDict,
@ -369,6 +370,9 @@ class SlidingSyncHandler:
# Our working list of rooms that can show up in the sync response
sync_room_id_set = {
# Note: The `room_for_user` we're assigning here will need to be fixed up
# (below) because they are potentially from the current snapshot time
# instead from the time of the `to_token`.
room_for_user.room_id: room_for_user
for room_for_user in room_for_user_list
if filter_membership_for_sync(
@ -404,33 +408,10 @@ class SlidingSyncHandler:
instance_map=immutabledict(instance_to_max_stream_ordering_map),
)
# Since we fetched the users room list at some point in time after the from/to
# tokens, we need to revert/rewind some membership changes to match the point in
# time of the `to_token`. In particular, we need to make these fixups:
#
# - 1a) Remove rooms that the user joined after the `to_token`
# - 1b) Add back rooms that the user left after the `to_token`
# - 2) Add back newly_left rooms (> `from_token` and <= `to_token`)
#
# Below, we're doing two separate lookups for membership changes. We could
# request everything for both fixups in one range, [`from_token.room_key`,
# `membership_snapshot_token`), but we want to avoid raw `stream_ordering`
# comparison without `instance_name` (which is flawed). We could refactor
# `event.internal_metadata` to include `instance_name` but it might turn out a
# little difficult and a bigger, broader Synapse change than we want to make.
# 1) -----------------------------------------------------
# 1) Fetch membership changes that fall in the range from `to_token` up to
# `membership_snapshot_token`
#
# If our `to_token` is already the same or ahead of the latest room membership
# for the user, we don't need to do any "2)" fix-ups and can just straight-up
# use the room list from the snapshot as a base (nothing has changed)
membership_change_events_after_to_token = []
current_state_delta_membership_changes_after_to_token = []
if not membership_snapshot_token.is_before_or_eq(to_token.room_key):
membership_change_events_after_to_token = (
await self.store.get_membership_changes_for_user(
current_state_delta_membership_changes_after_to_token = (
await self.store.get_current_state_delta_membership_changes_for_user(
user_id,
from_key=to_token.room_key,
to_key=membership_snapshot_token,
@ -438,138 +419,224 @@ class SlidingSyncHandler:
)
)
# 1) Assemble a list of the last membership events in some given ranges. Someone
# could have left and joined multiple times during the given range but we only
# care about end-result so we grab the last one.
last_membership_change_by_room_id_after_to_token: Dict[str, EventBase] = {}
# We also need the first membership event after the `to_token` so we can step
# We need the first membership event after the `to_token` so we can step
# backward to the previous membership that would apply to the from/to range.
first_membership_change_by_room_id_after_to_token: Dict[str, EventBase] = {}
for event in membership_change_events_after_to_token:
last_membership_change_by_room_id_after_to_token[event.room_id] = event
first_membership_change_by_room_id_after_to_token: Dict[
str, CurrentStateDeltaMembership
] = {}
for membership_change in current_state_delta_membership_changes_after_to_token:
# Only set if we haven't already set it
first_membership_change_by_room_id_after_to_token.setdefault(
event.room_id, event
membership_change.room_id, membership_change
)
# 1) Fixup
# Since we fetched a snapshot of the users room list at some point in time after
# the from/to tokens, we need to revert/rewind some membership changes to match
# the point in time of the `to_token`.
prev_event_ids_in_from_to_range = []
for (
last_membership_change_after_to_token
) in last_membership_change_by_room_id_after_to_token.values():
room_id = last_membership_change_after_to_token.room_id
# We want to find the first membership change after the `to_token` then step
# backward to know the membership in the from/to range.
first_membership_change_after_to_token = (
first_membership_change_by_room_id_after_to_token.get(room_id)
)
assert first_membership_change_after_to_token is not None, (
"If there was a `last_membership_change_after_to_token` that we're iterating over, "
+ "then there should be corresponding a first change. For example, even if there "
+ "is only one event after the `to_token`, the first and last event will be same event. "
+ "This is probably a mistake in assembling the `last_membership_change_by_room_id_after_to_token`"
+ "/`first_membership_change_by_room_id_after_to_token` dicts above."
)
# TODO: Instead of reading from `unsigned`, refactor this to use the
# `current_state_delta_stream` table in the future. Probably a new
# `get_membership_changes_for_user()` function that uses
# `current_state_delta_stream` with a join to `room_memberships`. This would
# help in state reset scenarios since `prev_content` is looking at the
# current branch vs the current room state. This is all just data given to
# the client so no real harm to data integrity, but we'd like to be nice to
# the client. Since the `current_state_delta_stream` table is new, it
# doesn't have all events in it. Since this is Sliding Sync, if we ever need
# to, we can signal the client to throw all of their state away by sending
# "operation: RESET".
prev_content = first_membership_change_after_to_token.unsigned.get(
"prev_content", {}
)
prev_membership = prev_content.get("membership", None)
prev_sender = first_membership_change_after_to_token.unsigned.get(
"prev_sender", None
room_id,
first_membership_change_after_to_token,
) in first_membership_change_by_room_id_after_to_token.items():
# One of these should exist to be a valid row in `current_state_delta_stream`
assert (
first_membership_change_after_to_token.event_id is not None
or first_membership_change_after_to_token.prev_event_id is not None
)
# Check if the previous membership (membership that applies to the from/to
# range) should be included in our `sync_room_id_set`
should_prev_membership_be_included = (
prev_membership is not None
and prev_sender is not None
and filter_membership_for_sync(
membership=prev_membership,
user_id=user_id,
sender=prev_sender,
# If the membership change was added after the `to_token`, we need to remove
# it
if first_membership_change_after_to_token.prev_event_id is None:
sync_room_id_set.pop(room_id, None)
# From the first membership event after the `to_token`, we need to step
# backward to the previous membership that would apply to the from/to range.
else:
prev_event_ids_in_from_to_range.append(
first_membership_change_after_to_token.prev_event_id
)
)
# Check if the last membership (membership that applies to our snapshot) was
# already included in our `sync_room_id_set`
was_last_membership_already_included = filter_membership_for_sync(
membership=last_membership_change_after_to_token.membership,
# Fetch the previous membership events that apply to the from/to range and fixup
# our working list.
prev_events_in_from_to_range = await self.store.get_events(
prev_event_ids_in_from_to_range
)
for prev_event_in_from_to_range in prev_events_in_from_to_range.values():
# Update if the membership should be included
if filter_membership_for_sync(
membership=prev_event_in_from_to_range.membership,
user_id=user_id,
sender=last_membership_change_after_to_token.sender,
)
# 1a) Add back rooms that the user left after the `to_token`
#
# For example, if the last membership event after the `to_token` is a leave
# event, then the room was excluded from `sync_room_id_set` when we first
# crafted it above. We should add these rooms back as long as the user also
# was part of the room before the `to_token`.
if (
not was_last_membership_already_included
and should_prev_membership_be_included
sender=prev_event_in_from_to_range.sender,
):
sync_room_id_set[room_id] = convert_event_to_rooms_for_user(
last_membership_change_after_to_token
sync_room_id_set[prev_event_in_from_to_range.room_id] = (
convert_event_to_rooms_for_user(prev_event_in_from_to_range)
)
# 1b) Remove rooms that the user joined (hasn't left) after the `to_token`
#
# For example, if the last membership event after the `to_token` is a "join"
# event, then the room was included `sync_room_id_set` when we first crafted
# it above. We should remove these rooms as long as the user also wasn't
# part of the room before the `to_token`.
elif (
was_last_membership_already_included
and not should_prev_membership_be_included
):
del sync_room_id_set[room_id]
# Otherwise, remove it
else:
sync_room_id_set.pop(prev_event_in_from_to_range.room_id, None)
# 2) -----------------------------------------------------
# We fix-up newly_left rooms after the first fixup because it may have removed
# some left rooms that we can figure out are newly_left in the following code
# TODO: Add back newly_left rooms
# 2) Fetch membership changes that fall in the range from `from_token` up to `to_token`
membership_change_events_in_from_to_range = []
if from_token:
membership_change_events_in_from_to_range = (
await self.store.get_membership_changes_for_user(
user_id,
from_key=from_token.room_key,
to_key=to_token.room_key,
excluded_rooms=self.rooms_to_exclude_globally,
)
)
# Since we fetched the users room list at some point in time after the from/to
# tokens, we need to revert/rewind some membership changes to match the point in
# time of the `to_token`. In particular, we need to make these fixups:
#
# - 1a) Remove rooms that the user joined after the `to_token`
# - 1b) Add back rooms that the user left after the `to_token`
# - 2) Add back newly_left rooms (> `from_token` and <= `to_token`)
# 2) Assemble a list of the last membership events in some given ranges. Someone
# could have left and joined multiple times during the given range but we only
# care about end-result so we grab the last one.
last_membership_change_by_room_id_in_from_to_range: Dict[str, EventBase] = {}
for event in membership_change_events_in_from_to_range:
last_membership_change_by_room_id_in_from_to_range[event.room_id] = event
# # 1) -----------------------------------------------------
# 2) Fixup
for (
last_membership_change_in_from_to_range
) in last_membership_change_by_room_id_in_from_to_range.values():
room_id = last_membership_change_in_from_to_range.room_id
# # 1) Fetch membership changes that fall in the range from `to_token` up to
# # `membership_snapshot_token`
# #
# # If our `to_token` is already the same or ahead of the latest room membership
# # for the user, we don't need to do any "2)" fix-ups and can just straight-up
# # use the room list from the snapshot as a base (nothing has changed)
# membership_change_events_after_to_token = []
# if not membership_snapshot_token.is_before_or_eq(to_token.room_key):
# membership_change_events_after_to_token = (
# await self.store.get_membership_changes_for_user(
# user_id,
# from_key=to_token.room_key,
# to_key=membership_snapshot_token,
# excluded_rooms=self.rooms_to_exclude_globally,
# )
# )
# 2) Add back newly_left rooms (> `from_token` and <= `to_token`). We
# include newly_left rooms because the last event that the user should see
# is their own leave event
if last_membership_change_in_from_to_range.membership == Membership.LEAVE:
sync_room_id_set[room_id] = convert_event_to_rooms_for_user(
last_membership_change_in_from_to_range
)
# # 1) Assemble a list of the last membership events in some given ranges. Someone
# # could have left and joined multiple times during the given range but we only
# # care about end-result so we grab the last one.
# last_membership_change_by_room_id_after_to_token: Dict[str, EventBase] = {}
# # We also need the first membership event after the `to_token` so we can step
# # backward to the previous membership that would apply to the from/to range.
# first_membership_change_by_room_id_after_to_token: Dict[str, EventBase] = {}
# for event in membership_change_events_after_to_token:
# last_membership_change_by_room_id_after_to_token[event.room_id] = event
# # Only set if we haven't already set it
# first_membership_change_by_room_id_after_to_token.setdefault(
# event.room_id, event
# )
# # 1) Fixup
# for (
# last_membership_change_after_to_token
# ) in last_membership_change_by_room_id_after_to_token.values():
# room_id = last_membership_change_after_to_token.room_id
# # We want to find the first membership change after the `to_token` then step
# # backward to know the membership in the from/to range.
# first_membership_change_after_to_token = (
# first_membership_change_by_room_id_after_to_token.get(room_id)
# )
# assert first_membership_change_after_to_token is not None, (
# "If there was a `last_membership_change_after_to_token` that we're iterating over, "
# + "then there should be corresponding a first change. For example, even if there "
# + "is only one event after the `to_token`, the first and last event will be same event. "
# + "This is probably a mistake in assembling the `last_membership_change_by_room_id_after_to_token`"
# + "/`first_membership_change_by_room_id_after_to_token` dicts above."
# )
# # TODO: Instead of reading from `unsigned`, refactor this to use the
# # `current_state_delta_stream` table in the future. Probably a new
# # `get_membership_changes_for_user()` function that uses
# # `current_state_delta_stream` with a join to `room_memberships`. This would
# # help in state reset scenarios since `prev_content` is looking at the
# # current branch vs the current room state. This is all just data given to
# # the client so no real harm to data integrity, but we'd like to be nice to
# # the client. Since the `current_state_delta_stream` table is new, it
# # doesn't have all events in it. Since this is Sliding Sync, if we ever need
# # to, we can signal the client to throw all of their state away by sending
# # "operation: RESET".
# prev_content = first_membership_change_after_to_token.unsigned.get(
# "prev_content", {}
# )
# prev_membership = prev_content.get("membership", None)
# prev_sender = first_membership_change_after_to_token.unsigned.get(
# "prev_sender", None
# )
# # Check if the previous membership (membership that applies to the from/to
# # range) should be included in our `sync_room_id_set`
# should_prev_membership_be_included = (
# prev_membership is not None
# and prev_sender is not None
# and filter_membership_for_sync(
# membership=prev_membership,
# user_id=user_id,
# sender=prev_sender,
# )
# )
# # Check if the last membership (membership that applies to our snapshot) was
# # already included in our `sync_room_id_set`
# was_last_membership_already_included = filter_membership_for_sync(
# membership=last_membership_change_after_to_token.membership,
# user_id=user_id,
# sender=last_membership_change_after_to_token.sender,
# )
# # 1a) Add back rooms that the user left after the `to_token`
# #
# # For example, if the last membership event after the `to_token` is a leave
# # event, then the room was excluded from `sync_room_id_set` when we first
# # crafted it above. We should add these rooms back as long as the user also
# # was part of the room before the `to_token`.
# if (
# not was_last_membership_already_included
# and should_prev_membership_be_included
# ):
# # TODO: Assign the correct membership event at the `to_token` here
# # (currently we're setting it as the last event after the `to_token`)
# sync_room_id_set[room_id] = convert_event_to_rooms_for_user(
# last_membership_change_after_to_token
# )
# # 1b) Remove rooms that the user joined (hasn't left) after the `to_token`
# #
# # For example, if the last membership event after the `to_token` is a "join"
# # event, then the room was included `sync_room_id_set` when we first crafted
# # it above. We should remove these rooms as long as the user also wasn't
# # part of the room before the `to_token`.
# elif (
# was_last_membership_already_included
# and not should_prev_membership_be_included
# ):
# del sync_room_id_set[room_id]
# # 2) -----------------------------------------------------
# # We fix-up newly_left rooms after the first fixup because it may have removed
# # some left rooms that we can figure out are newly_left in the following code
# # 2) Fetch membership changes that fall in the range from `from_token` up to `to_token`
# membership_change_events_in_from_to_range = []
# if from_token:
# membership_change_events_in_from_to_range = (
# await self.store.get_membership_changes_for_user(
# user_id,
# from_key=from_token.room_key,
# to_key=to_token.room_key,
# excluded_rooms=self.rooms_to_exclude_globally,
# )
# )
# # 2) Assemble a list of the last membership events in some given ranges. Someone
# # could have left and joined multiple times during the given range but we only
# # care about end-result so we grab the last one.
# last_membership_change_by_room_id_in_from_to_range: Dict[str, EventBase] = {}
# for event in membership_change_events_in_from_to_range:
# last_membership_change_by_room_id_in_from_to_range[event.room_id] = event
# # 2) Fixup
# for (
# last_membership_change_in_from_to_range
# ) in last_membership_change_by_room_id_in_from_to_range.values():
# room_id = last_membership_change_in_from_to_range.room_id
# # 2) Add back newly_left rooms (> `from_token` and <= `to_token`). We
# # include newly_left rooms because the last event that the user should see
# # is their own leave event
# if last_membership_change_in_from_to_range.membership == Membership.LEAVE:
# sync_room_id_set[room_id] = convert_event_to_rooms_for_user(
# last_membership_change_in_from_to_range
# )
return sync_room_id_set

View file

@ -44,6 +44,7 @@ what sort order was used:
import logging
from typing import (
TYPE_CHECKING,
AbstractSet,
Any,
Collection,
Dict,
@ -62,7 +63,7 @@ from typing_extensions import Literal
from twisted.internet import defer
from synapse.api.constants import Direction
from synapse.api.constants import Direction, EventTypes
from synapse.api.filtering import Filter
from synapse.events import EventBase
from synapse.logging.context import make_deferred_yieldable, run_in_background
@ -111,6 +112,24 @@ class _EventsAround:
end: RoomStreamToken
@attr.s(slots=True, frozen=True, auto_attribs=True)
class CurrentStateDeltaMembership:
"""
Attributes:
event_id: The "current" membership event ID in this room. May be `None` if the
server is no longer in the room or a state reset happened.
prev_event_id: The previous membership event in this room that was replaced by
the "current" one. May be `None` if there was no previous membership event.
room_id: The room ID of the membership event.
"""
event_id: Optional[str]
prev_event_id: Optional[str]
room_id: str
# Could be useful but we're not using it yet.
# event_pos: PersistedEventPosition
def generate_pagination_where_clause(
direction: Direction,
column_names: Tuple[str, str],
@ -390,6 +409,42 @@ def _filter_results(
return True
def _filter_results_by_stream(
lower_token: Optional[RoomStreamToken],
upper_token: Optional[RoomStreamToken],
instance_name: str,
stream_ordering: int,
) -> bool:
"""
Note: This function only works with "live" tokens with `stream_ordering` only.
Returns True if the event persisted by the given instance at the given
topological/stream_ordering falls between the two tokens (taking a None
token to mean unbounded).
Used to filter results from fetching events in the DB against the given
tokens. This is necessary to handle the case where the tokens include
position maps, which we handle by fetching more than necessary from the DB
and then filtering (rather than attempting to construct a complicated SQL
query).
"""
if lower_token:
assert lower_token.topological is None
# If these are live tokens we compare the stream ordering against the
# writers stream position.
if stream_ordering <= lower_token.get_stream_pos_for_instance(instance_name):
return False
if upper_token:
assert upper_token.topological is None
if upper_token.get_stream_pos_for_instance(instance_name) < stream_ordering:
return False
return True
def filter_to_clause(event_filter: Optional[Filter]) -> Tuple[str, List[str]]:
# NB: This may create SQL clauses that don't optimise well (and we don't
# have indices on all possible clauses). E.g. it may create
@ -731,6 +786,94 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
return ret, key
async def get_current_state_delta_membership_changes_for_user(
self,
user_id: str,
from_key: RoomStreamToken,
to_key: RoomStreamToken,
excluded_rooms: Optional[List[str]] = None,
) -> List[CurrentStateDeltaMembership]:
"""
TODO
Note: This function only works with "live" tokens with `stream_ordering` only.
All such events whose stream ordering `s` lies in the range `from_key < s <=
to_key` are returned. Events are sorted by `stream_ordering` ascending.
"""
# Start by ruling out cases where a DB query is not necessary.
if from_key == to_key:
return []
if from_key:
has_changed = self._membership_stream_cache.has_entity_changed(
user_id, int(from_key.stream)
)
if not has_changed:
return []
def f(txn: LoggingTransaction) -> List[CurrentStateDeltaMembership]:
# To handle tokens with a non-empty instance_map we fetch more
# results than necessary and then filter down
min_from_id = from_key.stream
max_to_id = to_key.get_max_stream_pos()
args: List[Any] = [EventTypes.Member, user_id, min_from_id, max_to_id]
# TODO: It would be good to assert that the `to_token` is >=
# the first row in `current_state_delta_stream` for the rooms we're
# interested in. Otherwise, we will end up with empty results and not know
# it.
# Note: There is no index for `(type, state_key)` in
# `current_state_delta_stream`. We also can't just add an index for
# `event_id` and join the `room_memberships` table by `event_id` because it
# may be `null` in `current_state_delta_stream` so nothing will match (it's
# `null` when the server is no longer in the room or a state reset happened
# and it was unset).
sql = """
SELECT s.event_id, s.prev_event_id, s.room_id, s.instance_name, s.stream_id
FROM current_state_delta_stream AS s
WHERE s.type = ? AND s.state_key = ?
AND s.stream_id > ? AND s.stream_id <= ?
ORDER BY s.stream_id ASC
"""
txn.execute(sql, args)
return [
CurrentStateDeltaMembership(
event_id=event_id,
prev_event_id=prev_event_id,
room_id=room_id,
# event_pos=PersistedEventPosition(
# instance_name=instance_name,
# stream=stream_ordering,
# ),
)
for event_id, prev_event_id, room_id, instance_name, stream_ordering in txn
if _filter_results_by_stream(
from_key,
to_key,
instance_name,
stream_ordering,
)
]
current_state_delta_membership_changes = await self.db_pool.runInteraction(
"get_current_state_delta_membership_changes_for_user", f
)
rooms_to_exclude: AbstractSet[str] = set()
if excluded_rooms is not None:
rooms_to_exclude = set(excluded_rooms)
return [
membership_change
for membership_change in current_state_delta_membership_changes
if membership_change.room_id not in rooms_to_exclude
]
@cancellable
async def get_membership_changes_for_user(
self,
@ -766,10 +909,10 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
ignore_room_clause = ""
if excluded_rooms is not None and len(excluded_rooms) > 0:
ignore_room_clause = "AND e.room_id NOT IN (%s)" % ",".join(
"?" for _ in excluded_rooms
ignore_room_clause, ignore_room_args = make_in_list_sql_clause(
txn.database_engine, "e.room_id", excluded_rooms, negative=True
)
args = args + excluded_rooms
args += ignore_room_args
sql = """
SELECT m.event_id, instance_name, topological_ordering, stream_ordering

View file

@ -1029,7 +1029,7 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
),
)
def test_display_name_changes(
def test_display_name_changes_in_token_range(
self,
) -> None:
"""
@ -1102,6 +1102,77 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
),
)
def test_display_name_changes_before_and_after_token_range(
self,
) -> None:
"""
Test that we point to the correct membership event even though there are no
membership events in the from/range but there are `displayname`/`avatar_url`
changes before/after the token range.
"""
user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")
user2_id = self.register_user("user2", "pass")
user2_tok = self.login(user2_id, "pass")
# We create the room with user2 so the room isn't left with no members when we
# leave and can still re-join.
room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok, is_public=True)
join_response = self.helper.join(room_id1, user1_id, tok=user1_tok)
# Update the displayname before the token range
displayname_change_before_token_range_response = self.helper.send_state(
room_id1,
event_type=EventTypes.Member,
state_key=user1_id,
body={
"membership": Membership.JOIN,
"displayname": "displayname during token range",
},
tok=user1_tok,
)
after_room1_token = self.event_sources.get_current_token()
# Update the displayname after the token range
displayname_change_after_token_range_response = self.helper.send_state(
room_id1,
event_type=EventTypes.Member,
state_key=user1_id,
body={
"membership": Membership.JOIN,
"displayname": "displayname after token range",
},
tok=user1_tok,
)
room_id_results = self.get_success(
self.sliding_sync_handler.get_sync_room_ids_for_user(
UserID.from_string(user1_id),
from_token=after_room1_token,
to_token=after_room1_token,
)
)
# Room should show up because we were joined before the from/to range
self.assertEqual(room_id_results.keys(), {room_id1})
# It should be pointing to the latest membership event in the from/to range
self.assertEqual(
room_id_results[room_id1].event_id,
displayname_change_before_token_range_response["event_id"],
"Corresponding map to disambiguate the opaque event IDs: "
+ str(
{
"join_response": join_response["event_id"],
"displayname_change_before_token_range_response": displayname_change_before_token_range_response[
"event_id"
],
"displayname_change_after_token_range_response": displayname_change_after_token_range_response[
"event_id"
],
}
),
)
def test_display_name_changes_leave_after_token_range(
self,
) -> None: