Add test that remotely joins room

This commit is contained in:
Eric Eastwood 2024-06-26 19:35:18 -05:00
parent ec2d8dc1e3
commit 0b9a903ca1

View file

@ -21,24 +21,32 @@
import logging
from typing import List, Tuple
from unittest.mock import AsyncMock, patch
from immutabledict import immutabledict
from twisted.test.proto_helpers import MemoryReactor
from synapse.api.constants import Direction, EventTypes, RelationTypes
from synapse.api.constants import Direction, EventTypes, Membership, RelationTypes
from synapse.api.filtering import Filter
from synapse.api.room_versions import RoomVersions
from synapse.events import make_event_from_dict
from synapse.crypto.event_signing import add_hashes_and_signatures
from synapse.events import FrozenEventV3
from synapse.federation.federation_client import SendJoinResult
from synapse.rest import admin
from synapse.rest.client import login, room
from synapse.server import HomeServer
from synapse.storage.databases.main.stream import CurrentStateDeltaMembership
from synapse.types import JsonDict, PersistedEventPosition, RoomStreamToken
from synapse.types import (
JsonDict,
PersistedEventPosition,
RoomStreamToken,
UserID,
create_requester,
)
from synapse.util import Clock
from tests.test_utils.event_injection import create_event
from tests.unittest import HomeserverTestCase
from tests.unittest import FederatingHomeserverTestCase, HomeserverTestCase
logger = logging.getLogger(__name__)
@ -884,8 +892,6 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase):
],
)
# TODO: Test remote join where the first rows will just be the state when you joined
# TODO: Test state reset where the user gets removed from the room (when there is no
# corresponding leave event)
@ -974,67 +980,178 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase):
)
# class GetCurrentStateDeltaMembershipChangesForUserFederationTestCase(BaseMultiWorkerStreamTestCase):
# """
# TODO
# """
class GetCurrentStateDeltaMembershipChangesForUserFederationTestCase(
FederatingHomeserverTestCase
):
"""
Test `get_current_state_delta_membership_changes_for_user(...)` when joining remote federated rooms.
"""
# servlets = [
# admin.register_servlets_for_client_rest_resource,
# room.register_servlets,
# login.register_servlets,
# ]
servlets = [
admin.register_servlets_for_client_rest_resource,
room.register_servlets,
login.register_servlets,
]
# def default_config(self) -> dict:
# conf = super().default_config()
# conf["federation_custom_ca_list"] = [get_test_ca_cert_file()]
# return conf
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.sliding_sync_handler = self.hs.get_sliding_sync_handler()
self.store = self.hs.get_datastores().main
self.event_sources = hs.get_event_sources()
self.room_member_handler = hs.get_room_member_handler()
# def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
# self.sliding_sync_handler = self.hs.get_sliding_sync_handler()
# self.store = self.hs.get_datastores().main
# self.event_sources = hs.get_event_sources()
def test_remote_join(self) -> None:
"""
Test remote join where the first rows will just be the state when you joined
"""
user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")
intially_unjoined_room_id = f"!example:{self.OTHER_SERVER_NAME}"
# def test_sharded_event_persisters(self) -> None:
# """
# TODO
# """
# user1_id = self.register_user("user1", "pass")
# user1_tok = self.login(user1_id, "pass")
# user2_id = self.register_user("user2", "pass")
# user2_tok = self.login(user2_id, "pass")
# Remotely join a room on another homeserver.
#
# To do this we have to mock the responses from the remote homeserver. We also
# patch out a bunch of event checks on our end.
create_event_source = {
"auth_events": [],
"content": {
"creator": f"@creator:{self.OTHER_SERVER_NAME}",
"room_version": self.hs.config.server.default_room_version.identifier,
},
"depth": 0,
"origin_server_ts": 0,
"prev_events": [],
"room_id": intially_unjoined_room_id,
"sender": f"@creator:{self.OTHER_SERVER_NAME}",
"state_key": "",
"type": EventTypes.Create,
}
self.add_hashes_and_signatures_from_other_server(
create_event_source,
self.hs.config.server.default_room_version,
)
create_event = FrozenEventV3(
create_event_source,
self.hs.config.server.default_room_version,
{},
None,
)
creator_join_event_source = {
"auth_events": [create_event.event_id],
"content": {
"membership": "join",
},
"depth": 1,
"origin_server_ts": 1,
"prev_events": [],
"room_id": intially_unjoined_room_id,
"sender": f"@creator:{self.OTHER_SERVER_NAME}",
"state_key": f"@creator:{self.OTHER_SERVER_NAME}",
"type": EventTypes.Member,
}
self.add_hashes_and_signatures_from_other_server(
creator_join_event_source,
self.hs.config.server.default_room_version,
)
creator_join_event = FrozenEventV3(
creator_join_event_source,
self.hs.config.server.default_room_version,
{},
None,
)
# remote_hs = self.make_worker_hs("synapse.app.generic_worker")
# Our local user is going to remote join the room
join_event_source = {
"auth_events": [create_event.event_id],
"content": {"membership": "join"},
"depth": 1,
"origin_server_ts": 100,
"prev_events": [creator_join_event.event_id],
"sender": user1_id,
"state_key": user1_id,
"room_id": intially_unjoined_room_id,
"type": EventTypes.Member,
}
add_hashes_and_signatures(
self.hs.config.server.default_room_version,
join_event_source,
self.hs.hostname,
self.hs.signing_key,
)
join_event = FrozenEventV3(
join_event_source,
self.hs.config.server.default_room_version,
{},
None,
)
# channel = make_request(
# self.reactor,
# self._hs_to_site[hs],
# "GET",
# f"/_matrix/media/r0/download/{target}/{media_id}",
# shorthand=False,
# access_token=self.access_token,
# await_result=False,
# )
mock_make_membership_event = AsyncMock(
return_value=(
self.OTHER_SERVER_NAME,
join_event,
self.hs.config.server.default_room_version,
)
)
mock_send_join = AsyncMock(
return_value=SendJoinResult(
join_event,
self.OTHER_SERVER_NAME,
state=[create_event, creator_join_event],
auth_chain=[create_event, creator_join_event],
partial_state=False,
servers_in_room=frozenset(),
)
)
# remote_hs
with patch.object(
self.room_member_handler.federation_handler.federation_client,
"make_membership_event",
mock_make_membership_event,
), patch.object(
self.room_member_handler.federation_handler.federation_client,
"send_join",
mock_send_join,
), patch(
"synapse.event_auth._is_membership_change_allowed",
return_value=None,
), patch(
"synapse.handlers.federation_event.check_state_dependent_auth_rules",
return_value=None,
):
self.get_success(
self.room_member_handler.update_membership(
requester=create_requester(user1_id),
target=UserID.from_string(user1_id),
room_id=intially_unjoined_room_id,
action=Membership.JOIN,
remote_room_hosts=[self.OTHER_SERVER_NAME],
)
)
# worker_store2 = worker_hs2.get_datastores().main
# assert isinstance(worker_store2._stream_id_gen, MultiWriterIdGenerator)
# actx = worker_store2._stream_id_gen.get_next()
events_db_dump = self.get_success(
self.store.db_pool.simple_select_list(
table="events",
keyvalues={},
retcols=[
"*",
],
desc="debug dump events",
)
)
# self.assertEqual(
# room_id_results.keys(),
# {
# room_id1,
# # room_id2 shouldn't show up because we left before the from/to range
# # and the join event during the range happened while worker2 was stuck.
# # This means that from the perspective of the master, where the
# # `stuck_activity_token` is generated, the stream position for worker2
# # wasn't advanced to the join yet. Looking at the `instance_map`, the
# # join technically comes after `stuck_activity_token``.
# #
# # room_id2,
# room_id3,
# },
# )
logger.info("events_db_dump: %s", events_db_dump)
current_state_delta_stream_db_dump = self.get_success(
self.store.db_pool.simple_select_list(
table="current_state_delta_stream",
keyvalues={},
retcols=[
"*",
],
desc="debug dump current_state_delta_stream",
)
)
logger.info(
"current_state_delta_stream_db_dump: %s", current_state_delta_stream_db_dump
)