mirror of
https://github.com/element-hq/synapse
synced 2024-09-29 15:52:43 +00:00
Reduce pauses on large device list changes (#17192)
For large accounts waking up all the relevant notifier streams can cause pauses of the reactor.
This commit is contained in:
parent
0b91ccce47
commit
ebe77381b0
2 changed files with 11 additions and 3 deletions
1
changelog.d/17192.misc
Normal file
1
changelog.d/17192.misc
Normal file
|
@ -0,0 +1 @@
|
||||||
|
Improve performance by fixing a reactor pause.
|
|
@ -55,6 +55,7 @@ from synapse.replication.tcp.streams.partial_state import (
|
||||||
)
|
)
|
||||||
from synapse.types import PersistedEventPosition, ReadReceipt, StreamKeyType, UserID
|
from synapse.types import PersistedEventPosition, ReadReceipt, StreamKeyType, UserID
|
||||||
from synapse.util.async_helpers import Linearizer, timeout_deferred
|
from synapse.util.async_helpers import Linearizer, timeout_deferred
|
||||||
|
from synapse.util.iterutils import batch_iter
|
||||||
from synapse.util.metrics import Measure
|
from synapse.util.metrics import Measure
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
|
@ -150,9 +151,15 @@ class ReplicationDataHandler:
|
||||||
if row.entity.startswith("@") and not row.is_signature:
|
if row.entity.startswith("@") and not row.is_signature:
|
||||||
room_ids = await self.store.get_rooms_for_user(row.entity)
|
room_ids = await self.store.get_rooms_for_user(row.entity)
|
||||||
all_room_ids.update(room_ids)
|
all_room_ids.update(room_ids)
|
||||||
self.notifier.on_new_event(
|
|
||||||
StreamKeyType.DEVICE_LIST, token, rooms=all_room_ids
|
# `all_room_ids` can be large, so let's wake up those streams in batches
|
||||||
)
|
for batched_room_ids in batch_iter(all_room_ids, 100):
|
||||||
|
self.notifier.on_new_event(
|
||||||
|
StreamKeyType.DEVICE_LIST, token, rooms=batched_room_ids
|
||||||
|
)
|
||||||
|
|
||||||
|
# Yield to reactor so that we don't block.
|
||||||
|
await self._clock.sleep(0)
|
||||||
elif stream_name == PushersStream.NAME:
|
elif stream_name == PushersStream.NAME:
|
||||||
for row in rows:
|
for row in rows:
|
||||||
if row.deleted:
|
if row.deleted:
|
||||||
|
|
Loading…
Reference in a new issue