diff --git a/changelog.d/7146.misc b/changelog.d/7146.misc new file mode 100644 index 0000000000..facde06959 --- /dev/null +++ b/changelog.d/7146.misc @@ -0,0 +1 @@ +Run replication streamers on workers. diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py index 2a56fe0bd5..d125327f08 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py @@ -960,17 +960,22 @@ def start(config_options): synapse.events.USE_FROZEN_DICTS = config.use_frozen_dicts - ss = GenericWorkerServer( + hs = GenericWorkerServer( config.server_name, config=config, version_string="Synapse/" + get_version_string(synapse), ) - setup_logging(ss, config, use_worker_options=True) + setup_logging(hs, config, use_worker_options=True) + + hs.setup() + + # Ensure the replication streamer is always started in case we write to any + # streams. Will no-op if no streams can be written to by this worker. + hs.get_replication_streamer() - ss.setup() reactor.addSystemEventTrigger( - "before", "startup", _base.start, ss, config.worker_listeners + "before", "startup", _base.start, hs, config.worker_listeners ) _base.start_worker_reactor("synapse-generic-worker", config) diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py index b2d6baa2a2..33d2f589ac 100644 --- a/synapse/replication/tcp/resource.py +++ b/synapse/replication/tcp/resource.py @@ -17,9 +17,7 @@ import logging import random -from typing import Dict - -from six import itervalues +from typing import Dict, List from prometheus_client import Counter @@ -71,29 +69,28 @@ class ReplicationStreamer(object): def __init__(self, hs): self.store = hs.get_datastore() - self.presence_handler = hs.get_presence_handler() self.clock = hs.get_clock() self.notifier = hs.get_notifier() - self._server_notices_sender = hs.get_server_notices_sender() self._replication_torture_level = hs.config.replication_torture_level - # List of streams that clients can subscribe to. - # We only support federation stream if federation sending hase been - # disabled on the master. - self.streams = [ - stream(hs) - for stream in itervalues(STREAMS_MAP) - if stream != FederationStream or not hs.config.send_federation - ] + # Work out list of streams that this instance is the source of. + self.streams = [] # type: List[Stream] + if hs.config.worker_app is None: + for stream in STREAMS_MAP.values(): + if stream == FederationStream and hs.config.send_federation: + # We only support federation stream if federation sending + # hase been disabled on the master. + continue + + self.streams.append(stream(hs)) self.streams_by_name = {stream.NAME: stream for stream in self.streams} - self.federation_sender = None - if not hs.config.send_federation: - self.federation_sender = hs.get_federation_sender() - - self.notifier.add_replication_callback(self.on_notifier_poke) + # Only bother registering the notifier callback if we have streams to + # publish. + if self.streams: + self.notifier.add_replication_callback(self.on_notifier_poke) # Keeps track of whether we are currently checking for updates self.is_looping = False