Compare commits

...

5 commits

Author SHA1 Message Date
Eric Eastwood 63c7b5017a (doesn't work) Add test for batch persisting multiple member events for the same user 2024-06-27 16:34:41 -05:00
Eric Eastwood 325856e14b Inclusive ranges 2024-06-27 15:57:01 -05:00
Eric Eastwood f77403251c Add better comments 2024-06-27 15:39:43 -05:00
Eric Eastwood ba56350642 Passing current tests 2024-06-27 15:31:18 -05:00
Eric Eastwood eb159c11cd Don't worry about state_reset for now
See:

 - Why no `COALESCE` https://github.com/element-hq/synapse/pull/17320#discussion_r1657435662
 - Don't worry about `state_reset` for now, https://github.com/element-hq/synapse/pull/17320#discussion_r1657562645
2024-06-27 14:38:55 -05:00
5 changed files with 284 additions and 98 deletions

View file

@ -18,7 +18,6 @@
# #
# #
import logging import logging
from collections import defaultdict
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Set, Tuple from typing import TYPE_CHECKING, Any, Dict, List, Optional, Set, Tuple
import attr import attr
@ -48,7 +47,9 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
def filter_membership_for_sync(*, membership: str, user_id: str, sender: str) -> bool: def filter_membership_for_sync(
*, membership: str, user_id: str, sender: Optional[str]
) -> bool:
""" """
Returns True if the membership event should be included in the sync response, Returns True if the membership event should be included in the sync response,
otherwise False. otherwise False.
@ -65,7 +66,13 @@ def filter_membership_for_sync(*, membership: str, user_id: str, sender: str) ->
# #
# This logic includes kicks (leave events where the sender is not the same user) and # This logic includes kicks (leave events where the sender is not the same user) and
# can be read as "anything that isn't a leave or a leave with a different sender". # can be read as "anything that isn't a leave or a leave with a different sender".
return membership != Membership.LEAVE or sender != user_id #
# When `sender=None` and `membership=Membership.LEAVE`, it means that a state reset
# happened that removed the user from the room, or the user was the last person
# locally to leave the room which caused the server to leave the room. In both
# cases, we can just remove the rooms since they are no longer relevant to the user.
# They could still be added back later if they are `newly_left`.
return membership != Membership.LEAVE or sender not in (user_id, None)
# We can't freeze this class because we want to update it in place with the # We can't freeze this class because we want to update it in place with the
@ -99,10 +106,10 @@ class _RoomMembershipForUser:
range range
""" """
event_id: str event_id: Optional[str]
event_pos: PersistedEventPosition event_pos: PersistedEventPosition
membership: str membership: str
sender: str sender: Optional[str]
newly_joined: bool newly_joined: bool
def copy_and_replace(self, **kwds: Any) -> "_RoomMembershipForUser": def copy_and_replace(self, **kwds: Any) -> "_RoomMembershipForUser":
@ -256,7 +263,8 @@ class SlidingSyncHandler:
for range in list_config.ranges: for range in list_config.ranges:
sliced_room_ids = [ sliced_room_ids = [
room_id room_id
for room_id, _ in sorted_room_info[range[0] : range[1]] # Both sides of range are inclusive
for room_id, _ in sorted_room_info[range[0] : range[1] + 1]
] ]
ops.append( ops.append(
@ -502,6 +510,8 @@ class SlidingSyncHandler:
) )
) )
# Filter the rooms that that we have updated room membership events to the point
# in time of the `to_token` (from the "1)" fixups)
filtered_sync_room_id_set = { filtered_sync_room_id_set = {
room_id: room_membership_for_user room_id: room_membership_for_user
for room_id, room_membership_for_user in sync_room_id_set.items() for room_id, room_membership_for_user in sync_room_id_set.items()
@ -540,9 +550,11 @@ class SlidingSyncHandler:
first_membership_change_by_room_id_in_from_to_range: Dict[ first_membership_change_by_room_id_in_from_to_range: Dict[
str, CurrentStateDeltaMembership str, CurrentStateDeltaMembership
] = {} ] = {}
non_join_event_ids_by_room_id_in_from_to_range: Dict[str, List[str]] = ( # Keep track if the room has a non-join event in the token range so we can later
defaultdict(list) # tell if it was a `newly_joined` room. If the last membership event in the
) # token range is a join and there is also some non-join in the range, we know
# they `newly_joined`.
has_non_join_event_by_room_id_in_from_to_range: Dict[str, bool] = {}
for ( for (
membership_change membership_change
) in current_state_delta_membership_changes_in_from_to_range: ) in current_state_delta_membership_changes_in_from_to_range:
@ -551,16 +563,13 @@ class SlidingSyncHandler:
last_membership_change_by_room_id_in_from_to_range[room_id] = ( last_membership_change_by_room_id_in_from_to_range[room_id] = (
membership_change membership_change
) )
# Only set if we haven't already set it # Only set if we haven't already set it
first_membership_change_by_room_id_in_from_to_range.setdefault( first_membership_change_by_room_id_in_from_to_range.setdefault(
room_id, membership_change room_id, membership_change
) )
if membership_change.membership != Membership.JOIN: if membership_change.membership != Membership.JOIN:
non_join_event_ids_by_room_id_in_from_to_range[room_id].append( has_non_join_event_by_room_id_in_from_to_range[room_id] = True
membership_change.event_id
)
# 2) Fixup # 2) Fixup
# #
@ -574,6 +583,7 @@ class SlidingSyncHandler:
) in last_membership_change_by_room_id_in_from_to_range.values(): ) in last_membership_change_by_room_id_in_from_to_range.values():
room_id = last_membership_change_in_from_to_range.room_id room_id = last_membership_change_in_from_to_range.room_id
# 3)
if last_membership_change_in_from_to_range.membership == Membership.JOIN: if last_membership_change_in_from_to_range.membership == Membership.JOIN:
possibly_newly_joined_room_ids.add(room_id) possibly_newly_joined_room_ids.add(room_id)
@ -592,10 +602,14 @@ class SlidingSyncHandler:
# 3) Figure out `newly_joined` # 3) Figure out `newly_joined`
prev_event_ids_before_token_range: List[str] = [] prev_event_ids_before_token_range: List[str] = []
for possibly_newly_joined_room_id in possibly_newly_joined_room_ids: for possibly_newly_joined_room_id in possibly_newly_joined_room_ids:
non_joins_for_room = non_join_event_ids_by_room_id_in_from_to_range[ has_non_join_in_from_to_range = (
possibly_newly_joined_room_id has_non_join_event_by_room_id_in_from_to_range.get(
] possibly_newly_joined_room_id, False
if len(non_joins_for_room) > 0: )
)
# If the last membership event in the token range is a join and there is
# also some non-join in the range, we know they `newly_joined`.
if has_non_join_in_from_to_range:
# We found a `newly_joined` room (we left and joined within the token range) # We found a `newly_joined` room (we left and joined within the token range)
filtered_sync_room_id_set[room_id] = filtered_sync_room_id_set[ filtered_sync_room_id_set[room_id] = filtered_sync_room_id_set[
room_id room_id
@ -968,6 +982,10 @@ class SlidingSyncHandler:
Membership.INVITE, Membership.INVITE,
Membership.KNOCK, Membership.KNOCK,
): ):
# This should never happen. If someone is invited/knocked on room, then
# there should be an event for it.
assert rooms_membership_for_user_at_to_token.event_id is not None
invite_or_knock_event = await self.store.get_event( invite_or_knock_event = await self.store.get_event(
rooms_membership_for_user_at_to_token.event_id rooms_membership_for_user_at_to_token.event_id
) )

