diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py index 2d6d49eed7..43adef4400 100644 --- a/synapse/replication/tcp/client.py +++ b/synapse/replication/tcp/client.py @@ -114,7 +114,7 @@ class ReplicationDataHandler: """ all_room_ids: Set[str] = set() if stream_name == DeviceListsStream.NAME: - if any(row.entity.startswith("@") and not row.is_signature for row in rows): + if any(not row.is_signature and not row.hosts_calculated for row in rows): prev_token = self.store.get_device_stream_token() all_room_ids = await self.store.get_all_device_list_changes( prev_token, token @@ -149,7 +149,7 @@ class ReplicationDataHandler: ) await self._pusher_pool.on_new_receipts({row.user_id for row in rows}) elif stream_name == ToDeviceStream.NAME: - entities = [row.entity for row in rows if row.entity.startswith("@")] + entities = [row.user_id for row in rows if not row.hosts_calculated] if entities: self.notifier.on_new_event( StreamKeyType.TO_DEVICE, token, users=entities @@ -433,11 +433,7 @@ class FederationSenderHandler: # The entities are either user IDs (starting with '@') whose devices # have changed, or remote servers that we need to tell about # changes. - hosts = { - row.entity - for row in rows - if not row.entity.startswith("@") and not row.is_signature - } + hosts = await self.store.get_destinations_for_device(token) await self.federation_sender.send_device_messages(hosts, immediate=False) elif stream_name == ToDeviceStream.NAME: diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py index 89ab14705b..390fa22ddd 100644 --- a/synapse/storage/databases/main/devices.py +++ b/synapse/storage/databases/main/devices.py @@ -207,22 +207,25 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore): ) -> None: for row in rows: if row.is_signature: - self._user_signature_stream_cache.entity_has_changed(row.entity, token) + self._user_signature_stream_cache.entity_has_changed(row.user_id, token) continue # The entities are either user IDs (starting with '@') whose devices # have changed, or remote servers that we need to tell about # changes. - if row.entity.startswith("@"): - self._device_list_stream_cache.entity_has_changed(row.entity, token) - self.get_cached_devices_for_user.invalidate((row.entity,)) - self._get_cached_user_device.invalidate((row.entity,)) - self.get_device_list_last_stream_id_for_remote.invalidate((row.entity,)) + if not row.hosts_calculated: + self._device_list_stream_cache.entity_has_changed(row.user_id, token) + self.get_cached_devices_for_user.invalidate((row.user_id,)) + self._get_cached_user_device.invalidate((row.user_id,)) + self.get_device_list_last_stream_id_for_remote.invalidate( + (row.user_id,) + ) else: - self._device_list_federation_stream_cache.entity_has_changed( - row.entity, token - ) + # self._device_list_federation_stream_cache.entity_has_changed( + # row.entity, token + # ) + pass def device_lists_in_rooms_have_changed( self, room_ids: StrCollection, token: int @@ -364,18 +367,18 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore): """ now_stream_id = self.get_device_stream_token() - has_changed = self._device_list_federation_stream_cache.has_entity_changed( - destination, int(from_stream_id) - ) - if not has_changed: - # debugging for https://github.com/matrix-org/synapse/issues/14251 - issue_8631_logger.debug( - "%s: no change between %i and %i", - destination, - from_stream_id, - now_stream_id, - ) - return now_stream_id, [] + # has_changed = self._device_list_federation_stream_cache.has_entity_changed( + # destination, int(from_stream_id) + # ) + # if not has_changed: + # # debugging for https://github.com/matrix-org/synapse/issues/14251 + # issue_8631_logger.debug( + # "%s: no change between %i and %i", + # destination, + # from_stream_id, + # now_stream_id, + # ) + # return now_stream_id, [] updates = await self.db_pool.runInteraction( "get_device_updates_by_remote", @@ -1577,6 +1580,14 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore): get_device_list_changes_in_room_txn, ) + async def get_destinations_for_device(self, stream_id: int) -> StrCollection: + return await self.db_pool.simple_select_onecol( + table="device_lists_outbound_pokes", + keyvalues={"stream_id": stream_id}, + retcol="destination", + desc="get_destinations_for_device", + ) + class DeviceBackgroundUpdateStore(SQLBaseStore): def __init__( diff --git a/synapse/storage/databases/main/end_to_end_keys.py b/synapse/storage/databases/main/end_to_end_keys.py index 38d8785faa..9e6c9561ae 100644 --- a/synapse/storage/databases/main/end_to_end_keys.py +++ b/synapse/storage/databases/main/end_to_end_keys.py @@ -123,9 +123,9 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorker if stream_name == DeviceListsStream.NAME: for row in rows: assert isinstance(row, DeviceListsStream.DeviceListsStreamRow) - if row.entity.startswith("@"): + if not row.hosts_calculated: self._get_e2e_device_keys_for_federation_query_inner.invalidate( - (row.entity,) + (row.user_id,) ) super().process_replication_rows(stream_name, instance_name, token, rows)