mirror of
https://github.com/element-hq/synapse
synced 2024-10-02 09:12:43 +00:00
Stream change cache
This commit is contained in:
parent
a45f1be28c
commit
ce7b1d3a21
2 changed files with 55 additions and 39 deletions
|
@ -121,6 +121,12 @@ class ReplicationDataHandler:
|
||||||
)
|
)
|
||||||
self.store.device_lists_in_rooms_have_changed(all_room_ids, token)
|
self.store.device_lists_in_rooms_have_changed(all_room_ids, token)
|
||||||
|
|
||||||
|
# If we're sending federation we need to update the device lists
|
||||||
|
# outbound pokes stream change cache with updated hosts.
|
||||||
|
if self.send_handler and any(row.hosts_calculated for row in rows):
|
||||||
|
hosts = await self.store.get_destinations_for_device(token)
|
||||||
|
self.store.device_lists_outbound_pokes_have_changed(hosts, token)
|
||||||
|
|
||||||
self.store.process_replication_rows(stream_name, instance_name, token, rows)
|
self.store.process_replication_rows(stream_name, instance_name, token, rows)
|
||||||
# NOTE: this must be called after process_replication_rows to ensure any
|
# NOTE: this must be called after process_replication_rows to ensure any
|
||||||
# cache invalidations are first handled before any stream ID advances.
|
# cache invalidations are first handled before any stream ID advances.
|
||||||
|
|
|
@ -164,6 +164,8 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore):
|
||||||
prefilled_cache=user_signature_stream_prefill,
|
prefilled_cache=user_signature_stream_prefill,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
self._device_list_federation_stream_cache = None
|
||||||
|
if hs.get_federation_sender() is not None:
|
||||||
(
|
(
|
||||||
device_list_federation_prefill,
|
device_list_federation_prefill,
|
||||||
device_list_federation_list_id,
|
device_list_federation_list_id,
|
||||||
|
@ -221,11 +223,15 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore):
|
||||||
(row.user_id,)
|
(row.user_id,)
|
||||||
)
|
)
|
||||||
|
|
||||||
else:
|
def device_lists_outbound_pokes_have_changed(
|
||||||
# self._device_list_federation_stream_cache.entity_has_changed(
|
self, destinations: StrCollection, token: int
|
||||||
# row.entity, token
|
) -> None:
|
||||||
# )
|
assert self._device_list_federation_stream_cache is not None
|
||||||
pass
|
|
||||||
|
for destination in destinations:
|
||||||
|
self._device_list_federation_stream_cache.entity_has_changed(
|
||||||
|
destination, token
|
||||||
|
)
|
||||||
|
|
||||||
def device_lists_in_rooms_have_changed(
|
def device_lists_in_rooms_have_changed(
|
||||||
self, room_ids: StrCollection, token: int
|
self, room_ids: StrCollection, token: int
|
||||||
|
@ -369,18 +375,21 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore):
|
||||||
if from_stream_id == now_stream_id:
|
if from_stream_id == now_stream_id:
|
||||||
return now_stream_id, []
|
return now_stream_id, []
|
||||||
|
|
||||||
# has_changed = self._device_list_federation_stream_cache.has_entity_changed(
|
if self._device_list_federation_stream_cache is None:
|
||||||
# destination, int(from_stream_id)
|
raise Exception("Func can only be used on federation senders")
|
||||||
# )
|
|
||||||
# if not has_changed:
|
has_changed = self._device_list_federation_stream_cache.has_entity_changed(
|
||||||
# # debugging for https://github.com/matrix-org/synapse/issues/14251
|
destination, int(from_stream_id)
|
||||||
# issue_8631_logger.debug(
|
)
|
||||||
# "%s: no change between %i and %i",
|
if not has_changed:
|
||||||
# destination,
|
# debugging for https://github.com/matrix-org/synapse/issues/14251
|
||||||
# from_stream_id,
|
issue_8631_logger.debug(
|
||||||
# now_stream_id,
|
"%s: no change between %i and %i",
|
||||||
# )
|
destination,
|
||||||
# return now_stream_id, []
|
from_stream_id,
|
||||||
|
now_stream_id,
|
||||||
|
)
|
||||||
|
return now_stream_id, []
|
||||||
|
|
||||||
updates = await self.db_pool.runInteraction(
|
updates = await self.db_pool.runInteraction(
|
||||||
"get_device_updates_by_remote",
|
"get_device_updates_by_remote",
|
||||||
|
@ -2125,6 +2134,7 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
|
||||||
stream_ids: List[int],
|
stream_ids: List[int],
|
||||||
context: Optional[Dict[str, str]],
|
context: Optional[Dict[str, str]],
|
||||||
) -> None:
|
) -> None:
|
||||||
|
if self._device_list_federation_stream_cache:
|
||||||
for host in hosts:
|
for host in hosts:
|
||||||
txn.call_after(
|
txn.call_after(
|
||||||
self._device_list_federation_stream_cache.entity_has_changed,
|
self._device_list_federation_stream_cache.entity_has_changed,
|
||||||
|
|
Loading…
Reference in a new issue