Allow new get_sync_room_ids_for_user implementation to work with multiple event persisters

Before, the problem scenario would get caught in one of the assertions because
we expect the to_token <= membership_snapshot_token or vice-versa but it's
possible the tokens are intertwined and neither is ahead of each other.
Especially since the `instance_map` in `membership_snapshot_token` is made up
from the `stream_ordering` of membership events at various stream positions
and processed on different instances (not current stream positions).

We get into trouble when stream positions are lagging between workers and our
now/`to_token` doesn't cleanly compare to `membership_snapshot_token`.

What we really want to assert is that the `to_token` <= the stream positions
at the time we asked for the room membership snapshot. Since
`get_rooms_for_local_user_where_membership_is()` doesn't return that
information, the closest we can get is to get the stream positions before we
ask for the room membership snapshot and consider that good enough to compare
against.
This commit is contained in:
Eric Eastwood 2024-06-04 19:50:12 -05:00
parent 3ce08925e3
commit 2864837b65
2 changed files with 171 additions and 437 deletions

View file

@ -19,10 +19,10 @@
#
import logging
from enum import Enum
from immutabledict import immutabledict
from typing import TYPE_CHECKING, AbstractSet, Dict, Final, List, Optional, Tuple
import attr
from immutabledict import immutabledict
from synapse._pydantic_compat import HAS_PYDANTIC_V2
@ -373,299 +373,6 @@ class SlidingSyncHandler:
extensions={},
)
# async def get_sync_room_ids_for_user(
# self,
# user: UserID,
# to_token: StreamToken,
# from_token: Optional[StreamToken] = None,
# ) -> AbstractSet[str]:
# """
# Fetch room IDs that should be listed for this user in the sync response (the
# full room list that will be filtered, sorted, and sliced).
# We're looking for rooms where the user has the following state in the token
# range (> `from_token` and <= `to_token`):
# - `invite`, `join`, `knock`, `ban` membership events
# - Kicks (`leave` membership events where `sender` is different from the
# `user_id`/`state_key`)
# - `newly_left` (rooms that were left during the given token range)
# - In order for bans/kicks to not show up in sync, you need to `/forget` those
# rooms. This doesn't modify the event itself though and only adds the
# `forgotten` flag to the `room_memberships` table in Synapse. There isn't a way
# to tell when a room was forgotten at the moment so we can't factor it into the
# from/to range.
# """
# user_id = user.to_string()
# logger.info("from_token %s", from_token.room_key)
# logger.info("to_token %s", to_token.room_key)
# # First grab a current snapshot rooms for the user
# # (also handles forgotten rooms)
# room_for_user_list = await self.store.get_rooms_for_local_user_where_membership_is(
# user_id=user_id,
# # We want to fetch any kind of membership (joined and left rooms) in order
# # to get the `event_pos` of the latest room membership event for the
# # user.
# #
# # We will filter out the rooms that don't belong below (see
# # `filter_membership_for_sync`)
# membership_list=Membership.LIST,
# excluded_rooms=self.rooms_to_exclude_globally,
# )
# # If the user has never joined any rooms before, we can just return an empty list
# if not room_for_user_list:
# return set()
# # Our working list of rooms that can show up in the sync response
# sync_room_id_set = {
# room_for_user.room_id
# for room_for_user in room_for_user_list
# if filter_membership_for_sync(
# membership=room_for_user.membership,
# user_id=user_id,
# sender=room_for_user.sender,
# )
# }
# # Get the `RoomStreamToken` that represents the spot we queried up to when we got
# # our membership snapshot from `get_rooms_for_local_user_where_membership_is()`.
# #
# # First, we need to get the max stream_ordering of each event persister instance
# # that we queried events from.
# instance_to_max_stream_ordering_map: Dict[str, int] = {}
# for room_for_user in room_for_user_list:
# instance_name = room_for_user.event_pos.instance_name
# stream_ordering = room_for_user.event_pos.stream
# current_instance_max_stream_ordering = (
# instance_to_max_stream_ordering_map.get(instance_name)
# )
# if (
# current_instance_max_stream_ordering is None
# or stream_ordering > current_instance_max_stream_ordering
# ):
# instance_to_max_stream_ordering_map[instance_name] = stream_ordering
# # Then assemble the `RoomStreamToken`
# membership_snapshot_token = RoomStreamToken(
# # Minimum position in the `instance_map`
# stream=min(
# stream_ordering
# for stream_ordering in instance_to_max_stream_ordering_map.values()
# ),
# instance_map=immutabledict(instance_to_max_stream_ordering_map),
# )
# # If our `to_token` is already the same or ahead of the latest room membership
# # for the user, we can just straight-up return the room list (nothing has
# # changed)
# if membership_snapshot_token.is_before_or_eq(to_token.room_key):
# return sync_room_id_set
# # We assume the `from_token` is before or at-least equal to the `to_token`
# assert from_token is None or from_token.room_key.is_before_or_eq(
# to_token.room_key
# ), f"{from_token.room_key if from_token else None} <= {to_token.room_key}"
# # We assume the `from_token`/`to_token` is before the `membership_snapshot_token`
# assert from_token is None or from_token.room_key.is_before_or_eq(
# membership_snapshot_token
# ), f"{from_token.room_key if from_token else None} <= {membership_snapshot_token}"
# assert to_token.room_key.is_before_or_eq(
# membership_snapshot_token
# ), f"{to_token.room_key} <= {membership_snapshot_token}"
# # Since we fetched the users room list at some point in time after the from/to
# # tokens, we need to revert/rewind some membership changes to match the point in
# # time of the `to_token`. In particular, we need to make these fixups:
# #
# # - 1a) Remove rooms that the user joined after the `to_token`
# # - 1b) Add back rooms that the user left after the `to_token`
# # - 2) Add back newly_left rooms (> `from_token` and <= `to_token`)
# #
# # Below, we're doing two separate lookups for membership changes. We could
# # request everything for both fixups in one range, [`from_token.room_key`,
# # `membership_snapshot_token`), but we want to avoid raw `stream_ordering`
# # comparison without `instance_name` (which is flawed). We could refactor
# # `event.internal_metadata` to include `instance_name` but it might turn out a
# # little difficult and a bigger, broader Synapse change than we want to make.
# # 2) -----------------------------------------------------
# # 1) Fetch membership changes that fall in the range from `to_token` up to
# # `membership_snapshot_token`
# membership_change_events_after_to_token = (
# await self.store.get_membership_changes_for_user(
# user_id,
# from_key=to_token.room_key,
# to_key=membership_snapshot_token,
# excluded_rooms=self.rooms_to_exclude_globally,
# )
# )
# # 1) Assemble a list of the last membership events in some given ranges. Someone
# # could have left and joined multiple times during the given range but we only
# # care about end-result so we grab the last one.
# last_membership_change_by_room_id_after_to_token: Dict[str, EventBase] = {}
# # We also need the first membership event after the `to_token` so we can step
# # backward to the previous membership that would apply to the from/to range.
# first_membership_change_by_room_id_after_to_token: Dict[str, EventBase] = {}
# for event in membership_change_events_after_to_token:
# assert event.internal_metadata.stream_ordering
# last_membership_change_by_room_id_after_to_token[event.room_id] = event
# # Only set if we haven't already set it
# first_membership_change_by_room_id_after_to_token.setdefault(
# event.room_id, event
# )
# logger.info(
# "last_membership_change_by_room_id_after_to_token %s",
# [
# f"{e.room_id}.{e.membership}.{e.internal_metadata.stream_ordering}"
# for e in last_membership_change_by_room_id_after_to_token.values()
# ],
# )
# logger.info(
# "first_membership_change_by_room_id_after_to_token %s",
# [
# f"{e.room_id}.{e.membership}.{e.internal_metadata.stream_ordering}->{e.unsigned.get("prev_content", {}).get("membership", None)}"
# for e in first_membership_change_by_room_id_after_to_token.values()
# ],
# )
# # 1) Fixup
# for (
# last_membership_change_after_to_token
# ) in last_membership_change_by_room_id_after_to_token.values():
# room_id = last_membership_change_after_to_token.room_id
# # We want to find the first membership change after the `to_token` then step
# # backward to know the membership in the from/to range.
# first_membership_change_after_to_token = (
# first_membership_change_by_room_id_after_to_token.get(room_id)
# )
# assert first_membership_change_after_to_token is not None, (
# "If there was a `last_membership_change_after_to_token` that we're iterating over, "
# + "then there should be corresponding a first change. For example, even if there "
# + "is only one event after the `to_token`, the first and last event will be same event. "
# + "This is probably a mistake in assembling the `last_membership_change_by_room_id_after_to_token`"
# + "/`first_membership_change_by_room_id_after_to_token` dicts above."
# )
# # TODO: Instead of reading from `unsigned`, refactor this to use the
# # `current_state_delta_stream` table in the future. Probably a new
# # `get_membership_changes_for_user()` function that uses
# # `current_state_delta_stream` with a join to `room_memberships`. This would
# # help in state reset scenarios since `prev_content` is looking at the
# # current branch vs the current room state. This is all just data given to
# # the client so no real harm to data integrity, but we'd like to be nice to
# # the client. Since the `current_state_delta_stream` table is new, it
# # doesn't have all events in it. Since this is Sliding Sync, if we ever need
# # to, we can signal the client to throw all of their state away by sending
# # "operation: RESET".
# prev_content = first_membership_change_after_to_token.unsigned.get(
# "prev_content", {}
# )
# prev_membership = prev_content.get("membership", None)
# prev_sender = first_membership_change_after_to_token.unsigned.get(
# "prev_sender", None
# )
# # Check if the previous membership (membership that applies to the from/to
# # range) should be included in our `sync_room_id_set`
# should_prev_membership_be_included = (
# prev_membership is not None
# and prev_sender is not None
# and filter_membership_for_sync(
# membership=prev_membership,
# user_id=user_id,
# sender=prev_sender,
# )
# )
# # Check if the last membership (membership that applies to our snapshot) was
# # already included in our `sync_room_id_set`
# was_last_membership_already_included = filter_membership_for_sync(
# membership=last_membership_change_after_to_token.membership,
# user_id=user_id,
# sender=last_membership_change_after_to_token.sender,
# )
# # 1a) Add back rooms that the user left after the `to_token`
# #
# # For example, if the last membership event after the `to_token` is a leave
# # event, then the room was excluded from `sync_room_id_set` when we first
# # crafted it above. We should add these rooms back as long as the user also
# # was part of the room before the `to_token`.
# if (
# not was_last_membership_already_included
# and should_prev_membership_be_included
# ):
# sync_room_id_set.add(room_id)
# # 1b) Remove rooms that the user joined (hasn't left) after the `to_token`
# #
# # For example, if the last membership event after the `to_token` is a "join"
# # event, then the room was included `sync_room_id_set` when we first crafted
# # it above. We should remove these rooms as long as the user also wasn't
# # part of the room before the `to_token`.
# elif (
# was_last_membership_already_included
# and not should_prev_membership_be_included
# ):
# sync_room_id_set.discard(room_id)
# # 2) -----------------------------------------------------
# # We fix-up newly_left rooms after the first fixup because it may have removed
# # some left rooms that we can figure out our newly_left in the following code
# # 2) Fetch membership changes that fall in the range from `from_token` up to `to_token`
# membership_change_events_in_from_to_range = []
# if from_token:
# membership_change_events_in_from_to_range = (
# await self.store.get_membership_changes_for_user(
# user_id,
# from_key=from_token.room_key,
# to_key=to_token.room_key,
# excluded_rooms=self.rooms_to_exclude_globally,
# )
# )
# # 2) Assemble a list of the last membership events in some given ranges. Someone
# # could have left and joined multiple times during the given range but we only
# # care about end-result so we grab the last one.
# last_membership_change_by_room_id_in_from_to_range: Dict[str, EventBase] = {}
# for event in membership_change_events_in_from_to_range:
# assert event.internal_metadata.stream_ordering
# last_membership_change_by_room_id_in_from_to_range[event.room_id] = event
# logger.info(
# "last_membership_change_by_room_id_in_from_to_range %s",
# [
# f"{e.room_id}.{e.membership}.{e.internal_metadata.stream_ordering}"
# for e in last_membership_change_by_room_id_in_from_to_range.values()
# ],
# )
# # 2) Fixup
# for (
# last_membership_change_in_from_to_range
# ) in last_membership_change_by_room_id_in_from_to_range.values():
# room_id = last_membership_change_in_from_to_range.room_id
# # 2) Add back newly_left rooms (> `from_token` and <= `to_token`). We
# # include newly_left rooms because the last event that the user should see
# # is their own leave event
# if last_membership_change_in_from_to_range.membership == Membership.LEAVE:
# sync_room_id_set.add(room_id)
# return sync_room_id_set
# Old implementation before talking with Erik (with fixups switched around to be correct)
# via https://github.com/element-hq/synapse/blob/49998e053edeb77a65d22067b7c41dc795dcb920/synapse/handlers/sliding_sync.py#L301C1-L490C32
async def get_sync_room_ids_for_user(
self,
user: UserID,
@ -673,25 +380,35 @@ class SlidingSyncHandler:
from_token: Optional[StreamToken] = None,
) -> AbstractSet[str]:
"""
Fetch room IDs that should be listed for this user in the sync response.
Fetch room IDs that should be listed for this user in the sync response (the
full room list that will be filtered, sorted, and sliced).
We're looking for rooms that the user has not left (`invite`, `knock`, `join`,
and `ban`) or newly_left rooms that are > `from_token` and <= `to_token`.
We're looking for rooms where the user has the following state in the token
range (> `from_token` and <= `to_token`):
- `invite`, `join`, `knock`, `ban` membership events
- Kicks (`leave` membership events where `sender` is different from the
`user_id`/`state_key`)
- `newly_left` (rooms that were left during the given token range)
- In order for bans/kicks to not show up in sync, you need to `/forget` those
rooms. This doesn't modify the event itself though and only adds the
`forgotten` flag to the `room_memberships` table in Synapse. There isn't a way
to tell when a room was forgotten at the moment so we can't factor it into the
from/to range.
"""
user_id = user.to_string()
logger.info("from_token %s", from_token.room_key)
logger.info("to_token %s", to_token.room_key)
# First grab a current snapshot rooms for the user
# (also handles forgotten rooms)
token_before_rooms = self.event_sources.get_current_token()
room_for_user_list = await self.store.get_rooms_for_local_user_where_membership_is(
user_id=user_id,
# We want to fetch any kind of membership (joined and left rooms) in order
# to get the `stream_ordering` of the latest room membership event for the
# to get the `event_pos` of the latest room membership event for the
# user.
#
# We will filter out the rooms that the user has left below (see
# `MEMBERSHIP_TO_DISPLAY_IN_SYNC`)
# We will filter out the rooms that don't belong below (see
# `filter_membership_for_sync`)
membership_list=Membership.LIST,
excluded_rooms=self.rooms_to_exclude_globally,
)
@ -704,134 +421,113 @@ class SlidingSyncHandler:
sync_room_id_set = {
room_for_user.room_id
for room_for_user in room_for_user_list
if room_for_user.membership in MEMBERSHIP_TO_DISPLAY_IN_SYNC
if filter_membership_for_sync(
membership=room_for_user.membership,
user_id=user_id,
sender=room_for_user.sender,
)
}
# Find the stream_ordering of the latest room membership event which will mark
# the spot we queried up to.
max_stream_ordering_from_room_list = max(
room_for_user.event_pos.stream for room_for_user in room_for_user_list
)
logger.info(
"max_stream_ordering_from_room_list %s", max_stream_ordering_from_room_list
# Get the `RoomStreamToken` that represents the spot we queried up to when we got
# our membership snapshot from `get_rooms_for_local_user_where_membership_is()`.
#
# First, we need to get the max stream_ordering of each event persister instance
# that we queried events from.
instance_to_max_stream_ordering_map: Dict[str, int] = {}
for room_for_user in room_for_user_list:
instance_name = room_for_user.event_pos.instance_name
stream_ordering = room_for_user.event_pos.stream
current_instance_max_stream_ordering = (
instance_to_max_stream_ordering_map.get(instance_name)
)
if (
current_instance_max_stream_ordering is None
or stream_ordering > current_instance_max_stream_ordering
):
instance_to_max_stream_ordering_map[instance_name] = stream_ordering
# Then assemble the `RoomStreamToken`
membership_snapshot_token = RoomStreamToken(
# Minimum position in the `instance_map`
stream=min(
stream_ordering
for stream_ordering in instance_to_max_stream_ordering_map.values()
),
instance_map=immutabledict(instance_to_max_stream_ordering_map),
)
# If our `to_token` is already the same or ahead of the latest room membership
# for the user, we can just straight-up return the room list (nothing has
# changed)
if max_stream_ordering_from_room_list <= to_token.room_key.stream:
if membership_snapshot_token.is_before_or_eq(to_token.room_key):
return sync_room_id_set
# We assume the `from_token` is before or at-least equal to the `to_token`
assert (
from_token is None or from_token.room_key.stream <= to_token.room_key.stream
), f"{from_token.room_key.stream if from_token else None} <= {to_token.room_key.stream}"
assert from_token is None or from_token.room_key.is_before_or_eq(
to_token.room_key
), f"{from_token.room_key if from_token else None} <= {to_token.room_key}"
# We assume the `from_token`/`to_token` is before the `max_stream_ordering_from_room_list`
assert (
from_token is None
or from_token.room_key.stream < max_stream_ordering_from_room_list
), f"{from_token.room_key.stream if from_token else None} < {max_stream_ordering_from_room_list}"
assert (
to_token.room_key.stream < max_stream_ordering_from_room_list
), f"{to_token.room_key.stream} < {max_stream_ordering_from_room_list}"
# We assume the `from_token`/`to_token` is before `membership_snapshot_token` or
# at-least before the current stream positions at the time we queried for
# `membership_snapshot_token`. The closest we can get to the current stream
# positions at the time is `token_before_rooms`. Otherwise, we just need to
# give-up and throw an error.
best_effort_stream_positions_at_snapshot_time_token = (
membership_snapshot_token.copy_and_advance(token_before_rooms.room_key)
)
assert from_token is None or from_token.room_key.is_before_or_eq(
best_effort_stream_positions_at_snapshot_time_token
), f"{from_token.room_key if from_token else None} <= {best_effort_stream_positions_at_snapshot_time_token}"
assert to_token.room_key.is_before_or_eq(
best_effort_stream_positions_at_snapshot_time_token
), f"{to_token.room_key} <= {best_effort_stream_positions_at_snapshot_time_token}"
# Since we fetched the users room list at some point in time after the from/to
# tokens, we need to revert/rewind some membership changes to match the point in
# time of the `to_token`.
# time of the `to_token`. In particular, we need to make these fixups:
#
# - 1a) Remove rooms that the user joined after the `to_token`
# - 1b) Add back rooms that the user left after the `to_token`
# - 2) Add back newly_left rooms (> `from_token` and <= `to_token`)
membership_change_events = await self.store.get_membership_changes_for_user(
user_id,
# Start from the `from_token` if given (for "2)" fixups), otherwise from the `to_token` so we
# can still do the "1)" fixups.
from_key=from_token.room_key if from_token else to_token.room_key,
# Fetch up to our membership snapshot
to_key=RoomStreamToken(stream=max_stream_ordering_from_room_list),
excluded_rooms=self.rooms_to_exclude_globally,
#
# Below, we're doing two separate lookups for membership changes. We could
# request everything for both fixups in one range, [`from_token.room_key`,
# `membership_snapshot_token`), but we want to avoid raw `stream_ordering`
# comparison without `instance_name` (which is flawed). We could refactor
# `event.internal_metadata` to include `instance_name` but it might turn out a
# little difficult and a bigger, broader Synapse change than we want to make.
# 2) -----------------------------------------------------
# 1) Fetch membership changes that fall in the range from `to_token` up to
# `membership_snapshot_token`
membership_change_events_after_to_token = (
await self.store.get_membership_changes_for_user(
user_id,
from_key=to_token.room_key,
to_key=membership_snapshot_token,
excluded_rooms=self.rooms_to_exclude_globally,
)
)
logger.info(
"membership_change_events %s",
[
f"{e.room_id}.{e.membership}.{e.internal_metadata.stream_ordering}"
for e in membership_change_events
],
)
# Assemble a list of the last membership events in some given ranges. Someone
# 1) Assemble a list of the last membership events in some given ranges. Someone
# could have left and joined multiple times during the given range but we only
# care about end-result so we grab the last one.
last_membership_change_by_room_id_in_from_to_range: Dict[str, EventBase] = {}
last_membership_change_by_room_id_after_to_token: Dict[str, EventBase] = {}
# We also need the first membership event after the `to_token` so we can step
# backward to the previous membership that would apply to the from/to range.
first_membership_change_by_room_id_after_to_token: Dict[str, EventBase] = {}
for event in membership_change_events:
for event in membership_change_events_after_to_token:
assert event.internal_metadata.stream_ordering
if (
(
from_token is None
or event.internal_metadata.stream_ordering
> from_token.room_key.stream
)
and event.internal_metadata.stream_ordering <= to_token.room_key.stream
):
last_membership_change_by_room_id_in_from_to_range[event.room_id] = (
event
)
elif (
event.internal_metadata.stream_ordering > to_token.room_key.stream
and event.internal_metadata.stream_ordering
<= max_stream_ordering_from_room_list
):
last_membership_change_by_room_id_after_to_token[event.room_id] = event
# Only set if we haven't already set it
first_membership_change_by_room_id_after_to_token.setdefault(
event.room_id, event
)
else:
# We don't expect this to happen since we should only be fetching
# `membership_change_events` that fall in the given ranges above. It
# doesn't hurt anything to ignore an event we don't need but may
# indicate a bug in the logic above.
raise AssertionError(
"Membership event with stream_ordering=%s should fall in the given ranges above"
+ " (%d > x <= %d) or (%d > x <= %d). We shouldn't be fetching extra membership"
+ " events that aren't used.",
event.internal_metadata.stream_ordering,
from_token.room_key.stream if from_token else None,
to_token.room_key.stream,
to_token.room_key.stream,
max_stream_ordering_from_room_list,
)
last_membership_change_by_room_id_after_to_token[event.room_id] = event
# Only set if we haven't already set it
first_membership_change_by_room_id_after_to_token.setdefault(
event.room_id, event
)
logger.info(
"last_membership_change_by_room_id_in_from_to_range %s",
[
f"{e.room_id}.{e.membership}.{e.internal_metadata.stream_ordering}"
for e in last_membership_change_by_room_id_in_from_to_range.values()
],
)
logger.info(
"last_membership_change_by_room_id_after_to_token %s",
[
f"{e.room_id}.{e.membership}.{e.internal_metadata.stream_ordering}"
for e in last_membership_change_by_room_id_after_to_token.values()
],
)
logger.info(
"first_membership_change_by_room_id_after_to_token %s",
[
f"{e.room_id}.{e.membership}.{e.internal_metadata.stream_ordering}->{e.unsigned.get("prev_content", {}).get("membership", None)}"
for e in first_membership_change_by_room_id_after_to_token.values()
],
)
# 1)
# 1) Fixup
for (
last_membership_change_after_to_token
) in last_membership_change_by_room_id_after_to_token.values():
@ -849,37 +545,93 @@ class SlidingSyncHandler:
+ "This is probably a mistake in assembling the `last_membership_change_by_room_id_after_to_token`"
+ "/`first_membership_change_by_room_id_after_to_token` dicts above."
)
# TODO: Instead of reading from `unsigned`, refactor this to use the
# `current_state_delta_stream` table in the future. Probably a new
# `get_membership_changes_for_user()` function that uses
# `current_state_delta_stream` with a join to `room_memberships`. This would
# help in state reset scenarios since `prev_content` is looking at the
# current branch vs the current room state. This is all just data given to
# the client so no real harm to data integrity, but we'd like to be nice to
# the client. Since the `current_state_delta_stream` table is new, it
# doesn't have all events in it. Since this is Sliding Sync, if we ever need
# to, we can signal the client to throw all of their state away by sending
# "operation: RESET".
prev_content = first_membership_change_after_to_token.unsigned.get(
"prev_content", {}
)
prev_membership = prev_content.get("membership", None)
prev_sender = first_membership_change_after_to_token.unsigned.get(
"prev_sender", None
)
# Check if the previous membership (membership that applies to the from/to
# range) should be included in our `sync_room_id_set`
should_prev_membership_be_included = (
prev_membership is not None
and prev_sender is not None
and filter_membership_for_sync(
membership=prev_membership,
user_id=user_id,
sender=prev_sender,
)
)
# Check if the last membership (membership that applies to our snapshot) was
# already included in our `sync_room_id_set`
was_last_membership_already_included = filter_membership_for_sync(
membership=last_membership_change_after_to_token.membership,
user_id=user_id,
sender=last_membership_change_after_to_token.sender,
)
# 1a) Add back rooms that the user left after the `to_token`
#
# If the last membership event after the `to_token` is a leave event, then
# the room was excluded from the
# `get_rooms_for_local_user_where_membership_is()` results. We should add
# these rooms back as long as the user was part of the room before the
# `to_token`.
# For example, if the last membership event after the `to_token` is a leave
# event, then the room was excluded from `sync_room_id_set` when we first
# crafted it above. We should add these rooms back as long as the user also
# was part of the room before the `to_token`.
if (
last_membership_change_after_to_token.membership == Membership.LEAVE
and prev_membership is not None
and prev_membership != Membership.LEAVE
not was_last_membership_already_included
and should_prev_membership_be_included
):
sync_room_id_set.add(room_id)
# 1b) Remove rooms that the user joined (hasn't left) after the `to_token`
#
# If the last membership event after the `to_token` is a "join" event, then
# the room was included in the `get_rooms_for_local_user_where_membership_is()`
# results. We should remove these rooms as long as the user wasn't part of
# the room before the `to_token`.
# For example, if the last membership event after the `to_token` is a "join"
# event, then the room was included `sync_room_id_set` when we first crafted
# it above. We should remove these rooms as long as the user also wasn't
# part of the room before the `to_token`.
elif (
last_membership_change_after_to_token.membership != Membership.LEAVE
and (prev_membership is None or prev_membership == Membership.LEAVE)
was_last_membership_already_included
and not should_prev_membership_be_included
):
sync_room_id_set.discard(room_id)
# 2)
# 2) -----------------------------------------------------
# We fix-up newly_left rooms after the first fixup because it may have removed
# some left rooms that we can figure out our newly_left in the following code
# 2) Fetch membership changes that fall in the range from `from_token` up to `to_token`
membership_change_events_in_from_to_range = []
if from_token:
membership_change_events_in_from_to_range = (
await self.store.get_membership_changes_for_user(
user_id,
from_key=from_token.room_key,
to_key=to_token.room_key,
excluded_rooms=self.rooms_to_exclude_globally,
)
)
# 2) Assemble a list of the last membership events in some given ranges. Someone
# could have left and joined multiple times during the given range but we only
# care about end-result so we grab the last one.
last_membership_change_by_room_id_in_from_to_range: Dict[str, EventBase] = {}
for event in membership_change_events_in_from_to_range:
assert event.internal_metadata.stream_ordering
last_membership_change_by_room_id_in_from_to_range[event.room_id] = event
# 2) Fixup
for (
last_membership_change_in_from_to_range
) in last_membership_change_by_room_id_in_from_to_range.values():

View file

@ -800,7 +800,7 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
class GetSyncRoomIdsForUserEventShardTestCase(BaseMultiWorkerStreamTestCase):
"""
Tests Sliding Sync handler `get_sync_room_ids_for_user()` to make sure it works when
Tests Sliding Sync handler `get_sync_room_ids_for_user()` to make sure it works with
sharded event stream_writers enabled
"""
@ -882,15 +882,6 @@ class GetSyncRoomIdsForUserEventShardTestCase(BaseMultiWorkerStreamTestCase):
{"worker_name": "worker3"},
)
# TODO: Debug remove
for instance_name in ["worker1", "worker2", "worker3"]:
instance_id = self.get_success(
self.store.get_id_for_instance(instance_name)
)
logger.info(
"instance_name: %s -> instance_id: %s", instance_name, instance_id
)
# Specially crafted room IDs that get persisted on different workers.
#
# Sharded to worker1
@ -900,8 +891,6 @@ class GetSyncRoomIdsForUserEventShardTestCase(BaseMultiWorkerStreamTestCase):
# Sharded to worker3
room_id3 = "!quux:test"
before_room_token = self.event_sources.get_current_token()
# Create rooms on the different workers.
self._create_room(room_id1, user2_id, user2_tok)
self._create_room(room_id2, user2_id, user2_tok)
@ -930,10 +919,6 @@ class GetSyncRoomIdsForUserEventShardTestCase(BaseMultiWorkerStreamTestCase):
before_stuck_activity_token = self.event_sources.get_current_token()
# TODO: asdf
# self.helper.join(room_id2, user1_id, tok=user1_tok)
# self.helper.leave(room_id2, user1_id, tok=user1_tok)
# We now gut wrench into the events stream `MultiWriterIdGenerator` on worker2 to
# mimic it getting stuck persisting an event. This ensures that when we send an
# event on worker1/worker3 we end up in a state where worker2 events stream
@ -1001,14 +986,11 @@ class GetSyncRoomIdsForUserEventShardTestCase(BaseMultiWorkerStreamTestCase):
stuck_activity_token.room_key.get_stream_pos_for_instance("worker3"),
)
# TODO: asdf
# self.helper.leave(room_id2, user1_id, tok=user1_tok)
# self.helper.join(room_id2, user1_id, tok=user1_tok)
# We finish the fake persisting an event we started above and advance worker2's
# event stream position (unstuck worker2).
self.get_success(actx.__aexit__(None, None, None))
# The function under test
room_id_results = self.get_success(
self.sliding_sync_handler.get_sync_room_ids_for_user(
UserID.from_string(user1_id),