mirror of
https://github.com/element-hq/synapse
synced 2024-10-05 11:52:41 +00:00
Compare commits
No commits in common. "63c7b5017ad82ee20bc2ae5898b051a2660cf188" and "81c06bec20d2f6732100672853a140a6e19ff67d" have entirely different histories.
63c7b5017a
...
81c06bec20
5 changed files with 98 additions and 284 deletions
|
@ -18,6 +18,7 @@
|
||||||
#
|
#
|
||||||
#
|
#
|
||||||
import logging
|
import logging
|
||||||
|
from collections import defaultdict
|
||||||
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Set, Tuple
|
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Set, Tuple
|
||||||
|
|
||||||
import attr
|
import attr
|
||||||
|
@ -47,9 +48,7 @@ if TYPE_CHECKING:
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
def filter_membership_for_sync(
|
def filter_membership_for_sync(*, membership: str, user_id: str, sender: str) -> bool:
|
||||||
*, membership: str, user_id: str, sender: Optional[str]
|
|
||||||
) -> bool:
|
|
||||||
"""
|
"""
|
||||||
Returns True if the membership event should be included in the sync response,
|
Returns True if the membership event should be included in the sync response,
|
||||||
otherwise False.
|
otherwise False.
|
||||||
|
@ -66,13 +65,7 @@ def filter_membership_for_sync(
|
||||||
#
|
#
|
||||||
# This logic includes kicks (leave events where the sender is not the same user) and
|
# This logic includes kicks (leave events where the sender is not the same user) and
|
||||||
# can be read as "anything that isn't a leave or a leave with a different sender".
|
# can be read as "anything that isn't a leave or a leave with a different sender".
|
||||||
#
|
return membership != Membership.LEAVE or sender != user_id
|
||||||
# When `sender=None` and `membership=Membership.LEAVE`, it means that a state reset
|
|
||||||
# happened that removed the user from the room, or the user was the last person
|
|
||||||
# locally to leave the room which caused the server to leave the room. In both
|
|
||||||
# cases, we can just remove the rooms since they are no longer relevant to the user.
|
|
||||||
# They could still be added back later if they are `newly_left`.
|
|
||||||
return membership != Membership.LEAVE or sender not in (user_id, None)
|
|
||||||
|
|
||||||
|
|
||||||
# We can't freeze this class because we want to update it in place with the
|
# We can't freeze this class because we want to update it in place with the
|
||||||
|
@ -106,10 +99,10 @@ class _RoomMembershipForUser:
|
||||||
range
|
range
|
||||||
"""
|
"""
|
||||||
|
|
||||||
event_id: Optional[str]
|
event_id: str
|
||||||
event_pos: PersistedEventPosition
|
event_pos: PersistedEventPosition
|
||||||
membership: str
|
membership: str
|
||||||
sender: Optional[str]
|
sender: str
|
||||||
newly_joined: bool
|
newly_joined: bool
|
||||||
|
|
||||||
def copy_and_replace(self, **kwds: Any) -> "_RoomMembershipForUser":
|
def copy_and_replace(self, **kwds: Any) -> "_RoomMembershipForUser":
|
||||||
|
@ -263,8 +256,7 @@ class SlidingSyncHandler:
|
||||||
for range in list_config.ranges:
|
for range in list_config.ranges:
|
||||||
sliced_room_ids = [
|
sliced_room_ids = [
|
||||||
room_id
|
room_id
|
||||||
# Both sides of range are inclusive
|
for room_id, _ in sorted_room_info[range[0] : range[1]]
|
||||||
for room_id, _ in sorted_room_info[range[0] : range[1] + 1]
|
|
||||||
]
|
]
|
||||||
|
|
||||||
ops.append(
|
ops.append(
|
||||||
|
@ -510,8 +502,6 @@ class SlidingSyncHandler:
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
# Filter the rooms that that we have updated room membership events to the point
|
|
||||||
# in time of the `to_token` (from the "1)" fixups)
|
|
||||||
filtered_sync_room_id_set = {
|
filtered_sync_room_id_set = {
|
||||||
room_id: room_membership_for_user
|
room_id: room_membership_for_user
|
||||||
for room_id, room_membership_for_user in sync_room_id_set.items()
|
for room_id, room_membership_for_user in sync_room_id_set.items()
|
||||||
|
@ -550,11 +540,9 @@ class SlidingSyncHandler:
|
||||||
first_membership_change_by_room_id_in_from_to_range: Dict[
|
first_membership_change_by_room_id_in_from_to_range: Dict[
|
||||||
str, CurrentStateDeltaMembership
|
str, CurrentStateDeltaMembership
|
||||||
] = {}
|
] = {}
|
||||||
# Keep track if the room has a non-join event in the token range so we can later
|
non_join_event_ids_by_room_id_in_from_to_range: Dict[str, List[str]] = (
|
||||||
# tell if it was a `newly_joined` room. If the last membership event in the
|
defaultdict(list)
|
||||||
# token range is a join and there is also some non-join in the range, we know
|
)
|
||||||
# they `newly_joined`.
|
|
||||||
has_non_join_event_by_room_id_in_from_to_range: Dict[str, bool] = {}
|
|
||||||
for (
|
for (
|
||||||
membership_change
|
membership_change
|
||||||
) in current_state_delta_membership_changes_in_from_to_range:
|
) in current_state_delta_membership_changes_in_from_to_range:
|
||||||
|
@ -563,13 +551,16 @@ class SlidingSyncHandler:
|
||||||
last_membership_change_by_room_id_in_from_to_range[room_id] = (
|
last_membership_change_by_room_id_in_from_to_range[room_id] = (
|
||||||
membership_change
|
membership_change
|
||||||
)
|
)
|
||||||
|
|
||||||
# Only set if we haven't already set it
|
# Only set if we haven't already set it
|
||||||
first_membership_change_by_room_id_in_from_to_range.setdefault(
|
first_membership_change_by_room_id_in_from_to_range.setdefault(
|
||||||
room_id, membership_change
|
room_id, membership_change
|
||||||
)
|
)
|
||||||
|
|
||||||
if membership_change.membership != Membership.JOIN:
|
if membership_change.membership != Membership.JOIN:
|
||||||
has_non_join_event_by_room_id_in_from_to_range[room_id] = True
|
non_join_event_ids_by_room_id_in_from_to_range[room_id].append(
|
||||||
|
membership_change.event_id
|
||||||
|
)
|
||||||
|
|
||||||
# 2) Fixup
|
# 2) Fixup
|
||||||
#
|
#
|
||||||
|
@ -583,7 +574,6 @@ class SlidingSyncHandler:
|
||||||
) in last_membership_change_by_room_id_in_from_to_range.values():
|
) in last_membership_change_by_room_id_in_from_to_range.values():
|
||||||
room_id = last_membership_change_in_from_to_range.room_id
|
room_id = last_membership_change_in_from_to_range.room_id
|
||||||
|
|
||||||
# 3)
|
|
||||||
if last_membership_change_in_from_to_range.membership == Membership.JOIN:
|
if last_membership_change_in_from_to_range.membership == Membership.JOIN:
|
||||||
possibly_newly_joined_room_ids.add(room_id)
|
possibly_newly_joined_room_ids.add(room_id)
|
||||||
|
|
||||||
|
@ -602,14 +592,10 @@ class SlidingSyncHandler:
|
||||||
# 3) Figure out `newly_joined`
|
# 3) Figure out `newly_joined`
|
||||||
prev_event_ids_before_token_range: List[str] = []
|
prev_event_ids_before_token_range: List[str] = []
|
||||||
for possibly_newly_joined_room_id in possibly_newly_joined_room_ids:
|
for possibly_newly_joined_room_id in possibly_newly_joined_room_ids:
|
||||||
has_non_join_in_from_to_range = (
|
non_joins_for_room = non_join_event_ids_by_room_id_in_from_to_range[
|
||||||
has_non_join_event_by_room_id_in_from_to_range.get(
|
possibly_newly_joined_room_id
|
||||||
possibly_newly_joined_room_id, False
|
]
|
||||||
)
|
if len(non_joins_for_room) > 0:
|
||||||
)
|
|
||||||
# If the last membership event in the token range is a join and there is
|
|
||||||
# also some non-join in the range, we know they `newly_joined`.
|
|
||||||
if has_non_join_in_from_to_range:
|
|
||||||
# We found a `newly_joined` room (we left and joined within the token range)
|
# We found a `newly_joined` room (we left and joined within the token range)
|
||||||
filtered_sync_room_id_set[room_id] = filtered_sync_room_id_set[
|
filtered_sync_room_id_set[room_id] = filtered_sync_room_id_set[
|
||||||
room_id
|
room_id
|
||||||
|
@ -982,10 +968,6 @@ class SlidingSyncHandler:
|
||||||
Membership.INVITE,
|
Membership.INVITE,
|
||||||
Membership.KNOCK,
|
Membership.KNOCK,
|
||||||
):
|
):
|
||||||
# This should never happen. If someone is invited/knocked on room, then
|
|
||||||
# there should be an event for it.
|
|
||||||
assert rooms_membership_for_user_at_to_token.event_id is not None
|
|
||||||
|
|
||||||
invite_or_knock_event = await self.store.get_event(
|
invite_or_knock_event = await self.store.get_event(
|
||||||
rooms_membership_for_user_at_to_token.event_id
|
rooms_membership_for_user_at_to_token.event_id
|
||||||
)
|
)
|
||||||
|
|
|
@ -123,6 +123,8 @@ class CurrentStateDeltaMembership:
|
||||||
room_id: The room ID of the membership event.
|
room_id: The room ID of the membership event.
|
||||||
membership: The membership state of the user in the room
|
membership: The membership state of the user in the room
|
||||||
sender: The person who sent the membership event
|
sender: The person who sent the membership event
|
||||||
|
state_reset: Whether the membership in the room was changed without a
|
||||||
|
corresponding event (state reset).
|
||||||
"""
|
"""
|
||||||
|
|
||||||
event_id: Optional[str]
|
event_id: Optional[str]
|
||||||
|
@ -131,6 +133,7 @@ class CurrentStateDeltaMembership:
|
||||||
room_id: str
|
room_id: str
|
||||||
membership: str
|
membership: str
|
||||||
sender: Optional[str]
|
sender: Optional[str]
|
||||||
|
state_reset: bool
|
||||||
|
|
||||||
|
|
||||||
def generate_pagination_where_clause(
|
def generate_pagination_where_clause(
|
||||||
|
@ -846,37 +849,56 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
||||||
min_from_id = from_key.stream
|
min_from_id = from_key.stream
|
||||||
max_to_id = to_key.get_max_stream_pos()
|
max_to_id = to_key.get_max_stream_pos()
|
||||||
|
|
||||||
args: List[Any] = [min_from_id, max_to_id, EventTypes.Member, user_id]
|
args: List[Any] = [
|
||||||
|
EventTypes.Member,
|
||||||
|
user_id,
|
||||||
|
user_id,
|
||||||
|
min_from_id,
|
||||||
|
max_to_id,
|
||||||
|
EventTypes.Member,
|
||||||
|
user_id,
|
||||||
|
]
|
||||||
|
|
||||||
# TODO: It would be good to assert that the `from_token`/`to_token` is >=
|
# TODO: It would be good to assert that the `from_token`/`to_token` is >=
|
||||||
# the first row in `current_state_delta_stream` for the rooms we're
|
# the first row in `current_state_delta_stream` for the rooms we're
|
||||||
# interested in. Otherwise, we will end up with empty results and not know
|
# interested in. Otherwise, we will end up with empty results and not know
|
||||||
# it.
|
# it.
|
||||||
|
|
||||||
# We could `COALESCE(e.stream_ordering, s.stream_id)` to get more accurate
|
# We have to look-up events by `stream_ordering` because
|
||||||
# stream positioning when available but given our usages, we can avoid the
|
# `current_state_delta_stream.event_id` can be `null` if the server is no
|
||||||
# complexity. Between two (valid) stream tokens, we will still get all of
|
# longer in the room or a state reset happened and it was unset.
|
||||||
# the state changes. Since those events are persisted in a batch, valid
|
# `stream_ordering` is unique across the Synapse instance so this should
|
||||||
# tokens will either be before or after the batch of events.
|
# work fine.
|
||||||
#
|
#
|
||||||
# `stream_ordering` from the `events` table is more accurate when available
|
# We `COALESCE` the `stream_ordering` because we prefer the source of truth
|
||||||
# since the `current_state_delta_stream` table only tracks that the current
|
# from the `events` table. This gives slightly more accurate results when
|
||||||
|
# available since `current_state_delta_stream` only tracks that the current
|
||||||
# state is at this stream position (not what stream position the state event
|
# state is at this stream position (not what stream position the state event
|
||||||
# was added) and uses the *minimum* stream position for batches of events.
|
# was added) and uses the *minimum* stream position for batches of events.
|
||||||
|
#
|
||||||
|
# The extra `LEFT JOIN` by stream position are only needed to tell a state
|
||||||
|
# reset from the server leaving the room. Both cases have `event_id = null`
|
||||||
|
# but if we can find a corresponding event at that stream position, then we
|
||||||
|
# know it was just the server leaving the room.
|
||||||
sql = """
|
sql = """
|
||||||
SELECT
|
SELECT
|
||||||
e.event_id,
|
COALESCE(e.event_id, e_by_stream.event_id) AS event_id,
|
||||||
s.prev_event_id,
|
s.prev_event_id,
|
||||||
s.room_id,
|
s.room_id,
|
||||||
s.instance_name,
|
s.instance_name,
|
||||||
s.stream_id,
|
COALESCE(e.stream_ordering, e_by_stream.stream_ordering, s.stream_id) AS stream_ordering,
|
||||||
m.membership,
|
COALESCE(m.membership, m_by_stream.membership) AS membership,
|
||||||
e.sender,
|
COALESCE(e.sender, e_by_stream.sender) AS sender,
|
||||||
m_prev.membership AS prev_membership
|
m_prev.membership AS prev_membership
|
||||||
FROM current_state_delta_stream AS s
|
FROM current_state_delta_stream AS s
|
||||||
LEFT JOIN events AS e ON e.event_id = s.event_id
|
LEFT JOIN events AS e ON e.event_id = s.event_id
|
||||||
LEFT JOIN room_memberships AS m ON m.event_id = s.event_id
|
LEFT JOIN room_memberships AS m ON m.event_id = s.event_id
|
||||||
LEFT JOIN room_memberships AS m_prev ON s.prev_event_id = m_prev.event_id
|
LEFT JOIN room_memberships AS m_prev ON s.prev_event_id = m_prev.event_id
|
||||||
|
LEFT JOIN events AS e_by_stream ON e_by_stream.stream_ordering = s.stream_id
|
||||||
|
AND e_by_stream.type = ?
|
||||||
|
AND e_by_stream.state_key = ?
|
||||||
|
LEFT JOIN room_memberships AS m_by_stream ON m_by_stream.event_stream_ordering = s.stream_id
|
||||||
|
AND m_by_stream.user_id = ?
|
||||||
WHERE s.stream_id > ? AND s.stream_id <= ?
|
WHERE s.stream_id > ? AND s.stream_id <= ?
|
||||||
AND s.type = ?
|
AND s.type = ?
|
||||||
AND s.state_key = ?
|
AND s.state_key = ?
|
||||||
|
@ -915,6 +937,12 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
||||||
if event_id is None and prev_membership == Membership.LEAVE:
|
if event_id is None and prev_membership == Membership.LEAVE:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
|
# We can detect a state reset if there was a membership change
|
||||||
|
# without a corresponding event.
|
||||||
|
state_reset = False
|
||||||
|
if event_id is None and membership != prev_membership:
|
||||||
|
state_reset = True
|
||||||
|
|
||||||
membership_change = CurrentStateDeltaMembership(
|
membership_change = CurrentStateDeltaMembership(
|
||||||
event_id=event_id,
|
event_id=event_id,
|
||||||
event_pos=PersistedEventPosition(
|
event_pos=PersistedEventPosition(
|
||||||
|
@ -927,6 +955,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
||||||
membership if membership is not None else Membership.LEAVE
|
membership if membership is not None else Membership.LEAVE
|
||||||
),
|
),
|
||||||
sender=sender,
|
sender=sender,
|
||||||
|
state_reset=state_reset,
|
||||||
)
|
)
|
||||||
|
|
||||||
membership_changes.append(membership_change)
|
membership_changes.append(membership_change)
|
||||||
|
|
|
@ -390,7 +390,7 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
|
||||||
|
|
||||||
# Leave during the from_token/to_token range (newly_left)
|
# Leave during the from_token/to_token range (newly_left)
|
||||||
room_id2 = self.helper.create_room_as(user1_id, tok=user1_tok)
|
room_id2 = self.helper.create_room_as(user1_id, tok=user1_tok)
|
||||||
_leave_response2 = self.helper.leave(room_id2, user1_id, tok=user1_tok)
|
leave_response = self.helper.leave(room_id2, user1_id, tok=user1_tok)
|
||||||
|
|
||||||
after_room2_token = self.event_sources.get_current_token()
|
after_room2_token = self.event_sources.get_current_token()
|
||||||
|
|
||||||
|
@ -404,13 +404,10 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
|
||||||
|
|
||||||
# Only the newly_left room should show up
|
# Only the newly_left room should show up
|
||||||
self.assertEqual(room_id_results.keys(), {room_id2})
|
self.assertEqual(room_id_results.keys(), {room_id2})
|
||||||
# It should be pointing to the latest membership event in the from/to range but
|
# It should be pointing to the latest membership event in the from/to range
|
||||||
# the `event_id` is `None` because we left the room causing the server to leave
|
|
||||||
# the room because no other local users are in it (quirk of the
|
|
||||||
# `current_state_delta_stream` table that we source things from)
|
|
||||||
self.assertEqual(
|
self.assertEqual(
|
||||||
room_id_results[room_id2].event_id,
|
room_id_results[room_id2].event_id,
|
||||||
None, # _leave_response2["event_id"],
|
leave_response["event_id"],
|
||||||
)
|
)
|
||||||
# We should *NOT* be `newly_joined` because we are instead `newly_left`
|
# We should *NOT* be `newly_joined` because we are instead `newly_left`
|
||||||
self.assertEqual(room_id_results[room_id2].newly_joined, False)
|
self.assertEqual(room_id_results[room_id2].newly_joined, False)
|
||||||
|
|
|
@ -1616,98 +1616,6 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
|
||||||
channel.json_body["lists"]["foo-list"],
|
channel.json_body["lists"]["foo-list"],
|
||||||
)
|
)
|
||||||
|
|
||||||
def test_sliced_windows(self) -> None:
|
|
||||||
"""
|
|
||||||
Test that the `lists` `ranges` are sliced correctly. Both sides of each range
|
|
||||||
are inclusive.
|
|
||||||
"""
|
|
||||||
user1_id = self.register_user("user1", "pass")
|
|
||||||
user1_tok = self.login(user1_id, "pass")
|
|
||||||
|
|
||||||
room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True)
|
|
||||||
room_id2 = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True)
|
|
||||||
room_id3 = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True)
|
|
||||||
|
|
||||||
# Make the Sliding Sync request for a single room
|
|
||||||
channel = self.make_request(
|
|
||||||
"POST",
|
|
||||||
self.sync_endpoint,
|
|
||||||
{
|
|
||||||
"lists": {
|
|
||||||
"foo-list": {
|
|
||||||
"ranges": [[0, 0]],
|
|
||||||
"required_state": [
|
|
||||||
["m.room.join_rules", ""],
|
|
||||||
["m.room.history_visibility", ""],
|
|
||||||
["m.space.child", "*"],
|
|
||||||
],
|
|
||||||
"timeline_limit": 1,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
},
|
|
||||||
access_token=user1_tok,
|
|
||||||
)
|
|
||||||
self.assertEqual(channel.code, 200, channel.json_body)
|
|
||||||
|
|
||||||
# Make sure it has the foo-list we requested
|
|
||||||
self.assertListEqual(
|
|
||||||
list(channel.json_body["lists"].keys()),
|
|
||||||
["foo-list"],
|
|
||||||
channel.json_body["lists"].keys(),
|
|
||||||
)
|
|
||||||
# Make sure the list is sorted in the way we expect
|
|
||||||
self.assertListEqual(
|
|
||||||
list(channel.json_body["lists"]["foo-list"]["ops"]),
|
|
||||||
[
|
|
||||||
{
|
|
||||||
"op": "SYNC",
|
|
||||||
"range": [0, 0],
|
|
||||||
"room_ids": [room_id3],
|
|
||||||
}
|
|
||||||
],
|
|
||||||
channel.json_body["lists"]["foo-list"],
|
|
||||||
)
|
|
||||||
|
|
||||||
# Make the Sliding Sync request for the first two rooms
|
|
||||||
channel = self.make_request(
|
|
||||||
"POST",
|
|
||||||
self.sync_endpoint,
|
|
||||||
{
|
|
||||||
"lists": {
|
|
||||||
"foo-list": {
|
|
||||||
"ranges": [[0, 1]],
|
|
||||||
"required_state": [
|
|
||||||
["m.room.join_rules", ""],
|
|
||||||
["m.room.history_visibility", ""],
|
|
||||||
["m.space.child", "*"],
|
|
||||||
],
|
|
||||||
"timeline_limit": 1,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
},
|
|
||||||
access_token=user1_tok,
|
|
||||||
)
|
|
||||||
self.assertEqual(channel.code, 200, channel.json_body)
|
|
||||||
|
|
||||||
# Make sure it has the foo-list we requested
|
|
||||||
self.assertListEqual(
|
|
||||||
list(channel.json_body["lists"].keys()),
|
|
||||||
["foo-list"],
|
|
||||||
channel.json_body["lists"].keys(),
|
|
||||||
)
|
|
||||||
# Make sure the list is sorted in the way we expect
|
|
||||||
self.assertListEqual(
|
|
||||||
list(channel.json_body["lists"]["foo-list"]["ops"]),
|
|
||||||
[
|
|
||||||
{
|
|
||||||
"op": "SYNC",
|
|
||||||
"range": [0, 1],
|
|
||||||
"room_ids": [room_id3, room_id2],
|
|
||||||
}
|
|
||||||
],
|
|
||||||
channel.json_body["lists"]["foo-list"],
|
|
||||||
)
|
|
||||||
|
|
||||||
def test_rooms_limited_initial_sync(self) -> None:
|
def test_rooms_limited_initial_sync(self) -> None:
|
||||||
"""
|
"""
|
||||||
Test that we mark `rooms` as `limited=True` when we saturate the `timeline_limit`
|
Test that we mark `rooms` as `limited=True` when we saturate the `timeline_limit`
|
||||||
|
|
|
@ -46,7 +46,7 @@ from synapse.types import (
|
||||||
from synapse.util import Clock
|
from synapse.util import Clock
|
||||||
|
|
||||||
from tests.test_utils.event_injection import create_event
|
from tests.test_utils.event_injection import create_event
|
||||||
from tests.unittest import FederatingHomeserverTestCase, HomeserverTestCase, skip_unless
|
from tests.unittest import FederatingHomeserverTestCase, HomeserverTestCase
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
@ -615,6 +615,7 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase):
|
||||||
room_id=room_id1,
|
room_id=room_id1,
|
||||||
membership="join",
|
membership="join",
|
||||||
sender=user1_id,
|
sender=user1_id,
|
||||||
|
state_reset=False,
|
||||||
)
|
)
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
@ -716,6 +717,7 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase):
|
||||||
room_id=room_id1,
|
room_id=room_id1,
|
||||||
membership="join",
|
membership="join",
|
||||||
sender=user1_id,
|
sender=user1_id,
|
||||||
|
state_reset=False,
|
||||||
),
|
),
|
||||||
CurrentStateDeltaMembership(
|
CurrentStateDeltaMembership(
|
||||||
event_id=leave_response1["event_id"],
|
event_id=leave_response1["event_id"],
|
||||||
|
@ -724,6 +726,7 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase):
|
||||||
room_id=room_id1,
|
room_id=room_id1,
|
||||||
membership="leave",
|
membership="leave",
|
||||||
sender=user1_id,
|
sender=user1_id,
|
||||||
|
state_reset=False,
|
||||||
),
|
),
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
@ -882,24 +885,26 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase):
|
||||||
room_id=room_id1,
|
room_id=room_id1,
|
||||||
membership="join",
|
membership="join",
|
||||||
sender=user1_id,
|
sender=user1_id,
|
||||||
|
state_reset=False,
|
||||||
),
|
),
|
||||||
CurrentStateDeltaMembership(
|
CurrentStateDeltaMembership(
|
||||||
event_id=None, # leave_response1["event_id"],
|
event_id=leave_response1["event_id"],
|
||||||
event_pos=leave_pos1,
|
event_pos=leave_pos1,
|
||||||
prev_event_id=join_response1["event_id"],
|
prev_event_id=join_response1["event_id"],
|
||||||
room_id=room_id1,
|
room_id=room_id1,
|
||||||
membership="leave",
|
membership="leave",
|
||||||
sender=None, # user1_id,
|
sender=user1_id,
|
||||||
|
state_reset=False,
|
||||||
),
|
),
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
|
||||||
def test_different_user_membership_persisted_in_same_batch(self) -> None:
|
def test_membership_persisted_in_same_batch(self) -> None:
|
||||||
"""
|
"""
|
||||||
Test batch of membership events from different users being processed at once.
|
Test batch of membership events being processed at once. This will result in all
|
||||||
This will result in all of the memberships being stored in the
|
of the memberships being stored in the `current_state_delta_stream` table with
|
||||||
`current_state_delta_stream` table with the same `stream_ordering` even though
|
the same `stream_ordering` even though the individual events have different
|
||||||
the individual events have different `stream_ordering`s.
|
`stream_ordering`s.
|
||||||
"""
|
"""
|
||||||
user1_id = self.register_user("user1", "pass")
|
user1_id = self.register_user("user1", "pass")
|
||||||
_user1_tok = self.login(user1_id, "pass")
|
_user1_tok = self.login(user1_id, "pass")
|
||||||
|
@ -919,19 +924,6 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase):
|
||||||
# Persist the user1, user3, and user4 join events in the same batch so they all
|
# Persist the user1, user3, and user4 join events in the same batch so they all
|
||||||
# end up in the `current_state_delta_stream` table with the same
|
# end up in the `current_state_delta_stream` table with the same
|
||||||
# stream_ordering.
|
# stream_ordering.
|
||||||
join_event3, join_event_context3 = self.get_success(
|
|
||||||
create_event(
|
|
||||||
self.hs,
|
|
||||||
sender=user3_id,
|
|
||||||
type=EventTypes.Member,
|
|
||||||
state_key=user3_id,
|
|
||||||
content={"membership": "join"},
|
|
||||||
room_id=room_id1,
|
|
||||||
)
|
|
||||||
)
|
|
||||||
# We want to put user1 in the middle of the batch. This way, regardless of the
|
|
||||||
# implementation that inserts rows into current_state_delta_stream` (whether it
|
|
||||||
# be minimum/maximum of stream position of the batch), we will still catch bugs.
|
|
||||||
join_event1, join_event_context1 = self.get_success(
|
join_event1, join_event_context1 = self.get_success(
|
||||||
create_event(
|
create_event(
|
||||||
self.hs,
|
self.hs,
|
||||||
|
@ -942,6 +934,16 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase):
|
||||||
room_id=room_id1,
|
room_id=room_id1,
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
join_event3, join_event_context3 = self.get_success(
|
||||||
|
create_event(
|
||||||
|
self.hs,
|
||||||
|
sender=user3_id,
|
||||||
|
type=EventTypes.Member,
|
||||||
|
state_key=user3_id,
|
||||||
|
content={"membership": "join"},
|
||||||
|
room_id=room_id1,
|
||||||
|
)
|
||||||
|
)
|
||||||
join_event4, join_event_context4 = self.get_success(
|
join_event4, join_event_context4 = self.get_success(
|
||||||
create_event(
|
create_event(
|
||||||
self.hs,
|
self.hs,
|
||||||
|
@ -955,8 +957,8 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase):
|
||||||
self.get_success(
|
self.get_success(
|
||||||
self.persistence.persist_events(
|
self.persistence.persist_events(
|
||||||
[
|
[
|
||||||
(join_event3, join_event_context3),
|
|
||||||
(join_event1, join_event_context1),
|
(join_event1, join_event_context1),
|
||||||
|
(join_event3, join_event_context3),
|
||||||
(join_event4, join_event_context4),
|
(join_event4, join_event_context4),
|
||||||
]
|
]
|
||||||
)
|
)
|
||||||
|
@ -964,7 +966,10 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase):
|
||||||
|
|
||||||
after_room1_token = self.event_sources.get_current_token()
|
after_room1_token = self.event_sources.get_current_token()
|
||||||
|
|
||||||
# Get the membership changes for the user.
|
# Let's get membership changes from user3's perspective because it was in the
|
||||||
|
# middle of the batch. This way, if rows in` current_state_delta_stream` are
|
||||||
|
# stored with the first or last event's `stream_ordering`, we will still catch
|
||||||
|
# bugs.
|
||||||
#
|
#
|
||||||
# At this point, the `current_state_delta_stream` table should look like (notice
|
# At this point, the `current_state_delta_stream` table should look like (notice
|
||||||
# those three memberships at the end with `stream_id=7` because we persisted
|
# those three memberships at the end with `stream_id=7` because we persisted
|
||||||
|
@ -982,7 +987,7 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase):
|
||||||
# | 7 | '!x:test' | 'm.room.member' | '@user4:test' | '$xxx' | None |
|
# | 7 | '!x:test' | 'm.room.member' | '@user4:test' | '$xxx' | None |
|
||||||
membership_changes = self.get_success(
|
membership_changes = self.get_success(
|
||||||
self.store.get_current_state_delta_membership_changes_for_user(
|
self.store.get_current_state_delta_membership_changes_for_user(
|
||||||
user1_id,
|
user3_id,
|
||||||
from_key=before_room1_token.room_key,
|
from_key=before_room1_token.room_key,
|
||||||
to_key=after_room1_token.room_key,
|
to_key=after_room1_token.room_key,
|
||||||
)
|
)
|
||||||
|
@ -998,125 +1003,13 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase):
|
||||||
membership_changes,
|
membership_changes,
|
||||||
[
|
[
|
||||||
CurrentStateDeltaMembership(
|
CurrentStateDeltaMembership(
|
||||||
event_id=join_event1.event_id,
|
event_id=join_event3.event_id,
|
||||||
# Ideally, this would be `join_pos1` (to match the `event_id`) but
|
|
||||||
# when events are persisted in a batch, they are all stored in the
|
|
||||||
# `current_state_delta_stream` table with the minimum
|
|
||||||
# `stream_ordering` from the batch.
|
|
||||||
event_pos=join_pos3,
|
event_pos=join_pos3,
|
||||||
prev_event_id=None,
|
prev_event_id=None,
|
||||||
room_id=room_id1,
|
room_id=room_id1,
|
||||||
membership="join",
|
membership="join",
|
||||||
sender=user1_id,
|
sender=user3_id,
|
||||||
),
|
state_reset=False,
|
||||||
],
|
|
||||||
)
|
|
||||||
|
|
||||||
@skip_unless(
|
|
||||||
False,
|
|
||||||
"persist code does not support multiple membership events for the same user in the same batch",
|
|
||||||
)
|
|
||||||
def test_membership_persisted_in_same_batch(self) -> None:
|
|
||||||
"""
|
|
||||||
Test batch of membership events for the same user being processed at once.
|
|
||||||
|
|
||||||
This *should* (doesn't happen currently) result in all of the memberships being
|
|
||||||
stored in the `current_state_delta_stream` table with the same `stream_ordering`
|
|
||||||
even though the individual events have different `stream_ordering`s.
|
|
||||||
|
|
||||||
FIXME: Currently, only the `join_event` is recorded in the `current_state_delta_stream`
|
|
||||||
table.
|
|
||||||
"""
|
|
||||||
user1_id = self.register_user("user1", "pass")
|
|
||||||
_user1_tok = self.login(user1_id, "pass")
|
|
||||||
user2_id = self.register_user("user2", "pass")
|
|
||||||
user2_tok = self.login(user2_id, "pass")
|
|
||||||
|
|
||||||
before_room1_token = self.event_sources.get_current_token()
|
|
||||||
|
|
||||||
# User2 is just the designated person to create the room (we do this across the
|
|
||||||
# tests to be consistent)
|
|
||||||
room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok)
|
|
||||||
|
|
||||||
# Persist a timeline event sandwiched between two membership events so they end
|
|
||||||
# up in the `current_state_delta_stream` table with the same `stream_id`.
|
|
||||||
join_event, join_event_context = self.get_success(
|
|
||||||
create_event(
|
|
||||||
self.hs,
|
|
||||||
sender=user1_id,
|
|
||||||
type=EventTypes.Member,
|
|
||||||
state_key=user1_id,
|
|
||||||
content={"membership": "join"},
|
|
||||||
room_id=room_id1,
|
|
||||||
)
|
|
||||||
)
|
|
||||||
timeline_event, timeline_event_context = self.get_success(
|
|
||||||
create_event(
|
|
||||||
self.hs,
|
|
||||||
sender=user1_id,
|
|
||||||
type=EventTypes.Message,
|
|
||||||
state_key=user1_id,
|
|
||||||
content={"body": "foo bar", "msgtype": "m.text"},
|
|
||||||
room_id=room_id1,
|
|
||||||
)
|
|
||||||
)
|
|
||||||
leave_event, leave_event_context = self.get_success(
|
|
||||||
create_event(
|
|
||||||
self.hs,
|
|
||||||
sender=user1_id,
|
|
||||||
type=EventTypes.Member,
|
|
||||||
state_key=user1_id,
|
|
||||||
content={"membership": "leave"},
|
|
||||||
room_id=room_id1,
|
|
||||||
)
|
|
||||||
)
|
|
||||||
self.get_success(
|
|
||||||
self.persistence.persist_events(
|
|
||||||
[
|
|
||||||
(join_event, join_event_context),
|
|
||||||
(timeline_event, timeline_event_context),
|
|
||||||
(leave_event, leave_event_context),
|
|
||||||
]
|
|
||||||
)
|
|
||||||
)
|
|
||||||
|
|
||||||
after_room1_token = self.event_sources.get_current_token()
|
|
||||||
|
|
||||||
# Get the membership changes for the user.
|
|
||||||
#
|
|
||||||
# At this point, the `current_state_delta_stream` table should look like (notice
|
|
||||||
# those three memberships at the end with `stream_id=7` because we persisted
|
|
||||||
# them in the same batch):
|
|
||||||
#
|
|
||||||
# TODO: DB rows to better see what's going on.
|
|
||||||
membership_changes = self.get_success(
|
|
||||||
self.store.get_current_state_delta_membership_changes_for_user(
|
|
||||||
user1_id,
|
|
||||||
from_key=before_room1_token.room_key,
|
|
||||||
to_key=after_room1_token.room_key,
|
|
||||||
)
|
|
||||||
)
|
|
||||||
|
|
||||||
join_pos = self.get_success(
|
|
||||||
self.store.get_position_for_event(join_event.event_id)
|
|
||||||
)
|
|
||||||
|
|
||||||
# Let the whole diff show on failure
|
|
||||||
self.maxDiff = None
|
|
||||||
self.assertEqual(
|
|
||||||
membership_changes,
|
|
||||||
[
|
|
||||||
CurrentStateDeltaMembership(
|
|
||||||
event_id=leave_event.event_id,
|
|
||||||
# Ideally, this would be `leave_pos` (to match the `event_id`) but
|
|
||||||
# when events are persisted in a batch, they are all stored in the
|
|
||||||
# `current_state_delta_stream` table with the minimum
|
|
||||||
# `stream_ordering` from the batch.
|
|
||||||
event_pos=join_pos, # leave_pos,
|
|
||||||
prev_event_id=None,
|
|
||||||
room_id=room_id1,
|
|
||||||
membership="leave",
|
|
||||||
sender=user1_id,
|
|
||||||
),
|
),
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
@ -1204,6 +1097,7 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase):
|
||||||
room_id=room_id1,
|
room_id=room_id1,
|
||||||
membership="leave",
|
membership="leave",
|
||||||
sender=None, # user1_id,
|
sender=None, # user1_id,
|
||||||
|
state_reset=True,
|
||||||
),
|
),
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
@ -1254,6 +1148,7 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase):
|
||||||
room_id=room_id1,
|
room_id=room_id1,
|
||||||
membership="join",
|
membership="join",
|
||||||
sender=user1_id,
|
sender=user1_id,
|
||||||
|
state_reset=False,
|
||||||
),
|
),
|
||||||
CurrentStateDeltaMembership(
|
CurrentStateDeltaMembership(
|
||||||
event_id=join_response2["event_id"],
|
event_id=join_response2["event_id"],
|
||||||
|
@ -1262,6 +1157,7 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase):
|
||||||
room_id=room_id2,
|
room_id=room_id2,
|
||||||
membership="join",
|
membership="join",
|
||||||
sender=user1_id,
|
sender=user1_id,
|
||||||
|
state_reset=False,
|
||||||
),
|
),
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
@ -1288,6 +1184,7 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase):
|
||||||
room_id=room_id1,
|
room_id=room_id1,
|
||||||
membership="join",
|
membership="join",
|
||||||
sender=user1_id,
|
sender=user1_id,
|
||||||
|
state_reset=False,
|
||||||
)
|
)
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
@ -1481,6 +1378,7 @@ class GetCurrentStateDeltaMembershipChangesForUserFederationTestCase(
|
||||||
room_id=intially_unjoined_room_id,
|
room_id=intially_unjoined_room_id,
|
||||||
membership="join",
|
membership="join",
|
||||||
sender=user1_id,
|
sender=user1_id,
|
||||||
|
state_reset=False,
|
||||||
),
|
),
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
|
Loading…
Reference in a new issue