diff --git a/README.rst b/README.rst index 655a2bf3be..1a5503572e 100644 --- a/README.rst +++ b/README.rst @@ -393,7 +393,12 @@ massive excess of outgoing federation requests (see `discussion indicate that your server is also issuing far more outgoing federation requests than can be accounted for by your users' activity, this is a likely cause. The misbehavior can be worked around by setting -``use_presence: false`` in the Synapse config file. +the following in the Synapse config file: + +.. code-block:: yaml + + presence: + enabled: false People can't accept room invitations from me -------------------------------------------- diff --git a/changelog.d/9491.feature b/changelog.d/9491.feature new file mode 100644 index 0000000000..8b56a95a44 --- /dev/null +++ b/changelog.d/9491.feature @@ -0,0 +1 @@ +Add a Synapse module for routing presence updates between users. diff --git a/docs/presence_router_module.md b/docs/presence_router_module.md new file mode 100644 index 0000000000..d6566d978d --- /dev/null +++ b/docs/presence_router_module.md @@ -0,0 +1,235 @@ +# Presence Router Module + +Synapse supports configuring a module that can specify additional users +(local or remote) to should receive certain presence updates from local +users. + +Note that routing presence via Application Service transactions is not +currently supported. + +The presence routing module is implemented as a Python class, which will +be imported by the running Synapse. + +## Python Presence Router Class + +The Python class is instantiated with two objects: + +* A configuration object of some type (see below). +* An instance of `synapse.module_api.ModuleApi`. + +It then implements methods related to presence routing. + +Note that one method of `ModuleApi` that may be useful is: + +```python +async def ModuleApi.send_local_online_presence_to(users: Iterable[str]) -> None +``` + +which can be given a list of local or remote MXIDs to broadcast known, online user +presence to (for those users that the receiving user is considered interested in). +It does not include state for users who are currently offline, and it can only be +called on workers that support sending federation. + +### Module structure + +Below is a list of possible methods that can be implemented, and whether they are +required. + +#### `parse_config` + +```python +def parse_config(config_dict: dict) -> Any +``` + +**Required.** A static method that is passed a dictionary of config options, and + should return a validated config object. This method is described further in + [Configuration](#configuration). + +#### `get_users_for_states` + +```python +async def get_users_for_states( + self, + state_updates: Iterable[UserPresenceState], +) -> Dict[str, Set[UserPresenceState]]: +``` + +**Required.** An asynchronous method that is passed an iterable of user presence +state. This method can determine whether a given presence update should be sent to certain +users. It does this by returning a dictionary with keys representing local or remote +Matrix User IDs, and values being a python set +of `synapse.handlers.presence.UserPresenceState` instances. + +Synapse will then attempt to send the specified presence updates to each user when +possible. + +#### `get_interested_users` + +```python +async def get_interested_users(self, user_id: str) -> Union[Set[str], str] +``` + +**Required.** An asynchronous method that is passed a single Matrix User ID. This +method is expected to return the users that the passed in user may be interested in the +presence of. Returned users may be local or remote. The presence routed as a result of +what this method returns is sent in addition to the updates already sent between users +that share a room together. Presence updates are deduplicated. + +This method should return a python set of Matrix User IDs, or the object +`synapse.events.presence_router.PresenceRouter.ALL_USERS` to indicate that the passed +user should receive presence information for *all* known users. + +For clarity, if the user `@alice:example.org` is passed to this method, and the Set +`{"@bob:example.com", "@charlie:somewhere.org"}` is returned, this signifies that Alice +should receive presence updates sent by Bob and Charlie, regardless of whether these +users share a room. + +### Example + +Below is an example implementation of a presence router class. + +```python +from typing import Dict, Iterable, Set, Union +from synapse.events.presence_router import PresenceRouter +from synapse.handlers.presence import UserPresenceState +from synapse.module_api import ModuleApi + +class PresenceRouterConfig: + def __init__(self): + # Config options with their defaults + # A list of users to always send all user presence updates to + self.always_send_to_users = [] # type: List[str] + + # A list of users to ignore presence updates for. Does not affect + # shared-room presence relationships + self.blacklisted_users = [] # type: List[str] + +class ExamplePresenceRouter: + """An example implementation of synapse.presence_router.PresenceRouter. + Supports routing all presence to a configured set of users, or a subset + of presence from certain users to members of certain rooms. + + Args: + config: A configuration object. + module_api: An instance of Synapse's ModuleApi. + """ + def __init__(self, config: PresenceRouterConfig, module_api: ModuleApi): + self._config = config + self._module_api = module_api + + @staticmethod + def parse_config(config_dict: dict) -> PresenceRouterConfig: + """Parse a configuration dictionary from the homeserver config, do + some validation and return a typed PresenceRouterConfig. + + Args: + config_dict: The configuration dictionary. + + Returns: + A validated config object. + """ + # Initialise a typed config object + config = PresenceRouterConfig() + always_send_to_users = config_dict.get("always_send_to_users") + blacklisted_users = config_dict.get("blacklisted_users") + + # Do some validation of config options... otherwise raise a + # synapse.config.ConfigError. + config.always_send_to_users = always_send_to_users + config.blacklisted_users = blacklisted_users + + return config + + async def get_users_for_states( + self, + state_updates: Iterable[UserPresenceState], + ) -> Dict[str, Set[UserPresenceState]]: + """Given an iterable of user presence updates, determine where each one + needs to go. Returned results will not affect presence updates that are + sent between users who share a room. + + Args: + state_updates: An iterable of user presence state updates. + + Returns: + A dictionary of user_id -> set of UserPresenceState that the user should + receive. + """ + destination_users = {} # type: Dict[str, Set[UserPresenceState] + + # Ignore any updates for blacklisted users + desired_updates = set() + for update in state_updates: + if update.state_key not in self._config.blacklisted_users: + desired_updates.add(update) + + # Send all presence updates to specific users + for user_id in self._config.always_send_to_users: + destination_users[user_id] = desired_updates + + return destination_users + + async def get_interested_users( + self, + user_id: str, + ) -> Union[Set[str], PresenceRouter.ALL_USERS]: + """ + Retrieve a list of users that `user_id` is interested in receiving the + presence of. This will be in addition to those they share a room with. + Optionally, the object PresenceRouter.ALL_USERS can be returned to indicate + that this user should receive all incoming local and remote presence updates. + + Note that this method will only be called for local users. + + Args: + user_id: A user requesting presence updates. + + Returns: + A set of user IDs to return additional presence updates for, or + PresenceRouter.ALL_USERS to return presence updates for all other users. + """ + if user_id in self._config.always_send_to_users: + return PresenceRouter.ALL_USERS + + return set() +``` + +#### A note on `get_users_for_states` and `get_interested_users` + +Both of these methods are effectively two different sides of the same coin. The logic +regarding which users should receive updates for other users should be the same +between them. + +`get_users_for_states` is called when presence updates come in from either federation +or local users, and is used to either direct local presence to remote users, or to +wake up the sync streams of local users to collect remote presence. + +In contrast, `get_interested_users` is used to determine the users that presence should +be fetched for when a local user is syncing. This presence is then retrieved, before +being fed through `get_users_for_states` once again, with only the syncing user's +routing information pulled from the resulting dictionary. + +Their routing logic should thus line up, else you may run into unintended behaviour. + +## Configuration + +Once you've crafted your module and installed it into the same Python environment as +Synapse, amend your homeserver config file with the following. + +```yaml +presence: + routing_module: + module: my_module.ExamplePresenceRouter + config: + # Any configuration options for your module. The below is an example. + # of setting options for ExamplePresenceRouter. + always_send_to_users: ["@presence_gobbler:example.org"] + blacklisted_users: + - "@alice:example.com" + - "@bob:example.com" + ... +``` + +The contents of `config` will be passed as a Python dictionary to the static +`parse_config` method of your class. The object returned by this method will +then be passed to the `__init__` method of your module as `config`. diff --git a/docs/sample_config.yaml b/docs/sample_config.yaml index b0bf987740..9182dcd987 100644 --- a/docs/sample_config.yaml +++ b/docs/sample_config.yaml @@ -82,9 +82,28 @@ pid_file: DATADIR/homeserver.pid # #soft_file_limit: 0 -# Set to false to disable presence tracking on this homeserver. +# Presence tracking allows users to see the state (e.g online/offline) +# of other local and remote users. # -#use_presence: false +presence: + # Uncomment to disable presence tracking on this homeserver. This option + # replaces the previous top-level 'use_presence' option. + # + #enabled: false + + # Presence routers are third-party modules that can specify additional logic + # to where presence updates from users are routed. + # + presence_router: + # The custom module's class. Uncomment to use a custom presence router module. + # + #module: "my_custom_router.PresenceRouter" + + # Configuration options of the custom module. Refer to your module's + # documentation for available options. + # + #config: + # example_option: 'something' # Whether to require authentication to retrieve profile data (avatars, # display names) of other users through the client API. Defaults to diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py index 3df2aa5c2b..d1c2079233 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py @@ -281,6 +281,7 @@ class GenericWorkerPresence(BasePresenceHandler): self.hs = hs self.is_mine_id = hs.is_mine_id + self.presence_router = hs.get_presence_router() self._presence_enabled = hs.config.use_presence # The number of ongoing syncs on this process, by user id. @@ -395,7 +396,7 @@ class GenericWorkerPresence(BasePresenceHandler): return _user_syncing() async def notify_from_replication(self, states, stream_id): - parties = await get_interested_parties(self.store, states) + parties = await get_interested_parties(self.store, self.presence_router, states) room_ids_to_states, users_to_states = parties self.notifier.on_new_event( diff --git a/synapse/config/server.py b/synapse/config/server.py index 5f8910b6e1..8decc9d10d 100644 --- a/synapse/config/server.py +++ b/synapse/config/server.py @@ -27,6 +27,7 @@ import yaml from netaddr import AddrFormatError, IPNetwork, IPSet from synapse.api.room_versions import KNOWN_ROOM_VERSIONS +from synapse.util.module_loader import load_module from synapse.util.stringutils import parse_and_validate_server_name from ._base import Config, ConfigError @@ -238,7 +239,20 @@ class ServerConfig(Config): self.public_baseurl = config.get("public_baseurl") # Whether to enable user presence. - self.use_presence = config.get("use_presence", True) + presence_config = config.get("presence") or {} + self.use_presence = presence_config.get("enabled") + if self.use_presence is None: + self.use_presence = config.get("use_presence", True) + + # Custom presence router module + self.presence_router_module_class = None + self.presence_router_config = None + presence_router_config = presence_config.get("presence_router") + if presence_router_config: + ( + self.presence_router_module_class, + self.presence_router_config, + ) = load_module(presence_router_config, ("presence", "presence_router")) # Whether to update the user directory or not. This should be set to # false only if we are updating the user directory in a worker @@ -834,9 +848,28 @@ class ServerConfig(Config): # #soft_file_limit: 0 - # Set to false to disable presence tracking on this homeserver. + # Presence tracking allows users to see the state (e.g online/offline) + # of other local and remote users. # - #use_presence: false + presence: + # Uncomment to disable presence tracking on this homeserver. This option + # replaces the previous top-level 'use_presence' option. + # + #enabled: false + + # Presence routers are third-party modules that can specify additional logic + # to where presence updates from users are routed. + # + presence_router: + # The custom module's class. Uncomment to use a custom presence router module. + # + #module: "my_custom_router.PresenceRouter" + + # Configuration options of the custom module. Refer to your module's + # documentation for available options. + # + #config: + # example_option: 'something' # Whether to require authentication to retrieve profile data (avatars, # display names) of other users through the client API. Defaults to diff --git a/synapse/events/presence_router.py b/synapse/events/presence_router.py new file mode 100644 index 0000000000..24cd389d80 --- /dev/null +++ b/synapse/events/presence_router.py @@ -0,0 +1,104 @@ +# -*- coding: utf-8 -*- +# Copyright 2021 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import TYPE_CHECKING, Dict, Iterable, Set, Union + +from synapse.api.presence import UserPresenceState + +if TYPE_CHECKING: + from synapse.server import HomeServer + + +class PresenceRouter: + """ + A module that the homeserver will call upon to help route user presence updates to + additional destinations. If a custom presence router is configured, calls will be + passed to that instead. + """ + + ALL_USERS = "ALL" + + def __init__(self, hs: "HomeServer"): + self.custom_presence_router = None + + # Check whether a custom presence router module has been configured + if hs.config.presence_router_module_class: + # Initialise the module + self.custom_presence_router = hs.config.presence_router_module_class( + config=hs.config.presence_router_config, module_api=hs.get_module_api() + ) + + # Ensure the module has implemented the required methods + required_methods = ["get_users_for_states", "get_interested_users"] + for method_name in required_methods: + if not hasattr(self.custom_presence_router, method_name): + raise Exception( + "PresenceRouter module '%s' must implement all required methods: %s" + % ( + hs.config.presence_router_module_class.__name__, + ", ".join(required_methods), + ) + ) + + async def get_users_for_states( + self, + state_updates: Iterable[UserPresenceState], + ) -> Dict[str, Set[UserPresenceState]]: + """ + Given an iterable of user presence updates, determine where each one + needs to go. + + Args: + state_updates: An iterable of user presence state updates. + + Returns: + A dictionary of user_id -> set of UserPresenceState, indicating which + presence updates each user should receive. + """ + if self.custom_presence_router is not None: + # Ask the custom module + return await self.custom_presence_router.get_users_for_states( + state_updates=state_updates + ) + + # Don't include any extra destinations for presence updates + return {} + + async def get_interested_users(self, user_id: str) -> Union[Set[str], ALL_USERS]: + """ + Retrieve a list of users that `user_id` is interested in receiving the + presence of. This will be in addition to those they share a room with. + Optionally, the object PresenceRouter.ALL_USERS can be returned to indicate + that this user should receive all incoming local and remote presence updates. + + Note that this method will only be called for local users, but can return users + that are local or remote. + + Args: + user_id: A user requesting presence updates. + + Returns: + A set of user IDs to return presence updates for, or ALL_USERS to return all + known updates. + """ + if self.custom_presence_router is not None: + # Ask the custom module for interested users + return await self.custom_presence_router.get_interested_users( + user_id=user_id + ) + + # A custom presence router is not defined. + # Don't report any additional interested users + return set() diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py index 8babb1ebbe..98bfce22ff 100644 --- a/synapse/federation/sender/__init__.py +++ b/synapse/federation/sender/__init__.py @@ -44,6 +44,7 @@ from synapse.types import JsonDict, ReadReceipt, RoomStreamToken from synapse.util.metrics import Measure, measure_func if TYPE_CHECKING: + from synapse.events.presence_router import PresenceRouter from synapse.server import HomeServer logger = logging.getLogger(__name__) @@ -162,6 +163,7 @@ class FederationSender(AbstractFederationSender): self.clock = hs.get_clock() self.is_mine_id = hs.is_mine_id + self._presence_router = None # type: Optional[PresenceRouter] self._transaction_manager = TransactionManager(hs) self._instance_name = hs.get_instance_name() @@ -584,7 +586,22 @@ class FederationSender(AbstractFederationSender): """Given a list of states populate self.pending_presence_by_dest and poke to send a new transaction to each destination """ - hosts_and_states = await get_interested_remotes(self.store, states, self.state) + # We pull the presence router here instead of __init__ + # to prevent a dependency cycle: + # + # AuthHandler -> Notifier -> FederationSender + # -> PresenceRouter -> ModuleApi -> AuthHandler + if self._presence_router is None: + self._presence_router = self.hs.get_presence_router() + + assert self._presence_router is not None + + hosts_and_states = await get_interested_remotes( + self.store, + self._presence_router, + states, + self.state, + ) for destinations, states in hosts_and_states: for destination in destinations: diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index da92feacc9..c817f2952d 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -25,7 +25,17 @@ The methods that define policy are: import abc import logging from contextlib import contextmanager -from typing import TYPE_CHECKING, Dict, Iterable, List, Set, Tuple +from typing import ( + TYPE_CHECKING, + Dict, + FrozenSet, + Iterable, + List, + Optional, + Set, + Tuple, + Union, +) from prometheus_client import Counter from typing_extensions import ContextManager @@ -34,6 +44,7 @@ import synapse.metrics from synapse.api.constants import EventTypes, Membership, PresenceState from synapse.api.errors import SynapseError from synapse.api.presence import UserPresenceState +from synapse.events.presence_router import PresenceRouter from synapse.logging.context import run_in_background from synapse.logging.utils import log_function from synapse.metrics import LaterGauge @@ -42,7 +53,7 @@ from synapse.state import StateHandler from synapse.storage.databases.main import DataStore from synapse.types import Collection, JsonDict, UserID, get_domain_from_id from synapse.util.async_helpers import Linearizer -from synapse.util.caches.descriptors import cached +from synapse.util.caches.descriptors import _CacheContext, cached from synapse.util.metrics import Measure from synapse.util.wheel_timer import WheelTimer @@ -209,6 +220,7 @@ class PresenceHandler(BasePresenceHandler): self.notifier = hs.get_notifier() self.federation = hs.get_federation_sender() self.state = hs.get_state_handler() + self.presence_router = hs.get_presence_router() self._presence_enabled = hs.config.use_presence federation_registry = hs.get_federation_registry() @@ -653,7 +665,7 @@ class PresenceHandler(BasePresenceHandler): """ stream_id, max_token = await self.store.update_presence(states) - parties = await get_interested_parties(self.store, states) + parties = await get_interested_parties(self.store, self.presence_router, states) room_ids_to_states, users_to_states = parties self.notifier.on_new_event( @@ -1041,7 +1053,12 @@ class PresenceEventSource: # # Presence -> Notifier -> PresenceEventSource -> Presence # + # Same with get_module_api, get_presence_router + # + # AuthHandler -> Notifier -> PresenceEventSource -> ModuleApi -> AuthHandler self.get_presence_handler = hs.get_presence_handler + self.get_module_api = hs.get_module_api + self.get_presence_router = hs.get_presence_router self.clock = hs.get_clock() self.store = hs.get_datastore() self.state = hs.get_state_handler() @@ -1055,7 +1072,7 @@ class PresenceEventSource: include_offline=True, explicit_room_id=None, **kwargs - ): + ) -> Tuple[List[UserPresenceState], int]: # The process for getting presence events are: # 1. Get the rooms the user is in. # 2. Get the list of user in the rooms. @@ -1068,7 +1085,17 @@ class PresenceEventSource: # We don't try and limit the presence updates by the current token, as # sending down the rare duplicate is not a concern. + user_id = user.to_string() + stream_change_cache = self.store.presence_stream_cache + with Measure(self.clock, "presence.get_new_events"): + if user_id in self.get_module_api()._send_full_presence_to_local_users: + # This user has been specified by a module to receive all current, online + # user presence. Removing from_key and setting include_offline to false + # will do effectively this. + from_key = None + include_offline = False + if from_key is not None: from_key = int(from_key) @@ -1091,59 +1118,209 @@ class PresenceEventSource: # doesn't return. C.f. #5503. return [], max_token - presence = self.get_presence_handler() - stream_change_cache = self.store.presence_stream_cache - + # Figure out which other users this user should receive updates for users_interested_in = await self._get_interested_in(user, explicit_room_id) - user_ids_changed = set() # type: Collection[str] - changed = None - if from_key: - changed = stream_change_cache.get_all_entities_changed(from_key) + # We have a set of users that we're interested in the presence of. We want to + # cross-reference that with the users that have actually changed their presence. - if changed is not None and len(changed) < 500: - assert isinstance(user_ids_changed, set) + # Check whether this user should see all user updates - # For small deltas, its quicker to get all changes and then - # work out if we share a room or they're in our presence list - get_updates_counter.labels("stream").inc() - for other_user_id in changed: - if other_user_id in users_interested_in: - user_ids_changed.add(other_user_id) - else: - # Too many possible updates. Find all users we can see and check - # if any of them have changed. - get_updates_counter.labels("full").inc() + if users_interested_in == PresenceRouter.ALL_USERS: + # Provide presence state for all users + presence_updates = await self._filter_all_presence_updates_for_user( + user_id, include_offline, from_key + ) - if from_key: - user_ids_changed = stream_change_cache.get_entities_changed( - users_interested_in, from_key + # Remove the user from the list of users to receive all presence + if user_id in self.get_module_api()._send_full_presence_to_local_users: + self.get_module_api()._send_full_presence_to_local_users.remove( + user_id ) + + return presence_updates, max_token + + # Make mypy happy. users_interested_in should now be a set + assert not isinstance(users_interested_in, str) + + # The set of users that we're interested in and that have had a presence update. + # We'll actually pull the presence updates for these users at the end. + interested_and_updated_users = ( + set() + ) # type: Union[Set[str], FrozenSet[str]] + + if from_key: + # First get all users that have had a presence update + updated_users = stream_change_cache.get_all_entities_changed(from_key) + + # Cross-reference users we're interested in with those that have had updates. + # Use a slightly-optimised method for processing smaller sets of updates. + if updated_users is not None and len(updated_users) < 500: + # For small deltas, it's quicker to get all changes and then + # cross-reference with the users we're interested in + get_updates_counter.labels("stream").inc() + for other_user_id in updated_users: + if other_user_id in users_interested_in: + # mypy thinks this variable could be a FrozenSet as it's possibly set + # to one in the `get_entities_changed` call below, and `add()` is not + # method on a FrozenSet. That doesn't affect us here though, as + # `interested_and_updated_users` is clearly a set() above. + interested_and_updated_users.add(other_user_id) # type: ignore else: - user_ids_changed = users_interested_in + # Too many possible updates. Find all users we can see and check + # if any of them have changed. + get_updates_counter.labels("full").inc() - updates = await presence.current_state_for_users(user_ids_changed) + interested_and_updated_users = ( + stream_change_cache.get_entities_changed( + users_interested_in, from_key + ) + ) + else: + # No from_key has been specified. Return the presence for all users + # this user is interested in + interested_and_updated_users = users_interested_in - if include_offline: - return (list(updates.values()), max_token) - else: - return ( - [s for s in updates.values() if s.state != PresenceState.OFFLINE], - max_token, + # Retrieve the current presence state for each user + users_to_state = await self.get_presence_handler().current_state_for_users( + interested_and_updated_users ) + presence_updates = list(users_to_state.values()) + + # Remove the user from the list of users to receive all presence + if user_id in self.get_module_api()._send_full_presence_to_local_users: + self.get_module_api()._send_full_presence_to_local_users.remove(user_id) + + if not include_offline: + # Filter out offline presence states + presence_updates = self._filter_offline_presence_state(presence_updates) + + return presence_updates, max_token + + async def _filter_all_presence_updates_for_user( + self, + user_id: str, + include_offline: bool, + from_key: Optional[int] = None, + ) -> List[UserPresenceState]: + """ + Computes the presence updates a user should receive. + + First pulls presence updates from the database. Then consults PresenceRouter + for whether any updates should be excluded by user ID. + + Args: + user_id: The User ID of the user to compute presence updates for. + include_offline: Whether to include offline presence states from the results. + from_key: The minimum stream ID of updates to pull from the database + before filtering. + + Returns: + A list of presence states for the given user to receive. + """ + if from_key: + # Only return updates since the last sync + updated_users = self.store.presence_stream_cache.get_all_entities_changed( + from_key + ) + if not updated_users: + updated_users = [] + + # Get the actual presence update for each change + users_to_state = await self.get_presence_handler().current_state_for_users( + updated_users + ) + presence_updates = list(users_to_state.values()) + + if not include_offline: + # Filter out offline states + presence_updates = self._filter_offline_presence_state(presence_updates) + else: + users_to_state = await self.store.get_presence_for_all_users( + include_offline=include_offline + ) + + presence_updates = list(users_to_state.values()) + + # TODO: This feels wildly inefficient, and it's unfortunate we need to ask the + # module for information on a number of users when we then only take the info + # for a single user + + # Filter through the presence router + users_to_state_set = await self.get_presence_router().get_users_for_states( + presence_updates + ) + + # We only want the mapping for the syncing user + presence_updates = list(users_to_state_set[user_id]) + + # Return presence information for all users + return presence_updates + + def _filter_offline_presence_state( + self, presence_updates: Iterable[UserPresenceState] + ) -> List[UserPresenceState]: + """Given an iterable containing user presence updates, return a list with any offline + presence states removed. + + Args: + presence_updates: Presence states to filter + + Returns: + A new list with any offline presence states removed. + """ + return [ + update + for update in presence_updates + if update.state != PresenceState.OFFLINE + ] def get_current_key(self): return self.store.get_current_presence_token() @cached(num_args=2, cache_context=True) - async def _get_interested_in(self, user, explicit_room_id, cache_context): + async def _get_interested_in( + self, + user: UserID, + explicit_room_id: Optional[str] = None, + cache_context: Optional[_CacheContext] = None, + ) -> Union[Set[str], str]: """Returns the set of users that the given user should see presence - updates for + updates for. + + Args: + user: The user to retrieve presence updates for. + explicit_room_id: The users that are in the room will be returned. + + Returns: + A set of user IDs to return presence updates for, or "ALL" to return all + known updates. """ user_id = user.to_string() users_interested_in = set() users_interested_in.add(user_id) # So that we receive our own presence + # cache_context isn't likely to ever be None due to the @cached decorator, + # but we can't have a non-optional argument after the optional argument + # explicit_room_id either. Assert cache_context is not None so we can use it + # without mypy complaining. + assert cache_context + + # Check with the presence router whether we should poll additional users for + # their presence information + additional_users = await self.get_presence_router().get_interested_users( + user.to_string() + ) + if additional_users == PresenceRouter.ALL_USERS: + # If the module requested that this user see the presence updates of *all* + # users, then simply return that instead of calculating what rooms this + # user shares + return PresenceRouter.ALL_USERS + + # Add the additional users from the router + users_interested_in.update(additional_users) + + # Find the users who share a room with this user users_who_share_room = await self.store.get_users_who_share_room_with_user( user_id, on_invalidate=cache_context.invalidate ) @@ -1314,14 +1491,15 @@ def handle_update(prev_state, new_state, is_mine, wheel_timer, now): async def get_interested_parties( - store: DataStore, states: List[UserPresenceState] + store: DataStore, presence_router: PresenceRouter, states: List[UserPresenceState] ) -> Tuple[Dict[str, List[UserPresenceState]], Dict[str, List[UserPresenceState]]]: """Given a list of states return which entities (rooms, users) are interested in the given states. Args: - store - states + store: The homeserver's data store. + presence_router: A module for augmenting the destinations for presence updates. + states: A list of incoming user presence updates. Returns: A 2-tuple of `(room_ids_to_states, users_to_states)`, @@ -1337,11 +1515,22 @@ async def get_interested_parties( # Always notify self users_to_states.setdefault(state.user_id, []).append(state) + # Ask a presence routing module for any additional parties if one + # is loaded. + router_users_to_states = await presence_router.get_users_for_states(states) + + # Update the dictionaries with additional destinations and state to send + for user_id, user_states in router_users_to_states.items(): + users_to_states.setdefault(user_id, []).extend(user_states) + return room_ids_to_states, users_to_states async def get_interested_remotes( - store: DataStore, states: List[UserPresenceState], state_handler: StateHandler + store: DataStore, + presence_router: PresenceRouter, + states: List[UserPresenceState], + state_handler: StateHandler, ) -> List[Tuple[Collection[str], List[UserPresenceState]]]: """Given a list of presence states figure out which remote servers should be sent which. @@ -1349,9 +1538,10 @@ async def get_interested_remotes( All the presence states should be for local users only. Args: - store - states - state_handler + store: The homeserver's data store. + presence_router: A module for augmenting the destinations for presence updates. + states: A list of incoming user presence updates. + state_handler: Returns: A list of 2-tuples of destinations and states, where for @@ -1363,7 +1553,9 @@ async def get_interested_remotes( # First we look up the rooms each user is in (as well as any explicit # subscriptions), then for each distinct room we look up the remote # hosts in those rooms. - room_ids_to_states, users_to_states = await get_interested_parties(store, states) + room_ids_to_states, users_to_states = await get_interested_parties( + store, presence_router, states + ) for room_id, states in room_ids_to_states.items(): hosts = await state_handler.get_current_hosts_in_room(room_id) diff --git a/synapse/module_api/__init__.py b/synapse/module_api/__init__.py index 781e02fbbb..3ecd46c038 100644 --- a/synapse/module_api/__init__.py +++ b/synapse/module_api/__init__.py @@ -50,11 +50,20 @@ class ModuleApi: self._auth = hs.get_auth() self._auth_handler = auth_handler self._server_name = hs.hostname + self._presence_stream = hs.get_event_sources().sources["presence"] # We expose these as properties below in order to attach a helpful docstring. self._http_client = hs.get_simple_http_client() # type: SimpleHttpClient self._public_room_list_manager = PublicRoomListManager(hs) + # The next time these users sync, they will receive the current presence + # state of all local users. Users are added by send_local_online_presence_to, + # and removed after a successful sync. + # + # We make this a private variable to deter modules from accessing it directly, + # though other classes in Synapse will still do so. + self._send_full_presence_to_local_users = set() + @property def http_client(self): """Allows making outbound HTTP requests to remote resources. @@ -385,6 +394,47 @@ class ModuleApi: return event + async def send_local_online_presence_to(self, users: Iterable[str]) -> None: + """ + Forces the equivalent of a presence initial_sync for a set of local or remote + users. The users will receive presence for all currently online users that they + are considered interested in. + + Updates to remote users will be sent immediately, whereas local users will receive + them on their next sync attempt. + + Note that this method can only be run on the main or federation_sender worker + processes. + """ + if not self._hs.should_send_federation(): + raise Exception( + "send_local_online_presence_to can only be run " + "on processes that send federation", + ) + + for user in users: + if self._hs.is_mine_id(user): + # Modify SyncHandler._generate_sync_entry_for_presence to call + # presence_source.get_new_events with an empty `from_key` if + # that user's ID were in a list modified by ModuleApi somewhere. + # That user would then get all presence state on next incremental sync. + + # Force a presence initial_sync for this user next time + self._send_full_presence_to_local_users.add(user) + else: + # Retrieve presence state for currently online users that this user + # is considered interested in + presence_events, _ = await self._presence_stream.get_new_events( + UserID.from_string(user), from_key=None, include_offline=False + ) + + # Send to remote destinations + await make_deferred_yieldable( + # We pull the federation sender here as we can only do so on workers + # that support sending presence + self._hs.get_federation_sender().send_presence(presence_events) + ) + class PublicRoomListManager: """Contains methods for adding to, removing from and querying whether a room diff --git a/synapse/server.py b/synapse/server.py index e42f7b1a18..cfb55c230d 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -51,6 +51,7 @@ from synapse.crypto import context_factory from synapse.crypto.context_factory import RegularPolicyForHTTPS from synapse.crypto.keyring import Keyring from synapse.events.builder import EventBuilderFactory +from synapse.events.presence_router import PresenceRouter from synapse.events.spamcheck import SpamChecker from synapse.events.third_party_rules import ThirdPartyEventRules from synapse.events.utils import EventClientSerializer @@ -425,6 +426,10 @@ class HomeServer(metaclass=abc.ABCMeta): else: raise Exception("Workers cannot write typing") + @cache_in_self + def get_presence_router(self) -> PresenceRouter: + return PresenceRouter(self) + @cache_in_self def get_typing_handler(self) -> FollowerTypingHandler: if self.config.worker.writers.typing == self.get_instance_name(): diff --git a/tests/events/test_presence_router.py b/tests/events/test_presence_router.py new file mode 100644 index 0000000000..c6e547f11c --- /dev/null +++ b/tests/events/test_presence_router.py @@ -0,0 +1,386 @@ +# -*- coding: utf-8 -*- +# Copyright 2021 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the 'License'); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an 'AS IS' BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from typing import Dict, Iterable, List, Optional, Set, Tuple, Union + +from mock import Mock + +import attr + +from synapse.api.constants import EduTypes +from synapse.events.presence_router import PresenceRouter +from synapse.federation.units import Transaction +from synapse.handlers.presence import UserPresenceState +from synapse.module_api import ModuleApi +from synapse.rest import admin +from synapse.rest.client.v1 import login, presence, room +from synapse.types import JsonDict, StreamToken, create_requester + +from tests.handlers.test_sync import generate_sync_config +from tests.unittest import FederatingHomeserverTestCase, TestCase, override_config + + +@attr.s +class PresenceRouterTestConfig: + users_who_should_receive_all_presence = attr.ib(type=List[str], default=[]) + + +class PresenceRouterTestModule: + def __init__(self, config: PresenceRouterTestConfig, module_api: ModuleApi): + self._config = config + self._module_api = module_api + + async def get_users_for_states( + self, state_updates: Iterable[UserPresenceState] + ) -> Dict[str, Set[UserPresenceState]]: + users_to_state = { + user_id: set(state_updates) + for user_id in self._config.users_who_should_receive_all_presence + } + return users_to_state + + async def get_interested_users( + self, user_id: str + ) -> Union[Set[str], PresenceRouter.ALL_USERS]: + if user_id in self._config.users_who_should_receive_all_presence: + return PresenceRouter.ALL_USERS + + return set() + + @staticmethod + def parse_config(config_dict: dict) -> PresenceRouterTestConfig: + """Parse a configuration dictionary from the homeserver config, do + some validation and return a typed PresenceRouterConfig. + + Args: + config_dict: The configuration dictionary. + + Returns: + A validated config object. + """ + # Initialise a typed config object + config = PresenceRouterTestConfig() + + config.users_who_should_receive_all_presence = config_dict.get( + "users_who_should_receive_all_presence" + ) + + return config + + +class PresenceRouterTestCase(FederatingHomeserverTestCase): + servlets = [ + admin.register_servlets, + login.register_servlets, + room.register_servlets, + presence.register_servlets, + ] + + def make_homeserver(self, reactor, clock): + return self.setup_test_homeserver( + federation_transport_client=Mock(spec=["send_transaction"]), + ) + + def prepare(self, reactor, clock, homeserver): + self.sync_handler = self.hs.get_sync_handler() + self.module_api = homeserver.get_module_api() + + @override_config( + { + "presence": { + "presence_router": { + "module": __name__ + ".PresenceRouterTestModule", + "config": { + "users_who_should_receive_all_presence": [ + "@presence_gobbler:test", + ] + }, + } + }, + "send_federation": True, + } + ) + def test_receiving_all_presence(self): + """Test that a user that does not share a room with another other can receive + presence for them, due to presence routing. + """ + # Create a user who should receive all presence of others + self.presence_receiving_user_id = self.register_user( + "presence_gobbler", "monkey" + ) + self.presence_receiving_user_tok = self.login("presence_gobbler", "monkey") + + # And two users who should not have any special routing + self.other_user_one_id = self.register_user("other_user_one", "monkey") + self.other_user_one_tok = self.login("other_user_one", "monkey") + self.other_user_two_id = self.register_user("other_user_two", "monkey") + self.other_user_two_tok = self.login("other_user_two", "monkey") + + # Put the other two users in a room with each other + room_id = self.helper.create_room_as( + self.other_user_one_id, tok=self.other_user_one_tok + ) + + self.helper.invite( + room_id, + self.other_user_one_id, + self.other_user_two_id, + tok=self.other_user_one_tok, + ) + self.helper.join(room_id, self.other_user_two_id, tok=self.other_user_two_tok) + # User one sends some presence + send_presence_update( + self, + self.other_user_one_id, + self.other_user_one_tok, + "online", + "boop", + ) + + # Check that the presence receiving user gets user one's presence when syncing + presence_updates, sync_token = sync_presence( + self, self.presence_receiving_user_id + ) + self.assertEqual(len(presence_updates), 1) + + presence_update = presence_updates[0] # type: UserPresenceState + self.assertEqual(presence_update.user_id, self.other_user_one_id) + self.assertEqual(presence_update.state, "online") + self.assertEqual(presence_update.status_msg, "boop") + + # Have all three users send presence + send_presence_update( + self, + self.other_user_one_id, + self.other_user_one_tok, + "online", + "user_one", + ) + send_presence_update( + self, + self.other_user_two_id, + self.other_user_two_tok, + "online", + "user_two", + ) + send_presence_update( + self, + self.presence_receiving_user_id, + self.presence_receiving_user_tok, + "online", + "presence_gobbler", + ) + + # Check that the presence receiving user gets everyone's presence + presence_updates, _ = sync_presence( + self, self.presence_receiving_user_id, sync_token + ) + self.assertEqual(len(presence_updates), 3) + + # But that User One only get itself and User Two's presence + presence_updates, _ = sync_presence(self, self.other_user_one_id) + self.assertEqual(len(presence_updates), 2) + + found = False + for update in presence_updates: + if update.user_id == self.other_user_two_id: + self.assertEqual(update.state, "online") + self.assertEqual(update.status_msg, "user_two") + found = True + + self.assertTrue(found) + + @override_config( + { + "presence": { + "presence_router": { + "module": __name__ + ".PresenceRouterTestModule", + "config": { + "users_who_should_receive_all_presence": [ + "@presence_gobbler1:test", + "@presence_gobbler2:test", + "@far_away_person:island", + ] + }, + } + }, + "send_federation": True, + } + ) + def test_send_local_online_presence_to_with_module(self): + """Tests that send_local_presence_to_users sends local online presence to a set + of specified local and remote users, with a custom PresenceRouter module enabled. + """ + # Create a user who will send presence updates + self.other_user_id = self.register_user("other_user", "monkey") + self.other_user_tok = self.login("other_user", "monkey") + + # And another two users that will also send out presence updates, as well as receive + # theirs and everyone else's + self.presence_receiving_user_one_id = self.register_user( + "presence_gobbler1", "monkey" + ) + self.presence_receiving_user_one_tok = self.login("presence_gobbler1", "monkey") + self.presence_receiving_user_two_id = self.register_user( + "presence_gobbler2", "monkey" + ) + self.presence_receiving_user_two_tok = self.login("presence_gobbler2", "monkey") + + # Have all three users send some presence updates + send_presence_update( + self, + self.other_user_id, + self.other_user_tok, + "online", + "I'm online!", + ) + send_presence_update( + self, + self.presence_receiving_user_one_id, + self.presence_receiving_user_one_tok, + "online", + "I'm also online!", + ) + send_presence_update( + self, + self.presence_receiving_user_two_id, + self.presence_receiving_user_two_tok, + "unavailable", + "I'm in a meeting!", + ) + + # Mark each presence-receiving user for receiving all user presence + self.get_success( + self.module_api.send_local_online_presence_to( + [ + self.presence_receiving_user_one_id, + self.presence_receiving_user_two_id, + ] + ) + ) + + # Perform a sync for each user + + # The other user should only receive their own presence + presence_updates, _ = sync_presence(self, self.other_user_id) + self.assertEqual(len(presence_updates), 1) + + presence_update = presence_updates[0] # type: UserPresenceState + self.assertEqual(presence_update.user_id, self.other_user_id) + self.assertEqual(presence_update.state, "online") + self.assertEqual(presence_update.status_msg, "I'm online!") + + # Whereas both presence receiving users should receive everyone's presence updates + presence_updates, _ = sync_presence(self, self.presence_receiving_user_one_id) + self.assertEqual(len(presence_updates), 3) + presence_updates, _ = sync_presence(self, self.presence_receiving_user_two_id) + self.assertEqual(len(presence_updates), 3) + + # Test that sending to a remote user works + remote_user_id = "@far_away_person:island" + + # Note that due to the remote user being in our module's + # users_who_should_receive_all_presence config, they would have + # received user presence updates already. + # + # Thus we reset the mock, and try sending all online local user + # presence again + self.hs.get_federation_transport_client().send_transaction.reset_mock() + + # Broadcast local user online presence + self.get_success( + self.module_api.send_local_online_presence_to([remote_user_id]) + ) + + # Check that the expected presence updates were sent + expected_users = [ + self.other_user_id, + self.presence_receiving_user_one_id, + self.presence_receiving_user_two_id, + ] + + calls = ( + self.hs.get_federation_transport_client().send_transaction.call_args_list + ) + for call in calls: + federation_transaction = call.args[0] # type: Transaction + + # Get the sent EDUs in this transaction + edus = federation_transaction.get_dict()["edus"] + + for edu in edus: + # Make sure we're only checking presence-type EDUs + if edu["edu_type"] != EduTypes.Presence: + continue + + # EDUs can contain multiple presence updates + for presence_update in edu["content"]["push"]: + # Check for presence updates that contain the user IDs we're after + expected_users.remove(presence_update["user_id"]) + + # Ensure that no offline states are being sent out + self.assertNotEqual(presence_update["presence"], "offline") + + self.assertEqual(len(expected_users), 0) + + +def send_presence_update( + testcase: TestCase, + user_id: str, + access_token: str, + presence_state: str, + status_message: Optional[str] = None, +) -> JsonDict: + # Build the presence body + body = {"presence": presence_state} + if status_message: + body["status_msg"] = status_message + + # Update the user's presence state + channel = testcase.make_request( + "PUT", "/presence/%s/status" % (user_id,), body, access_token=access_token + ) + testcase.assertEqual(channel.code, 200) + + return channel.json_body + + +def sync_presence( + testcase: TestCase, + user_id: str, + since_token: Optional[StreamToken] = None, +) -> Tuple[List[UserPresenceState], StreamToken]: + """Perform a sync request for the given user and return the user presence updates + they've received, as well as the next_batch token. + + This method assumes testcase.sync_handler points to the homeserver's sync handler. + + Args: + testcase: The testcase that is currently being run. + user_id: The ID of the user to generate a sync response for. + since_token: An optional token to indicate from at what point to sync from. + + Returns: + A tuple containing a list of presence updates, and the sync response's + next_batch token. + """ + requester = create_requester(user_id) + sync_config = generate_sync_config(requester.user.to_string()) + sync_result = testcase.get_success( + testcase.sync_handler.wait_for_sync_for_user( + requester, sync_config, since_token + ) + ) + + return sync_result.presence, sync_result.next_batch diff --git a/tests/handlers/test_sync.py b/tests/handlers/test_sync.py index e62586142e..8e950f25c5 100644 --- a/tests/handlers/test_sync.py +++ b/tests/handlers/test_sync.py @@ -37,7 +37,7 @@ class SyncTestCase(tests.unittest.HomeserverTestCase): def test_wait_for_sync_for_user_auth_blocking(self): user_id1 = "@user1:test" user_id2 = "@user2:test" - sync_config = self._generate_sync_config(user_id1) + sync_config = generate_sync_config(user_id1) requester = create_requester(user_id1) self.reactor.advance(100) # So we get not 0 time @@ -60,7 +60,7 @@ class SyncTestCase(tests.unittest.HomeserverTestCase): self.auth_blocking._hs_disabled = False - sync_config = self._generate_sync_config(user_id2) + sync_config = generate_sync_config(user_id2) requester = create_requester(user_id2) e = self.get_failure( @@ -69,11 +69,12 @@ class SyncTestCase(tests.unittest.HomeserverTestCase): ) self.assertEquals(e.value.errcode, Codes.RESOURCE_LIMIT_EXCEEDED) - def _generate_sync_config(self, user_id): - return SyncConfig( - user=UserID(user_id.split(":")[0][1:], user_id.split(":")[1]), - filter_collection=DEFAULT_FILTER_COLLECTION, - is_guest=False, - request_key="request_key", - device_id="device_id", - ) + +def generate_sync_config(user_id: str) -> SyncConfig: + return SyncConfig( + user=UserID(user_id.split(":")[0][1:], user_id.split(":")[1]), + filter_collection=DEFAULT_FILTER_COLLECTION, + is_guest=False, + request_key="request_key", + device_id="device_id", + ) diff --git a/tests/module_api/test_api.py b/tests/module_api/test_api.py index edacd1b566..1d1fceeecf 100644 --- a/tests/module_api/test_api.py +++ b/tests/module_api/test_api.py @@ -14,25 +14,37 @@ # limitations under the License. from mock import Mock +from synapse.api.constants import EduTypes from synapse.events import EventBase +from synapse.federation.units import Transaction +from synapse.handlers.presence import UserPresenceState from synapse.rest import admin -from synapse.rest.client.v1 import login, room +from synapse.rest.client.v1 import login, presence, room from synapse.types import create_requester -from tests.unittest import HomeserverTestCase +from tests.events.test_presence_router import send_presence_update, sync_presence +from tests.test_utils.event_injection import inject_member_event +from tests.unittest import FederatingHomeserverTestCase, override_config -class ModuleApiTestCase(HomeserverTestCase): +class ModuleApiTestCase(FederatingHomeserverTestCase): servlets = [ admin.register_servlets, login.register_servlets, room.register_servlets, + presence.register_servlets, ] def prepare(self, reactor, clock, homeserver): self.store = homeserver.get_datastore() self.module_api = homeserver.get_module_api() self.event_creation_handler = homeserver.get_event_creation_handler() + self.sync_handler = homeserver.get_sync_handler() + + def make_homeserver(self, reactor, clock): + return self.setup_test_homeserver( + federation_transport_client=Mock(spec=["send_transaction"]), + ) def test_can_register_user(self): """Tests that an external module can register a user""" @@ -205,3 +217,160 @@ class ModuleApiTestCase(HomeserverTestCase): ) ) self.assertFalse(is_in_public_rooms) + + # The ability to send federation is required by send_local_online_presence_to. + @override_config({"send_federation": True}) + def test_send_local_online_presence_to(self): + """Tests that send_local_presence_to_users sends local online presence to local users.""" + # Create a user who will send presence updates + self.presence_receiver_id = self.register_user("presence_receiver", "monkey") + self.presence_receiver_tok = self.login("presence_receiver", "monkey") + + # And another user that will send presence updates out + self.presence_sender_id = self.register_user("presence_sender", "monkey") + self.presence_sender_tok = self.login("presence_sender", "monkey") + + # Put them in a room together so they will receive each other's presence updates + room_id = self.helper.create_room_as( + self.presence_receiver_id, + tok=self.presence_receiver_tok, + ) + self.helper.join(room_id, self.presence_sender_id, tok=self.presence_sender_tok) + + # Presence sender comes online + send_presence_update( + self, + self.presence_sender_id, + self.presence_sender_tok, + "online", + "I'm online!", + ) + + # Presence receiver should have received it + presence_updates, sync_token = sync_presence(self, self.presence_receiver_id) + self.assertEqual(len(presence_updates), 1) + + presence_update = presence_updates[0] # type: UserPresenceState + self.assertEqual(presence_update.user_id, self.presence_sender_id) + self.assertEqual(presence_update.state, "online") + + # Syncing again should result in no presence updates + presence_updates, sync_token = sync_presence( + self, self.presence_receiver_id, sync_token + ) + self.assertEqual(len(presence_updates), 0) + + # Trigger sending local online presence + self.get_success( + self.module_api.send_local_online_presence_to( + [ + self.presence_receiver_id, + ] + ) + ) + + # Presence receiver should have received online presence again + presence_updates, sync_token = sync_presence( + self, self.presence_receiver_id, sync_token + ) + self.assertEqual(len(presence_updates), 1) + + presence_update = presence_updates[0] # type: UserPresenceState + self.assertEqual(presence_update.user_id, self.presence_sender_id) + self.assertEqual(presence_update.state, "online") + + # Presence sender goes offline + send_presence_update( + self, + self.presence_sender_id, + self.presence_sender_tok, + "offline", + "I slink back into the darkness.", + ) + + # Trigger sending local online presence + self.get_success( + self.module_api.send_local_online_presence_to( + [ + self.presence_receiver_id, + ] + ) + ) + + # Presence receiver should *not* have received offline state + presence_updates, sync_token = sync_presence( + self, self.presence_receiver_id, sync_token + ) + self.assertEqual(len(presence_updates), 0) + + @override_config({"send_federation": True}) + def test_send_local_online_presence_to_federation(self): + """Tests that send_local_presence_to_users sends local online presence to remote users.""" + # Create a user who will send presence updates + self.presence_sender_id = self.register_user("presence_sender", "monkey") + self.presence_sender_tok = self.login("presence_sender", "monkey") + + # And a room they're a part of + room_id = self.helper.create_room_as( + self.presence_sender_id, + tok=self.presence_sender_tok, + ) + + # Mark them as online + send_presence_update( + self, + self.presence_sender_id, + self.presence_sender_tok, + "online", + "I'm online!", + ) + + # Make up a remote user to send presence to + remote_user_id = "@far_away_person:island" + + # Create a join membership event for the remote user into the room. + # This allows presence information to flow from one user to the other. + self.get_success( + inject_member_event( + self.hs, + room_id, + sender=remote_user_id, + target=remote_user_id, + membership="join", + ) + ) + + # The remote user would have received the existing room members' presence + # when they joined the room. + # + # Thus we reset the mock, and try sending online local user + # presence again + self.hs.get_federation_transport_client().send_transaction.reset_mock() + + # Broadcast local user online presence + self.get_success( + self.module_api.send_local_online_presence_to([remote_user_id]) + ) + + # Check that a presence update was sent as part of a federation transaction + found_update = False + calls = ( + self.hs.get_federation_transport_client().send_transaction.call_args_list + ) + for call in calls: + federation_transaction = call.args[0] # type: Transaction + + # Get the sent EDUs in this transaction + edus = federation_transaction.get_dict()["edus"] + + for edu in edus: + # Make sure we're only checking presence-type EDUs + if edu["edu_type"] != EduTypes.Presence: + continue + + # EDUs can contain multiple presence updates + for presence_update in edu["content"]["push"]: + if presence_update["user_id"] == self.presence_sender_id: + found_update = True + + self.assertTrue(found_update)