mirror of
https://github.com/element-hq/synapse
synced 2024-10-05 13:02:41 +00:00
1040 lines
38 KiB
Python
1040 lines
38 KiB
Python
#
|
|
# This file is licensed under the Affero General Public License (AGPL) version 3.
|
|
#
|
|
# Copyright 2021 The Matrix.org Foundation C.I.C.
|
|
# Copyright (C) 2023 New Vector, Ltd
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU Affero General Public License as
|
|
# published by the Free Software Foundation, either version 3 of the
|
|
# License, or (at your option) any later version.
|
|
#
|
|
# See the GNU Affero General Public License for more details:
|
|
# <https://www.gnu.org/licenses/agpl-3.0.html>.
|
|
#
|
|
# Originally licensed under the Apache License, Version 2.0:
|
|
# <http://www.apache.org/licenses/LICENSE-2.0>.
|
|
#
|
|
# [This file includes modifications made by New Vector Limited]
|
|
#
|
|
#
|
|
|
|
import logging
|
|
from typing import List, Tuple
|
|
|
|
from immutabledict import immutabledict
|
|
|
|
from twisted.test.proto_helpers import MemoryReactor
|
|
|
|
from synapse.api.constants import Direction, EventTypes, RelationTypes
|
|
from synapse.api.filtering import Filter
|
|
from synapse.api.room_versions import RoomVersions
|
|
from synapse.events import make_event_from_dict
|
|
from synapse.rest import admin
|
|
from synapse.rest.client import login, room
|
|
from synapse.server import HomeServer
|
|
from synapse.storage.databases.main.stream import CurrentStateDeltaMembership
|
|
from synapse.types import JsonDict, PersistedEventPosition, RoomStreamToken
|
|
from synapse.util import Clock
|
|
|
|
from tests.test_utils.event_injection import create_event
|
|
from tests.unittest import HomeserverTestCase
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class PaginationTestCase(HomeserverTestCase):
|
|
"""
|
|
Test the pre-filtering done in the pagination code.
|
|
|
|
This is similar to some of the tests in tests.rest.client.test_rooms but here
|
|
we ensure that the filtering done in the database is applied successfully.
|
|
"""
|
|
|
|
servlets = [
|
|
admin.register_servlets_for_client_rest_resource,
|
|
room.register_servlets,
|
|
login.register_servlets,
|
|
]
|
|
|
|
def default_config(self) -> JsonDict:
|
|
config = super().default_config()
|
|
config["experimental_features"] = {"msc3874_enabled": True}
|
|
return config
|
|
|
|
def prepare(
|
|
self, reactor: MemoryReactor, clock: Clock, homeserver: HomeServer
|
|
) -> None:
|
|
self.user_id = self.register_user("test", "test")
|
|
self.tok = self.login("test", "test")
|
|
self.room_id = self.helper.create_room_as(self.user_id, tok=self.tok)
|
|
|
|
self.second_user_id = self.register_user("second", "test")
|
|
self.second_tok = self.login("second", "test")
|
|
self.helper.join(
|
|
room=self.room_id, user=self.second_user_id, tok=self.second_tok
|
|
)
|
|
|
|
self.third_user_id = self.register_user("third", "test")
|
|
self.third_tok = self.login("third", "test")
|
|
self.helper.join(room=self.room_id, user=self.third_user_id, tok=self.third_tok)
|
|
|
|
# Store a token which is after all the room creation events.
|
|
self.from_token = self.get_success(
|
|
self.hs.get_event_sources().get_current_token_for_pagination(self.room_id)
|
|
)
|
|
|
|
# An initial event with a relation from second user.
|
|
res = self.helper.send_event(
|
|
room_id=self.room_id,
|
|
type=EventTypes.Message,
|
|
content={"msgtype": "m.text", "body": "Message 1"},
|
|
tok=self.tok,
|
|
)
|
|
self.event_id_1 = res["event_id"]
|
|
res = self.helper.send_event(
|
|
room_id=self.room_id,
|
|
type="m.reaction",
|
|
content={
|
|
"m.relates_to": {
|
|
"rel_type": RelationTypes.ANNOTATION,
|
|
"event_id": self.event_id_1,
|
|
"key": "👍",
|
|
}
|
|
},
|
|
tok=self.second_tok,
|
|
)
|
|
self.event_id_annotation = res["event_id"]
|
|
|
|
# Another event with a relation from third user.
|
|
res = self.helper.send_event(
|
|
room_id=self.room_id,
|
|
type=EventTypes.Message,
|
|
content={"msgtype": "m.text", "body": "Message 2"},
|
|
tok=self.tok,
|
|
)
|
|
self.event_id_2 = res["event_id"]
|
|
res = self.helper.send_event(
|
|
room_id=self.room_id,
|
|
type="m.reaction",
|
|
content={
|
|
"m.relates_to": {
|
|
"rel_type": RelationTypes.REFERENCE,
|
|
"event_id": self.event_id_2,
|
|
}
|
|
},
|
|
tok=self.third_tok,
|
|
)
|
|
self.event_id_reference = res["event_id"]
|
|
|
|
# An event with no relations.
|
|
res = self.helper.send_event(
|
|
room_id=self.room_id,
|
|
type=EventTypes.Message,
|
|
content={"msgtype": "m.text", "body": "No relations"},
|
|
tok=self.tok,
|
|
)
|
|
self.event_id_none = res["event_id"]
|
|
|
|
def _filter_messages(self, filter: JsonDict) -> List[str]:
|
|
"""Make a request to /messages with a filter, returns the chunk of events."""
|
|
|
|
events, next_key = self.get_success(
|
|
self.hs.get_datastores().main.paginate_room_events(
|
|
room_id=self.room_id,
|
|
from_key=self.from_token.room_key,
|
|
to_key=None,
|
|
direction=Direction.FORWARDS,
|
|
limit=10,
|
|
event_filter=Filter(self.hs, filter),
|
|
)
|
|
)
|
|
|
|
return [ev.event_id for ev in events]
|
|
|
|
def test_filter_relation_senders(self) -> None:
|
|
# Messages which second user reacted to.
|
|
filter = {"related_by_senders": [self.second_user_id]}
|
|
chunk = self._filter_messages(filter)
|
|
self.assertEqual(chunk, [self.event_id_1])
|
|
|
|
# Messages which third user reacted to.
|
|
filter = {"related_by_senders": [self.third_user_id]}
|
|
chunk = self._filter_messages(filter)
|
|
self.assertEqual(chunk, [self.event_id_2])
|
|
|
|
# Messages which either user reacted to.
|
|
filter = {"related_by_senders": [self.second_user_id, self.third_user_id]}
|
|
chunk = self._filter_messages(filter)
|
|
self.assertCountEqual(chunk, [self.event_id_1, self.event_id_2])
|
|
|
|
def test_filter_relation_type(self) -> None:
|
|
# Messages which have annotations.
|
|
filter = {"related_by_rel_types": [RelationTypes.ANNOTATION]}
|
|
chunk = self._filter_messages(filter)
|
|
self.assertEqual(chunk, [self.event_id_1])
|
|
|
|
# Messages which have references.
|
|
filter = {"related_by_rel_types": [RelationTypes.REFERENCE]}
|
|
chunk = self._filter_messages(filter)
|
|
self.assertEqual(chunk, [self.event_id_2])
|
|
|
|
# Messages which have either annotations or references.
|
|
filter = {
|
|
"related_by_rel_types": [
|
|
RelationTypes.ANNOTATION,
|
|
RelationTypes.REFERENCE,
|
|
]
|
|
}
|
|
chunk = self._filter_messages(filter)
|
|
self.assertCountEqual(chunk, [self.event_id_1, self.event_id_2])
|
|
|
|
def test_filter_relation_senders_and_type(self) -> None:
|
|
# Messages which second user reacted to.
|
|
filter = {
|
|
"related_by_senders": [self.second_user_id],
|
|
"related_by_rel_types": [RelationTypes.ANNOTATION],
|
|
}
|
|
chunk = self._filter_messages(filter)
|
|
self.assertEqual(chunk, [self.event_id_1])
|
|
|
|
def test_duplicate_relation(self) -> None:
|
|
"""An event should only be returned once if there are multiple relations to it."""
|
|
self.helper.send_event(
|
|
room_id=self.room_id,
|
|
type="m.reaction",
|
|
content={
|
|
"m.relates_to": {
|
|
"rel_type": RelationTypes.ANNOTATION,
|
|
"event_id": self.event_id_1,
|
|
"key": "A",
|
|
}
|
|
},
|
|
tok=self.second_tok,
|
|
)
|
|
|
|
filter = {"related_by_senders": [self.second_user_id]}
|
|
chunk = self._filter_messages(filter)
|
|
self.assertEqual(chunk, [self.event_id_1])
|
|
|
|
def test_filter_rel_types(self) -> None:
|
|
# Messages which are annotations.
|
|
filter = {"org.matrix.msc3874.rel_types": [RelationTypes.ANNOTATION]}
|
|
chunk = self._filter_messages(filter)
|
|
self.assertEqual(chunk, [self.event_id_annotation])
|
|
|
|
# Messages which are references.
|
|
filter = {"org.matrix.msc3874.rel_types": [RelationTypes.REFERENCE]}
|
|
chunk = self._filter_messages(filter)
|
|
self.assertEqual(chunk, [self.event_id_reference])
|
|
|
|
# Messages which are either annotations or references.
|
|
filter = {
|
|
"org.matrix.msc3874.rel_types": [
|
|
RelationTypes.ANNOTATION,
|
|
RelationTypes.REFERENCE,
|
|
]
|
|
}
|
|
chunk = self._filter_messages(filter)
|
|
self.assertCountEqual(
|
|
chunk,
|
|
[self.event_id_annotation, self.event_id_reference],
|
|
)
|
|
|
|
def test_filter_not_rel_types(self) -> None:
|
|
# Messages which are not annotations.
|
|
filter = {"org.matrix.msc3874.not_rel_types": [RelationTypes.ANNOTATION]}
|
|
chunk = self._filter_messages(filter)
|
|
self.assertEqual(
|
|
chunk,
|
|
[
|
|
self.event_id_1,
|
|
self.event_id_2,
|
|
self.event_id_reference,
|
|
self.event_id_none,
|
|
],
|
|
)
|
|
|
|
# Messages which are not references.
|
|
filter = {"org.matrix.msc3874.not_rel_types": [RelationTypes.REFERENCE]}
|
|
chunk = self._filter_messages(filter)
|
|
self.assertEqual(
|
|
chunk,
|
|
[
|
|
self.event_id_1,
|
|
self.event_id_annotation,
|
|
self.event_id_2,
|
|
self.event_id_none,
|
|
],
|
|
)
|
|
|
|
# Messages which are neither annotations or references.
|
|
filter = {
|
|
"org.matrix.msc3874.not_rel_types": [
|
|
RelationTypes.ANNOTATION,
|
|
RelationTypes.REFERENCE,
|
|
]
|
|
}
|
|
chunk = self._filter_messages(filter)
|
|
self.assertEqual(chunk, [self.event_id_1, self.event_id_2, self.event_id_none])
|
|
|
|
|
|
class GetLastEventInRoomBeforeStreamOrderingTestCase(HomeserverTestCase):
|
|
"""
|
|
Test `get_last_event_pos_in_room_before_stream_ordering(...)`
|
|
"""
|
|
|
|
servlets = [
|
|
admin.register_servlets,
|
|
room.register_servlets,
|
|
login.register_servlets,
|
|
]
|
|
|
|
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
|
|
self.store = hs.get_datastores().main
|
|
self.event_sources = hs.get_event_sources()
|
|
|
|
def _update_persisted_instance_name_for_event(
|
|
self, event_id: str, instance_name: str
|
|
) -> None:
|
|
"""
|
|
Update the `instance_name` that persisted the the event in the database.
|
|
"""
|
|
return self.get_success(
|
|
self.store.db_pool.simple_update_one(
|
|
"events",
|
|
keyvalues={"event_id": event_id},
|
|
updatevalues={"instance_name": instance_name},
|
|
)
|
|
)
|
|
|
|
def _send_event_on_instance(
|
|
self, instance_name: str, room_id: str, access_token: str
|
|
) -> Tuple[JsonDict, PersistedEventPosition]:
|
|
"""
|
|
Send an event in a room and mimic that it was persisted by a specific
|
|
instance/worker.
|
|
"""
|
|
event_response = self.helper.send(
|
|
room_id, f"{instance_name} message", tok=access_token
|
|
)
|
|
|
|
self._update_persisted_instance_name_for_event(
|
|
event_response["event_id"], instance_name
|
|
)
|
|
|
|
event_pos = self.get_success(
|
|
self.store.get_position_for_event(event_response["event_id"])
|
|
)
|
|
|
|
return event_response, event_pos
|
|
|
|
def test_before_room_created(self) -> None:
|
|
"""
|
|
Test that no event is returned if we are using a token before the room was even created
|
|
"""
|
|
user1_id = self.register_user("user1", "pass")
|
|
user1_tok = self.login(user1_id, "pass")
|
|
|
|
before_room_token = self.event_sources.get_current_token()
|
|
|
|
room_id = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True)
|
|
|
|
last_event_result = self.get_success(
|
|
self.store.get_last_event_pos_in_room_before_stream_ordering(
|
|
room_id=room_id,
|
|
end_token=before_room_token.room_key,
|
|
)
|
|
)
|
|
|
|
self.assertIsNone(last_event_result)
|
|
|
|
def test_after_room_created(self) -> None:
|
|
"""
|
|
Test that an event is returned if we are using a token after the room was created
|
|
"""
|
|
user1_id = self.register_user("user1", "pass")
|
|
user1_tok = self.login(user1_id, "pass")
|
|
|
|
room_id = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True)
|
|
|
|
after_room_token = self.event_sources.get_current_token()
|
|
|
|
last_event_result = self.get_success(
|
|
self.store.get_last_event_pos_in_room_before_stream_ordering(
|
|
room_id=room_id,
|
|
end_token=after_room_token.room_key,
|
|
)
|
|
)
|
|
assert last_event_result is not None
|
|
last_event_id, _ = last_event_result
|
|
|
|
self.assertIsNotNone(last_event_id)
|
|
|
|
def test_activity_in_other_rooms(self) -> None:
|
|
"""
|
|
Test to make sure that the last event in the room is returned even if the
|
|
`stream_ordering` has advanced from activity in other rooms.
|
|
"""
|
|
user1_id = self.register_user("user1", "pass")
|
|
user1_tok = self.login(user1_id, "pass")
|
|
|
|
room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True)
|
|
event_response = self.helper.send(room_id1, "target!", tok=user1_tok)
|
|
# Create another room to advance the stream_ordering
|
|
self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True)
|
|
|
|
after_room_token = self.event_sources.get_current_token()
|
|
|
|
last_event_result = self.get_success(
|
|
self.store.get_last_event_pos_in_room_before_stream_ordering(
|
|
room_id=room_id1,
|
|
end_token=after_room_token.room_key,
|
|
)
|
|
)
|
|
assert last_event_result is not None
|
|
last_event_id, _ = last_event_result
|
|
|
|
# Make sure it's the event we expect (which also means we know it's from the
|
|
# correct room)
|
|
self.assertEqual(last_event_id, event_response["event_id"])
|
|
|
|
def test_activity_after_token_has_no_effect(self) -> None:
|
|
"""
|
|
Test to make sure we return the last event before the token even if there is
|
|
activity after it.
|
|
"""
|
|
user1_id = self.register_user("user1", "pass")
|
|
user1_tok = self.login(user1_id, "pass")
|
|
|
|
room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True)
|
|
event_response = self.helper.send(room_id1, "target!", tok=user1_tok)
|
|
|
|
after_room_token = self.event_sources.get_current_token()
|
|
|
|
# Send some events after the token
|
|
self.helper.send(room_id1, "after1", tok=user1_tok)
|
|
self.helper.send(room_id1, "after2", tok=user1_tok)
|
|
|
|
last_event_result = self.get_success(
|
|
self.store.get_last_event_pos_in_room_before_stream_ordering(
|
|
room_id=room_id1,
|
|
end_token=after_room_token.room_key,
|
|
)
|
|
)
|
|
assert last_event_result is not None
|
|
last_event_id, _ = last_event_result
|
|
|
|
# Make sure it's the last event before the token
|
|
self.assertEqual(last_event_id, event_response["event_id"])
|
|
|
|
def test_last_event_within_sharded_token(self) -> None:
|
|
"""
|
|
Test to make sure we can find the last event that that is *within* the sharded
|
|
token (a token that has an `instance_map` and looks like
|
|
`m{min_pos}~{writer1}.{pos1}~{writer2}.{pos2}`). We are specifically testing
|
|
that we can find an event within the tokens minimum and instance
|
|
`stream_ordering`.
|
|
"""
|
|
user1_id = self.register_user("user1", "pass")
|
|
user1_tok = self.login(user1_id, "pass")
|
|
|
|
room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True)
|
|
event_response1, event_pos1 = self._send_event_on_instance(
|
|
"worker1", room_id1, user1_tok
|
|
)
|
|
event_response2, event_pos2 = self._send_event_on_instance(
|
|
"worker1", room_id1, user1_tok
|
|
)
|
|
event_response3, event_pos3 = self._send_event_on_instance(
|
|
"worker1", room_id1, user1_tok
|
|
)
|
|
|
|
# Create another room to advance the `stream_ordering` on the same worker
|
|
# so we can sandwich event3 in the middle of the token
|
|
room_id2 = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True)
|
|
event_response4, event_pos4 = self._send_event_on_instance(
|
|
"worker1", room_id2, user1_tok
|
|
)
|
|
|
|
# Assemble a token that encompasses event1 -> event4 on worker1
|
|
end_token = RoomStreamToken(
|
|
stream=event_pos2.stream,
|
|
instance_map=immutabledict({"worker1": event_pos4.stream}),
|
|
)
|
|
|
|
# Send some events after the token
|
|
self.helper.send(room_id1, "after1", tok=user1_tok)
|
|
self.helper.send(room_id1, "after2", tok=user1_tok)
|
|
|
|
last_event_result = self.get_success(
|
|
self.store.get_last_event_pos_in_room_before_stream_ordering(
|
|
room_id=room_id1,
|
|
end_token=end_token,
|
|
)
|
|
)
|
|
assert last_event_result is not None
|
|
last_event_id, _ = last_event_result
|
|
|
|
# Should find closest event before the token in room1
|
|
self.assertEqual(
|
|
last_event_id,
|
|
event_response3["event_id"],
|
|
f"We expected {event_response3['event_id']} but saw {last_event_id} which corresponds to "
|
|
+ str(
|
|
{
|
|
"event1": event_response1["event_id"],
|
|
"event2": event_response2["event_id"],
|
|
"event3": event_response3["event_id"],
|
|
}
|
|
),
|
|
)
|
|
|
|
def test_last_event_before_sharded_token(self) -> None:
|
|
"""
|
|
Test to make sure we can find the last event that is *before* the sharded token
|
|
(a token that has an `instance_map` and looks like
|
|
`m{min_pos}~{writer1}.{pos1}~{writer2}.{pos2}`).
|
|
"""
|
|
user1_id = self.register_user("user1", "pass")
|
|
user1_tok = self.login(user1_id, "pass")
|
|
|
|
room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True)
|
|
event_response1, event_pos1 = self._send_event_on_instance(
|
|
"worker1", room_id1, user1_tok
|
|
)
|
|
event_response2, event_pos2 = self._send_event_on_instance(
|
|
"worker1", room_id1, user1_tok
|
|
)
|
|
|
|
# Create another room to advance the `stream_ordering` on the same worker
|
|
room_id2 = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True)
|
|
event_response3, event_pos3 = self._send_event_on_instance(
|
|
"worker1", room_id2, user1_tok
|
|
)
|
|
event_response4, event_pos4 = self._send_event_on_instance(
|
|
"worker1", room_id2, user1_tok
|
|
)
|
|
|
|
# Assemble a token that encompasses event3 -> event4 on worker1
|
|
end_token = RoomStreamToken(
|
|
stream=event_pos3.stream,
|
|
instance_map=immutabledict({"worker1": event_pos4.stream}),
|
|
)
|
|
|
|
# Send some events after the token
|
|
self.helper.send(room_id1, "after1", tok=user1_tok)
|
|
self.helper.send(room_id1, "after2", tok=user1_tok)
|
|
|
|
last_event_result = self.get_success(
|
|
self.store.get_last_event_pos_in_room_before_stream_ordering(
|
|
room_id=room_id1,
|
|
end_token=end_token,
|
|
)
|
|
)
|
|
assert last_event_result is not None
|
|
last_event_id, _ = last_event_result
|
|
|
|
# Should find closest event before the token in room1
|
|
self.assertEqual(
|
|
last_event_id,
|
|
event_response2["event_id"],
|
|
f"We expected {event_response2['event_id']} but saw {last_event_id} which corresponds to "
|
|
+ str(
|
|
{
|
|
"event1": event_response1["event_id"],
|
|
"event2": event_response2["event_id"],
|
|
}
|
|
),
|
|
)
|
|
|
|
|
|
class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase):
|
|
"""
|
|
Test `get_current_state_delta_membership_changes_for_user(...)`
|
|
"""
|
|
|
|
servlets = [
|
|
admin.register_servlets,
|
|
room.register_servlets,
|
|
login.register_servlets,
|
|
]
|
|
|
|
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
|
|
self.store = hs.get_datastores().main
|
|
self.event_sources = hs.get_event_sources()
|
|
self.state_handler = self.hs.get_state_handler()
|
|
persistence = hs.get_storage_controllers().persistence
|
|
assert persistence is not None
|
|
self.persistence = persistence
|
|
|
|
def test_returns_membership_events(self) -> None:
|
|
"""
|
|
A basic test that a membership event in the token range is returned for the user.
|
|
"""
|
|
user1_id = self.register_user("user1", "pass")
|
|
user1_tok = self.login(user1_id, "pass")
|
|
user2_id = self.register_user("user2", "pass")
|
|
user2_tok = self.login(user2_id, "pass")
|
|
|
|
before_room1_token = self.event_sources.get_current_token()
|
|
|
|
room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok)
|
|
join_response = self.helper.join(room_id1, user1_id, tok=user1_tok)
|
|
join_pos = self.get_success(
|
|
self.store.get_position_for_event(join_response["event_id"])
|
|
)
|
|
|
|
after_room1_token = self.event_sources.get_current_token()
|
|
|
|
membership_changes = self.get_success(
|
|
self.store.get_current_state_delta_membership_changes_for_user(
|
|
user1_id,
|
|
from_key=before_room1_token.room_key,
|
|
to_key=after_room1_token.room_key,
|
|
)
|
|
)
|
|
|
|
# Let the whole diff show on failure
|
|
self.maxDiff = None
|
|
self.assertEqual(
|
|
membership_changes,
|
|
[
|
|
CurrentStateDeltaMembership(
|
|
event_id=join_response["event_id"],
|
|
event_pos=join_pos,
|
|
prev_event_id=None,
|
|
room_id=room_id1,
|
|
membership="join",
|
|
sender=user1_id,
|
|
)
|
|
],
|
|
)
|
|
|
|
def test_server_left_after_us_room(self) -> None:
|
|
"""
|
|
Test that when probing over part of the DAG where the server left the room *after
|
|
us*, we still see the join and leave changes.
|
|
|
|
This is to make sure we play nicely with this behavior: When the server leaves a
|
|
room, it will insert new rows with `event_id = null` into the
|
|
`current_state_delta_stream` table for all current state.
|
|
"""
|
|
user1_id = self.register_user("user1", "pass")
|
|
user1_tok = self.login(user1_id, "pass")
|
|
user2_id = self.register_user("user2", "pass")
|
|
user2_tok = self.login(user2_id, "pass")
|
|
|
|
before_room1_token = self.event_sources.get_current_token()
|
|
|
|
room_id1 = self.helper.create_room_as(
|
|
user2_id,
|
|
tok=user2_tok,
|
|
extra_content={
|
|
"power_level_content_override": {
|
|
"users": {
|
|
user2_id: 100,
|
|
# Allow user1 to send state in the room
|
|
user1_id: 100,
|
|
}
|
|
}
|
|
},
|
|
)
|
|
join_response1 = self.helper.join(room_id1, user1_id, tok=user1_tok)
|
|
join_pos1 = self.get_success(
|
|
self.store.get_position_for_event(join_response1["event_id"])
|
|
)
|
|
# Make sure random other non-member state that happens to have a state_key
|
|
# matching the user ID doesn't mess with things.
|
|
self.helper.send_state(
|
|
room_id1,
|
|
event_type="foobarbazdummy",
|
|
state_key=user1_id,
|
|
body={"foo": "bar"},
|
|
tok=user1_tok,
|
|
)
|
|
# User1 should leave the room first
|
|
leave_response1 = self.helper.leave(room_id1, user1_id, tok=user1_tok)
|
|
leave_pos1 = self.get_success(
|
|
self.store.get_position_for_event(leave_response1["event_id"])
|
|
)
|
|
|
|
# User2 should also leave the room (everyone has left the room which means the
|
|
# server is no longer in the room).
|
|
self.helper.leave(room_id1, user2_id, tok=user2_tok)
|
|
|
|
after_room1_token = self.event_sources.get_current_token()
|
|
|
|
membership_changes = self.get_success(
|
|
self.store.get_current_state_delta_membership_changes_for_user(
|
|
user1_id,
|
|
from_key=before_room1_token.room_key,
|
|
to_key=after_room1_token.room_key,
|
|
)
|
|
)
|
|
|
|
# Let the whole diff show on failure
|
|
self.maxDiff = None
|
|
self.assertEqual(
|
|
membership_changes,
|
|
[
|
|
CurrentStateDeltaMembership(
|
|
event_id=join_response1["event_id"],
|
|
event_pos=join_pos1,
|
|
prev_event_id=None,
|
|
room_id=room_id1,
|
|
membership="join",
|
|
sender=user1_id,
|
|
),
|
|
CurrentStateDeltaMembership(
|
|
event_id=leave_response1["event_id"],
|
|
event_pos=leave_pos1,
|
|
prev_event_id=join_response1["event_id"],
|
|
room_id=room_id1,
|
|
membership="leave",
|
|
sender=user1_id,
|
|
),
|
|
],
|
|
)
|
|
|
|
def test_server_left_room(self) -> None:
|
|
"""
|
|
Test that when probing over part of the DAG where we leave the room causing the
|
|
server to leave the room (because we were the last local user in the room), we
|
|
still see the join and leave changes.
|
|
|
|
This is to make sure we play nicely with this behavior: When the server leaves a
|
|
room, it will insert new rows with `event_id = null` into the
|
|
`current_state_delta_stream` table for all current state.
|
|
"""
|
|
user1_id = self.register_user("user1", "pass")
|
|
user1_tok = self.login(user1_id, "pass")
|
|
user2_id = self.register_user("user2", "pass")
|
|
user2_tok = self.login(user2_id, "pass")
|
|
|
|
before_room1_token = self.event_sources.get_current_token()
|
|
|
|
room_id1 = self.helper.create_room_as(
|
|
user2_id,
|
|
tok=user2_tok,
|
|
extra_content={
|
|
"power_level_content_override": {
|
|
"users": {
|
|
user2_id: 100,
|
|
# Allow user1 to send state in the room
|
|
user1_id: 100,
|
|
}
|
|
}
|
|
},
|
|
)
|
|
join_response1 = self.helper.join(room_id1, user1_id, tok=user1_tok)
|
|
join_pos1 = self.get_success(
|
|
self.store.get_position_for_event(join_response1["event_id"])
|
|
)
|
|
# Make sure random other non-member state that happens to have a state_key
|
|
# matching the user ID doesn't mess with things.
|
|
self.helper.send_state(
|
|
room_id1,
|
|
event_type="foobarbazdummy",
|
|
state_key=user1_id,
|
|
body={"foo": "bar"},
|
|
tok=user1_tok,
|
|
)
|
|
|
|
# User2 should leave the room first.
|
|
self.helper.leave(room_id1, user2_id, tok=user2_tok)
|
|
|
|
# User1 (the person we're testing with) should also leave the room (everyone has
|
|
# left the room which means the server is no longer in the room).
|
|
leave_response1 = self.helper.leave(room_id1, user1_id, tok=user1_tok)
|
|
leave_pos1 = self.get_success(
|
|
self.store.get_position_for_event(leave_response1["event_id"])
|
|
)
|
|
|
|
after_room1_token = self.event_sources.get_current_token()
|
|
|
|
membership_changes = self.get_success(
|
|
self.store.get_current_state_delta_membership_changes_for_user(
|
|
user1_id,
|
|
from_key=before_room1_token.room_key,
|
|
to_key=after_room1_token.room_key,
|
|
)
|
|
)
|
|
|
|
# Let the whole diff show on failure
|
|
self.maxDiff = None
|
|
self.assertEqual(
|
|
membership_changes,
|
|
[
|
|
CurrentStateDeltaMembership(
|
|
event_id=join_response1["event_id"],
|
|
event_pos=join_pos1,
|
|
prev_event_id=None,
|
|
room_id=room_id1,
|
|
membership="join",
|
|
sender=user1_id,
|
|
),
|
|
CurrentStateDeltaMembership(
|
|
event_id=leave_response1["event_id"],
|
|
event_pos=leave_pos1,
|
|
prev_event_id=join_response1["event_id"],
|
|
room_id=room_id1,
|
|
membership="leave",
|
|
sender=user1_id,
|
|
),
|
|
],
|
|
)
|
|
|
|
def test_membership_persisted_in_same_batch(self) -> None:
|
|
"""
|
|
Test batch of membership events being processed at once. This will result in all
|
|
of the memberships being stored in the `current_state_delta_stream` table with
|
|
the same `stream_ordering` even though the individual events have different
|
|
`stream_ordering`s.
|
|
"""
|
|
user1_id = self.register_user("user1", "pass")
|
|
_user1_tok = self.login(user1_id, "pass")
|
|
user2_id = self.register_user("user2", "pass")
|
|
user2_tok = self.login(user2_id, "pass")
|
|
user3_id = self.register_user("user3", "pass")
|
|
_user3_tok = self.login(user3_id, "pass")
|
|
user4_id = self.register_user("user4", "pass")
|
|
_user4_tok = self.login(user4_id, "pass")
|
|
|
|
before_room1_token = self.event_sources.get_current_token()
|
|
|
|
# User2 is just the designated person to create the room (we do this across the
|
|
# tests to be consistent)
|
|
room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok)
|
|
|
|
# Persist the user1, user3, and user4 join events in the same batch so they all
|
|
# end up in the `current_state_delta_stream` table with the same
|
|
# stream_ordering.
|
|
join_event1, join_event_context1 = self.get_success(
|
|
create_event(
|
|
self.hs,
|
|
sender=user1_id,
|
|
type=EventTypes.Member,
|
|
state_key=user1_id,
|
|
content={"membership": "join"},
|
|
room_id=room_id1,
|
|
)
|
|
)
|
|
join_event3, join_event_context3 = self.get_success(
|
|
create_event(
|
|
self.hs,
|
|
sender=user3_id,
|
|
type=EventTypes.Member,
|
|
state_key=user3_id,
|
|
content={"membership": "join"},
|
|
room_id=room_id1,
|
|
)
|
|
)
|
|
join_event4, join_event_context4 = self.get_success(
|
|
create_event(
|
|
self.hs,
|
|
sender=user4_id,
|
|
type=EventTypes.Member,
|
|
state_key=user4_id,
|
|
content={"membership": "join"},
|
|
room_id=room_id1,
|
|
)
|
|
)
|
|
self.get_success(
|
|
self.persistence.persist_events(
|
|
[
|
|
(join_event1, join_event_context1),
|
|
(join_event3, join_event_context3),
|
|
(join_event4, join_event_context4),
|
|
]
|
|
)
|
|
)
|
|
|
|
after_room1_token = self.event_sources.get_current_token()
|
|
|
|
# Let's get membership changes from user3's perspective because it was in the
|
|
# middle of the batch. This way, if rows in` current_state_delta_stream` are
|
|
# stored with the first or last event's `stream_ordering`, we will still catch
|
|
# bugs.
|
|
membership_changes = self.get_success(
|
|
self.store.get_current_state_delta_membership_changes_for_user(
|
|
user3_id,
|
|
from_key=before_room1_token.room_key,
|
|
to_key=after_room1_token.room_key,
|
|
)
|
|
)
|
|
|
|
join_pos3 = self.get_success(
|
|
self.store.get_position_for_event(join_event3.event_id)
|
|
)
|
|
|
|
# Let the whole diff show on failure
|
|
self.maxDiff = None
|
|
self.assertEqual(
|
|
membership_changes,
|
|
[
|
|
CurrentStateDeltaMembership(
|
|
event_id=join_event3.event_id,
|
|
event_pos=join_pos3,
|
|
prev_event_id=None,
|
|
room_id=room_id1,
|
|
membership="join",
|
|
sender=user1_id,
|
|
),
|
|
],
|
|
)
|
|
|
|
# TODO: Test remote join where the first rows will just be the state when you joined
|
|
|
|
# TODO: Test state reset where the user gets removed from the room (when there is no
|
|
# corresponding leave event)
|
|
|
|
def test_excluded_room_ids(self) -> None:
|
|
"""
|
|
Test that the `excluded_room_ids` option excludes changes from the specified rooms.
|
|
"""
|
|
user1_id = self.register_user("user1", "pass")
|
|
user1_tok = self.login(user1_id, "pass")
|
|
user2_id = self.register_user("user2", "pass")
|
|
user2_tok = self.login(user2_id, "pass")
|
|
|
|
before_room1_token = self.event_sources.get_current_token()
|
|
|
|
room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok)
|
|
join_response1 = self.helper.join(room_id1, user1_id, tok=user1_tok)
|
|
join_pos1 = self.get_success(
|
|
self.store.get_position_for_event(join_response1["event_id"])
|
|
)
|
|
|
|
room_id2 = self.helper.create_room_as(user2_id, tok=user2_tok)
|
|
join_response2 = self.helper.join(room_id2, user1_id, tok=user1_tok)
|
|
join_pos2 = self.get_success(
|
|
self.store.get_position_for_event(join_response2["event_id"])
|
|
)
|
|
|
|
after_room1_token = self.event_sources.get_current_token()
|
|
|
|
# First test the the room is returned without the `excluded_room_ids` option
|
|
membership_changes = self.get_success(
|
|
self.store.get_current_state_delta_membership_changes_for_user(
|
|
user1_id,
|
|
from_key=before_room1_token.room_key,
|
|
to_key=after_room1_token.room_key,
|
|
)
|
|
)
|
|
|
|
# Let the whole diff show on failure
|
|
self.maxDiff = None
|
|
self.assertEqual(
|
|
membership_changes,
|
|
[
|
|
CurrentStateDeltaMembership(
|
|
event_id=join_response1["event_id"],
|
|
event_pos=join_pos1,
|
|
prev_event_id=None,
|
|
room_id=room_id1,
|
|
membership="join",
|
|
sender=user1_id,
|
|
),
|
|
CurrentStateDeltaMembership(
|
|
event_id=join_response2["event_id"],
|
|
event_pos=join_pos2,
|
|
prev_event_id=None,
|
|
room_id=room_id2,
|
|
membership="join",
|
|
sender=user1_id,
|
|
),
|
|
],
|
|
)
|
|
|
|
# The test that `excluded_room_ids` excludes room2 as expected
|
|
membership_changes = self.get_success(
|
|
self.store.get_current_state_delta_membership_changes_for_user(
|
|
user1_id,
|
|
from_key=before_room1_token.room_key,
|
|
to_key=after_room1_token.room_key,
|
|
excluded_room_ids=[room_id2],
|
|
)
|
|
)
|
|
|
|
# Let the whole diff show on failure
|
|
self.maxDiff = None
|
|
self.assertEqual(
|
|
membership_changes,
|
|
[
|
|
CurrentStateDeltaMembership(
|
|
event_id=join_response1["event_id"],
|
|
event_pos=join_pos1,
|
|
prev_event_id=None,
|
|
room_id=room_id1,
|
|
membership="join",
|
|
sender=user1_id,
|
|
)
|
|
],
|
|
)
|
|
|
|
|
|
# class GetCurrentStateDeltaMembershipChangesForUserFederationTestCase(BaseMultiWorkerStreamTestCase):
|
|
# """
|
|
# TODO
|
|
# """
|
|
|
|
# servlets = [
|
|
# admin.register_servlets_for_client_rest_resource,
|
|
# room.register_servlets,
|
|
# login.register_servlets,
|
|
# ]
|
|
|
|
# def default_config(self) -> dict:
|
|
# conf = super().default_config()
|
|
# conf["federation_custom_ca_list"] = [get_test_ca_cert_file()]
|
|
# return conf
|
|
|
|
# def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
|
|
# self.sliding_sync_handler = self.hs.get_sliding_sync_handler()
|
|
# self.store = self.hs.get_datastores().main
|
|
# self.event_sources = hs.get_event_sources()
|
|
|
|
|
|
# def test_sharded_event_persisters(self) -> None:
|
|
# """
|
|
# TODO
|
|
# """
|
|
# user1_id = self.register_user("user1", "pass")
|
|
# user1_tok = self.login(user1_id, "pass")
|
|
# user2_id = self.register_user("user2", "pass")
|
|
# user2_tok = self.login(user2_id, "pass")
|
|
|
|
# remote_hs = self.make_worker_hs("synapse.app.generic_worker")
|
|
|
|
# channel = make_request(
|
|
# self.reactor,
|
|
# self._hs_to_site[hs],
|
|
# "GET",
|
|
# f"/_matrix/media/r0/download/{target}/{media_id}",
|
|
# shorthand=False,
|
|
# access_token=self.access_token,
|
|
# await_result=False,
|
|
# )
|
|
|
|
# remote_hs
|
|
|
|
# worker_store2 = worker_hs2.get_datastores().main
|
|
# assert isinstance(worker_store2._stream_id_gen, MultiWriterIdGenerator)
|
|
# actx = worker_store2._stream_id_gen.get_next()
|
|
|
|
# self.assertEqual(
|
|
# room_id_results.keys(),
|
|
# {
|
|
# room_id1,
|
|
# # room_id2 shouldn't show up because we left before the from/to range
|
|
# # and the join event during the range happened while worker2 was stuck.
|
|
# # This means that from the perspective of the master, where the
|
|
# # `stuck_activity_token` is generated, the stream position for worker2
|
|
# # wasn't advanced to the join yet. Looking at the `instance_map`, the
|
|
# # join technically comes after `stuck_activity_token``.
|
|
# #
|
|
# # room_id2,
|
|
# room_id3,
|
|
# },
|
|
# )
|