View file

@ -123,8 +123,6 @@ class CurrentStateDeltaMembership:
room_id: The room ID of the membership event. room_id: The room ID of the membership event.
membership: The membership state of the user in the room membership: The membership state of the user in the room
sender: The person who sent the membership event sender: The person who sent the membership event
state_reset: Whether the membership in the room was changed without a
corresponding event (state reset).
""" """
event_id: Optional[str] event_id: Optional[str]
@ -133,7 +131,6 @@ class CurrentStateDeltaMembership:
room_id: str room_id: str
membership: str membership: str
sender: Optional[str] sender: Optional[str]
state_reset: bool
def generate_pagination_where_clause( def generate_pagination_where_clause(
@ -849,56 +846,37 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
min_from_id = from_key.stream min_from_id = from_key.stream
max_to_id = to_key.get_max_stream_pos() max_to_id = to_key.get_max_stream_pos()
args: List[Any] = [ args: List[Any] = [min_from_id, max_to_id, EventTypes.Member, user_id]
EventTypes.Member,
user_id,
user_id,
min_from_id,
max_to_id,
EventTypes.Member,
user_id,
]
# TODO: It would be good to assert that the `from_token`/`to_token` is >= # TODO: It would be good to assert that the `from_token`/`to_token` is >=
# the first row in `current_state_delta_stream` for the rooms we're # the first row in `current_state_delta_stream` for the rooms we're
# interested in. Otherwise, we will end up with empty results and not know # interested in. Otherwise, we will end up with empty results and not know
# it. # it.
# We have to look-up events by `stream_ordering` because # We could `COALESCE(e.stream_ordering, s.stream_id)` to get more accurate
# `current_state_delta_stream.event_id` can be `null` if the server is no # stream positioning when available but given our usages, we can avoid the
# longer in the room or a state reset happened and it was unset. # complexity. Between two (valid) stream tokens, we will still get all of
# `stream_ordering` is unique across the Synapse instance so this should # the state changes. Since those events are persisted in a batch, valid
# work fine. # tokens will either be before or after the batch of events.
# #
# We `COALESCE` the `stream_ordering` because we prefer the source of truth # `stream_ordering` from the `events` table is more accurate when available
# from the `events` table. This gives slightly more accurate results when # since the `current_state_delta_stream` table only tracks that the current
# available since `current_state_delta_stream` only tracks that the current
# state is at this stream position (not what stream position the state event # state is at this stream position (not what stream position the state event
# was added) and uses the *minimum* stream position for batches of events. # was added) and uses the *minimum* stream position for batches of events.
#
# The extra `LEFT JOIN` by stream position are only needed to tell a state
# reset from the server leaving the room. Both cases have `event_id = null`
# but if we can find a corresponding event at that stream position, then we
# know it was just the server leaving the room.
sql = """ sql = """
SELECT SELECT
COALESCE(e.event_id, e_by_stream.event_id) AS event_id, e.event_id,
s.prev_event_id, s.prev_event_id,
s.room_id, s.room_id,
s.instance_name, s.instance_name,
COALESCE(e.stream_ordering, e_by_stream.stream_ordering, s.stream_id) AS stream_ordering, s.stream_id,
COALESCE(m.membership, m_by_stream.membership) AS membership, m.membership,
COALESCE(e.sender, e_by_stream.sender) AS sender, e.sender,
m_prev.membership AS prev_membership m_prev.membership AS prev_membership
FROM current_state_delta_stream AS s FROM current_state_delta_stream AS s
LEFT JOIN events AS e ON e.event_id = s.event_id LEFT JOIN events AS e ON e.event_id = s.event_id
LEFT JOIN room_memberships AS m ON m.event_id = s.event_id LEFT JOIN room_memberships AS m ON m.event_id = s.event_id
LEFT JOIN room_memberships AS m_prev ON s.prev_event_id = m_prev.event_id LEFT JOIN room_memberships AS m_prev ON s.prev_event_id = m_prev.event_id
LEFT JOIN events AS e_by_stream ON e_by_stream.stream_ordering = s.stream_id
AND e_by_stream.type = ?
AND e_by_stream.state_key = ?
LEFT JOIN room_memberships AS m_by_stream ON m_by_stream.event_stream_ordering = s.stream_id
AND m_by_stream.user_id = ?
WHERE s.stream_id > ? AND s.stream_id <= ? WHERE s.stream_id > ? AND s.stream_id <= ?
AND s.type = ? AND s.type = ?
AND s.state_key = ? AND s.state_key = ?
@ -937,12 +915,6 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
if event_id is None and prev_membership == Membership.LEAVE: if event_id is None and prev_membership == Membership.LEAVE:
continue continue
# We can detect a state reset if there was a membership change
# without a corresponding event.
state_reset = False
if event_id is None and membership != prev_membership:
state_reset = True
membership_change = CurrentStateDeltaMembership( membership_change = CurrentStateDeltaMembership(
event_id=event_id, event_id=event_id,
event_pos=PersistedEventPosition( event_pos=PersistedEventPosition(
@ -955,7 +927,6 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
membership if membership is not None else Membership.LEAVE membership if membership is not None else Membership.LEAVE
), ),
sender=sender, sender=sender,
state_reset=state_reset,
) )
membership_changes.append(membership_change) membership_changes.append(membership_change)

View file

@ -390,7 +390,7 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
# Leave during the from_token/to_token range (newly_left) # Leave during the from_token/to_token range (newly_left)
room_id2 = self.helper.create_room_as(user1_id, tok=user1_tok) room_id2 = self.helper.create_room_as(user1_id, tok=user1_tok)
leave_response = self.helper.leave(room_id2, user1_id, tok=user1_tok) _leave_response2 = self.helper.leave(room_id2, user1_id, tok=user1_tok)
after_room2_token = self.event_sources.get_current_token() after_room2_token = self.event_sources.get_current_token()
@ -404,10 +404,13 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
# Only the newly_left room should show up # Only the newly_left room should show up
self.assertEqual(room_id_results.keys(), {room_id2}) self.assertEqual(room_id_results.keys(), {room_id2})
# It should be pointing to the latest membership event in the from/to range # It should be pointing to the latest membership event in the from/to range but
# the `event_id` is `None` because we left the room causing the server to leave
# the room because no other local users are in it (quirk of the
# `current_state_delta_stream` table that we source things from)
self.assertEqual( self.assertEqual(
room_id_results[room_id2].event_id, room_id_results[room_id2].event_id,
leave_response["event_id"], None, # _leave_response2["event_id"],
) )
# We should *NOT* be `newly_joined` because we are instead `newly_left` # We should *NOT* be `newly_joined` because we are instead `newly_left`
self.assertEqual(room_id_results[room_id2].newly_joined, False) self.assertEqual(room_id_results[room_id2].newly_joined, False)

View file

@ -1616,6 +1616,98 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
channel.json_body["lists"]["foo-list"], channel.json_body["lists"]["foo-list"],
) )
def test_sliced_windows(self) -> None:
"""
Test that the `lists` `ranges` are sliced correctly. Both sides of each range
are inclusive.
"""
user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")
room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True)
room_id2 = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True)
room_id3 = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True)
# Make the Sliding Sync request for a single room
channel = self.make_request(
"POST",
self.sync_endpoint,
{
"lists": {
"foo-list": {
"ranges": [[0, 0]],
"required_state": [
["m.room.join_rules", ""],
["m.room.history_visibility", ""],
["m.space.child", "*"],
],
"timeline_limit": 1,
}
}
},
access_token=user1_tok,
)
self.assertEqual(channel.code, 200, channel.json_body)
# Make sure it has the foo-list we requested
self.assertListEqual(
list(channel.json_body["lists"].keys()),
["foo-list"],
channel.json_body["lists"].keys(),
)
# Make sure the list is sorted in the way we expect
self.assertListEqual(
list(channel.json_body["lists"]["foo-list"]["ops"]),
[
{
"op": "SYNC",
"range": [0, 0],
"room_ids": [room_id3],
}
],
channel.json_body["lists"]["foo-list"],
)
# Make the Sliding Sync request for the first two rooms
channel = self.make_request(
"POST",
self.sync_endpoint,
{
"lists": {
"foo-list": {
"ranges": [[0, 1]],
"required_state": [
["m.room.join_rules", ""],
["m.room.history_visibility", ""],
["m.space.child", "*"],
],
"timeline_limit": 1,
}
}
},
access_token=user1_tok,
)
self.assertEqual(channel.code, 200, channel.json_body)
# Make sure it has the foo-list we requested
self.assertListEqual(
list(channel.json_body["lists"].keys()),
["foo-list"],
channel.json_body["lists"].keys(),
)
# Make sure the list is sorted in the way we expect
self.assertListEqual(
list(channel.json_body["lists"]["foo-list"]["ops"]),
[
{
"op": "SYNC",
"range": [0, 1],
"room_ids": [room_id3, room_id2],
}
],
channel.json_body["lists"]["foo-list"],
)
def test_rooms_limited_initial_sync(self) -> None: def test_rooms_limited_initial_sync(self) -> None:
""" """
Test that we mark `rooms` as `limited=True` when we saturate the `timeline_limit` Test that we mark `rooms` as `limited=True` when we saturate the `timeline_limit`

