From 0b9a903ca12831e431b596daacf127e53ecbd050 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 26 Jun 2024 19:35:18 -0500 Subject: [PATCH] Add test that remotely joins room --- tests/storage/test_stream.py | 239 ++++++++++++++++++++++++++--------- 1 file changed, 178 insertions(+), 61 deletions(-) diff --git a/tests/storage/test_stream.py b/tests/storage/test_stream.py index 39cb5a25c5..3b825dbbbe 100644 --- a/tests/storage/test_stream.py +++ b/tests/storage/test_stream.py @@ -21,24 +21,32 @@ import logging from typing import List, Tuple +from unittest.mock import AsyncMock, patch from immutabledict import immutabledict from twisted.test.proto_helpers import MemoryReactor -from synapse.api.constants import Direction, EventTypes, RelationTypes +from synapse.api.constants import Direction, EventTypes, Membership, RelationTypes from synapse.api.filtering import Filter -from synapse.api.room_versions import RoomVersions -from synapse.events import make_event_from_dict +from synapse.crypto.event_signing import add_hashes_and_signatures +from synapse.events import FrozenEventV3 +from synapse.federation.federation_client import SendJoinResult from synapse.rest import admin from synapse.rest.client import login, room from synapse.server import HomeServer from synapse.storage.databases.main.stream import CurrentStateDeltaMembership -from synapse.types import JsonDict, PersistedEventPosition, RoomStreamToken +from synapse.types import ( + JsonDict, + PersistedEventPosition, + RoomStreamToken, + UserID, + create_requester, +) from synapse.util import Clock from tests.test_utils.event_injection import create_event -from tests.unittest import HomeserverTestCase +from tests.unittest import FederatingHomeserverTestCase, HomeserverTestCase logger = logging.getLogger(__name__) @@ -884,8 +892,6 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase): ], ) - # TODO: Test remote join where the first rows will just be the state when you joined - # TODO: Test state reset where the user gets removed from the room (when there is no # corresponding leave event) @@ -974,67 +980,178 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase): ) -# class GetCurrentStateDeltaMembershipChangesForUserFederationTestCase(BaseMultiWorkerStreamTestCase): -# """ -# TODO -# """ +class GetCurrentStateDeltaMembershipChangesForUserFederationTestCase( + FederatingHomeserverTestCase +): + """ + Test `get_current_state_delta_membership_changes_for_user(...)` when joining remote federated rooms. + """ -# servlets = [ -# admin.register_servlets_for_client_rest_resource, -# room.register_servlets, -# login.register_servlets, -# ] + servlets = [ + admin.register_servlets_for_client_rest_resource, + room.register_servlets, + login.register_servlets, + ] -# def default_config(self) -> dict: -# conf = super().default_config() -# conf["federation_custom_ca_list"] = [get_test_ca_cert_file()] -# return conf + def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: + self.sliding_sync_handler = self.hs.get_sliding_sync_handler() + self.store = self.hs.get_datastores().main + self.event_sources = hs.get_event_sources() + self.room_member_handler = hs.get_room_member_handler() -# def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: -# self.sliding_sync_handler = self.hs.get_sliding_sync_handler() -# self.store = self.hs.get_datastores().main -# self.event_sources = hs.get_event_sources() + def test_remote_join(self) -> None: + """ + Test remote join where the first rows will just be the state when you joined + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + intially_unjoined_room_id = f"!example:{self.OTHER_SERVER_NAME}" -# def test_sharded_event_persisters(self) -> None: -# """ -# TODO -# """ -# user1_id = self.register_user("user1", "pass") -# user1_tok = self.login(user1_id, "pass") -# user2_id = self.register_user("user2", "pass") -# user2_tok = self.login(user2_id, "pass") + # Remotely join a room on another homeserver. + # + # To do this we have to mock the responses from the remote homeserver. We also + # patch out a bunch of event checks on our end. + create_event_source = { + "auth_events": [], + "content": { + "creator": f"@creator:{self.OTHER_SERVER_NAME}", + "room_version": self.hs.config.server.default_room_version.identifier, + }, + "depth": 0, + "origin_server_ts": 0, + "prev_events": [], + "room_id": intially_unjoined_room_id, + "sender": f"@creator:{self.OTHER_SERVER_NAME}", + "state_key": "", + "type": EventTypes.Create, + } + self.add_hashes_and_signatures_from_other_server( + create_event_source, + self.hs.config.server.default_room_version, + ) + create_event = FrozenEventV3( + create_event_source, + self.hs.config.server.default_room_version, + {}, + None, + ) + creator_join_event_source = { + "auth_events": [create_event.event_id], + "content": { + "membership": "join", + }, + "depth": 1, + "origin_server_ts": 1, + "prev_events": [], + "room_id": intially_unjoined_room_id, + "sender": f"@creator:{self.OTHER_SERVER_NAME}", + "state_key": f"@creator:{self.OTHER_SERVER_NAME}", + "type": EventTypes.Member, + } + self.add_hashes_and_signatures_from_other_server( + creator_join_event_source, + self.hs.config.server.default_room_version, + ) + creator_join_event = FrozenEventV3( + creator_join_event_source, + self.hs.config.server.default_room_version, + {}, + None, + ) -# remote_hs = self.make_worker_hs("synapse.app.generic_worker") + # Our local user is going to remote join the room + join_event_source = { + "auth_events": [create_event.event_id], + "content": {"membership": "join"}, + "depth": 1, + "origin_server_ts": 100, + "prev_events": [creator_join_event.event_id], + "sender": user1_id, + "state_key": user1_id, + "room_id": intially_unjoined_room_id, + "type": EventTypes.Member, + } + add_hashes_and_signatures( + self.hs.config.server.default_room_version, + join_event_source, + self.hs.hostname, + self.hs.signing_key, + ) + join_event = FrozenEventV3( + join_event_source, + self.hs.config.server.default_room_version, + {}, + None, + ) -# channel = make_request( -# self.reactor, -# self._hs_to_site[hs], -# "GET", -# f"/_matrix/media/r0/download/{target}/{media_id}", -# shorthand=False, -# access_token=self.access_token, -# await_result=False, -# ) + mock_make_membership_event = AsyncMock( + return_value=( + self.OTHER_SERVER_NAME, + join_event, + self.hs.config.server.default_room_version, + ) + ) + mock_send_join = AsyncMock( + return_value=SendJoinResult( + join_event, + self.OTHER_SERVER_NAME, + state=[create_event, creator_join_event], + auth_chain=[create_event, creator_join_event], + partial_state=False, + servers_in_room=frozenset(), + ) + ) -# remote_hs + with patch.object( + self.room_member_handler.federation_handler.federation_client, + "make_membership_event", + mock_make_membership_event, + ), patch.object( + self.room_member_handler.federation_handler.federation_client, + "send_join", + mock_send_join, + ), patch( + "synapse.event_auth._is_membership_change_allowed", + return_value=None, + ), patch( + "synapse.handlers.federation_event.check_state_dependent_auth_rules", + return_value=None, + ): + self.get_success( + self.room_member_handler.update_membership( + requester=create_requester(user1_id), + target=UserID.from_string(user1_id), + room_id=intially_unjoined_room_id, + action=Membership.JOIN, + remote_room_hosts=[self.OTHER_SERVER_NAME], + ) + ) -# worker_store2 = worker_hs2.get_datastores().main -# assert isinstance(worker_store2._stream_id_gen, MultiWriterIdGenerator) -# actx = worker_store2._stream_id_gen.get_next() + events_db_dump = self.get_success( + self.store.db_pool.simple_select_list( + table="events", + keyvalues={}, + retcols=[ + "*", + ], + desc="debug dump events", + ) + ) -# self.assertEqual( -# room_id_results.keys(), -# { -# room_id1, -# # room_id2 shouldn't show up because we left before the from/to range -# # and the join event during the range happened while worker2 was stuck. -# # This means that from the perspective of the master, where the -# # `stuck_activity_token` is generated, the stream position for worker2 -# # wasn't advanced to the join yet. Looking at the `instance_map`, the -# # join technically comes after `stuck_activity_token``. -# # -# # room_id2, -# room_id3, -# }, -# ) + logger.info("events_db_dump: %s", events_db_dump) + + current_state_delta_stream_db_dump = self.get_success( + self.store.db_pool.simple_select_list( + table="current_state_delta_stream", + keyvalues={}, + retcols=[ + "*", + ], + desc="debug dump current_state_delta_stream", + ) + ) + + logger.info( + "current_state_delta_stream_db_dump: %s", current_state_delta_stream_db_dump + )