Move stream_writers to their own field in the WorkerTemplate

This commit is contained in:
Olivier Wilkinson (reivilibre) 2024-01-17 14:39:57 +00:00
parent 2ff1de3b3c
commit 29541fd994

View file

@ -103,6 +103,8 @@ class WorkerTemplate:
shared_extra_conf: Callable[[str], Dict[str, Any]] = lambda _worker_name: {}
worker_extra_conf: str = ""
stream_writers: Set[str] = field(default_factory=set)
# True if and only if multiple of this worker type are allowed.
sharding_allowed: bool = True
@ -226,9 +228,7 @@ WORKERS_CONFIG: Dict[str, WorkerTemplate] = {
),
"event_persister": WorkerTemplate(
listener_resources={"replication"},
shared_extra_conf=lambda worker_name: {
"stream_writers": {"events": [worker_name]}
},
stream_writers={"events"},
),
"background_worker": WorkerTemplate(
# This worker cannot be sharded. Therefore, there should only ever be one
@ -257,17 +257,13 @@ WORKERS_CONFIG: Dict[str, WorkerTemplate] = {
"^/_matrix/client/(r0|v3|unstable)/.*/tags",
"^/_matrix/client/(r0|v3|unstable)/.*/account_data",
},
shared_extra_conf=lambda worker_name: {
"stream_writers": {"account_data": [worker_name]}
},
stream_writers={"account_data"},
sharding_allowed=False,
),
"presence": WorkerTemplate(
listener_resources={"client", "replication"},
endpoint_patterns={"^/_matrix/client/(api/v1|r0|v3|unstable)/presence/"},
shared_extra_conf=lambda worker_name: {
"stream_writers": {"presence": [worker_name]}
},
stream_writers={"presence"},
sharding_allowed=False,
),
"receipts": WorkerTemplate(
@ -276,25 +272,19 @@ WORKERS_CONFIG: Dict[str, WorkerTemplate] = {
"^/_matrix/client/(r0|v3|unstable)/rooms/.*/receipt",
"^/_matrix/client/(r0|v3|unstable)/rooms/.*/read_markers",
},
shared_extra_conf=lambda worker_name: {
"stream_writers": {"receipts": [worker_name]}
},
stream_writers={"receipts"},
sharding_allowed=False,
),
"to_device": WorkerTemplate(
listener_resources={"client", "replication"},
endpoint_patterns={"^/_matrix/client/(r0|v3|unstable)/sendToDevice/"},
shared_extra_conf=lambda worker_name: {
"stream_writers": {"to_device": [worker_name]}
},
stream_writers={"to_device"},
sharding_allowed=False,
),
"typing": WorkerTemplate(
listener_resources={"client", "replication"},
endpoint_patterns={"^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/typing"},
shared_extra_conf=lambda worker_name: {
"stream_writers": {"typing": [worker_name]}
},
stream_writers={"typing"},
sharding_allowed=False,
),
}
@ -447,6 +437,8 @@ def merge_worker_template_configs(
# (This is unused, but in principle sharding this hybrid worker type
# would be allowed if both constituent types are shardable)
sharding_allowed=left.sharding_allowed and right.sharding_allowed,
# include stream writers from both
stream_writers=left.stream_writers | right.stream_writers,
)
@ -462,7 +454,10 @@ def instantiate_worker_template(
Returns: worker configuration dictionary
"""
worker_config_dict = dataclasses.asdict(template)
worker_config_dict["shared_extra_conf"] = template.shared_extra_conf(worker_name)
stream_writers_dict = {
writer: worker_name for writer in template.stream_writers
}
worker_config_dict["shared_extra_conf"] = merged(template.shared_extra_conf(worker_name), stream_writers_dict)
worker_config_dict["endpoint_patterns"] = sorted(template.endpoint_patterns)
worker_config_dict["listener_resources"] = sorted(template.listener_resources)
return worker_config_dict