Compare commits

..

No commits in common. "63c7b5017ad82ee20bc2ae5898b051a2660cf188" and "81c06bec20d2f6732100672853a140a6e19ff67d" have entirely different histories.

5 changed files with 98 additions and 284 deletions

View file

@ -18,6 +18,7 @@
# #
# #
import logging import logging
from collections import defaultdict
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Set, Tuple from typing import TYPE_CHECKING, Any, Dict, List, Optional, Set, Tuple
import attr import attr
@ -47,9 +48,7 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
def filter_membership_for_sync( def filter_membership_for_sync(*, membership: str, user_id: str, sender: str) -> bool:
*, membership: str, user_id: str, sender: Optional[str]
) -> bool:
""" """
Returns True if the membership event should be included in the sync response, Returns True if the membership event should be included in the sync response,
otherwise False. otherwise False.
@ -66,13 +65,7 @@ def filter_membership_for_sync(
# #
# This logic includes kicks (leave events where the sender is not the same user) and # This logic includes kicks (leave events where the sender is not the same user) and
# can be read as "anything that isn't a leave or a leave with a different sender". # can be read as "anything that isn't a leave or a leave with a different sender".
# return membership != Membership.LEAVE or sender != user_id
# When `sender=None` and `membership=Membership.LEAVE`, it means that a state reset
# happened that removed the user from the room, or the user was the last person
# locally to leave the room which caused the server to leave the room. In both
# cases, we can just remove the rooms since they are no longer relevant to the user.
# They could still be added back later if they are `newly_left`.
return membership != Membership.LEAVE or sender not in (user_id, None)
# We can't freeze this class because we want to update it in place with the # We can't freeze this class because we want to update it in place with the
@ -106,10 +99,10 @@ class _RoomMembershipForUser:
range range
""" """
event_id: Optional[str] event_id: str
event_pos: PersistedEventPosition event_pos: PersistedEventPosition
membership: str membership: str
sender: Optional[str] sender: str
newly_joined: bool newly_joined: bool
def copy_and_replace(self, **kwds: Any) -> "_RoomMembershipForUser": def copy_and_replace(self, **kwds: Any) -> "_RoomMembershipForUser":
@ -263,8 +256,7 @@ class SlidingSyncHandler:
for range in list_config.ranges: for range in list_config.ranges:
sliced_room_ids = [ sliced_room_ids = [
room_id room_id
# Both sides of range are inclusive for room_id, _ in sorted_room_info[range[0] : range[1]]
for room_id, _ in sorted_room_info[range[0] : range[1] + 1]
] ]
ops.append( ops.append(
@ -510,8 +502,6 @@ class SlidingSyncHandler:
) )
) )
# Filter the rooms that that we have updated room membership events to the point
# in time of the `to_token` (from the "1)" fixups)
filtered_sync_room_id_set = { filtered_sync_room_id_set = {
room_id: room_membership_for_user room_id: room_membership_for_user
for room_id, room_membership_for_user in sync_room_id_set.items() for room_id, room_membership_for_user in sync_room_id_set.items()
@ -550,11 +540,9 @@ class SlidingSyncHandler:
first_membership_change_by_room_id_in_from_to_range: Dict[ first_membership_change_by_room_id_in_from_to_range: Dict[
str, CurrentStateDeltaMembership str, CurrentStateDeltaMembership
] = {} ] = {}
# Keep track if the room has a non-join event in the token range so we can later non_join_event_ids_by_room_id_in_from_to_range: Dict[str, List[str]] = (
# tell if it was a `newly_joined` room. If the last membership event in the defaultdict(list)
# token range is a join and there is also some non-join in the range, we know )
# they `newly_joined`.
has_non_join_event_by_room_id_in_from_to_range: Dict[str, bool] = {}
for ( for (
membership_change membership_change
) in current_state_delta_membership_changes_in_from_to_range: ) in current_state_delta_membership_changes_in_from_to_range:
@ -563,13 +551,16 @@ class SlidingSyncHandler:
last_membership_change_by_room_id_in_from_to_range[room_id] = ( last_membership_change_by_room_id_in_from_to_range[room_id] = (
membership_change membership_change
) )
# Only set if we haven't already set it # Only set if we haven't already set it
first_membership_change_by_room_id_in_from_to_range.setdefault( first_membership_change_by_room_id_in_from_to_range.setdefault(
room_id, membership_change room_id, membership_change
) )
if membership_change.membership != Membership.JOIN: if membership_change.membership != Membership.JOIN:
has_non_join_event_by_room_id_in_from_to_range[room_id] = True non_join_event_ids_by_room_id_in_from_to_range[room_id].append(
membership_change.event_id
)
# 2) Fixup # 2) Fixup
# #
@ -583,7 +574,6 @@ class SlidingSyncHandler:
) in last_membership_change_by_room_id_in_from_to_range.values(): ) in last_membership_change_by_room_id_in_from_to_range.values():
room_id = last_membership_change_in_from_to_range.room_id room_id = last_membership_change_in_from_to_range.room_id
# 3)
if last_membership_change_in_from_to_range.membership == Membership.JOIN: if last_membership_change_in_from_to_range.membership == Membership.JOIN:
possibly_newly_joined_room_ids.add(room_id) possibly_newly_joined_room_ids.add(room_id)
@ -602,14 +592,10 @@ class SlidingSyncHandler:
# 3) Figure out `newly_joined` # 3) Figure out `newly_joined`
prev_event_ids_before_token_range: List[str] = [] prev_event_ids_before_token_range: List[str] = []
for possibly_newly_joined_room_id in possibly_newly_joined_room_ids: for possibly_newly_joined_room_id in possibly_newly_joined_room_ids:
has_non_join_in_from_to_range = ( non_joins_for_room = non_join_event_ids_by_room_id_in_from_to_range[
has_non_join_event_by_room_id_in_from_to_range.get( possibly_newly_joined_room_id
possibly_newly_joined_room_id, False ]
) if len(non_joins_for_room) > 0:
)
# If the last membership event in the token range is a join and there is
# also some non-join in the range, we know they `newly_joined`.
if has_non_join_in_from_to_range:
# We found a `newly_joined` room (we left and joined within the token range) # We found a `newly_joined` room (we left and joined within the token range)
filtered_sync_room_id_set[room_id] = filtered_sync_room_id_set[ filtered_sync_room_id_set[room_id] = filtered_sync_room_id_set[
room_id room_id
@ -982,10 +968,6 @@ class SlidingSyncHandler:
Membership.INVITE, Membership.INVITE,
Membership.KNOCK, Membership.KNOCK,
): ):
# This should never happen. If someone is invited/knocked on room, then
# there should be an event for it.
assert rooms_membership_for_user_at_to_token.event_id is not None
invite_or_knock_event = await self.store.get_event( invite_or_knock_event = await self.store.get_event(
rooms_membership_for_user_at_to_token.event_id rooms_membership_for_user_at_to_token.event_id
) )

