From b5facbac0f2d5f6f0e83d7cac43f8de02ce6742f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 21 May 2024 16:48:20 +0100 Subject: [PATCH 1/6] Improve perf of sync device lists (#17216) Re-introduces #17191, and includes #17197 and #17214 The basic idea is to stop calling `get_rooms_for_user` everywhere, and instead use the table `device_lists_changes_in_room`. Commits reviewable one-by-one. --- changelog.d/17216.misc | 1 + synapse/handlers/device.py | 22 +++++- synapse/handlers/sync.py | 38 ++-------- synapse/replication/tcp/client.py | 15 ++-- synapse/storage/databases/main/devices.py | 89 +++++++++++++++++------ 5 files changed, 103 insertions(+), 62 deletions(-) create mode 100644 changelog.d/17216.misc diff --git a/changelog.d/17216.misc b/changelog.d/17216.misc new file mode 100644 index 0000000000..bd55eeaa33 --- /dev/null +++ b/changelog.d/17216.misc @@ -0,0 +1 @@ +Improve performance of calculating device lists changes in `/sync`. diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index 67953a3ed9..55842e7c7b 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -159,20 +159,32 @@ class DeviceWorkerHandler: @cancellable async def get_device_changes_in_shared_rooms( - self, user_id: str, room_ids: StrCollection, from_token: StreamToken + self, + user_id: str, + room_ids: StrCollection, + from_token: StreamToken, + now_token: Optional[StreamToken] = None, ) -> Set[str]: """Get the set of users whose devices have changed who share a room with the given user. """ + now_device_lists_key = self.store.get_device_stream_token() + if now_token: + now_device_lists_key = now_token.device_list_key + changed_users = await self.store.get_device_list_changes_in_rooms( - room_ids, from_token.device_list_key + room_ids, + from_token.device_list_key, + now_device_lists_key, ) if changed_users is not None: # We also check if the given user has changed their device. If # they're in no rooms then the above query won't include them. changed = await self.store.get_users_whose_devices_changed( - from_token.device_list_key, [user_id] + from_token.device_list_key, + [user_id], + to_key=now_device_lists_key, ) changed_users.update(changed) return changed_users @@ -190,7 +202,9 @@ class DeviceWorkerHandler: tracked_users.add(user_id) changed = await self.store.get_users_whose_devices_changed( - from_token.device_list_key, tracked_users + from_token.device_list_key, + tracked_users, + to_key=now_device_lists_key, ) return changed diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index d3d40e8682..b7917a99d6 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -1886,38 +1886,14 @@ class SyncHandler: # Step 1a, check for changes in devices of users we share a room # with - # - # We do this in two different ways depending on what we have cached. - # If we already have a list of all the user that have changed since - # the last sync then it's likely more efficient to compare the rooms - # they're in with the rooms the syncing user is in. - # - # If we don't have that info cached then we get all the users that - # share a room with our user and check if those users have changed. - cache_result = self.store.get_cached_device_list_changes( - since_token.device_list_key - ) - if cache_result.hit: - changed_users = cache_result.entities - - result = await self.store.get_rooms_for_users(changed_users) - - for changed_user_id, entries in result.items(): - # Check if the changed user shares any rooms with the user, - # or if the changed user is the syncing user (as we always - # want to include device list updates of their own devices). - if user_id == changed_user_id or any( - rid in joined_room_ids for rid in entries - ): - users_that_have_changed.add(changed_user_id) - else: - users_that_have_changed = ( - await self._device_handler.get_device_changes_in_shared_rooms( - user_id, - sync_result_builder.joined_room_ids, - from_token=since_token, - ) + users_that_have_changed = ( + await self._device_handler.get_device_changes_in_shared_rooms( + user_id, + sync_result_builder.joined_room_ids, + from_token=since_token, + now_token=sync_result_builder.now_token, ) + ) # Step 1b, check for newly joined rooms for room_id in newly_joined_rooms: diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py index 5e5387fdcb..2d6d49eed7 100644 --- a/synapse/replication/tcp/client.py +++ b/synapse/replication/tcp/client.py @@ -112,6 +112,15 @@ class ReplicationDataHandler: token: stream token for this batch of rows rows: a list of Stream.ROW_TYPE objects as returned by Stream.parse_row. """ + all_room_ids: Set[str] = set() + if stream_name == DeviceListsStream.NAME: + if any(row.entity.startswith("@") and not row.is_signature for row in rows): + prev_token = self.store.get_device_stream_token() + all_room_ids = await self.store.get_all_device_list_changes( + prev_token, token + ) + self.store.device_lists_in_rooms_have_changed(all_room_ids, token) + self.store.process_replication_rows(stream_name, instance_name, token, rows) # NOTE: this must be called after process_replication_rows to ensure any # cache invalidations are first handled before any stream ID advances. @@ -146,12 +155,6 @@ class ReplicationDataHandler: StreamKeyType.TO_DEVICE, token, users=entities ) elif stream_name == DeviceListsStream.NAME: - all_room_ids: Set[str] = set() - for row in rows: - if row.entity.startswith("@") and not row.is_signature: - room_ids = await self.store.get_rooms_for_user(row.entity) - all_room_ids.update(room_ids) - # `all_room_ids` can be large, so let's wake up those streams in batches for batched_room_ids in batch_iter(all_room_ids, 100): self.notifier.on_new_event( diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py index 8dbcb3f5a0..f4410b5c02 100644 --- a/synapse/storage/databases/main/devices.py +++ b/synapse/storage/databases/main/devices.py @@ -70,10 +70,7 @@ from synapse.types import ( from synapse.util import json_decoder, json_encoder from synapse.util.caches.descriptors import cached, cachedList from synapse.util.caches.lrucache import LruCache -from synapse.util.caches.stream_change_cache import ( - AllEntitiesChangedResult, - StreamChangeCache, -) +from synapse.util.caches.stream_change_cache import StreamChangeCache from synapse.util.cancellation import cancellable from synapse.util.iterutils import batch_iter from synapse.util.stringutils import shortstr @@ -132,6 +129,20 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore): prefilled_cache=device_list_prefill, ) + device_list_room_prefill, min_device_list_room_id = self.db_pool.get_cache_dict( + db_conn, + "device_lists_changes_in_room", + entity_column="room_id", + stream_column="stream_id", + max_value=device_list_max, + limit=10000, + ) + self._device_list_room_stream_cache = StreamChangeCache( + "DeviceListRoomStreamChangeCache", + min_device_list_room_id, + prefilled_cache=device_list_room_prefill, + ) + ( user_signature_stream_prefill, user_signature_stream_list_id, @@ -209,6 +220,13 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore): row.entity, token ) + def device_lists_in_rooms_have_changed( + self, room_ids: StrCollection, token: int + ) -> None: + "Record that device lists have changed in rooms" + for room_id in room_ids: + self._device_list_room_stream_cache.entity_has_changed(room_id, token) + def get_device_stream_token(self) -> int: return self._device_list_id_gen.get_current_token() @@ -832,16 +850,6 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore): ) return {device[0]: db_to_json(device[1]) for device in devices} - def get_cached_device_list_changes( - self, - from_key: int, - ) -> AllEntitiesChangedResult: - """Get set of users whose devices have changed since `from_key`, or None - if that information is not in our cache. - """ - - return self._device_list_stream_cache.get_all_entities_changed(from_key) - @cancellable async def get_all_devices_changed( self, @@ -1457,7 +1465,7 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore): @cancellable async def get_device_list_changes_in_rooms( - self, room_ids: Collection[str], from_id: int + self, room_ids: Collection[str], from_id: int, to_id: int ) -> Optional[Set[str]]: """Return the set of users whose devices have changed in the given rooms since the given stream ID. @@ -1473,9 +1481,15 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore): if min_stream_id > from_id: return None + changed_room_ids = self._device_list_room_stream_cache.get_entities_changed( + room_ids, from_id + ) + if not changed_room_ids: + return set() + sql = """ SELECT DISTINCT user_id FROM device_lists_changes_in_room - WHERE {clause} AND stream_id >= ? + WHERE {clause} AND stream_id > ? AND stream_id <= ? """ def _get_device_list_changes_in_rooms_txn( @@ -1487,11 +1501,12 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore): return {user_id for user_id, in txn} changes = set() - for chunk in batch_iter(room_ids, 1000): + for chunk in batch_iter(changed_room_ids, 1000): clause, args = make_in_list_sql_clause( self.database_engine, "room_id", chunk ) args.append(from_id) + args.append(to_id) changes |= await self.db_pool.runInteraction( "get_device_list_changes_in_rooms", @@ -1502,6 +1517,34 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore): return changes + async def get_all_device_list_changes(self, from_id: int, to_id: int) -> Set[str]: + """Return the set of rooms where devices have changed since the given + stream ID. + + Will raise an exception if the given stream ID is too old. + """ + + min_stream_id = await self._get_min_device_lists_changes_in_room() + + if min_stream_id > from_id: + raise Exception("stream ID is too old") + + sql = """ + SELECT DISTINCT room_id FROM device_lists_changes_in_room + WHERE stream_id > ? AND stream_id <= ? + """ + + def _get_all_device_list_changes_txn( + txn: LoggingTransaction, + ) -> Set[str]: + txn.execute(sql, (from_id, to_id)) + return {room_id for room_id, in txn} + + return await self.db_pool.runInteraction( + "get_all_device_list_changes", + _get_all_device_list_changes_txn, + ) + async def get_device_list_changes_in_room( self, room_id: str, min_stream_id: int ) -> Collection[Tuple[str, str]]: @@ -1962,8 +2005,8 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): async def add_device_change_to_streams( self, user_id: str, - device_ids: Collection[str], - room_ids: Collection[str], + device_ids: StrCollection, + room_ids: StrCollection, ) -> Optional[int]: """Persist that a user's devices have been updated, and which hosts (if any) should be poked. @@ -2122,8 +2165,8 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): self, txn: LoggingTransaction, user_id: str, - device_ids: Iterable[str], - room_ids: Collection[str], + device_ids: StrCollection, + room_ids: StrCollection, stream_ids: List[int], context: Dict[str, str], ) -> None: @@ -2161,6 +2204,10 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): ], ) + txn.call_after( + self.device_lists_in_rooms_have_changed, room_ids, max(stream_ids) + ) + async def get_uncoverted_outbound_room_pokes( self, start_stream_id: int, start_room_id: str, limit: int = 10 ) -> List[Tuple[str, str, str, int, Optional[Dict[str, str]]]]: From 6a9a641fb86b04587840bcb6b76af9a0acef9b54 Mon Sep 17 00:00:00 2001 From: devonh Date: Tue, 21 May 2024 20:09:17 +0000 Subject: [PATCH 2/6] Bring auto-accept invite logic into Synapse (#17147) This PR ports the logic from the [synapse_auto_accept_invite](https://github.com/matrix-org/synapse-auto-accept-invite) module into synapse. I went with the naive approach of injecting the "module" next to where third party modules are currently loaded. If there is a better/preferred way to handle this, I'm all ears. It wasn't obvious to me if there was a better location to add this logic that would cleanly apply to all incoming invite events. Relies on https://github.com/element-hq/synapse/pull/17166 to fix linter errors. --- changelog.d/17147.feature | 1 + .../configuration/config_documentation.md | 29 + synapse/app/_base.py | 6 + synapse/config/_base.pyi | 2 + synapse/config/auto_accept_invites.py | 43 ++ synapse/config/homeserver.py | 2 + synapse/events/auto_accept_invites.py | 196 ++++++ synapse/handlers/sso.py | 2 +- tests/events/test_auto_accept_invites.py | 657 ++++++++++++++++++ tests/rest/client/utils.py | 2 + tests/server.py | 6 + 11 files changed, 945 insertions(+), 1 deletion(-) create mode 100644 changelog.d/17147.feature create mode 100644 synapse/config/auto_accept_invites.py create mode 100644 synapse/events/auto_accept_invites.py create mode 100644 tests/events/test_auto_accept_invites.py diff --git a/changelog.d/17147.feature b/changelog.d/17147.feature new file mode 100644 index 0000000000..7c2cdb6bdf --- /dev/null +++ b/changelog.d/17147.feature @@ -0,0 +1 @@ +Add the ability to auto-accept invites on the behalf of users. See the [`auto_accept_invites`](https://element-hq.github.io/synapse/latest/usage/configuration/config_documentation.html#auto-accept-invites) config option for details. diff --git a/docs/usage/configuration/config_documentation.md b/docs/usage/configuration/config_documentation.md index e04fdfdfb0..2c917d1f8e 100644 --- a/docs/usage/configuration/config_documentation.md +++ b/docs/usage/configuration/config_documentation.md @@ -4595,3 +4595,32 @@ background_updates: min_batch_size: 10 default_batch_size: 50 ``` +--- +## Auto Accept Invites +Configuration settings related to automatically accepting invites. + +--- +### `auto_accept_invites` + +Automatically accepting invites controls whether users are presented with an invite request or if they +are instead automatically joined to a room when receiving an invite. Set the `enabled` sub-option to true to +enable auto-accepting invites. Defaults to false. +This setting has the following sub-options: +* `enabled`: Whether to run the auto-accept invites logic. Defaults to false. +* `only_for_direct_messages`: Whether invites should be automatically accepted for all room types, or only + for direct messages. Defaults to false. +* `only_from_local_users`: Whether to only automatically accept invites from users on this homeserver. Defaults to false. +* `worker_to_run_on`: Which worker to run this module on. This must match the "worker_name". + +NOTE: Care should be taken not to enable this setting if the `synapse_auto_accept_invite` module is enabled and installed. +The two modules will compete to perform the same task and may result in undesired behaviour. For example, multiple join +events could be generated from a single invite. + +Example configuration: +```yaml +auto_accept_invites: + enabled: true + only_for_direct_messages: true + only_from_local_users: true + worker_to_run_on: "worker_1" +``` diff --git a/synapse/app/_base.py b/synapse/app/_base.py index 3182608f73..67e0df1459 100644 --- a/synapse/app/_base.py +++ b/synapse/app/_base.py @@ -68,6 +68,7 @@ from synapse.config._base import format_config_error from synapse.config.homeserver import HomeServerConfig from synapse.config.server import ListenerConfig, ManholeConfig, TCPListenerConfig from synapse.crypto import context_factory +from synapse.events.auto_accept_invites import InviteAutoAccepter from synapse.events.presence_router import load_legacy_presence_router from synapse.handlers.auth import load_legacy_password_auth_providers from synapse.http.site import SynapseSite @@ -582,6 +583,11 @@ async def start(hs: "HomeServer") -> None: m = module(config, module_api) logger.info("Loaded module %s", m) + if hs.config.auto_accept_invites.enabled: + # Start the local auto_accept_invites module. + m = InviteAutoAccepter(hs.config.auto_accept_invites, module_api) + logger.info("Loaded local module %s", m) + load_legacy_spam_checkers(hs) load_legacy_third_party_event_rules(hs) load_legacy_presence_router(hs) diff --git a/synapse/config/_base.pyi b/synapse/config/_base.pyi index fc51aed234..d9cb0da38b 100644 --- a/synapse/config/_base.pyi +++ b/synapse/config/_base.pyi @@ -23,6 +23,7 @@ from synapse.config import ( # noqa: F401 api, appservice, auth, + auto_accept_invites, background_updates, cache, captcha, @@ -120,6 +121,7 @@ class RootConfig: federation: federation.FederationConfig retention: retention.RetentionConfig background_updates: background_updates.BackgroundUpdateConfig + auto_accept_invites: auto_accept_invites.AutoAcceptInvitesConfig config_classes: List[Type["Config"]] = ... config_files: List[str] diff --git a/synapse/config/auto_accept_invites.py b/synapse/config/auto_accept_invites.py new file mode 100644 index 0000000000..d90e13a510 --- /dev/null +++ b/synapse/config/auto_accept_invites.py @@ -0,0 +1,43 @@ +# +# This file is licensed under the Affero General Public License (AGPL) version 3. +# +# Copyright (C) 2024 New Vector, Ltd +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. +# +# See the GNU Affero General Public License for more details: +# . +# +# Originally licensed under the Apache License, Version 2.0: +# . +# +# [This file includes modifications made by New Vector Limited] +# +# +from typing import Any + +from synapse.types import JsonDict + +from ._base import Config + + +class AutoAcceptInvitesConfig(Config): + section = "auto_accept_invites" + + def read_config(self, config: JsonDict, **kwargs: Any) -> None: + auto_accept_invites_config = config.get("auto_accept_invites") or {} + + self.enabled = auto_accept_invites_config.get("enabled", False) + + self.accept_invites_only_for_direct_messages = auto_accept_invites_config.get( + "only_for_direct_messages", False + ) + + self.accept_invites_only_from_local_users = auto_accept_invites_config.get( + "only_from_local_users", False + ) + + self.worker_to_run_on = auto_accept_invites_config.get("worker_to_run_on") diff --git a/synapse/config/homeserver.py b/synapse/config/homeserver.py index 72e93ed04f..e36c0bd6ae 100644 --- a/synapse/config/homeserver.py +++ b/synapse/config/homeserver.py @@ -23,6 +23,7 @@ from .account_validity import AccountValidityConfig from .api import ApiConfig from .appservice import AppServiceConfig from .auth import AuthConfig +from .auto_accept_invites import AutoAcceptInvitesConfig from .background_updates import BackgroundUpdateConfig from .cache import CacheConfig from .captcha import CaptchaConfig @@ -105,4 +106,5 @@ class HomeServerConfig(RootConfig): RedisConfig, ExperimentalConfig, BackgroundUpdateConfig, + AutoAcceptInvitesConfig, ] diff --git a/synapse/events/auto_accept_invites.py b/synapse/events/auto_accept_invites.py new file mode 100644 index 0000000000..d88ec51d9d --- /dev/null +++ b/synapse/events/auto_accept_invites.py @@ -0,0 +1,196 @@ +# +# This file is licensed under the Affero General Public License (AGPL) version 3. +# +# Copyright 2021 The Matrix.org Foundation C.I.C +# Copyright (C) 2024 New Vector, Ltd +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. +# +# See the GNU Affero General Public License for more details: +# . +# +# Originally licensed under the Apache License, Version 2.0: +# . +# +# [This file includes modifications made by New Vector Limited] +# +# +import logging +from http import HTTPStatus +from typing import Any, Dict, Tuple + +from synapse.api.constants import AccountDataTypes, EventTypes, Membership +from synapse.api.errors import SynapseError +from synapse.config.auto_accept_invites import AutoAcceptInvitesConfig +from synapse.module_api import EventBase, ModuleApi, run_as_background_process + +logger = logging.getLogger(__name__) + + +class InviteAutoAccepter: + def __init__(self, config: AutoAcceptInvitesConfig, api: ModuleApi): + # Keep a reference to the Module API. + self._api = api + self._config = config + + if not self._config.enabled: + return + + should_run_on_this_worker = config.worker_to_run_on == self._api.worker_name + + if not should_run_on_this_worker: + logger.info( + "Not accepting invites on this worker (configured: %r, here: %r)", + config.worker_to_run_on, + self._api.worker_name, + ) + return + + logger.info( + "Accepting invites on this worker (here: %r)", self._api.worker_name + ) + + # Register the callback. + self._api.register_third_party_rules_callbacks( + on_new_event=self.on_new_event, + ) + + async def on_new_event(self, event: EventBase, *args: Any) -> None: + """Listens for new events, and if the event is an invite for a local user then + automatically accepts it. + + Args: + event: The incoming event. + """ + # Check if the event is an invite for a local user. + is_invite_for_local_user = ( + event.type == EventTypes.Member + and event.is_state() + and event.membership == Membership.INVITE + and self._api.is_mine(event.state_key) + ) + + # Only accept invites for direct messages if the configuration mandates it. + is_direct_message = event.content.get("is_direct", False) + is_allowed_by_direct_message_rules = ( + not self._config.accept_invites_only_for_direct_messages + or is_direct_message is True + ) + + # Only accept invites from remote users if the configuration mandates it. + is_from_local_user = self._api.is_mine(event.sender) + is_allowed_by_local_user_rules = ( + not self._config.accept_invites_only_from_local_users + or is_from_local_user is True + ) + + if ( + is_invite_for_local_user + and is_allowed_by_direct_message_rules + and is_allowed_by_local_user_rules + ): + # Make the user join the room. We run this as a background process to circumvent a race condition + # that occurs when responding to invites over federation (see https://github.com/matrix-org/synapse-auto-accept-invite/issues/12) + run_as_background_process( + "retry_make_join", + self._retry_make_join, + event.state_key, + event.state_key, + event.room_id, + "join", + bg_start_span=False, + ) + + if is_direct_message: + # Mark this room as a direct message! + await self._mark_room_as_direct_message( + event.state_key, event.sender, event.room_id + ) + + async def _mark_room_as_direct_message( + self, user_id: str, dm_user_id: str, room_id: str + ) -> None: + """ + Marks a room (`room_id`) as a direct message with the counterparty `dm_user_id` + from the perspective of the user `user_id`. + + Args: + user_id: the user for whom the membership is changing + dm_user_id: the user performing the membership change + room_id: room id of the room the user is invited to + """ + + # This is a dict of User IDs to tuples of Room IDs + # (get_global will return a frozendict of tuples as it freezes the data, + # but we should accept either frozen or unfrozen variants.) + # Be careful: we convert the outer frozendict into a dict here, + # but the contents of the dict are still frozen (tuples in lieu of lists, + # etc.) + dm_map: Dict[str, Tuple[str, ...]] = dict( + await self._api.account_data_manager.get_global( + user_id, AccountDataTypes.DIRECT + ) + or {} + ) + + if dm_user_id not in dm_map: + dm_map[dm_user_id] = (room_id,) + else: + dm_rooms_for_user = dm_map[dm_user_id] + assert isinstance(dm_rooms_for_user, (tuple, list)) + + dm_map[dm_user_id] = tuple(dm_rooms_for_user) + (room_id,) + + await self._api.account_data_manager.put_global( + user_id, AccountDataTypes.DIRECT, dm_map + ) + + async def _retry_make_join( + self, sender: str, target: str, room_id: str, new_membership: str + ) -> None: + """ + A function to retry sending the `make_join` request with an increasing backoff. This is + implemented to work around a race condition when receiving invites over federation. + + Args: + sender: the user performing the membership change + target: the user for whom the membership is changing + room_id: room id of the room to join to + new_membership: the type of membership event (in this case will be "join") + """ + + sleep = 0 + retries = 0 + join_event = None + + while retries < 5: + try: + await self._api.sleep(sleep) + join_event = await self._api.update_room_membership( + sender=sender, + target=target, + room_id=room_id, + new_membership=new_membership, + ) + except SynapseError as e: + if e.code == HTTPStatus.FORBIDDEN: + logger.debug( + f"Update_room_membership was forbidden. This can sometimes be expected for remote invites. Exception: {e}" + ) + else: + logger.warn( + f"Update_room_membership raised the following unexpected (SynapseError) exception: {e}" + ) + except Exception as e: + logger.warn( + f"Update_room_membership raised the following unexpected exception: {e}" + ) + + sleep = 2**retries + retries += 1 + + if join_event is not None: + break diff --git a/synapse/handlers/sso.py b/synapse/handlers/sso.py index f275d4f35a..ee74289b6c 100644 --- a/synapse/handlers/sso.py +++ b/synapse/handlers/sso.py @@ -817,7 +817,7 @@ class SsoHandler: server_name = profile["avatar_url"].split("/")[-2] media_id = profile["avatar_url"].split("/")[-1] if self._is_mine_server_name(server_name): - media = await self._media_repo.store.get_local_media(media_id) + media = await self._media_repo.store.get_local_media(media_id) # type: ignore[has-type] if media is not None and upload_name == media.upload_name: logger.info("skipping saving the user avatar") return True diff --git a/tests/events/test_auto_accept_invites.py b/tests/events/test_auto_accept_invites.py new file mode 100644 index 0000000000..7fb4d4fa90 --- /dev/null +++ b/tests/events/test_auto_accept_invites.py @@ -0,0 +1,657 @@ +# +# This file is licensed under the Affero General Public License (AGPL) version 3. +# +# Copyright 2021 The Matrix.org Foundation C.I.C +# Copyright (C) 2024 New Vector, Ltd +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. +# +# See the GNU Affero General Public License for more details: +# . +# +# Originally licensed under the Apache License, Version 2.0: +# . +# +# [This file includes modifications made by New Vector Limited] +# +# +import asyncio +from asyncio import Future +from http import HTTPStatus +from typing import Any, Awaitable, Dict, List, Optional, Tuple, TypeVar, cast +from unittest.mock import Mock + +import attr +from parameterized import parameterized + +from twisted.test.proto_helpers import MemoryReactor + +from synapse.api.constants import EventTypes +from synapse.api.errors import SynapseError +from synapse.config.auto_accept_invites import AutoAcceptInvitesConfig +from synapse.events.auto_accept_invites import InviteAutoAccepter +from synapse.federation.federation_base import event_from_pdu_json +from synapse.handlers.sync import JoinedSyncResult, SyncRequestKey, SyncVersion +from synapse.module_api import ModuleApi +from synapse.rest import admin +from synapse.rest.client import login, room +from synapse.server import HomeServer +from synapse.types import StreamToken, create_requester +from synapse.util import Clock + +from tests.handlers.test_sync import generate_sync_config +from tests.unittest import ( + FederatingHomeserverTestCase, + HomeserverTestCase, + TestCase, + override_config, +) + + +class AutoAcceptInvitesTestCase(FederatingHomeserverTestCase): + """ + Integration test cases for auto-accepting invites. + """ + + servlets = [ + admin.register_servlets, + login.register_servlets, + room.register_servlets, + ] + + def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer: + hs = self.setup_test_homeserver() + self.handler = hs.get_federation_handler() + self.store = hs.get_datastores().main + return hs + + def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: + self.sync_handler = self.hs.get_sync_handler() + self.module_api = hs.get_module_api() + + @parameterized.expand( + [ + [False], + [True], + ] + ) + @override_config( + { + "auto_accept_invites": { + "enabled": True, + }, + } + ) + def test_auto_accept_invites(self, direct_room: bool) -> None: + """Test that a user automatically joins a room when invited, if the + module is enabled. + """ + # A local user who sends an invite + inviting_user_id = self.register_user("inviter", "pass") + inviting_user_tok = self.login("inviter", "pass") + + # A local user who receives an invite + invited_user_id = self.register_user("invitee", "pass") + self.login("invitee", "pass") + + # Create a room and send an invite to the other user + room_id = self.helper.create_room_as( + inviting_user_id, + is_public=False, + tok=inviting_user_tok, + ) + + self.helper.invite( + room_id, + inviting_user_id, + invited_user_id, + tok=inviting_user_tok, + extra_data={"is_direct": direct_room}, + ) + + # Check that the invite receiving user has automatically joined the room when syncing + join_updates, _ = sync_join(self, invited_user_id) + self.assertEqual(len(join_updates), 1) + + join_update: JoinedSyncResult = join_updates[0] + self.assertEqual(join_update.room_id, room_id) + + @override_config( + { + "auto_accept_invites": { + "enabled": False, + }, + } + ) + def test_module_not_enabled(self) -> None: + """Test that a user does not automatically join a room when invited, + if the module is not enabled. + """ + # A local user who sends an invite + inviting_user_id = self.register_user("inviter", "pass") + inviting_user_tok = self.login("inviter", "pass") + + # A local user who receives an invite + invited_user_id = self.register_user("invitee", "pass") + self.login("invitee", "pass") + + # Create a room and send an invite to the other user + room_id = self.helper.create_room_as( + inviting_user_id, is_public=False, tok=inviting_user_tok + ) + + self.helper.invite( + room_id, + inviting_user_id, + invited_user_id, + tok=inviting_user_tok, + ) + + # Check that the invite receiving user has not automatically joined the room when syncing + join_updates, _ = sync_join(self, invited_user_id) + self.assertEqual(len(join_updates), 0) + + @override_config( + { + "auto_accept_invites": { + "enabled": True, + }, + } + ) + def test_invite_from_remote_user(self) -> None: + """Test that an invite from a remote user results in the invited user + automatically joining the room. + """ + # A remote user who sends the invite + remote_server = "otherserver" + remote_user = "@otheruser:" + remote_server + + # A local user who creates the room + creator_user_id = self.register_user("creator", "pass") + creator_user_tok = self.login("creator", "pass") + + # A local user who receives an invite + invited_user_id = self.register_user("invitee", "pass") + self.login("invitee", "pass") + + room_id = self.helper.create_room_as( + room_creator=creator_user_id, tok=creator_user_tok + ) + room_version = self.get_success(self.store.get_room_version(room_id)) + + invite_event = event_from_pdu_json( + { + "type": EventTypes.Member, + "content": {"membership": "invite"}, + "room_id": room_id, + "sender": remote_user, + "state_key": invited_user_id, + "depth": 32, + "prev_events": [], + "auth_events": [], + "origin_server_ts": self.clock.time_msec(), + }, + room_version, + ) + self.get_success( + self.handler.on_invite_request( + remote_server, + invite_event, + invite_event.room_version, + ) + ) + + # Check that the invite receiving user has automatically joined the room when syncing + join_updates, _ = sync_join(self, invited_user_id) + self.assertEqual(len(join_updates), 1) + + join_update: JoinedSyncResult = join_updates[0] + self.assertEqual(join_update.room_id, room_id) + + @parameterized.expand( + [ + [False, False], + [True, True], + ] + ) + @override_config( + { + "auto_accept_invites": { + "enabled": True, + "only_for_direct_messages": True, + }, + } + ) + def test_accept_invite_direct_message( + self, + direct_room: bool, + expect_auto_join: bool, + ) -> None: + """Tests that, if the module is configured to only accept DM invites, invites to DM rooms are still + automatically accepted. Otherwise they are rejected. + """ + # A local user who sends an invite + inviting_user_id = self.register_user("inviter", "pass") + inviting_user_tok = self.login("inviter", "pass") + + # A local user who receives an invite + invited_user_id = self.register_user("invitee", "pass") + self.login("invitee", "pass") + + # Create a room and send an invite to the other user + room_id = self.helper.create_room_as( + inviting_user_id, + is_public=False, + tok=inviting_user_tok, + ) + + self.helper.invite( + room_id, + inviting_user_id, + invited_user_id, + tok=inviting_user_tok, + extra_data={"is_direct": direct_room}, + ) + + if expect_auto_join: + # Check that the invite receiving user has automatically joined the room when syncing + join_updates, _ = sync_join(self, invited_user_id) + self.assertEqual(len(join_updates), 1) + + join_update: JoinedSyncResult = join_updates[0] + self.assertEqual(join_update.room_id, room_id) + else: + # Check that the invite receiving user has not automatically joined the room when syncing + join_updates, _ = sync_join(self, invited_user_id) + self.assertEqual(len(join_updates), 0) + + @parameterized.expand( + [ + [False, True], + [True, False], + ] + ) + @override_config( + { + "auto_accept_invites": { + "enabled": True, + "only_from_local_users": True, + }, + } + ) + def test_accept_invite_local_user( + self, remote_inviter: bool, expect_auto_join: bool + ) -> None: + """Tests that, if the module is configured to only accept invites from local users, invites + from local users are still automatically accepted. Otherwise they are rejected. + """ + # A local user who sends an invite + creator_user_id = self.register_user("inviter", "pass") + creator_user_tok = self.login("inviter", "pass") + + # A local user who receives an invite + invited_user_id = self.register_user("invitee", "pass") + self.login("invitee", "pass") + + # Create a room and send an invite to the other user + room_id = self.helper.create_room_as( + creator_user_id, is_public=False, tok=creator_user_tok + ) + + if remote_inviter: + room_version = self.get_success(self.store.get_room_version(room_id)) + + # A remote user who sends the invite + remote_server = "otherserver" + remote_user = "@otheruser:" + remote_server + + invite_event = event_from_pdu_json( + { + "type": EventTypes.Member, + "content": {"membership": "invite"}, + "room_id": room_id, + "sender": remote_user, + "state_key": invited_user_id, + "depth": 32, + "prev_events": [], + "auth_events": [], + "origin_server_ts": self.clock.time_msec(), + }, + room_version, + ) + self.get_success( + self.handler.on_invite_request( + remote_server, + invite_event, + invite_event.room_version, + ) + ) + else: + self.helper.invite( + room_id, + creator_user_id, + invited_user_id, + tok=creator_user_tok, + ) + + if expect_auto_join: + # Check that the invite receiving user has automatically joined the room when syncing + join_updates, _ = sync_join(self, invited_user_id) + self.assertEqual(len(join_updates), 1) + + join_update: JoinedSyncResult = join_updates[0] + self.assertEqual(join_update.room_id, room_id) + else: + # Check that the invite receiving user has not automatically joined the room when syncing + join_updates, _ = sync_join(self, invited_user_id) + self.assertEqual(len(join_updates), 0) + + +_request_key = 0 + + +def generate_request_key() -> SyncRequestKey: + global _request_key + _request_key += 1 + return ("request_key", _request_key) + + +def sync_join( + testcase: HomeserverTestCase, + user_id: str, + since_token: Optional[StreamToken] = None, +) -> Tuple[List[JoinedSyncResult], StreamToken]: + """Perform a sync request for the given user and return the user join updates + they've received, as well as the next_batch token. + + This method assumes testcase.sync_handler points to the homeserver's sync handler. + + Args: + testcase: The testcase that is currently being run. + user_id: The ID of the user to generate a sync response for. + since_token: An optional token to indicate from at what point to sync from. + + Returns: + A tuple containing a list of join updates, and the sync response's + next_batch token. + """ + requester = create_requester(user_id) + sync_config = generate_sync_config(requester.user.to_string()) + sync_result = testcase.get_success( + testcase.hs.get_sync_handler().wait_for_sync_for_user( + requester, + sync_config, + SyncVersion.SYNC_V2, + generate_request_key(), + since_token, + ) + ) + + return sync_result.joined, sync_result.next_batch + + +class InviteAutoAccepterInternalTestCase(TestCase): + """ + Test cases which exercise the internals of the InviteAutoAccepter. + """ + + def setUp(self) -> None: + self.module = create_module() + self.user_id = "@peter:test" + self.invitee = "@lesley:test" + self.remote_invitee = "@thomas:remote" + + # We know our module API is a mock, but mypy doesn't. + self.mocked_update_membership: Mock = self.module._api.update_room_membership # type: ignore[assignment] + + async def test_accept_invite_with_failures(self) -> None: + """Tests that receiving an invite for a local user makes the module attempt to + make the invitee join the room. This test verifies that it works if the call to + update membership returns exceptions before successfully completing and returning an event. + """ + invite = MockEvent( + sender="@inviter:test", + state_key="@invitee:test", + type="m.room.member", + content={"membership": "invite"}, + ) + + join_event = MockEvent( + sender="someone", + state_key="someone", + type="m.room.member", + content={"membership": "join"}, + ) + # the first two calls raise an exception while the third call is successful + self.mocked_update_membership.side_effect = [ + SynapseError(HTTPStatus.FORBIDDEN, "Forbidden"), + SynapseError(HTTPStatus.FORBIDDEN, "Forbidden"), + make_awaitable(join_event), + ] + + # Stop mypy from complaining that we give on_new_event a MockEvent rather than an + # EventBase. + await self.module.on_new_event(event=invite) # type: ignore[arg-type] + + await self.retry_assertions( + self.mocked_update_membership, + 3, + sender=invite.state_key, + target=invite.state_key, + room_id=invite.room_id, + new_membership="join", + ) + + async def test_accept_invite_failures(self) -> None: + """Tests that receiving an invite for a local user makes the module attempt to + make the invitee join the room. This test verifies that if the update_membership call + fails consistently, _retry_make_join will break the loop after the set number of retries and + execution will continue. + """ + invite = MockEvent( + sender=self.user_id, + state_key=self.invitee, + type="m.room.member", + content={"membership": "invite"}, + ) + self.mocked_update_membership.side_effect = SynapseError( + HTTPStatus.FORBIDDEN, "Forbidden" + ) + + # Stop mypy from complaining that we give on_new_event a MockEvent rather than an + # EventBase. + await self.module.on_new_event(event=invite) # type: ignore[arg-type] + + await self.retry_assertions( + self.mocked_update_membership, + 5, + sender=invite.state_key, + target=invite.state_key, + room_id=invite.room_id, + new_membership="join", + ) + + async def test_not_state(self) -> None: + """Tests that receiving an invite that's not a state event does nothing.""" + invite = MockEvent( + sender=self.user_id, type="m.room.member", content={"membership": "invite"} + ) + + # Stop mypy from complaining that we give on_new_event a MockEvent rather than an + # EventBase. + await self.module.on_new_event(event=invite) # type: ignore[arg-type] + + self.mocked_update_membership.assert_not_called() + + async def test_not_invite(self) -> None: + """Tests that receiving a membership update that's not an invite does nothing.""" + invite = MockEvent( + sender=self.user_id, + state_key=self.user_id, + type="m.room.member", + content={"membership": "join"}, + ) + + # Stop mypy from complaining that we give on_new_event a MockEvent rather than an + # EventBase. + await self.module.on_new_event(event=invite) # type: ignore[arg-type] + + self.mocked_update_membership.assert_not_called() + + async def test_not_membership(self) -> None: + """Tests that receiving a state event that's not a membership update does + nothing. + """ + invite = MockEvent( + sender=self.user_id, + state_key=self.user_id, + type="org.matrix.test", + content={"foo": "bar"}, + ) + + # Stop mypy from complaining that we give on_new_event a MockEvent rather than an + # EventBase. + await self.module.on_new_event(event=invite) # type: ignore[arg-type] + + self.mocked_update_membership.assert_not_called() + + def test_config_parse(self) -> None: + """Tests that a correct configuration parses.""" + config = { + "auto_accept_invites": { + "enabled": True, + "only_for_direct_messages": True, + "only_from_local_users": True, + } + } + parsed_config = AutoAcceptInvitesConfig() + parsed_config.read_config(config) + + self.assertTrue(parsed_config.enabled) + self.assertTrue(parsed_config.accept_invites_only_for_direct_messages) + self.assertTrue(parsed_config.accept_invites_only_from_local_users) + + def test_runs_on_only_one_worker(self) -> None: + """ + Tests that the module only runs on the specified worker. + """ + # By default, we run on the main process... + main_module = create_module( + config_override={"auto_accept_invites": {"enabled": True}}, worker_name=None + ) + cast( + Mock, main_module._api.register_third_party_rules_callbacks + ).assert_called_once() + + # ...and not on other workers (like synchrotrons)... + sync_module = create_module(worker_name="synchrotron42") + cast( + Mock, sync_module._api.register_third_party_rules_callbacks + ).assert_not_called() + + # ...unless we configured them to be the designated worker. + specified_module = create_module( + config_override={ + "auto_accept_invites": { + "enabled": True, + "worker_to_run_on": "account_data1", + } + }, + worker_name="account_data1", + ) + cast( + Mock, specified_module._api.register_third_party_rules_callbacks + ).assert_called_once() + + async def retry_assertions( + self, mock: Mock, call_count: int, **kwargs: Any + ) -> None: + """ + This is a hacky way to ensure that the assertions are not called before the other coroutine + has a chance to call `update_room_membership`. It catches the exception caused by a failure, + and sleeps the thread before retrying, up until 5 tries. + + Args: + call_count: the number of times the mock should have been called + mock: the mocked function we want to assert on + kwargs: keyword arguments to assert that the mock was called with + """ + + i = 0 + while i < 5: + try: + # Check that the mocked method is called the expected amount of times and with the right + # arguments to attempt to make the user join the room. + mock.assert_called_with(**kwargs) + self.assertEqual(call_count, mock.call_count) + break + except AssertionError as e: + i += 1 + if i == 5: + # we've used up the tries, force the test to fail as we've already caught the exception + self.fail(e) + await asyncio.sleep(1) + + +@attr.s(auto_attribs=True) +class MockEvent: + """Mocks an event. Only exposes properties the module uses.""" + + sender: str + type: str + content: Dict[str, Any] + room_id: str = "!someroom" + state_key: Optional[str] = None + + def is_state(self) -> bool: + """Checks if the event is a state event by checking if it has a state key.""" + return self.state_key is not None + + @property + def membership(self) -> str: + """Extracts the membership from the event. Should only be called on an event + that's a membership event, and will raise a KeyError otherwise. + """ + membership: str = self.content["membership"] + return membership + + +T = TypeVar("T") +TV = TypeVar("TV") + + +async def make_awaitable(value: T) -> T: + return value + + +def make_multiple_awaitable(result: TV) -> Awaitable[TV]: + """ + Makes an awaitable, suitable for mocking an `async` function. + This uses Futures as they can be awaited multiple times so can be returned + to multiple callers. + """ + future: Future[TV] = Future() + future.set_result(result) + return future + + +def create_module( + config_override: Optional[Dict[str, Any]] = None, worker_name: Optional[str] = None +) -> InviteAutoAccepter: + # Create a mock based on the ModuleApi spec, but override some mocked functions + # because some capabilities are needed for running the tests. + module_api = Mock(spec=ModuleApi) + module_api.is_mine.side_effect = lambda a: a.split(":")[1] == "test" + module_api.worker_name = worker_name + module_api.sleep.return_value = make_multiple_awaitable(None) + + if config_override is None: + config_override = {} + + config = AutoAcceptInvitesConfig() + config.read_config(config_override) + + return InviteAutoAccepter(config, module_api) diff --git a/tests/rest/client/utils.py b/tests/rest/client/utils.py index fe00afe198..7362bde7ab 100644 --- a/tests/rest/client/utils.py +++ b/tests/rest/client/utils.py @@ -170,6 +170,7 @@ class RestHelper: targ: Optional[str] = None, expect_code: int = HTTPStatus.OK, tok: Optional[str] = None, + extra_data: Optional[dict] = None, ) -> JsonDict: return self.change_membership( room=room, @@ -178,6 +179,7 @@ class RestHelper: tok=tok, membership=Membership.INVITE, expect_code=expect_code, + extra_data=extra_data, ) def join( diff --git a/tests/server.py b/tests/server.py index 434be3d22c..f3a917f835 100644 --- a/tests/server.py +++ b/tests/server.py @@ -85,6 +85,7 @@ from twisted.web.server import Request, Site from synapse.config.database import DatabaseConnectionConfig from synapse.config.homeserver import HomeServerConfig +from synapse.events.auto_accept_invites import InviteAutoAccepter from synapse.events.presence_router import load_legacy_presence_router from synapse.handlers.auth import load_legacy_password_auth_providers from synapse.http.site import SynapseRequest @@ -1156,6 +1157,11 @@ def setup_test_homeserver( for module, module_config in hs.config.modules.loaded_modules: module(config=module_config, api=module_api) + if hs.config.auto_accept_invites.enabled: + # Start the local auto_accept_invites module. + m = InviteAutoAccepter(hs.config.auto_accept_invites, module_api) + logger.info("Loaded local module %s", m) + load_legacy_spam_checkers(hs) load_legacy_third_party_event_rules(hs) load_legacy_presence_router(hs) From a547b49773b504deddee4db4ec4fb07971cd2fea Mon Sep 17 00:00:00 2001 From: Yadd Date: Wed, 22 May 2024 16:29:31 +0400 Subject: [PATCH 3/6] Update Lemonldap-NG OIDC config (#17204) Update OIDC documentation: by default Matrix doesn't query userinfo endpoint, then claims should be put on id_token. --- changelog.d/17204.doc | 1 + docs/openid.md | 2 ++ 2 files changed, 3 insertions(+) create mode 100644 changelog.d/17204.doc diff --git a/changelog.d/17204.doc b/changelog.d/17204.doc new file mode 100644 index 0000000000..5a5a8f5107 --- /dev/null +++ b/changelog.d/17204.doc @@ -0,0 +1 @@ +Update OIDC documentation: by default Matrix doesn't query userinfo endpoint, then claims should be put on id_token. diff --git a/docs/openid.md b/docs/openid.md index 9773a7de52..7a10b1615b 100644 --- a/docs/openid.md +++ b/docs/openid.md @@ -525,6 +525,8 @@ oidc_providers: (`Options > Security > ID Token signature algorithm` and `Options > Security > Access Token signature algorithm`) - Scopes: OpenID, Email and Profile +- Force claims into `id_token` + (`Options > Advanced > Force claims to be returned in ID Token`) - Allowed redirection addresses for login (`Options > Basic > Allowed redirection addresses for login` ) : `[synapse public baseurl]/_synapse/client/oidc/callback` From b71d2774388c90a68d71dd8d805556c8f62c92a1 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 22 May 2024 13:55:18 +0100 Subject: [PATCH 4/6] Reduce work of calculating outbound device pokes (#17211) --- changelog.d/17211.misc | 1 + synapse/handlers/device.py | 7 +++++++ synapse/storage/databases/main/devices.py | 24 +++++++++++++++++++++++ 3 files changed, 32 insertions(+) create mode 100644 changelog.d/17211.misc diff --git a/changelog.d/17211.misc b/changelog.d/17211.misc new file mode 100644 index 0000000000..144db03a40 --- /dev/null +++ b/changelog.d/17211.misc @@ -0,0 +1 @@ +Reduce work of calculating outbound device lists updates. diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index 55842e7c7b..0432d97109 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -906,6 +906,13 @@ class DeviceHandler(DeviceWorkerHandler): context=opentracing_context, ) + await self.store.mark_redundant_device_lists_pokes( + user_id=user_id, + device_id=device_id, + room_id=room_id, + converted_upto_stream_id=stream_id, + ) + # Notify replication that we've updated the device list stream. self.notifier.notify_replication() diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py index f4410b5c02..48384e238c 100644 --- a/synapse/storage/databases/main/devices.py +++ b/synapse/storage/databases/main/devices.py @@ -2161,6 +2161,30 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): }, ) + async def mark_redundant_device_lists_pokes( + self, + user_id: str, + device_id: str, + room_id: str, + converted_upto_stream_id: int, + ) -> None: + """If we've calculated the outbound pokes for a given room/device list + update, mark any subsequent changes as already converted""" + + sql = """ + UPDATE device_lists_changes_in_room + SET converted_to_destinations = true + WHERE stream_id > ? AND user_id = ? AND device_id = ? + AND room_id = ? AND NOT converted_to_destinations + """ + + def mark_redundant_device_lists_pokes_txn(txn: LoggingTransaction) -> None: + txn.execute(sql, (converted_upto_stream_id, user_id, device_id, room_id)) + + return await self.db_pool.runInteraction( + "mark_redundant_device_lists_pokes", mark_redundant_device_lists_pokes_txn + ) + def _add_device_outbound_room_poke_txn( self, txn: LoggingTransaction, From 7ef00b76280c9d8f5ad60e42cc384b316569d15f Mon Sep 17 00:00:00 2001 From: reivilibre Date: Wed, 22 May 2024 14:12:58 +0100 Subject: [PATCH 5/6] Add logging to tasks managed by the task scheduler, showing CPU and database usage. (#17219) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The log format is the same as the request log format, except: - fields that are specific to HTTP requests have been removed - the task's params are included at the end of the log line. These log lines are emitted: - when the task function finishes — both completion and failure (and I suppose it is possible for a task to become schedulable again?) - every 5 minutes whilst it is running Closes #17217. --------- Signed-off-by: Olivier 'reivilibre --- changelog.d/17219.feature | 1 + synapse/util/task_scheduler.py | 69 +++++++++++++++++++++++++++++++++- 2 files changed, 68 insertions(+), 2 deletions(-) create mode 100644 changelog.d/17219.feature diff --git a/changelog.d/17219.feature b/changelog.d/17219.feature new file mode 100644 index 0000000000..f8277a89d8 --- /dev/null +++ b/changelog.d/17219.feature @@ -0,0 +1 @@ +Add logging to tasks managed by the task scheduler, showing CPU and database usage. \ No newline at end of file diff --git a/synapse/util/task_scheduler.py b/synapse/util/task_scheduler.py index 01d05c9ed6..448960b297 100644 --- a/synapse/util/task_scheduler.py +++ b/synapse/util/task_scheduler.py @@ -24,7 +24,12 @@ from typing import TYPE_CHECKING, Awaitable, Callable, Dict, List, Optional, Set from twisted.python.failure import Failure -from synapse.logging.context import nested_logging_context +from synapse.logging.context import ( + ContextResourceUsage, + LoggingContext, + nested_logging_context, + set_current_context, +) from synapse.metrics import LaterGauge from synapse.metrics.background_process_metrics import ( run_as_background_process, @@ -81,6 +86,8 @@ class TaskScheduler: MAX_CONCURRENT_RUNNING_TASKS = 5 # Time from the last task update after which we will log a warning LAST_UPDATE_BEFORE_WARNING_MS = 24 * 60 * 60 * 1000 # 24hrs + # Report a running task's status and usage every so often. + OCCASIONAL_REPORT_INTERVAL_MS = 5 * 60 * 1000 # 5 minutes def __init__(self, hs: "HomeServer"): self._hs = hs @@ -346,6 +353,33 @@ class TaskScheduler: assert task.id not in self._running_tasks await self._store.delete_scheduled_task(task.id) + @staticmethod + def _log_task_usage( + state: str, task: ScheduledTask, usage: ContextResourceUsage, active_time: float + ) -> None: + """ + Log a line describing the state and usage of a task. + The log line is inspired by / a copy of the request log line format, + but with irrelevant fields removed. + + active_time: Time that the task has been running for, in seconds. + """ + + logger.info( + "Task %s: %.3fsec (%.3fsec, %.3fsec) (%.3fsec/%.3fsec/%d)" + " [%d dbevts] %r, %r", + state, + active_time, + usage.ru_utime, + usage.ru_stime, + usage.db_sched_duration_sec, + usage.db_txn_duration_sec, + int(usage.db_txn_count), + usage.evt_db_fetch_count, + task.resource_id, + task.params, + ) + async def _launch_task(self, task: ScheduledTask) -> None: """Launch a scheduled task now. @@ -360,8 +394,32 @@ class TaskScheduler: ) function = self._actions[task.action] + def _occasional_report( + task_log_context: LoggingContext, start_time: float + ) -> None: + """ + Helper to log a 'Task continuing' line every so often. + """ + + current_time = self._clock.time() + calling_context = set_current_context(task_log_context) + try: + usage = task_log_context.get_resource_usage() + TaskScheduler._log_task_usage( + "continuing", task, usage, current_time - start_time + ) + finally: + set_current_context(calling_context) + async def wrapper() -> None: - with nested_logging_context(task.id): + with nested_logging_context(task.id) as log_context: + start_time = self._clock.time() + occasional_status_call = self._clock.looping_call( + _occasional_report, + TaskScheduler.OCCASIONAL_REPORT_INTERVAL_MS, + log_context, + start_time, + ) try: (status, result, error) = await function(task) except Exception: @@ -383,6 +441,13 @@ class TaskScheduler: ) self._running_tasks.remove(task.id) + current_time = self._clock.time() + usage = log_context.get_resource_usage() + TaskScheduler._log_task_usage( + status.value, task, usage, current_time - start_time + ) + occasional_status_call.stop() + # Try launch a new task since we've finished with this one. self._clock.call_later(0.1, self._launch_scheduled_tasks) From 7e2412265da43552b26dedfa72909afd704d1500 Mon Sep 17 00:00:00 2001 From: reivilibre Date: Wed, 22 May 2024 14:22:33 +0100 Subject: [PATCH 6/6] Log exceptions when failing to auto-join new user according to the `auto_join_rooms` option. (#17176) Would have been useful for tracking down #16878. Signed-off-by: Olivier 'reivilibre --- changelog.d/17176.misc | 1 + synapse/handlers/register.py | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) create mode 100644 changelog.d/17176.misc diff --git a/changelog.d/17176.misc b/changelog.d/17176.misc new file mode 100644 index 0000000000..cc9f2a5202 --- /dev/null +++ b/changelog.d/17176.misc @@ -0,0 +1 @@ +Log exceptions when failing to auto-join new user according to the `auto_join_rooms` option. \ No newline at end of file diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py index e48e70db04..c200e29569 100644 --- a/synapse/handlers/register.py +++ b/synapse/handlers/register.py @@ -590,7 +590,7 @@ class RegistrationHandler: # moving away from bare excepts is a good thing to do. logger.error("Failed to join new user to %r: %r", r, e) except Exception as e: - logger.error("Failed to join new user to %r: %r", r, e) + logger.error("Failed to join new user to %r: %r", r, e, exc_info=True) async def _auto_join_rooms(self, user_id: str) -> None: """Automatically joins users to auto join rooms - creating the room in the first place