Use new device_list_changes_in_room table when getting device list changes (#13045)

This commit is contained in:
Erik Johnston 2022-06-17 11:42:03 +01:00 committed by GitHub
parent c6d6176411
commit 5099b5ecc7
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 117 additions and 31 deletions

View file

@ -0,0 +1 @@
Speed up fetching of device list changes in `/sync` and `/keys/changes`.

View file

@ -123,6 +123,43 @@ class DeviceWorkerHandler:
return device return device
async def get_device_changes_in_shared_rooms(
self, user_id: str, room_ids: Collection[str], from_token: StreamToken
) -> Collection[str]:
"""Get the set of users whose devices have changed who share a room with
the given user.
"""
changed_users = await self.store.get_device_list_changes_in_rooms(
room_ids, from_token.device_list_key
)
if changed_users is not None:
# We also check if the given user has changed their device. If
# they're in no rooms then the above query won't include them.
changed = await self.store.get_users_whose_devices_changed(
from_token.device_list_key, [user_id]
)
changed_users.update(changed)
return changed_users
# If the DB returned None then the `from_token` is too old, so we fall
# back on looking for device updates for all users.
users_who_share_room = await self.store.get_users_who_share_room_with_user(
user_id
)
tracked_users = set(users_who_share_room)
# Always tell the user about their own devices
tracked_users.add(user_id)
changed = await self.store.get_users_whose_devices_changed(
from_token.device_list_key, tracked_users
)
return changed
@trace @trace
@measure_func("device.get_user_ids_changed") @measure_func("device.get_user_ids_changed")
async def get_user_ids_changed( async def get_user_ids_changed(
@ -138,19 +175,8 @@ class DeviceWorkerHandler:
room_ids = await self.store.get_rooms_for_user(user_id) room_ids = await self.store.get_rooms_for_user(user_id)
# First we check if any devices have changed for users that we share changed = await self.get_device_changes_in_shared_rooms(
# rooms with. user_id, room_ids, from_token
users_who_share_room = await self.store.get_users_who_share_room_with_user(
user_id
)
tracked_users = set(users_who_share_room)
# Always tell the user about their own devices
tracked_users.add(user_id)
changed = await self.store.get_users_whose_devices_changed(
from_token.device_list_key, tracked_users
) )
# Then work out if any users have since joined # Then work out if any users have since joined
@ -237,10 +263,19 @@ class DeviceWorkerHandler:
break break
if possibly_changed or possibly_left: if possibly_changed or possibly_left:
# Take the intersection of the users whose devices may have changed possibly_joined = possibly_changed
# and those that actually still share a room with the user possibly_left = possibly_changed | possibly_left
possibly_joined = possibly_changed & users_who_share_room
possibly_left = (possibly_changed | possibly_left) - users_who_share_room # Double check if we still share rooms with the given user.
users_rooms = await self.store.get_rooms_for_users_with_stream_ordering(
possibly_left
)
for changed_user_id, entries in users_rooms.items():
if any(e.room_id in room_ids for e in entries):
possibly_left.discard(changed_user_id)
else:
possibly_joined.discard(changed_user_id)
else: else:
possibly_joined = set() possibly_joined = set()
possibly_left = set() possibly_left = set()

View file

@ -240,6 +240,7 @@ class SyncHandler:
self.auth_blocking = hs.get_auth_blocking() self.auth_blocking = hs.get_auth_blocking()
self._storage_controllers = hs.get_storage_controllers() self._storage_controllers = hs.get_storage_controllers()
self._state_storage_controller = self._storage_controllers.state self._state_storage_controller = self._storage_controllers.state
self._device_handler = hs.get_device_handler()
# TODO: flush cache entries on subsequent sync request. # TODO: flush cache entries on subsequent sync request.
# Once we get the next /sync request (ie, one with the same access token # Once we get the next /sync request (ie, one with the same access token
@ -1268,21 +1269,11 @@ class SyncHandler:
): ):
users_that_have_changed.add(changed_user_id) users_that_have_changed.add(changed_user_id)
else: else:
users_who_share_room = (
await self.store.get_users_who_share_room_with_user(user_id)
)
# Always tell the user about their own devices. We check as the user
# ID is almost certainly already included (unless they're not in any
# rooms) and taking a copy of the set is relatively expensive.
if user_id not in users_who_share_room:
users_who_share_room = set(users_who_share_room)
users_who_share_room.add(user_id)
tracked_users = users_who_share_room
users_that_have_changed = ( users_that_have_changed = (
await self.store.get_users_whose_devices_changed( await self._device_handler.get_device_changes_in_shared_rooms(
since_token.device_list_key, tracked_users user_id,
sync_result_builder.joined_room_ids,
from_token=since_token,
) )
) )

View file

@ -1208,6 +1208,65 @@ class DeviceWorkerStore(EndToEndKeyWorkerStore):
return devices return devices
@cached()
async def _get_min_device_lists_changes_in_room(self) -> int:
"""Returns the minimum stream ID that we have entries for
`device_lists_changes_in_room`
"""
return await self.db_pool.simple_select_one_onecol(
table="device_lists_changes_in_room",
keyvalues={},
retcol="COALESCE(MIN(stream_id), 0)",
desc="get_min_device_lists_changes_in_room",
)
async def get_device_list_changes_in_rooms(
self, room_ids: Collection[str], from_id: int
) -> Optional[Set[str]]:
"""Return the set of users whose devices have changed in the given rooms
since the given stream ID.
Returns None if the given stream ID is too old.
"""
if not room_ids:
return set()
min_stream_id = await self._get_min_device_lists_changes_in_room()
if min_stream_id > from_id:
return None
sql = """
SELECT DISTINCT user_id FROM device_lists_changes_in_room
WHERE {clause} AND stream_id >= ?
"""
def _get_device_list_changes_in_rooms_txn(
txn: LoggingTransaction,
clause,
args,
) -> Set[str]:
txn.execute(sql.format(clause=clause), args)
return {user_id for user_id, in txn}
changes = set()
for chunk in batch_iter(room_ids, 1000):
clause, args = make_in_list_sql_clause(
self.database_engine, "room_id", chunk
)
args.append(from_id)
changes |= await self.db_pool.runInteraction(
"get_device_list_changes_in_rooms",
_get_device_list_changes_in_rooms_txn,
clause,
args,
)
return changes
class DeviceBackgroundUpdateStore(SQLBaseStore): class DeviceBackgroundUpdateStore(SQLBaseStore):
def __init__( def __init__(