View file

@ -123,6 +123,8 @@ class CurrentStateDeltaMembership:
room_id: The room ID of the membership event. room_id: The room ID of the membership event.
membership: The membership state of the user in the room membership: The membership state of the user in the room
sender: The person who sent the membership event sender: The person who sent the membership event
state_reset: Whether the membership in the room was changed without a
corresponding event (state reset).
""" """
event_id: Optional[str] event_id: Optional[str]
@ -131,6 +133,7 @@ class CurrentStateDeltaMembership:
room_id: str room_id: str
membership: str membership: str
sender: Optional[str] sender: Optional[str]
state_reset: bool
def generate_pagination_where_clause( def generate_pagination_where_clause(
@ -846,37 +849,56 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
min_from_id = from_key.stream min_from_id = from_key.stream
max_to_id = to_key.get_max_stream_pos() max_to_id = to_key.get_max_stream_pos()
args: List[Any] = [min_from_id, max_to_id, EventTypes.Member, user_id] args: List[Any] = [
EventTypes.Member,
user_id,
user_id,
min_from_id,
max_to_id,
EventTypes.Member,
user_id,
]
# TODO: It would be good to assert that the `from_token`/`to_token` is >= # TODO: It would be good to assert that the `from_token`/`to_token` is >=
# the first row in `current_state_delta_stream` for the rooms we're # the first row in `current_state_delta_stream` for the rooms we're
# interested in. Otherwise, we will end up with empty results and not know # interested in. Otherwise, we will end up with empty results and not know
# it. # it.
# We could `COALESCE(e.stream_ordering, s.stream_id)` to get more accurate # We have to look-up events by `stream_ordering` because
# stream positioning when available but given our usages, we can avoid the # `current_state_delta_stream.event_id` can be `null` if the server is no
# complexity. Between two (valid) stream tokens, we will still get all of # longer in the room or a state reset happened and it was unset.
# the state changes. Since those events are persisted in a batch, valid # `stream_ordering` is unique across the Synapse instance so this should
# tokens will either be before or after the batch of events. # work fine.
# #
# `stream_ordering` from the `events` table is more accurate when available # We `COALESCE` the `stream_ordering` because we prefer the source of truth
# since the `current_state_delta_stream` table only tracks that the current # from the `events` table. This gives slightly more accurate results when
# available since `current_state_delta_stream` only tracks that the current
# state is at this stream position (not what stream position the state event # state is at this stream position (not what stream position the state event
# was added) and uses the *minimum* stream position for batches of events. # was added) and uses the *minimum* stream position for batches of events.
#
# The extra `LEFT JOIN` by stream position are only needed to tell a state
# reset from the server leaving the room. Both cases have `event_id = null`
# but if we can find a corresponding event at that stream position, then we
# know it was just the server leaving the room.
sql = """ sql = """
SELECT SELECT
e.event_id, COALESCE(e.event_id, e_by_stream.event_id) AS event_id,
s.prev_event_id, s.prev_event_id,
s.room_id, s.room_id,
s.instance_name, s.instance_name,
s.stream_id, COALESCE(e.stream_ordering, e_by_stream.stream_ordering, s.stream_id) AS stream_ordering,
m.membership, COALESCE(m.membership, m_by_stream.membership) AS membership,
e.sender, COALESCE(e.sender, e_by_stream.sender) AS sender,
m_prev.membership AS prev_membership m_prev.membership AS prev_membership
FROM current_state_delta_stream AS s FROM current_state_delta_stream AS s
LEFT JOIN events AS e ON e.event_id = s.event_id LEFT JOIN events AS e ON e.event_id = s.event_id
LEFT JOIN room_memberships AS m ON m.event_id = s.event_id LEFT JOIN room_memberships AS m ON m.event_id = s.event_id
LEFT JOIN room_memberships AS m_prev ON s.prev_event_id = m_prev.event_id LEFT JOIN room_memberships AS m_prev ON s.prev_event_id = m_prev.event_id
LEFT JOIN events AS e_by_stream ON e_by_stream.stream_ordering = s.stream_id
AND e_by_stream.type = ?
AND e_by_stream.state_key = ?
LEFT JOIN room_memberships AS m_by_stream ON m_by_stream.event_stream_ordering = s.stream_id
AND m_by_stream.user_id = ?
WHERE s.stream_id > ? AND s.stream_id <= ? WHERE s.stream_id > ? AND s.stream_id <= ?
AND s.type = ? AND s.type = ?
AND s.state_key = ? AND s.state_key = ?
@ -915,6 +937,12 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
if event_id is None and prev_membership == Membership.LEAVE: if event_id is None and prev_membership == Membership.LEAVE:
continue continue
# We can detect a state reset if there was a membership change
# without a corresponding event.
state_reset = False
if event_id is None and membership != prev_membership:
state_reset = True
membership_change = CurrentStateDeltaMembership( membership_change = CurrentStateDeltaMembership(
event_id=event_id, event_id=event_id,
event_pos=PersistedEventPosition( event_pos=PersistedEventPosition(
@ -927,6 +955,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
membership if membership is not None else Membership.LEAVE membership if membership is not None else Membership.LEAVE
), ),
sender=sender, sender=sender,
state_reset=state_reset,
) )
membership_changes.append(membership_change) membership_changes.append(membership_change)

View file

@ -390,7 +390,7 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
# Leave during the from_token/to_token range (newly_left) # Leave during the from_token/to_token range (newly_left)
room_id2 = self.helper.create_room_as(user1_id, tok=user1_tok) room_id2 = self.helper.create_room_as(user1_id, tok=user1_tok)
_leave_response2 = self.helper.leave(room_id2, user1_id, tok=user1_tok) leave_response = self.helper.leave(room_id2, user1_id, tok=user1_tok)
after_room2_token = self.event_sources.get_current_token() after_room2_token = self.event_sources.get_current_token()
@ -404,13 +404,10 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
# Only the newly_left room should show up # Only the newly_left room should show up
self.assertEqual(room_id_results.keys(), {room_id2}) self.assertEqual(room_id_results.keys(), {room_id2})
# It should be pointing to the latest membership event in the from/to range but # It should be pointing to the latest membership event in the from/to range
# the `event_id` is `None` because we left the room causing the server to leave
# the room because no other local users are in it (quirk of the
# `current_state_delta_stream` table that we source things from)
self.assertEqual( self.assertEqual(
room_id_results[room_id2].event_id, room_id_results[room_id2].event_id,
None, # _leave_response2["event_id"], leave_response["event_id"],
) )
# We should *NOT* be `newly_joined` because we are instead `newly_left` # We should *NOT* be `newly_joined` because we are instead `newly_left`
self.assertEqual(room_id_results[room_id2].newly_joined, False) self.assertEqual(room_id_results[room_id2].newly_joined, False)

View file

@ -1616,98 +1616,6 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
channel.json_body["lists"]["foo-list"], channel.json_body["lists"]["foo-list"],
) )
def test_sliced_windows(self) -> None:
"""
Test that the `lists` `ranges` are sliced correctly. Both sides of each range
are inclusive.
"""
user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")
room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True)
room_id2 = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True)
room_id3 = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True)
# Make the Sliding Sync request for a single room
channel = self.make_request(
"POST",
self.sync_endpoint,
{
"lists": {
"foo-list": {
"ranges": [[0, 0]],
"required_state": [
["m.room.join_rules", ""],
["m.room.history_visibility", ""],
["m.space.child", "*"],
],
"timeline_limit": 1,
}
}
},
access_token=user1_tok,
)
self.assertEqual(channel.code, 200, channel.json_body)
# Make sure it has the foo-list we requested
self.assertListEqual(
list(channel.json_body["lists"].keys()),
["foo-list"],
channel.json_body["lists"].keys(),
)
# Make sure the list is sorted in the way we expect
self.assertListEqual(
list(channel.json_body["lists"]["foo-list"]["ops"]),
[
{
"op": "SYNC",
"range": [0, 0],
"room_ids": [room_id3],
}
],
channel.json_body["lists"]["foo-list"],
)
# Make the Sliding Sync request for the first two rooms
channel = self.make_request(
"POST",
self.sync_endpoint,
{
"lists": {
"foo-list": {
"ranges": [[0, 1]],
"required_state": [
["m.room.join_rules", ""],
["m.room.history_visibility", ""],
["m.space.child", "*"],
],
"timeline_limit": 1,
}
}
},
access_token=user1_tok,
)
self.assertEqual(channel.code, 200, channel.json_body)
# Make sure it has the foo-list we requested
self.assertListEqual(
list(channel.json_body["lists"].keys()),
["foo-list"],
channel.json_body["lists"].keys(),
)
# Make sure the list is sorted in the way we expect
self.assertListEqual(
list(channel.json_body["lists"]["foo-list"]["ops"]),
[
{
"op": "SYNC",
"range": [0, 1],
"room_ids": [room_id3, room_id2],
}
],
channel.json_body["lists"]["foo-list"],
)
def test_rooms_limited_initial_sync(self) -> None: def test_rooms_limited_initial_sync(self) -> None:
""" """
Test that we mark `rooms` as `limited=True` when we saturate the `timeline_limit` Test that we mark `rooms` as `limited=True` when we saturate the `timeline_limit`