View file

@ -46,7 +46,7 @@ from synapse.types import (
from synapse.util import Clock from synapse.util import Clock
from tests.test_utils.event_injection import create_event from tests.test_utils.event_injection import create_event
from tests.unittest import FederatingHomeserverTestCase, HomeserverTestCase from tests.unittest import FederatingHomeserverTestCase, HomeserverTestCase, skip_unless
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -615,7 +615,6 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase):
room_id=room_id1, room_id=room_id1,
membership="join", membership="join",
sender=user1_id, sender=user1_id,
state_reset=False,
) )
], ],
) )
@ -717,7 +716,6 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase):
room_id=room_id1, room_id=room_id1,
membership="join", membership="join",
sender=user1_id, sender=user1_id,
state_reset=False,
), ),
CurrentStateDeltaMembership( CurrentStateDeltaMembership(
event_id=leave_response1["event_id"], event_id=leave_response1["event_id"],
@ -726,7 +724,6 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase):
room_id=room_id1, room_id=room_id1,
membership="leave", membership="leave",
sender=user1_id, sender=user1_id,
state_reset=False,
), ),
], ],
) )
@ -885,26 +882,24 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase):
room_id=room_id1, room_id=room_id1,
membership="join", membership="join",
sender=user1_id, sender=user1_id,
state_reset=False,
), ),
CurrentStateDeltaMembership( CurrentStateDeltaMembership(
event_id=leave_response1["event_id"], event_id=None, # leave_response1["event_id"],
event_pos=leave_pos1, event_pos=leave_pos1,
prev_event_id=join_response1["event_id"], prev_event_id=join_response1["event_id"],
room_id=room_id1, room_id=room_id1,
membership="leave", membership="leave",
sender=user1_id, sender=None, # user1_id,
state_reset=False,
), ),
], ],
) )
def test_membership_persisted_in_same_batch(self) -> None: def test_different_user_membership_persisted_in_same_batch(self) -> None:
""" """
Test batch of membership events being processed at once. This will result in all Test batch of membership events from different users being processed at once.
of the memberships being stored in the `current_state_delta_stream` table with This will result in all of the memberships being stored in the
the same `stream_ordering` even though the individual events have different `current_state_delta_stream` table with the same `stream_ordering` even though
`stream_ordering`s. the individual events have different `stream_ordering`s.
""" """
user1_id = self.register_user("user1", "pass") user1_id = self.register_user("user1", "pass")
_user1_tok = self.login(user1_id, "pass") _user1_tok = self.login(user1_id, "pass")
@ -924,16 +919,6 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase):
# Persist the user1, user3, and user4 join events in the same batch so they all # Persist the user1, user3, and user4 join events in the same batch so they all
# end up in the `current_state_delta_stream` table with the same # end up in the `current_state_delta_stream` table with the same
# stream_ordering. # stream_ordering.
join_event1, join_event_context1 = self.get_success(
create_event(
self.hs,
sender=user1_id,
type=EventTypes.Member,
state_key=user1_id,
content={"membership": "join"},
room_id=room_id1,
)
)
join_event3, join_event_context3 = self.get_success( join_event3, join_event_context3 = self.get_success(
create_event( create_event(
self.hs, self.hs,
@ -944,6 +929,19 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase):
room_id=room_id1, room_id=room_id1,
) )
) )
# We want to put user1 in the middle of the batch. This way, regardless of the
# implementation that inserts rows into current_state_delta_stream` (whether it
# be minimum/maximum of stream position of the batch), we will still catch bugs.
join_event1, join_event_context1 = self.get_success(
create_event(
self.hs,
sender=user1_id,
type=EventTypes.Member,
state_key=user1_id,
content={"membership": "join"},
room_id=room_id1,
)
)
join_event4, join_event_context4 = self.get_success( join_event4, join_event_context4 = self.get_success(
create_event( create_event(
self.hs, self.hs,
@ -957,8 +955,8 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase):
self.get_success( self.get_success(
self.persistence.persist_events( self.persistence.persist_events(
[ [
(join_event1, join_event_context1),
(join_event3, join_event_context3), (join_event3, join_event_context3),
(join_event1, join_event_context1),
(join_event4, join_event_context4), (join_event4, join_event_context4),
] ]
) )
@ -966,10 +964,7 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase):
after_room1_token = self.event_sources.get_current_token() after_room1_token = self.event_sources.get_current_token()
# Let's get membership changes from user3's perspective because it was in the # Get the membership changes for the user.
# middle of the batch. This way, if rows in` current_state_delta_stream` are
# stored with the first or last event's `stream_ordering`, we will still catch
# bugs.
# #
# At this point, the `current_state_delta_stream` table should look like (notice # At this point, the `current_state_delta_stream` table should look like (notice
# those three memberships at the end with `stream_id=7` because we persisted # those three memberships at the end with `stream_id=7` because we persisted
@ -987,7 +982,7 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase):
# | 7 | '!x:test' | 'm.room.member' | '@user4:test' | '$xxx' | None | # | 7 | '!x:test' | 'm.room.member' | '@user4:test' | '$xxx' | None |
membership_changes = self.get_success( membership_changes = self.get_success(
self.store.get_current_state_delta_membership_changes_for_user( self.store.get_current_state_delta_membership_changes_for_user(
user3_id, user1_id,
from_key=before_room1_token.room_key, from_key=before_room1_token.room_key,
to_key=after_room1_token.room_key, to_key=after_room1_token.room_key,
) )
@ -1003,13 +998,125 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase):
membership_changes, membership_changes,
[ [
CurrentStateDeltaMembership( CurrentStateDeltaMembership(
event_id=join_event3.event_id, event_id=join_event1.event_id,
# Ideally, this would be `join_pos1` (to match the `event_id`) but
# when events are persisted in a batch, they are all stored in the
# `current_state_delta_stream` table with the minimum
# `stream_ordering` from the batch.
event_pos=join_pos3, event_pos=join_pos3,
prev_event_id=None, prev_event_id=None,
room_id=room_id1, room_id=room_id1,
membership="join", membership="join",
sender=user3_id, sender=user1_id,
state_reset=False, ),
],
)
@skip_unless(
False,
"persist code does not support multiple membership events for the same user in the same batch",
)
def test_membership_persisted_in_same_batch(self) -> None:
"""
Test batch of membership events for the same user being processed at once.
This *should* (doesn't happen currently) result in all of the memberships being
stored in the `current_state_delta_stream` table with the same `stream_ordering`
even though the individual events have different `stream_ordering`s.
FIXME: Currently, only the `join_event` is recorded in the `current_state_delta_stream`
table.
"""
user1_id = self.register_user("user1", "pass")
_user1_tok = self.login(user1_id, "pass")
user2_id = self.register_user("user2", "pass")
user2_tok = self.login(user2_id, "pass")
before_room1_token = self.event_sources.get_current_token()
# User2 is just the designated person to create the room (we do this across the
# tests to be consistent)
room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok)
# Persist a timeline event sandwiched between two membership events so they end
# up in the `current_state_delta_stream` table with the same `stream_id`.
join_event, join_event_context = self.get_success(
create_event(
self.hs,
sender=user1_id,
type=EventTypes.Member,
state_key=user1_id,
content={"membership": "join"},
room_id=room_id1,
)
)
timeline_event, timeline_event_context = self.get_success(
create_event(
self.hs,
sender=user1_id,
type=EventTypes.Message,
state_key=user1_id,
content={"body": "foo bar", "msgtype": "m.text"},
room_id=room_id1,
)
)
leave_event, leave_event_context = self.get_success(
create_event(
self.hs,
sender=user1_id,
type=EventTypes.Member,
state_key=user1_id,
content={"membership": "leave"},
room_id=room_id1,
)
)
self.get_success(
self.persistence.persist_events(
[
(join_event, join_event_context),
(timeline_event, timeline_event_context),
(leave_event, leave_event_context),
]
)
)
after_room1_token = self.event_sources.get_current_token()
# Get the membership changes for the user.
#
# At this point, the `current_state_delta_stream` table should look like (notice
# those three memberships at the end with `stream_id=7` because we persisted
# them in the same batch):
#
# TODO: DB rows to better see what's going on.
membership_changes = self.get_success(
self.store.get_current_state_delta_membership_changes_for_user(
user1_id,
from_key=before_room1_token.room_key,
to_key=after_room1_token.room_key,
)
)
join_pos = self.get_success(
self.store.get_position_for_event(join_event.event_id)
)
# Let the whole diff show on failure
self.maxDiff = None
self.assertEqual(
membership_changes,
[
CurrentStateDeltaMembership(
event_id=leave_event.event_id,
# Ideally, this would be `leave_pos` (to match the `event_id`) but
# when events are persisted in a batch, they are all stored in the
# `current_state_delta_stream` table with the minimum
# `stream_ordering` from the batch.
event_pos=join_pos, # leave_pos,
prev_event_id=None,
room_id=room_id1,
membership="leave",
sender=user1_id,
), ),
], ],
) )
@ -1097,7 +1204,6 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase):
room_id=room_id1, room_id=room_id1,
membership="leave", membership="leave",
sender=None, # user1_id, sender=None, # user1_id,
state_reset=True,
), ),
], ],
) )
@ -1148,7 +1254,6 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase):
room_id=room_id1, room_id=room_id1,
membership="join", membership="join",
sender=user1_id, sender=user1_id,
state_reset=False,
), ),
CurrentStateDeltaMembership( CurrentStateDeltaMembership(
event_id=join_response2["event_id"], event_id=join_response2["event_id"],
@ -1157,7 +1262,6 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase):
room_id=room_id2, room_id=room_id2,
membership="join", membership="join",
sender=user1_id, sender=user1_id,
state_reset=False,
), ),
], ],
) )
@ -1184,7 +1288,6 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase):
room_id=room_id1, room_id=room_id1,
membership="join", membership="join",
sender=user1_id, sender=user1_id,
state_reset=False,
) )
], ],
) )
@ -1378,7 +1481,6 @@ class GetCurrentStateDeltaMembershipChangesForUserFederationTestCase(
room_id=intially_unjoined_room_id, room_id=intially_unjoined_room_id,
membership="join", membership="join",
sender=user1_id, sender=user1_id,
state_reset=False,
), ),
], ],
) )