Fixups to new push stream (#17038)

Follow on from #17037
This commit is contained in:
Erik Johnston 2024-03-28 16:29:23 +00:00 committed by GitHub
parent ea6bfae0fc
commit fd48fc4585
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
9 changed files with 27 additions and 12 deletions

View file

@ -1 +1 @@
Add support for moving `/push_rules` off of main process. Add support for moving `/pushrules` off of main process.

View file

@ -0,0 +1 @@
Add support for moving `/pushrules` off of main process.

View file

@ -310,6 +310,13 @@ WORKERS_CONFIG: Dict[str, Dict[str, Any]] = {
"shared_extra_conf": {}, "shared_extra_conf": {},
"worker_extra_conf": "", "worker_extra_conf": "",
}, },
"push_rules": {
"app": "synapse.app.generic_worker",
"listener_resources": ["client", "replication"],
"endpoint_patterns": ["^/_matrix/client/(api/v1|r0|v3|unstable)/pushrules/"],
"shared_extra_conf": {},
"worker_extra_conf": "",
},
} }
# Templates for sections that may be inserted multiple times in config files # Templates for sections that may be inserted multiple times in config files
@ -401,6 +408,7 @@ def add_worker_roles_to_shared_config(
"receipts", "receipts",
"to_device", "to_device",
"typing", "typing",
"push_rules",
] ]
# Worker-type specific sharding config. Now a single worker can fulfill multiple # Worker-type specific sharding config. Now a single worker can fulfill multiple

View file

@ -532,12 +532,12 @@ the stream writer for the `presence` stream:
^/_matrix/client/(api/v1|r0|v3|unstable)/presence/ ^/_matrix/client/(api/v1|r0|v3|unstable)/presence/
##### The `push` stream ##### The `push_rules` stream
The following endpoints should be routed directly to the worker configured as The following endpoints should be routed directly to the worker configured as
the stream writer for the `push` stream: the stream writer for the `push` stream:
^/_matrix/client/(api/v1|r0|v3|unstable)/push_rules/ ^/_matrix/client/(api/v1|r0|v3|unstable)/pushrules/
#### Restrict outbound federation traffic to a specific set of workers #### Restrict outbound federation traffic to a specific set of workers

View file

@ -156,7 +156,7 @@ class WriterLocations:
can only be a single instance. can only be a single instance.
presence: The instances that write to the presence stream. Currently presence: The instances that write to the presence stream. Currently
can only be a single instance. can only be a single instance.
push: The instances that write to the push stream. Currently push_rules: The instances that write to the push stream. Currently
can only be a single instance. can only be a single instance.
""" """
@ -184,7 +184,7 @@ class WriterLocations:
default=["master"], default=["master"],
converter=_instance_to_list_converter, converter=_instance_to_list_converter,
) )
push: List[str] = attr.ib( push_rules: List[str] = attr.ib(
default=["master"], default=["master"],
converter=_instance_to_list_converter, converter=_instance_to_list_converter,
) )
@ -347,7 +347,7 @@ class WorkerConfig(Config):
"account_data", "account_data",
"receipts", "receipts",
"presence", "presence",
"push", "push_rules",
): ):
instances = _instance_to_list_converter(getattr(self.writers, stream)) instances = _instance_to_list_converter(getattr(self.writers, stream))
for instance in instances: for instance in instances:
@ -385,7 +385,7 @@ class WorkerConfig(Config):
"Must only specify one instance to handle `presence` messages." "Must only specify one instance to handle `presence` messages."
) )
if len(self.writers.push) != 1: if len(self.writers.push_rules) != 1:
raise ConfigError( raise ConfigError(
"Must only specify one instance to handle `push` messages." "Must only specify one instance to handle `push` messages."
) )

View file

@ -182,8 +182,10 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
hs.config.server.forgotten_room_retention_period hs.config.server.forgotten_room_retention_period
) )
self._is_push_writer = hs.get_instance_name() in hs.config.worker.writers.push self._is_push_writer = (
self._push_writer = hs.config.worker.writers.push[0] hs.get_instance_name() in hs.config.worker.writers.push_rules
)
self._push_writer = hs.config.worker.writers.push_rules[0]
self._copy_push_client = ReplicationCopyPusherRestServlet.make_client(hs) self._copy_push_client = ReplicationCopyPusherRestServlet.make_client(hs)
def _on_user_joined_room(self, event_id: str, room_id: str) -> None: def _on_user_joined_room(self, event_id: str, room_id: str) -> None:

View file

@ -180,7 +180,7 @@ class ReplicationCommandHandler:
continue continue
if isinstance(stream, PushRulesStream): if isinstance(stream, PushRulesStream):
if hs.get_instance_name() in hs.config.worker.writers.push: if hs.get_instance_name() in hs.config.worker.writers.push_rules:
self._streams_to_replicate.append(stream) self._streams_to_replicate.append(stream)
continue continue

View file

@ -59,7 +59,9 @@ class PushRuleRestServlet(RestServlet):
self.auth = hs.get_auth() self.auth = hs.get_auth()
self.store = hs.get_datastores().main self.store = hs.get_datastores().main
self.notifier = hs.get_notifier() self.notifier = hs.get_notifier()
self._is_push_worker = hs.get_instance_name() in hs.config.worker.writers.push self._is_push_worker = (
hs.get_instance_name() in hs.config.worker.writers.push_rules
)
self._push_rules_handler = hs.get_push_rules_handler() self._push_rules_handler = hs.get_push_rules_handler()
self._push_rule_linearizer = Linearizer(name="push_rules") self._push_rule_linearizer = Linearizer(name="push_rules")

View file

@ -136,7 +136,9 @@ class PushRulesWorkerStore(
): ):
super().__init__(database, db_conn, hs) super().__init__(database, db_conn, hs)
self._is_push_writer = hs.get_instance_name() in hs.config.worker.writers.push self._is_push_writer = (
hs.get_instance_name() in hs.config.worker.writers.push_rules
)
# In the worker store this is an ID tracker which we overwrite in the non-worker # In the worker store this is an ID tracker which we overwrite in the non-worker
# class below that is used on the main process. # class below that is used on the main process.