View file

@ -46,7 +46,7 @@ from synapse.types import (
from synapse.util import Clock from synapse.util import Clock
from tests.test_utils.event_injection import create_event from tests.test_utils.event_injection import create_event
from tests.unittest import FederatingHomeserverTestCase, HomeserverTestCase, skip_unless from tests.unittest import FederatingHomeserverTestCase, HomeserverTestCase
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -615,6 +615,7 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase):
room_id=room_id1, room_id=room_id1,
membership="join", membership="join",
sender=user1_id, sender=user1_id,
state_reset=False,
) )
], ],
) )
@ -716,6 +717,7 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase):
room_id=room_id1, room_id=room_id1,
membership="join", membership="join",
sender=user1_id, sender=user1_id,
state_reset=False,
), ),
CurrentStateDeltaMembership( CurrentStateDeltaMembership(
event_id=leave_response1["event_id"], event_id=leave_response1["event_id"],
@ -724,6 +726,7 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase):
room_id=room_id1, room_id=room_id1,
membership="leave", membership="leave",
sender=user1_id, sender=user1_id,
state_reset=False,
), ),
], ],
) )
@ -882,24 +885,26 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase):
room_id=room_id1, room_id=room_id1,
membership="join", membership="join",
sender=user1_id, sender=user1_id,
state_reset=False,
), ),
CurrentStateDeltaMembership( CurrentStateDeltaMembership(
event_id=None, # leave_response1["event_id"], event_id=leave_response1["event_id"],
event_pos=leave_pos1, event_pos=leave_pos1,
prev_event_id=join_response1["event_id"], prev_event_id=join_response1["event_id"],
room_id=room_id1, room_id=room_id1,
membership="leave", membership="leave",
sender=None, # user1_id, sender=user1_id,
state_reset=False,
), ),
], ],
) )
def test_different_user_membership_persisted_in_same_batch(self) -> None: def test_membership_persisted_in_same_batch(self) -> None:
""" """
Test batch of membership events from different users being processed at once. Test batch of membership events being processed at once. This will result in all
This will result in all of the memberships being stored in the of the memberships being stored in the `current_state_delta_stream` table with
`current_state_delta_stream` table with the same `stream_ordering` even though the same `stream_ordering` even though the individual events have different
the individual events have different `stream_ordering`s. `stream_ordering`s.
""" """
user1_id = self.register_user("user1", "pass") user1_id = self.register_user("user1", "pass")
_user1_tok = self.login(user1_id, "pass") _user1_tok = self.login(user1_id, "pass")
@ -919,19 +924,6 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase):
# Persist the user1, user3, and user4 join events in the same batch so they all # Persist the user1, user3, and user4 join events in the same batch so they all
# end up in the `current_state_delta_stream` table with the same # end up in the `current_state_delta_stream` table with the same
# stream_ordering. # stream_ordering.
join_event3, join_event_context3 = self.get_success(
create_event(
self.hs,
sender=user3_id,
type=EventTypes.Member,
state_key=user3_id,
content={"membership": "join"},
room_id=room_id1,
)
)
# We want to put user1 in the middle of the batch. This way, regardless of the
# implementation that inserts rows into current_state_delta_stream` (whether it
# be minimum/maximum of stream position of the batch), we will still catch bugs.
join_event1, join_event_context1 = self.get_success( join_event1, join_event_context1 = self.get_success(
create_event( create_event(
self.hs, self.hs,
@ -942,6 +934,16 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase):
room_id=room_id1, room_id=room_id1,
) )
) )
join_event3, join_event_context3 = self.get_success(
create_event(
self.hs,
sender=user3_id,
type=EventTypes.Member,
state_key=user3_id,
content={"membership": "join"},
room_id=room_id1,
)
)
join_event4, join_event_context4 = self.get_success( join_event4, join_event_context4 = self.get_success(
create_event( create_event(
self.hs, self.hs,
@ -955,8 +957,8 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase):
self.get_success( self.get_success(
self.persistence.persist_events( self.persistence.persist_events(
[ [
(join_event3, join_event_context3),
(join_event1, join_event_context1), (join_event1, join_event_context1),
(join_event3, join_event_context3),
(join_event4, join_event_context4), (join_event4, join_event_context4),
] ]
) )
@ -964,7 +966,10 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase):
after_room1_token = self.event_sources.get_current_token() after_room1_token = self.event_sources.get_current_token()
# Get the membership changes for the user. # Let's get membership changes from user3's perspective because it was in the
# middle of the batch. This way, if rows in` current_state_delta_stream` are
# stored with the first or last event's `stream_ordering`, we will still catch
# bugs.
# #
# At this point, the `current_state_delta_stream` table should look like (notice # At this point, the `current_state_delta_stream` table should look like (notice
# those three memberships at the end with `stream_id=7` because we persisted # those three memberships at the end with `stream_id=7` because we persisted
@ -982,7 +987,7 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase):
# | 7 | '!x:test' | 'm.room.member' | '@user4:test' | '$xxx' | None | # | 7 | '!x:test' | 'm.room.member' | '@user4:test' | '$xxx' | None |
membership_changes = self.get_success( membership_changes = self.get_success(
self.store.get_current_state_delta_membership_changes_for_user( self.store.get_current_state_delta_membership_changes_for_user(
user1_id, user3_id,
from_key=before_room1_token.room_key, from_key=before_room1_token.room_key,
to_key=after_room1_token.room_key, to_key=after_room1_token.room_key,
) )
@ -998,125 +1003,13 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase):
membership_changes, membership_changes,
[ [
CurrentStateDeltaMembership( CurrentStateDeltaMembership(
event_id=join_event1.event_id, event_id=join_event3.event_id,
# Ideally, this would be `join_pos1` (to match the `event_id`) but
# when events are persisted in a batch, they are all stored in the
# `current_state_delta_stream` table with the minimum
# `stream_ordering` from the batch.
event_pos=join_pos3, event_pos=join_pos3,
prev_event_id=None, prev_event_id=None,
room_id=room_id1, room_id=room_id1,
membership="join", membership="join",
sender=user1_id, sender=user3_id,
), state_reset=False,
],
)
@skip_unless(
False,
"persist code does not support multiple membership events for the same user in the same batch",
)
def test_membership_persisted_in_same_batch(self) -> None:
"""
Test batch of membership events for the same user being processed at once.
This *should* (doesn't happen currently) result in all of the memberships being
stored in the `current_state_delta_stream` table with the same `stream_ordering`
even though the individual events have different `stream_ordering`s.
FIXME: Currently, only the `join_event` is recorded in the `current_state_delta_stream`
table.
"""
user1_id = self.register_user("user1", "pass")
_user1_tok = self.login(user1_id, "pass")
user2_id = self.register_user("user2", "pass")
user2_tok = self.login(user2_id, "pass")
before_room1_token = self.event_sources.get_current_token()
# User2 is just the designated person to create the room (we do this across the
# tests to be consistent)
room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok)
# Persist a timeline event sandwiched between two membership events so they end
# up in the `current_state_delta_stream` table with the same `stream_id`.
join_event, join_event_context = self.get_success(
create_event(
self.hs,
sender=user1_id,
type=EventTypes.Member,
state_key=user1_id,
content={"membership": "join"},
room_id=room_id1,
)
)
timeline_event, timeline_event_context = self.get_success(
create_event(
self.hs,
sender=user1_id,
type=EventTypes.Message,
state_key=user1_id,
content={"body": "foo bar", "msgtype": "m.text"},
room_id=room_id1,
)
)
leave_event, leave_event_context = self.get_success(
create_event(
self.hs,
sender=user1_id,
type=EventTypes.Member,
state_key=user1_id,
content={"membership": "leave"},
room_id=room_id1,
)
)
self.get_success(
self.persistence.persist_events(
[
(join_event, join_event_context),
(timeline_event, timeline_event_context),
(leave_event, leave_event_context),
]
)
)
after_room1_token = self.event_sources.get_current_token()
# Get the membership changes for the user.
#
# At this point, the `current_state_delta_stream` table should look like (notice
# those three memberships at the end with `stream_id=7` because we persisted
# them in the same batch):
#
# TODO: DB rows to better see what's going on.
membership_changes = self.get_success(
self.store.get_current_state_delta_membership_changes_for_user(
user1_id,
from_key=before_room1_token.room_key,
to_key=after_room1_token.room_key,
)
)
join_pos = self.get_success(
self.store.get_position_for_event(join_event.event_id)
)
# Let the whole diff show on failure
self.maxDiff = None
self.assertEqual(
membership_changes,
[
CurrentStateDeltaMembership(
event_id=leave_event.event_id,
# Ideally, this would be `leave_pos` (to match the `event_id`) but
# when events are persisted in a batch, they are all stored in the
# `current_state_delta_stream` table with the minimum
# `stream_ordering` from the batch.
event_pos=join_pos, # leave_pos,
prev_event_id=None,
room_id=room_id1,
membership="leave",
sender=user1_id,
), ),
], ],
) )
@ -1204,6 +1097,7 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase):
room_id=room_id1, room_id=room_id1,
membership="leave", membership="leave",
sender=None, # user1_id, sender=None, # user1_id,
state_reset=True,
), ),
], ],
) )
@ -1254,6 +1148,7 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase):
room_id=room_id1, room_id=room_id1,
membership="join", membership="join",
sender=user1_id, sender=user1_id,
state_reset=False,
), ),
CurrentStateDeltaMembership( CurrentStateDeltaMembership(
event_id=join_response2["event_id"], event_id=join_response2["event_id"],
@ -1262,6 +1157,7 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase):
room_id=room_id2, room_id=room_id2,
membership="join", membership="join",
sender=user1_id, sender=user1_id,
state_reset=False,
), ),
], ],
) )
@ -1288,6 +1184,7 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase):
room_id=room_id1, room_id=room_id1,
membership="join", membership="join",
sender=user1_id, sender=user1_id,
state_reset=False,
) )
], ],
) )
@ -1481,6 +1378,7 @@ class GetCurrentStateDeltaMembershipChangesForUserFederationTestCase(
room_id=intially_unjoined_room_id, room_id=intially_unjoined_room_id,
membership="join", membership="join",
sender=user1_id, sender=user1_id,
state_reset=False,
), ),
], ],
) )