mirror of
https://github.com/element-hq/synapse
synced 2024-06-30 11:23:34 +00:00
Compare commits
16 commits
51864fc7b1
...
4b87b2c336
Author | SHA1 | Date | |
---|---|---|---|
|
4b87b2c336 | ||
|
adeedb7b7c | ||
|
7c5fb13f7b | ||
|
f8d57ce656 | ||
|
13ed84c573 | ||
|
57c5347559 | ||
|
d05aa3026b | ||
|
8666802f99 | ||
|
939d5e4ce1 | ||
|
ce7b1d3a21 | ||
|
a45f1be28c | ||
|
657c68e470 | ||
|
590624fc98 | ||
|
bc42a73e99 | ||
|
d84b438fc6 | ||
|
2813522704 |
1
changelog.d/17333.misc
Normal file
1
changelog.d/17333.misc
Normal file
|
@ -0,0 +1 @@
|
|||
Handle device lists notifications for large accounts more efficiently in worker mode.
|
52
poetry.lock
generated
52
poetry.lock
generated
|
@ -35,13 +35,13 @@ tests-no-zope = ["attrs[tests-mypy]", "cloudpickle", "hypothesis", "pympler", "p
|
|||
|
||||
[[package]]
|
||||
name = "authlib"
|
||||
version = "1.3.0"
|
||||
version = "1.3.1"
|
||||
description = "The ultimate Python library in building OAuth and OpenID Connect servers and clients."
|
||||
optional = true
|
||||
python-versions = ">=3.8"
|
||||
files = [
|
||||
{file = "Authlib-1.3.0-py2.py3-none-any.whl", hash = "sha256:9637e4de1fb498310a56900b3e2043a206b03cb11c05422014b0302cbc814be3"},
|
||||
{file = "Authlib-1.3.0.tar.gz", hash = "sha256:959ea62a5b7b5123c5059758296122b57cd2585ae2ed1c0622c21b371ffdae06"},
|
||||
{file = "Authlib-1.3.1-py2.py3-none-any.whl", hash = "sha256:d35800b973099bbadc49b42b256ecb80041ad56b7fe1216a362c7943c088f377"},
|
||||
{file = "authlib-1.3.1.tar.gz", hash = "sha256:7ae843f03c06c5c0debd63c9db91f9fda64fa62a42a77419fa15fbb7e7a58917"},
|
||||
]
|
||||
|
||||
[package.dependencies]
|
||||
|
@ -2157,13 +2157,13 @@ rpds-py = ">=0.7.0"
|
|||
|
||||
[[package]]
|
||||
name = "requests"
|
||||
version = "2.31.0"
|
||||
version = "2.32.2"
|
||||
description = "Python HTTP for Humans."
|
||||
optional = false
|
||||
python-versions = ">=3.7"
|
||||
python-versions = ">=3.8"
|
||||
files = [
|
||||
{file = "requests-2.31.0-py3-none-any.whl", hash = "sha256:58cd2187c01e70e6e26505bca751777aa9f2ee0b7f4300988b709f44e013003f"},
|
||||
{file = "requests-2.31.0.tar.gz", hash = "sha256:942c5a758f98d790eaed1a29cb6eefc7ffb0d1cf7af05c3d2791656dbd6ad1e1"},
|
||||
{file = "requests-2.32.2-py3-none-any.whl", hash = "sha256:fc06670dd0ed212426dfeb94fc1b983d917c4f9847c863f313c9dfaaffb7c23c"},
|
||||
{file = "requests-2.32.2.tar.gz", hash = "sha256:dd951ff5ecf3e3b3aa26b40703ba77495dab41da839ae72ef3c8e5d8e2433289"},
|
||||
]
|
||||
|
||||
[package.dependencies]
|
||||
|
@ -2598,22 +2598,22 @@ files = [
|
|||
|
||||
[[package]]
|
||||
name = "tornado"
|
||||
version = "6.4"
|
||||
version = "6.4.1"
|
||||
description = "Tornado is a Python web framework and asynchronous networking library, originally developed at FriendFeed."
|
||||
optional = true
|
||||
python-versions = ">= 3.8"
|
||||
optional = false
|
||||
python-versions = ">=3.8"
|
||||
files = [
|
||||
{file = "tornado-6.4-cp38-abi3-macosx_10_9_universal2.whl", hash = "sha256:02ccefc7d8211e5a7f9e8bc3f9e5b0ad6262ba2fbb683a6443ecc804e5224ce0"},
|
||||
{file = "tornado-6.4-cp38-abi3-macosx_10_9_x86_64.whl", hash = "sha256:27787de946a9cffd63ce5814c33f734c627a87072ec7eed71f7fc4417bb16263"},
|
||||
{file = "tornado-6.4-cp38-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:f7894c581ecdcf91666a0912f18ce5e757213999e183ebfc2c3fdbf4d5bd764e"},
|
||||
{file = "tornado-6.4-cp38-abi3-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:e43bc2e5370a6a8e413e1e1cd0c91bedc5bd62a74a532371042a18ef19e10579"},
|
||||
{file = "tornado-6.4-cp38-abi3-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:f0251554cdd50b4b44362f73ad5ba7126fc5b2c2895cc62b14a1c2d7ea32f212"},
|
||||
{file = "tornado-6.4-cp38-abi3-musllinux_1_1_aarch64.whl", hash = "sha256:fd03192e287fbd0899dd8f81c6fb9cbbc69194d2074b38f384cb6fa72b80e9c2"},
|
||||
{file = "tornado-6.4-cp38-abi3-musllinux_1_1_i686.whl", hash = "sha256:88b84956273fbd73420e6d4b8d5ccbe913c65d31351b4c004ae362eba06e1f78"},
|
||||
{file = "tornado-6.4-cp38-abi3-musllinux_1_1_x86_64.whl", hash = "sha256:71ddfc23a0e03ef2df1c1397d859868d158c8276a0603b96cf86892bff58149f"},
|
||||
{file = "tornado-6.4-cp38-abi3-win32.whl", hash = "sha256:6f8a6c77900f5ae93d8b4ae1196472d0ccc2775cc1dfdc9e7727889145c45052"},
|
||||
{file = "tornado-6.4-cp38-abi3-win_amd64.whl", hash = "sha256:10aeaa8006333433da48dec9fe417877f8bcc21f48dda8d661ae79da357b2a63"},
|
||||
{file = "tornado-6.4.tar.gz", hash = "sha256:72291fa6e6bc84e626589f1c29d90a5a6d593ef5ae68052ee2ef000dfd273dee"},
|
||||
{file = "tornado-6.4.1-cp38-abi3-macosx_10_9_universal2.whl", hash = "sha256:163b0aafc8e23d8cdc3c9dfb24c5368af84a81e3364745ccb4427669bf84aec8"},
|
||||
{file = "tornado-6.4.1-cp38-abi3-macosx_10_9_x86_64.whl", hash = "sha256:6d5ce3437e18a2b66fbadb183c1d3364fb03f2be71299e7d10dbeeb69f4b2a14"},
|
||||
{file = "tornado-6.4.1-cp38-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:e2e20b9113cd7293f164dc46fffb13535266e713cdb87bd2d15ddb336e96cfc4"},
|
||||
{file = "tornado-6.4.1-cp38-abi3-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:8ae50a504a740365267b2a8d1a90c9fbc86b780a39170feca9bcc1787ff80842"},
|
||||
{file = "tornado-6.4.1-cp38-abi3-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:613bf4ddf5c7a95509218b149b555621497a6cc0d46ac341b30bd9ec19eac7f3"},
|
||||
{file = "tornado-6.4.1-cp38-abi3-musllinux_1_2_aarch64.whl", hash = "sha256:25486eb223babe3eed4b8aecbac33b37e3dd6d776bc730ca14e1bf93888b979f"},
|
||||
{file = "tornado-6.4.1-cp38-abi3-musllinux_1_2_i686.whl", hash = "sha256:454db8a7ecfcf2ff6042dde58404164d969b6f5d58b926da15e6b23817950fc4"},
|
||||
{file = "tornado-6.4.1-cp38-abi3-musllinux_1_2_x86_64.whl", hash = "sha256:a02a08cc7a9314b006f653ce40483b9b3c12cda222d6a46d4ac63bb6c9057698"},
|
||||
{file = "tornado-6.4.1-cp38-abi3-win32.whl", hash = "sha256:d9a566c40b89757c9aa8e6f032bcdb8ca8795d7c1a9762910c722b1635c9de4d"},
|
||||
{file = "tornado-6.4.1-cp38-abi3-win_amd64.whl", hash = "sha256:b24b8982ed444378d7f21d563f4180a2de31ced9d8d84443907a0a64da2072e7"},
|
||||
{file = "tornado-6.4.1.tar.gz", hash = "sha256:92d3ab53183d8c50f8204a51e6f91d18a15d5ef261e84d452800d4ff6fc504e9"},
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
@ -2939,18 +2939,18 @@ files = [
|
|||
|
||||
[[package]]
|
||||
name = "urllib3"
|
||||
version = "2.0.7"
|
||||
version = "2.2.2"
|
||||
description = "HTTP library with thread-safe connection pooling, file post, and more."
|
||||
optional = false
|
||||
python-versions = ">=3.7"
|
||||
python-versions = ">=3.8"
|
||||
files = [
|
||||
{file = "urllib3-2.0.7-py3-none-any.whl", hash = "sha256:fdb6d215c776278489906c2f8916e6e7d4f5a9b602ccbcfdf7f016fc8da0596e"},
|
||||
{file = "urllib3-2.0.7.tar.gz", hash = "sha256:c97dfde1f7bd43a71c8d2a58e369e9b2bf692d1334ea9f9cae55add7d0dd0f84"},
|
||||
{file = "urllib3-2.2.2-py3-none-any.whl", hash = "sha256:a448b2f64d686155468037e1ace9f2d2199776e17f0a46610480d311f73e3472"},
|
||||
{file = "urllib3-2.2.2.tar.gz", hash = "sha256:dd505485549a7a552833da5e6063639d0d177c04f23bc3864e41e5dc5f612168"},
|
||||
]
|
||||
|
||||
[package.extras]
|
||||
brotli = ["brotli (>=1.0.9)", "brotlicffi (>=0.8.0)"]
|
||||
secure = ["certifi", "cryptography (>=1.9)", "idna (>=2.0.0)", "pyopenssl (>=17.1.0)", "urllib3-secure-extra"]
|
||||
h2 = ["h2 (>=4,<5)"]
|
||||
socks = ["pysocks (>=1.5.6,!=1.5.7,<2.0)"]
|
||||
zstd = ["zstandard (>=0.18.0)"]
|
||||
|
||||
|
|
|
@ -114,13 +114,19 @@ class ReplicationDataHandler:
|
|||
"""
|
||||
all_room_ids: Set[str] = set()
|
||||
if stream_name == DeviceListsStream.NAME:
|
||||
if any(row.entity.startswith("@") and not row.is_signature for row in rows):
|
||||
if any(not row.is_signature and not row.hosts_calculated for row in rows):
|
||||
prev_token = self.store.get_device_stream_token()
|
||||
all_room_ids = await self.store.get_all_device_list_changes(
|
||||
prev_token, token
|
||||
)
|
||||
self.store.device_lists_in_rooms_have_changed(all_room_ids, token)
|
||||
|
||||
# If we're sending federation we need to update the device lists
|
||||
# outbound pokes stream change cache with updated hosts.
|
||||
if self.send_handler and any(row.hosts_calculated for row in rows):
|
||||
hosts = await self.store.get_destinations_for_device(token)
|
||||
self.store.device_lists_outbound_pokes_have_changed(hosts, token)
|
||||
|
||||
self.store.process_replication_rows(stream_name, instance_name, token, rows)
|
||||
# NOTE: this must be called after process_replication_rows to ensure any
|
||||
# cache invalidations are first handled before any stream ID advances.
|
||||
|
@ -433,12 +439,11 @@ class FederationSenderHandler:
|
|||
# The entities are either user IDs (starting with '@') whose devices
|
||||
# have changed, or remote servers that we need to tell about
|
||||
# changes.
|
||||
hosts = {
|
||||
row.entity
|
||||
for row in rows
|
||||
if not row.entity.startswith("@") and not row.is_signature
|
||||
}
|
||||
await self.federation_sender.send_device_messages(hosts, immediate=False)
|
||||
if any(row.hosts_calculated for row in rows):
|
||||
hosts = await self.store.get_destinations_for_device(token)
|
||||
await self.federation_sender.send_device_messages(
|
||||
hosts, immediate=False
|
||||
)
|
||||
|
||||
elif stream_name == ToDeviceStream.NAME:
|
||||
# The to_device stream includes stuff to be pushed to both local
|
||||
|
|
|
@ -549,10 +549,14 @@ class DeviceListsStream(_StreamFromIdGen):
|
|||
|
||||
@attr.s(slots=True, frozen=True, auto_attribs=True)
|
||||
class DeviceListsStreamRow:
|
||||
entity: str
|
||||
user_id: str
|
||||
# Indicates that a user has signed their own device with their user-signing key
|
||||
is_signature: bool
|
||||
|
||||
# Indicates if this is a notification that we've calculated the hosts we
|
||||
# need to send the update to.
|
||||
hosts_calculated: bool
|
||||
|
||||
NAME = "device_lists"
|
||||
ROW_TYPE = DeviceListsStreamRow
|
||||
|
||||
|
@ -594,13 +598,13 @@ class DeviceListsStream(_StreamFromIdGen):
|
|||
upper_limit_token = min(upper_limit_token, signatures_to_token)
|
||||
|
||||
device_updates = [
|
||||
(stream_id, (entity, False))
|
||||
for stream_id, (entity,) in device_updates
|
||||
(stream_id, (entity, False, hosts))
|
||||
for stream_id, (entity, hosts) in device_updates
|
||||
if stream_id <= upper_limit_token
|
||||
]
|
||||
|
||||
signatures_updates = [
|
||||
(stream_id, (entity, True))
|
||||
(stream_id, (entity, True, False))
|
||||
for stream_id, (entity,) in signatures_updates
|
||||
if stream_id <= upper_limit_token
|
||||
]
|
||||
|
|
|
@ -164,22 +164,24 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore):
|
|||
prefilled_cache=user_signature_stream_prefill,
|
||||
)
|
||||
|
||||
(
|
||||
device_list_federation_prefill,
|
||||
device_list_federation_list_id,
|
||||
) = self.db_pool.get_cache_dict(
|
||||
db_conn,
|
||||
"device_lists_outbound_pokes",
|
||||
entity_column="destination",
|
||||
stream_column="stream_id",
|
||||
max_value=device_list_max,
|
||||
limit=10000,
|
||||
)
|
||||
self._device_list_federation_stream_cache = StreamChangeCache(
|
||||
"DeviceListFederationStreamChangeCache",
|
||||
device_list_federation_list_id,
|
||||
prefilled_cache=device_list_federation_prefill,
|
||||
)
|
||||
self._device_list_federation_stream_cache = None
|
||||
if hs.should_send_federation():
|
||||
(
|
||||
device_list_federation_prefill,
|
||||
device_list_federation_list_id,
|
||||
) = self.db_pool.get_cache_dict(
|
||||
db_conn,
|
||||
"device_lists_outbound_pokes",
|
||||
entity_column="destination",
|
||||
stream_column="stream_id",
|
||||
max_value=device_list_max,
|
||||
limit=10000,
|
||||
)
|
||||
self._device_list_federation_stream_cache = StreamChangeCache(
|
||||
"DeviceListFederationStreamChangeCache",
|
||||
device_list_federation_list_id,
|
||||
prefilled_cache=device_list_federation_prefill,
|
||||
)
|
||||
|
||||
if hs.config.worker.run_background_tasks:
|
||||
self._clock.looping_call(
|
||||
|
@ -207,23 +209,30 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore):
|
|||
) -> None:
|
||||
for row in rows:
|
||||
if row.is_signature:
|
||||
self._user_signature_stream_cache.entity_has_changed(row.entity, token)
|
||||
self._user_signature_stream_cache.entity_has_changed(row.user_id, token)
|
||||
continue
|
||||
|
||||
# The entities are either user IDs (starting with '@') whose devices
|
||||
# have changed, or remote servers that we need to tell about
|
||||
# changes.
|
||||
if row.entity.startswith("@"):
|
||||
self._device_list_stream_cache.entity_has_changed(row.entity, token)
|
||||
self.get_cached_devices_for_user.invalidate((row.entity,))
|
||||
self._get_cached_user_device.invalidate((row.entity,))
|
||||
self.get_device_list_last_stream_id_for_remote.invalidate((row.entity,))
|
||||
|
||||
else:
|
||||
self._device_list_federation_stream_cache.entity_has_changed(
|
||||
row.entity, token
|
||||
if not row.hosts_calculated:
|
||||
self._device_list_stream_cache.entity_has_changed(row.user_id, token)
|
||||
self.get_cached_devices_for_user.invalidate((row.user_id,))
|
||||
self._get_cached_user_device.invalidate((row.user_id,))
|
||||
self.get_device_list_last_stream_id_for_remote.invalidate(
|
||||
(row.user_id,)
|
||||
)
|
||||
|
||||
def device_lists_outbound_pokes_have_changed(
|
||||
self, destinations: StrCollection, token: int
|
||||
) -> None:
|
||||
assert self._device_list_federation_stream_cache is not None
|
||||
|
||||
for destination in destinations:
|
||||
self._device_list_federation_stream_cache.entity_has_changed(
|
||||
destination, token
|
||||
)
|
||||
|
||||
def device_lists_in_rooms_have_changed(
|
||||
self, room_ids: StrCollection, token: int
|
||||
) -> None:
|
||||
|
@ -363,6 +372,11 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore):
|
|||
EDU contents.
|
||||
"""
|
||||
now_stream_id = self.get_device_stream_token()
|
||||
if from_stream_id == now_stream_id:
|
||||
return now_stream_id, []
|
||||
|
||||
if self._device_list_federation_stream_cache is None:
|
||||
raise Exception("Func can only be used on federation senders")
|
||||
|
||||
has_changed = self._device_list_federation_stream_cache.has_entity_changed(
|
||||
destination, int(from_stream_id)
|
||||
|
@ -1018,10 +1032,10 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore):
|
|||
# This query Does The Right Thing where it'll correctly apply the
|
||||
# bounds to the inner queries.
|
||||
sql = """
|
||||
SELECT stream_id, entity FROM (
|
||||
SELECT stream_id, user_id AS entity FROM device_lists_stream
|
||||
SELECT stream_id, user_id, hosts FROM (
|
||||
SELECT stream_id, user_id, false AS hosts FROM device_lists_stream
|
||||
UNION ALL
|
||||
SELECT stream_id, destination AS entity FROM device_lists_outbound_pokes
|
||||
SELECT DISTINCT stream_id, user_id, true AS hosts FROM device_lists_outbound_pokes
|
||||
) AS e
|
||||
WHERE ? < stream_id AND stream_id <= ?
|
||||
ORDER BY stream_id ASC
|
||||
|
@ -1577,6 +1591,14 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore):
|
|||
get_device_list_changes_in_room_txn,
|
||||
)
|
||||
|
||||
async def get_destinations_for_device(self, stream_id: int) -> StrCollection:
|
||||
return await self.db_pool.simple_select_onecol(
|
||||
table="device_lists_outbound_pokes",
|
||||
keyvalues={"stream_id": stream_id},
|
||||
retcol="destination",
|
||||
desc="get_destinations_for_device",
|
||||
)
|
||||
|
||||
|
||||
class DeviceBackgroundUpdateStore(SQLBaseStore):
|
||||
def __init__(
|
||||
|
@ -2112,12 +2134,13 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
|
|||
stream_ids: List[int],
|
||||
context: Optional[Dict[str, str]],
|
||||
) -> None:
|
||||
for host in hosts:
|
||||
txn.call_after(
|
||||
self._device_list_federation_stream_cache.entity_has_changed,
|
||||
host,
|
||||
stream_ids[-1],
|
||||
)
|
||||
if self._device_list_federation_stream_cache:
|
||||
for host in hosts:
|
||||
txn.call_after(
|
||||
self._device_list_federation_stream_cache.entity_has_changed,
|
||||
host,
|
||||
stream_ids[-1],
|
||||
)
|
||||
|
||||
now = self._clock.time_msec()
|
||||
stream_id_iterator = iter(stream_ids)
|
||||
|
|
|
@ -123,9 +123,9 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorker
|
|||
if stream_name == DeviceListsStream.NAME:
|
||||
for row in rows:
|
||||
assert isinstance(row, DeviceListsStream.DeviceListsStreamRow)
|
||||
if row.entity.startswith("@"):
|
||||
if not row.hosts_calculated:
|
||||
self._get_e2e_device_keys_for_federation_query_inner.invalidate(
|
||||
(row.entity,)
|
||||
(row.user_id,)
|
||||
)
|
||||
|
||||
super().process_replication_rows(stream_name, instance_name, token, rows)
|
||||
|
|
|
@ -36,6 +36,14 @@ class DeviceStoreTestCase(HomeserverTestCase):
|
|||
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
|
||||
self.store = hs.get_datastores().main
|
||||
|
||||
def default_config(self) -> JsonDict:
|
||||
config = super().default_config()
|
||||
|
||||
# We 'enable' federation otherwise `get_device_updates_by_remote` will
|
||||
# throw an exception.
|
||||
config["federation_sender_instances"] = ["master"]
|
||||
return config
|
||||
|
||||
def add_device_change(self, user_id: str, device_ids: List[str], host: str) -> None:
|
||||
"""Add a device list change for the given device to
|
||||
`device_lists_outbound_pokes` table.
|
||||
|
|
Loading…
Reference in a new issue