From 2367c5568c01bc65aacc955b76ba707918b37f1e Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 25 Jan 2017 14:27:27 +0000 Subject: [PATCH 01/19] Add basic implementation of local device list changes --- synapse/federation/transaction_queue.py | 24 ++- synapse/handlers/device.py | 65 +++++-- synapse/handlers/e2e_keys.py | 1 + synapse/handlers/sync.py | 13 ++ synapse/rest/client/v2_alpha/sync.py | 6 +- synapse/storage/__init__.py | 11 ++ synapse/storage/_base.py | 6 + synapse/storage/devices.py | 169 +++++++++++++++++- synapse/storage/end_to_end_keys.py | 23 ++- .../schema/delta/40/device_list_streams.sql | 56 ++++++ synapse/streams/events.py | 4 + synapse/types.py | 2 + tests/handlers/test_typing.py | 3 + tests/rest/client/v1/test_rooms.py | 4 +- 14 files changed, 348 insertions(+), 39 deletions(-) create mode 100644 synapse/storage/schema/delta/40/device_list_streams.sql diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index 6b3a7abb9e..65c6673a87 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -100,6 +100,7 @@ class TransactionQueue(object): self.pending_failures_by_dest = {} self.last_device_stream_id_by_dest = {} + self.last_device_list_stream_id_by_dest = {} # HACK to get unique tx id self._next_txn_id = int(self.clock.time_msec()) @@ -356,7 +357,7 @@ class TransactionQueue(object): success = yield self._send_new_transaction( destination, pending_pdus, pending_edus, pending_failures, device_stream_id, - should_delete_from_device_stream=bool(device_message_edus), + includes_device_messages=bool(device_message_edus), limiter=limiter, ) if not success: @@ -373,6 +374,8 @@ class TransactionQueue(object): @defer.inlineCallbacks def _get_new_device_messages(self, destination): + # TODO: Send appropriate device list messages + last_device_stream_id = self.last_device_stream_id_by_dest.get(destination, 0) to_device_stream_id = self.store.get_to_device_stream_token() contents, stream_id = yield self.store.get_new_device_msgs_for_remote( @@ -387,13 +390,27 @@ class TransactionQueue(object): ) for content in contents ] + + last_device_list = self.last_device_list_stream_id_by_dest.get(destination, 0) + now_stream_id, results = yield self.store.get_devices_by_remote( + destination, last_device_list + ) + edus.extend( + Edu( + origin=self.server_name, + destination=destination, + edu_type="m.device_list_update", + content=content, + ) + for content in results + ) defer.returnValue((edus, stream_id)) @measure_func("_send_new_transaction") @defer.inlineCallbacks def _send_new_transaction(self, destination, pending_pdus, pending_edus, pending_failures, device_stream_id, - should_delete_from_device_stream, limiter): + includes_device_messages, limiter): # Sort based on the order field pending_pdus.sort(key=lambda t: t[1]) @@ -506,7 +523,8 @@ class TransactionQueue(object): success = False else: # Remove the acknowledged device messages from the database - if should_delete_from_device_stream: + # Only bother if we actually sent some device messages + if includes_device_messages: yield self.store.delete_device_msgs_for_remote( destination, device_stream_id ) diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index aa68755936..d92780b642 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -15,6 +15,7 @@ from synapse.api import errors from synapse.util import stringutils +from synapse.types import get_domain_from_id from twisted.internet import defer from ._base import BaseHandler @@ -27,6 +28,8 @@ class DeviceHandler(BaseHandler): def __init__(self, hs): super(DeviceHandler, self).__init__(hs) + self.state = hs.get_state_handler() + @defer.inlineCallbacks def check_device_registered(self, user_id, device_id, initial_device_display_name=None): @@ -45,29 +48,29 @@ class DeviceHandler(BaseHandler): str: device id (generated if none was supplied) """ if device_id is not None: - yield self.store.store_device( + new_device = yield self.store.store_device( user_id=user_id, device_id=device_id, initial_device_display_name=initial_device_display_name, - ignore_if_known=True, ) + if new_device: + yield self.notify_device_update(user_id, device_id) defer.returnValue(device_id) # if the device id is not specified, we'll autogen one, but loop a few # times in case of a clash. attempts = 0 while attempts < 5: - try: - device_id = stringutils.random_string(10).upper() - yield self.store.store_device( - user_id=user_id, - device_id=device_id, - initial_device_display_name=initial_device_display_name, - ignore_if_known=False, - ) + device_id = stringutils.random_string(10).upper() + new_device = yield self.store.store_device( + user_id=user_id, + device_id=device_id, + initial_device_display_name=initial_device_display_name, + ) + if new_device: + yield self.notify_device_update(user_id, device_id) defer.returnValue(device_id) - except errors.StoreError: - attempts += 1 + attempts += 1 raise errors.StoreError(500, "Couldn't generate a device ID.") @@ -147,6 +150,8 @@ class DeviceHandler(BaseHandler): user_id=user_id, device_id=device_id ) + yield self.notify_device_update(user_id, device_id) + @defer.inlineCallbacks def update_device(self, user_id, device_id, content): """ Update the given device @@ -166,12 +171,48 @@ class DeviceHandler(BaseHandler): device_id, new_display_name=content.get("display_name") ) + yield self.notify_device_update(user_id, device_id) except errors.StoreError, e: if e.code == 404: raise errors.NotFoundError() else: raise + @defer.inlineCallbacks + def notify_device_update(self, user_id, device_id): + rooms = yield self.store.get_rooms_for_user(user_id) + room_ids = [r.room_id for r in rooms] + + hosts = set() + for room_id in room_ids: + users = yield self.state.get_current_user_in_room(room_id) + hosts.update(get_domain_from_id(u) for u in users) + hosts.discard(self.server_name) + + position = yield self.store.add_device_change_to_streams( + user_id, device_id, list(hosts) + ) + + yield self.notifier.on_new_event( + "device_list_key", position, rooms=room_ids, + ) + + for host in hosts: + self.federation.send_device_messages(host) + + @defer.inlineCallbacks + def get_device_list_changes(self, user_id, room_ids, from_key): + room_ids = frozenset(room_ids) + + user_ids_changed = set() + changed = yield self.store.get_user_whose_devices_changed(from_key) + for other_user_id in changed: + other_rooms = yield self.store.get_rooms_for_user(other_user_id) + if room_ids.intersection(e.room_id for e in other_rooms): + user_ids_changed.add(other_user_id) + + defer.returnValue(user_ids_changed) + def _update_device_from_client_ips(device, client_ips): ip = client_ips.get((device["user_id"], device["device_id"]), {}) diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py index b63a660c06..38c2a2d39e 100644 --- a/synapse/handlers/e2e_keys.py +++ b/synapse/handlers/e2e_keys.py @@ -259,6 +259,7 @@ class E2eKeysHandler(object): user_id, device_id, time_now, encode_canonical_json(device_keys) ) + yield self.device_handler.notify_device_update(user_id, device_id) one_time_keys = keys.get("one_time_keys", None) if one_time_keys: diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index c880f61685..06bf626367 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -115,6 +115,7 @@ class SyncResult(collections.namedtuple("SyncResult", [ "invited", # InvitedSyncResult for each invited room. "archived", # ArchivedSyncResult for each archived room. "to_device", # List of direct messages for the device. + "device_lists", # List of user_ids whose devices have chanegd ])): __slots__ = [] @@ -143,6 +144,7 @@ class SyncHandler(object): self.clock = hs.get_clock() self.response_cache = ResponseCache(hs) self.state = hs.get_state_handler() + self.device_handler = hs.get_device_handler() def wait_for_sync_for_user(self, sync_config, since_token=None, timeout=0, full_state=False): @@ -544,6 +546,16 @@ class SyncHandler(object): yield self._generate_sync_entry_for_to_device(sync_result_builder) + if since_token and since_token.device_list_key: + user_id = sync_config.user.to_string() + rooms = yield self.store.get_rooms_for_user(user_id) + joined_room_ids = set(r.room_id for r in rooms) + device_lists = yield self.device_handler.get_device_list_changes( + user_id, joined_room_ids, since_token.device_list_key + ) + else: + device_lists = [] + defer.returnValue(SyncResult( presence=sync_result_builder.presence, account_data=sync_result_builder.account_data, @@ -551,6 +563,7 @@ class SyncHandler(object): invited=sync_result_builder.invited, archived=sync_result_builder.archived, to_device=sync_result_builder.to_device, + device_lists=device_lists, next_batch=sync_result_builder.now_token, )) diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py index 7199ec883a..b3d8001638 100644 --- a/synapse/rest/client/v2_alpha/sync.py +++ b/synapse/rest/client/v2_alpha/sync.py @@ -170,12 +170,16 @@ class SyncRestServlet(RestServlet): ) archived = self.encode_archived( - sync_result.archived, time_now, requester.access_token_id, filter.event_fields + sync_result.archived, time_now, requester.access_token_id, + filter.event_fields, ) response_content = { "account_data": {"events": sync_result.account_data}, "to_device": {"events": sync_result.to_device}, + "device_lists": { + "changed": list(sync_result.device_lists), + }, "presence": self.encode_presence( sync_result.presence, time_now ), diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index e8495f1eb9..b9968debe5 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -116,6 +116,9 @@ class DataStore(RoomMemberStore, RoomStore, self._public_room_id_gen = StreamIdGenerator( db_conn, "public_room_list_stream", "stream_id" ) + self._device_list_id_gen = StreamIdGenerator( + db_conn, "device_lists_stream", "stream_id", + ) self._transaction_id_gen = IdGenerator(db_conn, "sent_transactions", "id") self._state_groups_id_gen = IdGenerator(db_conn, "state_groups", "id") @@ -210,6 +213,14 @@ class DataStore(RoomMemberStore, RoomStore, prefilled_cache=device_outbox_prefill, ) + device_list_max = self._device_list_id_gen.get_current_token() + self._device_list_stream_cache = StreamChangeCache( + "DeviceListStreamChangeCache", device_list_max, + ) + self._device_list_federation_stream_cache = StreamChangeCache( + "DeviceListFederationStreamChangeCache", device_list_max, + ) + cur = LoggingTransaction( db_conn.cursor(), name="_find_stream_orderings_for_times_txn", diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 963ef999d5..05374682fd 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -387,6 +387,10 @@ class SQLBaseStore(object): Args: table : string giving the table name values : dict of new column names and values for them + + Returns: + bool: Whether the row was inserted or not. Only useful when + `or_ignore` is True """ try: yield self.runInteraction( @@ -398,6 +402,8 @@ class SQLBaseStore(object): # a cursor after we receive an error from the db. if not or_ignore: raise + defer.returnValue(False) + defer.returnValue(True) @staticmethod def _simple_insert_txn(txn, table, values): diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py index 17920d4480..b594f501f9 100644 --- a/synapse/storage/devices.py +++ b/synapse/storage/devices.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging +import ujson as json from twisted.internet import defer @@ -33,17 +34,13 @@ class DeviceStore(SQLBaseStore): user_id (str): id of user associated with the device device_id (str): id of device initial_device_display_name (str): initial displayname of the - device - ignore_if_known (bool): ignore integrity errors which mean the - device is already known + device. Ignored if device exists. Returns: - defer.Deferred - Raises: - StoreError: if ignore_if_known is False and the device was already - known + defer.Deferred: boolean whether the device was inserted or an + existing device existed with that ID. """ try: - yield self._simple_insert( + inserted = yield self._simple_insert( "devices", values={ "user_id": user_id, @@ -51,8 +48,9 @@ class DeviceStore(SQLBaseStore): "display_name": initial_device_display_name }, desc="store_device", - or_ignore=ignore_if_known, + or_ignore=True, ) + defer.returnValue(inserted) except Exception as e: logger.error("store_device with device_id=%s(%r) user_id=%s(%r)" " display_name=%s(%r) failed: %s", @@ -139,3 +137,156 @@ class DeviceStore(SQLBaseStore): ) defer.returnValue({d["device_id"]: d for d in devices}) + + def get_devices_by_remote(self, destination, from_stream_id): + now_stream_id = self._device_list_id_gen.get_current_token() + + has_changed = self._device_list_stream_cache.has_entity_changed( + destination, int(from_stream_id) + ) + if not has_changed: + defer.returnValue((now_stream_id, [])) + + return self.runInteraction( + "get_devices_by_remote", self._get_devices_by_remote_txn, + destination, from_stream_id, now_stream_id, + ) + + def _get_devices_by_remote_txn(self, txn, destination, from_stream_id, + now_stream_id): + sql = """ + SELECT user_id, device_id, max(stream_id) FROM device_lists_outbound_pokes + WHERE destination = ? AND stream_id > ? AND stream_id <= ? AND sent = ? + GROUP BY user_id, device_id + """ + txn.execute( + sql, (destination, from_stream_id, now_stream_id, False) + ) + rows = txn.fetchall() + + if not rows: + return now_stream_id, [] + + # maps (user_id, device_id) -> stream_id + query_map = {(r[0], r[1]): r[2] for r in rows} + devices = self._get_e2e_device_keys_txn( + txn, query_map.keys(), include_all_devices=True + ) + + prev_sent_id_sql = """ + SELECT coalesce(max(stream_id), 0) as stream_id + FROM device_lists_outbound_pokes + WHERE destination = ? AND user_id = ? AND sent = ? + """ + + results = [] + for user_id, user_devices in devices.iteritems(): + txn.execute(prev_sent_id_sql, (destination, user_id, True)) + rows = txn.fetchall() + prev_id = rows[0][0] + for device_id, result in user_devices.iteritems(): + stream_id = query_map[(user_id, device_id)] + result = { + "user_id": user_id, + "device_id": device_id, + "prev_id": prev_id, + "stream_id": stream_id, + } + + prev_id = stream_id + + key_json = result.get("key_json", None) + if key_json: + result["keys"] = json.loads(key_json) + device_display_name = result.get("device_display_name", None) + if device_display_name: + result["device_display_name"] = device_display_name + + results.setdefault(user_id, {})[device_id] = result + + return now_stream_id, results + + def mark_as_sent_devices_by_remote(self, destination, stream_id): + return self.runInteraction( + "mark_as_sent_devices_by_remote", self._mark_as_sent_devices_by_remote_txn, + destination, stream_id, + ) + + @defer.inlineCallbacks + def get_user_whose_devices_changed(self, from_key): + from_key = int(from_key) + changed = self._device_list_stream_cache.get_all_entities_changed(from_key) + if changed is not None: + defer.returnValue(set(changed)) + + sql = """ + SELECT user_id FROM device_lists_stream WHERE stream_id > ? + """ + rows = yield self._execute("get_user_whose_devices_changed", None, sql, from_key) + defer.returnValue(set(row["user_id"] for row in rows)) + + def _mark_as_sent_devices_by_remote_txn(self, txn, destination, stream_id): + sql = """ + DELETE FROM device_lists_outbound_pokes + WHERE destination = ? AND stream_id < ( + SELECT coalesce(max(stream_id), 0) FROM device_lists_outbound_pokes + WHERE destination = ? AND stream_id <= ? + ) + """ + txn.execute(sql, (destination, destination, stream_id,)) + + sql = """ + UPDATE device_lists_outbound_pokes SET sent = ? + WHERE destination = ? AND stream_id <= ? + """ + txn.execute(sql, (destination, True,)) + + @defer.inlineCallbacks + def add_device_change_to_streams(self, user_id, device_id, hosts): + # device_lists_stream + # device_lists_outbound_pokes + with self._device_list_id_gen.get_next() as stream_id: + yield self.runInteraction( + "add_device_change_to_streams", self._add_device_change_txn, + user_id, device_id, hosts, stream_id, + ) + defer.returnValue(stream_id) + + def _add_device_change_txn(self, txn, user_id, device_id, hosts, stream_id): + txn.call_after( + self._device_list_stream_cache.entity_has_changed, + user_id, stream_id, + ) + for host in hosts: + txn.call_after( + self._device_list_federation_stream_cache.entity_has_changed, + host, stream_id, + ) + + self._simple_insert_txn( + txn, + table="device_lists_stream", + values={ + "stream_id": stream_id, + "user_id": user_id, + "device_id": device_id, + } + ) + + self._simple_insert_many_txn( + txn, + table="device_lists_outbound_pokes", + values=[ + { + "destination": destination, + "stream_id": stream_id, + "user_id": user_id, + "device_id": device_id, + "sent": False, + } + for destination in hosts + ] + ) + + def get_device_stream_token(self): + return self._device_list_id_gen.get_current_token() diff --git a/synapse/storage/end_to_end_keys.py b/synapse/storage/end_to_end_keys.py index 385d607056..f82943a7a8 100644 --- a/synapse/storage/end_to_end_keys.py +++ b/synapse/storage/end_to_end_keys.py @@ -12,9 +12,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -import collections - -import twisted.internet.defer +from twisted.internet import defer from ._base import SQLBaseStore @@ -33,7 +31,7 @@ class EndToEndKeyStore(SQLBaseStore): } ) - def get_e2e_device_keys(self, query_list): + def get_e2e_device_keys(self, query_list, include_all_devices=False): """Fetch a list of device keys. Args: query_list(list): List of pairs of user_ids and device_ids. @@ -45,10 +43,11 @@ class EndToEndKeyStore(SQLBaseStore): return {} return self.runInteraction( - "get_e2e_device_keys", self._get_e2e_device_keys_txn, query_list + "get_e2e_device_keys", self._get_e2e_device_keys_txn, + query_list, include_all_devices, ) - def _get_e2e_device_keys_txn(self, txn, query_list): + def _get_e2e_device_keys_txn(self, txn, query_list, include_all_devices): query_clauses = [] query_params = [] @@ -63,23 +62,23 @@ class EndToEndKeyStore(SQLBaseStore): query_clauses.append(query_clause) sql = ( - "SELECT k.user_id, k.device_id, " + "SELECT user_id, device_id, " " d.display_name AS device_display_name, " " k.key_json" " FROM e2e_device_keys_json k" - " LEFT JOIN devices d ON d.user_id = k.user_id" - " AND d.device_id = k.device_id" + " %s JOIN devices d USING (user_id, device_id)" " WHERE %s" ) % ( + "FULL OUTER" if include_all_devices else "LEFT", " OR ".join("(" + q + ")" for q in query_clauses) ) txn.execute(sql, query_params) rows = self.cursor_to_dict(txn) - result = collections.defaultdict(dict) + result = {} for row in rows: - result[row["user_id"]][row["device_id"]] = row + result.setdefault(row["user_id"], {})[row["device_id"]] = row return result @@ -152,7 +151,7 @@ class EndToEndKeyStore(SQLBaseStore): "claim_e2e_one_time_keys", _claim_e2e_one_time_keys ) - @twisted.internet.defer.inlineCallbacks + @defer.inlineCallbacks def delete_e2e_keys_by_device(self, user_id, device_id): yield self._simple_delete( table="e2e_device_keys_json", diff --git a/synapse/storage/schema/delta/40/device_list_streams.sql b/synapse/storage/schema/delta/40/device_list_streams.sql new file mode 100644 index 0000000000..61cac63bbb --- /dev/null +++ b/synapse/storage/schema/delta/40/device_list_streams.sql @@ -0,0 +1,56 @@ +/* Copyright 2017 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE TABLE device_list_streams_remote ( + list_id TEXT NOT NULL, + origin TEXT NOT NULL, + user_id TEXT NOT NULL, + is_full BOOLEAN NOT NULL, + ts BIGINT NOT NULL +); + +CREATE INDEX device_list_streams_remote_id_origin ON device_list_streams_remote( + origin, list_id, user_id +); + + +CREATE TABLE device_lists_remote_cache ( + user_id TEXT NOT NULL, + device_id TEXT NOT NULL, + content TEXT NOT NULL +); + +CREATE INDEX device_lists_remote_cache_id ON device_lists_remote_cache(user_id, device_id); + + +CREATE TABLE device_lists_stream ( + stream_id BIGINT NOT NULL, + user_id TEXT NOT NULL, + device_id TEXT NOT NULL +); + +CREATE INDEX device_lists_stream_id ON device_lists_stream(stream_id, user_id); + + +CREATE TABLE device_lists_outbound_pokes ( + destination TEXT NOT NULL, + stream_id BIGINT NOT NULL, + user_id TEXT NOT NULL, + device_id TEXT NOT NULL, + sent BOOLEAN NOT NULL +); + +CREATE INDEX device_lists_outbound_pokes_id ON device_lists_outbound_pokes(destination, stream_id); +CREATE INDEX device_lists_outbound_pokes_user ON device_lists_outbound_pokes(destination, user_id); diff --git a/synapse/streams/events.py b/synapse/streams/events.py index 4d44c3d4ca..91a59b0bae 100644 --- a/synapse/streams/events.py +++ b/synapse/streams/events.py @@ -44,6 +44,7 @@ class EventSources(object): def get_current_token(self): push_rules_key, _ = self.store.get_push_rules_stream_token() to_device_key = self.store.get_to_device_stream_token() + device_list_key = self.store.get_device_stream_token() token = StreamToken( room_key=( @@ -63,6 +64,7 @@ class EventSources(object): ), push_rules_key=push_rules_key, to_device_key=to_device_key, + device_list_key=device_list_key, ) defer.returnValue(token) @@ -70,6 +72,7 @@ class EventSources(object): def get_current_token_for_room(self, room_id): push_rules_key, _ = self.store.get_push_rules_stream_token() to_device_key = self.store.get_to_device_stream_token() + device_list_key = self.store.get_device_stream_token() token = StreamToken( room_key=( @@ -89,5 +92,6 @@ class EventSources(object): ), push_rules_key=push_rules_key, to_device_key=to_device_key, + device_list_key=device_list_key, ) defer.returnValue(token) diff --git a/synapse/types.py b/synapse/types.py index 3a3ab21d17..9666f9d73f 100644 --- a/synapse/types.py +++ b/synapse/types.py @@ -158,6 +158,7 @@ class StreamToken( "account_data_key", "push_rules_key", "to_device_key", + "device_list_key", )) ): _SEPARATOR = "_" @@ -195,6 +196,7 @@ class StreamToken( or (int(other.account_data_key) < int(self.account_data_key)) or (int(other.push_rules_key) < int(self.push_rules_key)) or (int(other.to_device_key) < int(self.to_device_key)) + or (int(other.device_list_key) < int(self.device_list_key)) ) def copy_and_advance(self, key, new_value): diff --git a/tests/handlers/test_typing.py b/tests/handlers/test_typing.py index c718d1f98f..f88d2be7c5 100644 --- a/tests/handlers/test_typing.py +++ b/tests/handlers/test_typing.py @@ -75,6 +75,7 @@ class TypingNotificationsTestCase(unittest.TestCase): "get_received_txn_response", "set_received_txn_response", "get_destination_retry_timings", + "get_devices_by_remote", ]), state_handler=self.state_handler, handlers=None, @@ -99,6 +100,8 @@ class TypingNotificationsTestCase(unittest.TestCase): defer.succeed(retry_timings_res) ) + self.datastore.get_devices_by_remote.return_value = (0, []) + def get_received_txn_response(*args): return defer.succeed(None) self.datastore.get_received_txn_response = get_received_txn_response diff --git a/tests/rest/client/v1/test_rooms.py b/tests/rest/client/v1/test_rooms.py index 6bce352c5f..d746ea8568 100644 --- a/tests/rest/client/v1/test_rooms.py +++ b/tests/rest/client/v1/test_rooms.py @@ -1032,7 +1032,7 @@ class RoomMessageListTestCase(RestTestCase): @defer.inlineCallbacks def test_topo_token_is_accepted(self): - token = "t1-0_0_0_0_0_0_0" + token = "t1-0_0_0_0_0_0_0_0" (code, response) = yield self.mock_resource.trigger_get( "/rooms/%s/messages?access_token=x&from=%s" % (self.room_id, token)) @@ -1044,7 +1044,7 @@ class RoomMessageListTestCase(RestTestCase): @defer.inlineCallbacks def test_stream_token_is_accepted_for_fwd_pagianation(self): - token = "s0_0_0_0_0_0_0" + token = "s0_0_0_0_0_0_0_0" (code, response) = yield self.mock_resource.trigger_get( "/rooms/%s/messages?access_token=x&from=%s" % (self.room_id, token)) From 51e9fe36e46331ac611cec1d4cb425c1bc98721c Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 25 Jan 2017 16:55:21 +0000 Subject: [PATCH 02/19] Fix up sending of m.device_list_update edus --- synapse/federation/transaction_queue.py | 123 ++++++++++++------------ synapse/handlers/device.py | 1 + synapse/storage/devices.py | 40 ++++---- 3 files changed, 83 insertions(+), 81 deletions(-) diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index 65c6673a87..d18f6b6cfd 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -306,62 +306,74 @@ class TransactionQueue(object): yield run_on_reactor() while True: - pending_pdus = self.pending_pdus_by_dest.pop(destination, []) - pending_edus = self.pending_edus_by_dest.pop(destination, []) - pending_presence = self.pending_presence_by_dest.pop(destination, {}) - pending_failures = self.pending_failures_by_dest.pop(destination, []) + pending_pdus = self.pending_pdus_by_dest.pop(destination, []) + pending_edus = self.pending_edus_by_dest.pop(destination, []) + pending_presence = self.pending_presence_by_dest.pop(destination, {}) + pending_failures = self.pending_failures_by_dest.pop(destination, []) - pending_edus.extend( - self.pending_edus_keyed_by_dest.pop(destination, {}).values() + pending_edus.extend( + self.pending_edus_keyed_by_dest.pop(destination, {}).values() + ) + + limiter = yield get_retry_limiter( + destination, + self.clock, + self.store, + ) + + device_message_edus, device_stream_id, dev_list_id = ( + yield self._get_new_device_messages(destination) + ) + + pending_edus.extend(device_message_edus) + if pending_presence: + pending_edus.append( + Edu( + origin=self.server_name, + destination=destination, + edu_type="m.presence", + content={ + "push": [ + format_user_presence_state( + presence, self.clock.time_msec() + ) + for presence in pending_presence.values() + ] + }, + ) ) - limiter = yield get_retry_limiter( - destination, - self.clock, - self.store, - ) + if pending_pdus: + logger.debug("TX [%s] len(pending_pdus_by_dest[dest]) = %d", + destination, len(pending_pdus)) - device_message_edus, device_stream_id = ( - yield self._get_new_device_messages(destination) + if not pending_pdus and not pending_edus and not pending_failures: + logger.debug("TX [%s] Nothing to send", destination) + self.last_device_stream_id_by_dest[destination] = ( + device_stream_id ) + return - pending_edus.extend(device_message_edus) - if pending_presence: - pending_edus.append( - Edu( - origin=self.server_name, - destination=destination, - edu_type="m.presence", - content={ - "push": [ - format_user_presence_state( - presence, self.clock.time_msec() - ) - for presence in pending_presence.values() - ] - }, - ) + success = yield self._send_new_transaction( + destination, pending_pdus, pending_edus, pending_failures, + limiter=limiter, + ) + if success: + # Remove the acknowledged device messages from the database + # Only bother if we actually sent some device messages + if device_message_edus: + yield self.store.delete_device_msgs_for_remote( + destination, device_stream_id + ) + logger.info("Marking as sent %r %r", destination, dev_list_id) + yield self.store.mark_as_sent_devices_by_remote( + destination, dev_list_id ) - if pending_pdus: - logger.debug("TX [%s] len(pending_pdus_by_dest[dest]) = %d", - destination, len(pending_pdus)) - - if not pending_pdus and not pending_edus and not pending_failures: - logger.debug("TX [%s] Nothing to send", destination) - self.last_device_stream_id_by_dest[destination] = ( - device_stream_id - ) - return - - success = yield self._send_new_transaction( - destination, pending_pdus, pending_edus, pending_failures, - device_stream_id, - includes_device_messages=bool(device_message_edus), - limiter=limiter, - ) - if not success: - break + self.last_device_stream_id_by_dest[destination] = device_stream_id + self.last_device_list_stream_id_by_dest[destination] = dev_list_id + else: + break except NotRetryingDestination: logger.debug( "TX [%s] not ready for retry yet - " @@ -374,8 +386,6 @@ class TransactionQueue(object): @defer.inlineCallbacks def _get_new_device_messages(self, destination): - # TODO: Send appropriate device list messages - last_device_stream_id = self.last_device_stream_id_by_dest.get(destination, 0) to_device_stream_id = self.store.get_to_device_stream_token() contents, stream_id = yield self.store.get_new_device_msgs_for_remote( @@ -404,13 +414,12 @@ class TransactionQueue(object): ) for content in results ) - defer.returnValue((edus, stream_id)) + defer.returnValue((edus, stream_id, now_stream_id)) @measure_func("_send_new_transaction") @defer.inlineCallbacks def _send_new_transaction(self, destination, pending_pdus, pending_edus, - pending_failures, device_stream_id, - includes_device_messages, limiter): + pending_failures, limiter): # Sort based on the order field pending_pdus.sort(key=lambda t: t[1]) @@ -521,14 +530,6 @@ class TransactionQueue(object): "Failed to send event %s to %s", p.event_id, destination ) success = False - else: - # Remove the acknowledged device messages from the database - # Only bother if we actually sent some device messages - if includes_device_messages: - yield self.store.delete_device_msgs_for_remote( - destination, device_stream_id - ) - self.last_device_stream_id_by_dest[destination] = device_stream_id except RuntimeError as e: # We capture this here as there as nothing actually listens # for this finishing functions deferred. diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index d92780b642..ba4c48d590 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -29,6 +29,7 @@ class DeviceHandler(BaseHandler): super(DeviceHandler, self).__init__(hs) self.state = hs.get_state_handler() + self.federation = hs.get_federation_sender() @defer.inlineCallbacks def check_device_registered(self, user_id, device_id, diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py index b594f501f9..9628e2ff75 100644 --- a/synapse/storage/devices.py +++ b/synapse/storage/devices.py @@ -141,11 +141,11 @@ class DeviceStore(SQLBaseStore): def get_devices_by_remote(self, destination, from_stream_id): now_stream_id = self._device_list_id_gen.get_current_token() - has_changed = self._device_list_stream_cache.has_entity_changed( + has_changed = self._device_list_federation_stream_cache.has_entity_changed( destination, int(from_stream_id) ) if not has_changed: - defer.returnValue((now_stream_id, [])) + return (now_stream_id, []) return self.runInteraction( "get_devices_by_remote", self._get_devices_by_remote_txn, @@ -165,7 +165,7 @@ class DeviceStore(SQLBaseStore): rows = txn.fetchall() if not rows: - return now_stream_id, [] + return (now_stream_id, []) # maps (user_id, device_id) -> stream_id query_map = {(r[0], r[1]): r[2] for r in rows} @@ -189,7 +189,7 @@ class DeviceStore(SQLBaseStore): result = { "user_id": user_id, "device_id": device_id, - "prev_id": prev_id, + "prev_id": [prev_id] if prev_id else [], "stream_id": stream_id, } @@ -202,9 +202,9 @@ class DeviceStore(SQLBaseStore): if device_display_name: result["device_display_name"] = device_display_name - results.setdefault(user_id, {})[device_id] = result + results.append(result) - return now_stream_id, results + return (now_stream_id, results) def mark_as_sent_devices_by_remote(self, destination, stream_id): return self.runInteraction( @@ -212,19 +212,6 @@ class DeviceStore(SQLBaseStore): destination, stream_id, ) - @defer.inlineCallbacks - def get_user_whose_devices_changed(self, from_key): - from_key = int(from_key) - changed = self._device_list_stream_cache.get_all_entities_changed(from_key) - if changed is not None: - defer.returnValue(set(changed)) - - sql = """ - SELECT user_id FROM device_lists_stream WHERE stream_id > ? - """ - rows = yield self._execute("get_user_whose_devices_changed", None, sql, from_key) - defer.returnValue(set(row["user_id"] for row in rows)) - def _mark_as_sent_devices_by_remote_txn(self, txn, destination, stream_id): sql = """ DELETE FROM device_lists_outbound_pokes @@ -239,7 +226,20 @@ class DeviceStore(SQLBaseStore): UPDATE device_lists_outbound_pokes SET sent = ? WHERE destination = ? AND stream_id <= ? """ - txn.execute(sql, (destination, True,)) + txn.execute(sql, (True, destination, stream_id,)) + + @defer.inlineCallbacks + def get_user_whose_devices_changed(self, from_key): + from_key = int(from_key) + changed = self._device_list_stream_cache.get_all_entities_changed(from_key) + if changed is not None: + defer.returnValue(set(changed)) + + sql = """ + SELECT user_id FROM device_lists_stream WHERE stream_id > ? + """ + rows = yield self._execute("get_user_whose_devices_changed", None, sql, from_key) + defer.returnValue(set(row["user_id"] for row in rows)) @defer.inlineCallbacks def add_device_change_to_streams(self, user_id, device_id, hosts): From c974116f197d211ba9b42159fe61cfd5957411b5 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 26 Jan 2017 16:06:54 +0000 Subject: [PATCH 03/19] Implement device key caching over federation --- synapse/federation/federation_client.py | 10 + synapse/federation/federation_server.py | 3 + synapse/federation/transport/client.py | 26 +++ synapse/federation/transport/server.py | 8 + synapse/handlers/device.py | 85 ++++++-- synapse/handlers/e2e_keys.py | 40 +++- synapse/storage/devices.py | 201 ++++++++++++++++-- synapse/storage/end_to_end_keys.py | 4 +- .../schema/delta/40/device_list_streams.sql | 20 +- tests/handlers/test_device.py | 18 +- tests/handlers/test_directory.py | 1 + tests/handlers/test_profile.py | 1 + tests/storage/test_appservice.py | 21 +- 13 files changed, 381 insertions(+), 57 deletions(-) diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index c9175bb33d..b5bcfd705a 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -126,6 +126,16 @@ class FederationClient(FederationBase): destination, content, timeout ) + @log_function + def query_user_devices(self, destination, user_id, timeout=30000): + """Query the device keys for a list of user ids hosted on a remote + server. + """ + sent_queries_counter.inc("user_devices") + return self.transport_layer.query_user_devices( + destination, user_id, timeout + ) + @log_function def claim_client_keys(self, destination, content, timeout): """Claims one-time keys for a device hosted on a remote server. diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 862ccbef5d..e922b7ff4a 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -416,6 +416,9 @@ class FederationServer(FederationBase): def on_query_client_keys(self, origin, content): return self.on_query_request("client_keys", content) + def on_query_user_devices(self, origin, user_id): + return self.on_query_request("user_devices", user_id) + @defer.inlineCallbacks @log_function def on_claim_client_keys(self, origin, content): diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py index 915af34409..f49e8a2cc4 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py @@ -346,6 +346,32 @@ class TransportLayerClient(object): ) defer.returnValue(content) + @defer.inlineCallbacks + @log_function + def query_user_devices(self, destination, user_id, timeout): + """Query the devices for a user id hosted on a remote server. + + Response: + { + "stream_id": "...", + "devices": [ { ... } ] + } + + Args: + destination(str): The server to query. + query_content(dict): The user ids to query. + Returns: + A dict containg the device keys. + """ + path = PREFIX + "/user/devices/" + user_id + + content = yield self.client.get_json( + destination=destination, + path=path, + timeout=timeout, + ) + defer.returnValue(content) + @defer.inlineCallbacks @log_function def claim_client_keys(self, destination, query_content, timeout): diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index 159dbd1747..c840da834c 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -409,6 +409,13 @@ class FederationClientKeysQueryServlet(BaseFederationServlet): return self.handler.on_query_client_keys(origin, content) +class FederationUserDevicesQueryServlet(BaseFederationServlet): + PATH = "/user/devices/(?P[^/]*)" + + def on_GET(self, origin, content, query, user_id): + return self.handler.on_query_user_devices(origin, user_id) + + class FederationClientKeysClaimServlet(BaseFederationServlet): PATH = "/user/keys/claim" @@ -613,6 +620,7 @@ SERVLET_CLASSES = ( FederationGetMissingEventsServlet, FederationEventAuthServlet, FederationClientKeysQueryServlet, + FederationUserDevicesQueryServlet, FederationClientKeysClaimServlet, FederationThirdPartyInviteExchangeServlet, On3pidBindServlet, diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index ba4c48d590..2d66b3721a 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -15,6 +15,7 @@ from synapse.api import errors from synapse.util import stringutils +from synapse.util.async import Linearizer from synapse.types import get_domain_from_id from twisted.internet import defer from ._base import BaseHandler @@ -28,8 +29,18 @@ class DeviceHandler(BaseHandler): def __init__(self, hs): super(DeviceHandler, self).__init__(hs) + self.hs = hs self.state = hs.get_state_handler() - self.federation = hs.get_federation_sender() + self.federation_sender = hs.get_federation_sender() + self.federation = hs.get_replication_layer() + self._remote_edue_linearizer = Linearizer(name="remote_device_list") + + self.federation.register_edu_handler( + "m.device_list_update", self._incoming_device_list_update, + ) + self.federation.register_query_handler( + "user_devices", self.on_federation_query_user_devices, + ) @defer.inlineCallbacks def check_device_registered(self, user_id, device_id, @@ -55,7 +66,7 @@ class DeviceHandler(BaseHandler): initial_device_display_name=initial_device_display_name, ) if new_device: - yield self.notify_device_update(user_id, device_id) + yield self.notify_device_update(user_id, [device_id]) defer.returnValue(device_id) # if the device id is not specified, we'll autogen one, but loop a few @@ -69,7 +80,7 @@ class DeviceHandler(BaseHandler): initial_device_display_name=initial_device_display_name, ) if new_device: - yield self.notify_device_update(user_id, device_id) + yield self.notify_device_update(user_id, [device_id]) defer.returnValue(device_id) attempts += 1 @@ -151,7 +162,7 @@ class DeviceHandler(BaseHandler): user_id=user_id, device_id=device_id ) - yield self.notify_device_update(user_id, device_id) + yield self.notify_device_update(user_id, [device_id]) @defer.inlineCallbacks def update_device(self, user_id, device_id, content): @@ -172,7 +183,7 @@ class DeviceHandler(BaseHandler): device_id, new_display_name=content.get("display_name") ) - yield self.notify_device_update(user_id, device_id) + yield self.notify_device_update(user_id, [device_id]) except errors.StoreError, e: if e.code == 404: raise errors.NotFoundError() @@ -180,26 +191,28 @@ class DeviceHandler(BaseHandler): raise @defer.inlineCallbacks - def notify_device_update(self, user_id, device_id): + def notify_device_update(self, user_id, device_ids): rooms = yield self.store.get_rooms_for_user(user_id) room_ids = [r.room_id for r in rooms] hosts = set() - for room_id in room_ids: - users = yield self.state.get_current_user_in_room(room_id) - hosts.update(get_domain_from_id(u) for u in users) - hosts.discard(self.server_name) + if self.hs.is_mine_id(user_id): + for room_id in room_ids: + users = yield self.state.get_current_user_in_room(room_id) + hosts.update(get_domain_from_id(u) for u in users) + hosts.discard(self.server_name) position = yield self.store.add_device_change_to_streams( - user_id, device_id, list(hosts) + user_id, device_ids, list(hosts) ) yield self.notifier.on_new_event( "device_list_key", position, rooms=room_ids, ) + logger.info("Sending device list update notif to: %r", hosts) for host in hosts: - self.federation.send_device_messages(host) + self.federation_sender.send_device_messages(host) @defer.inlineCallbacks def get_device_list_changes(self, user_id, room_ids, from_key): @@ -214,6 +227,54 @@ class DeviceHandler(BaseHandler): defer.returnValue(user_ids_changed) + @defer.inlineCallbacks + def _incoming_device_list_update(self, origin, edu_content): + user_id = edu_content["user_id"] + device_id = edu_content["device_id"] + stream_id = edu_content["stream_id"] + prev_ids = edu_content.get("prev_id", []) + + if get_domain_from_id(user_id) != origin: + # TODO: Raise? + return + + logger.info("Got edu: %r", edu_content) + + with (yield self._remote_edue_linearizer.queue(user_id)): + resync = True + if len(prev_ids) == 1: + extremity = yield self.store.get_device_list_remote_extremity(user_id) + logger.info("Extrem: %r, prev_ids: %r", extremity, prev_ids) + if str(extremity) == str(prev_ids[0]): + resync = False + + if resync: + result = yield self.federation.query_user_devices(origin, user_id) + stream_id = result["stream_id"] + devices = result["devices"] + yield self.store.update_remote_device_list_cache( + user_id, devices, stream_id, + ) + device_ids = [device["device_id"] for device in devices] + yield self.notify_device_update(user_id, device_ids) + else: + content = dict(edu_content) + for key in ("user_id", "device_id", "stream_id", "prev_ids"): + content.pop(key, None) + yield self.store.update_remote_device_list_cache_entry( + user_id, device_id, content, stream_id, + ) + yield self.notify_device_update(user_id, [device_id]) + + @defer.inlineCallbacks + def on_federation_query_user_devices(self, user_id): + stream_id, devices = yield self.store.get_devices_with_keys_by_user(user_id) + defer.returnValue({ + "user_id": user_id, + "stream_id": stream_id, + "devices": devices, + }) + def _update_device_from_client_ips(device, client_ips): ip = client_ips.get((device["user_id"], device["device_id"]), {}) diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py index 38c2a2d39e..832998a6d3 100644 --- a/synapse/handlers/e2e_keys.py +++ b/synapse/handlers/e2e_keys.py @@ -73,8 +73,7 @@ class E2eKeysHandler(object): if self.is_mine_id(user_id): local_query[user_id] = device_ids else: - domain = get_domain_from_id(user_id) - remote_queries.setdefault(domain, {})[user_id] = device_ids + remote_queries[user_id] = device_ids # do the queries failures = {} @@ -85,9 +84,40 @@ class E2eKeysHandler(object): if user_id in local_query: results[user_id] = keys + remote_queries_not_in_cache = {} + if remote_queries: + query_list = [] + for user_id, device_ids in remote_queries.iteritems(): + if device_ids: + query_list.extend((user_id, device_id) for device_id in device_ids) + else: + query_list.append((user_id, None)) + + user_ids_not_in_cache, remote_results = ( + yield self.store.get_user_devices_from_cache( + query_list + ) + ) + for user_id, devices in remote_results.iteritems(): + user_devices = results.setdefault(user_id, {}) + for device_id, device in devices.iteritems(): + keys = device.get("keys", None) + device_display_name = device.get("device_display_name", None) + if keys: + result = dict(keys) + unsigned = result.setdefault("unsigned", {}) + if device_display_name: + unsigned["device_display_name"] = device_display_name + user_devices[device_id] = result + + for user_id in user_ids_not_in_cache: + domain = get_domain_from_id(user_id) + r = remote_queries_not_in_cache.setdefault(domain, {}) + r[user_id] = remote_queries[user_id] + @defer.inlineCallbacks def do_remote_query(destination): - destination_query = remote_queries[destination] + destination_query = remote_queries_not_in_cache[destination] try: limiter = yield get_retry_limiter( destination, self.clock, self.store @@ -119,7 +149,7 @@ class E2eKeysHandler(object): yield preserve_context_over_deferred(defer.gatherResults([ preserve_fn(do_remote_query)(destination) - for destination in remote_queries + for destination in remote_queries_not_in_cache ])) defer.returnValue({ @@ -259,7 +289,7 @@ class E2eKeysHandler(object): user_id, device_id, time_now, encode_canonical_json(device_keys) ) - yield self.device_handler.notify_device_update(user_id, device_id) + yield self.device_handler.notify_device_update(user_id, [device_id]) one_time_keys = keys.get("one_time_keys", None) if one_time_keys: diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py index 9628e2ff75..8ee3119db2 100644 --- a/synapse/storage/devices.py +++ b/synapse/storage/devices.py @@ -138,6 +138,89 @@ class DeviceStore(SQLBaseStore): defer.returnValue({d["device_id"]: d for d in devices}) + def get_device_list_remote_extremity(self, user_id): + return self._simple_select_one_onecol( + table="device_lists_remote_extremeties", + keyvalues={"user_id": user_id}, + retcol="stream_id", + desc="get_device_list_remote_extremity", + allow_none=True, + ) + + def update_remote_device_list_cache_entry(self, user_id, device_id, content, + stream_id): + return self.runInteraction( + "update_remote_device_list_cache_entry", + self._update_remote_device_list_cache_entry_txn, + user_id, device_id, content, stream_id, + ) + + def _update_remote_device_list_cache_entry_txn(self, txn, user_id, device_id, + content, stream_id): + self._simple_upsert_txn( + txn, + table="device_lists_remote_cache", + keyvalues={ + "user_id": user_id, + "device_id": device_id, + }, + values={ + "content": json.dumps(content), + } + ) + + self._simple_upsert_txn( + txn, + table="device_lists_remote_extremeties", + keyvalues={ + "user_id": user_id, + }, + values={ + "stream_id": stream_id, + } + ) + + def update_remote_device_list_cache(self, user_id, devices, stream_id): + return self.runInteraction( + "update_remote_device_list_cache", + self._update_remote_device_list_cache_txn, + user_id, devices, stream_id, + ) + + def _update_remote_device_list_cache_txn(self, txn, user_id, devices, + stream_id): + self._simple_delete_txn( + txn, + table="device_lists_remote_cache", + keyvalues={ + "user_id": user_id, + }, + ) + + self._simple_insert_many_txn( + txn, + table="device_lists_remote_cache", + values=[ + { + "user_id": user_id, + "device_id": content["device_id"], + "content": json.dumps(content), + } + for content in devices + ] + ) + + self._simple_upsert_txn( + txn, + table="device_lists_remote_extremeties", + keyvalues={ + "user_id": user_id, + }, + values={ + "stream_id": stream_id, + } + ) + def get_devices_by_remote(self, destination, from_stream_id): now_stream_id = self._device_list_id_gen.get_current_token() @@ -184,7 +267,7 @@ class DeviceStore(SQLBaseStore): txn.execute(prev_sent_id_sql, (destination, user_id, True)) rows = txn.fetchall() prev_id = rows[0][0] - for device_id, result in user_devices.iteritems(): + for device_id, device in user_devices.iteritems(): stream_id = query_map[(user_id, device_id)] result = { "user_id": user_id, @@ -195,10 +278,10 @@ class DeviceStore(SQLBaseStore): prev_id = stream_id - key_json = result.get("key_json", None) + key_json = device.get("key_json", None) if key_json: result["keys"] = json.loads(key_json) - device_display_name = result.get("device_display_name", None) + device_display_name = device.get("device_display_name", None) if device_display_name: result["device_display_name"] = device_display_name @@ -206,6 +289,96 @@ class DeviceStore(SQLBaseStore): return (now_stream_id, results) + def get_user_devices_from_cache(self, query_list): + return self.runInteraction( + "get_user_devices_from_cache", self._get_user_devices_from_cache_txn, + query_list, + ) + + def _get_user_devices_from_cache_txn(self, txn, query_list): + user_ids = {user_id for user_id, _ in query_list} + + user_ids_in_cache = set() + for user_id in user_ids: + stream_ids = self._simple_select_onecol_txn( + txn, + table="device_lists_remote_extremeties", + keyvalues={ + "user_id": user_id, + }, + retcol="stream_id", + ) + if stream_ids: + user_ids_in_cache.add(user_id) + + user_ids_not_in_cache = user_ids - user_ids_in_cache + + results = {} + for user_id, device_id in query_list: + if user_id not in user_ids_in_cache: + continue + + if device_id: + content = self._simple_select_one_onecol_txn( + txn, + table="device_lists_remote_cache", + keyvalues={ + "user_id": user_id, + "device_id": device_id, + }, + retcol="content", + ) + results.setdefault(user_id, {})[device_id] = json.loads(content) + else: + devices = self._simple_select_list_txn( + txn, + table="device_lists_remote_cache", + keyvalues={ + "user_id": user_id, + }, + retcols=("device_id", "content"), + ) + results[user_id] = { + device["device_id"]: json.loads(device["content"]) + for device in devices + } + user_ids_in_cache.discard(user_id) + + return user_ids_not_in_cache, results + + def get_devices_with_keys_by_user(self, user_id): + return self.runInteraction( + "get_devices_with_keys_by_user", + self._get_devices_with_keys_by_user_txn, user_id, + ) + + def _get_devices_with_keys_by_user_txn(self, txn, user_id): + now_stream_id = self._device_list_id_gen.get_current_token() + + devices = self._get_e2e_device_keys_txn( + txn, [(user_id, None)], include_all_devices=True + ) + + for user_id, user_devices in devices.iteritems(): + results = [] + for device_id, device in user_devices.iteritems(): + result = { + "device_id": device_id, + } + + key_json = device.get("key_json", None) + if key_json: + result["keys"] = json.loads(key_json) + device_display_name = device.get("device_display_name", None) + if device_display_name: + result["device_display_name"] = device_display_name + + results.append(result) + + return now_stream_id, results + + return now_stream_id, [] + def mark_as_sent_devices_by_remote(self, destination, stream_id): return self.runInteraction( "mark_as_sent_devices_by_remote", self._mark_as_sent_devices_by_remote_txn, @@ -242,17 +415,17 @@ class DeviceStore(SQLBaseStore): defer.returnValue(set(row["user_id"] for row in rows)) @defer.inlineCallbacks - def add_device_change_to_streams(self, user_id, device_id, hosts): + def add_device_change_to_streams(self, user_id, device_ids, hosts): # device_lists_stream # device_lists_outbound_pokes with self._device_list_id_gen.get_next() as stream_id: yield self.runInteraction( "add_device_change_to_streams", self._add_device_change_txn, - user_id, device_id, hosts, stream_id, + user_id, device_ids, hosts, stream_id, ) defer.returnValue(stream_id) - def _add_device_change_txn(self, txn, user_id, device_id, hosts, stream_id): + def _add_device_change_txn(self, txn, user_id, device_ids, hosts, stream_id): txn.call_after( self._device_list_stream_cache.entity_has_changed, user_id, stream_id, @@ -263,14 +436,17 @@ class DeviceStore(SQLBaseStore): host, stream_id, ) - self._simple_insert_txn( + self._simple_insert_many_txn( txn, table="device_lists_stream", - values={ - "stream_id": stream_id, - "user_id": user_id, - "device_id": device_id, - } + values=[ + { + "stream_id": stream_id, + "user_id": user_id, + "device_id": device_id, + } + for device_id in device_ids + ] ) self._simple_insert_many_txn( @@ -285,6 +461,7 @@ class DeviceStore(SQLBaseStore): "sent": False, } for destination in hosts + for device_id in device_ids ] ) diff --git a/synapse/storage/end_to_end_keys.py b/synapse/storage/end_to_end_keys.py index f82943a7a8..a915c790ff 100644 --- a/synapse/storage/end_to_end_keys.py +++ b/synapse/storage/end_to_end_keys.py @@ -52,11 +52,11 @@ class EndToEndKeyStore(SQLBaseStore): query_params = [] for (user_id, device_id) in query_list: - query_clause = "k.user_id = ?" + query_clause = "user_id = ?" query_params.append(user_id) if device_id: - query_clause += " AND k.device_id = ?" + query_clause += " AND device_id = ?" query_params.append(device_id) query_clauses.append(query_clause) diff --git a/synapse/storage/schema/delta/40/device_list_streams.sql b/synapse/storage/schema/delta/40/device_list_streams.sql index 61cac63bbb..d1051c6ddf 100644 --- a/synapse/storage/schema/delta/40/device_list_streams.sql +++ b/synapse/storage/schema/delta/40/device_list_streams.sql @@ -13,18 +13,6 @@ * limitations under the License. */ -CREATE TABLE device_list_streams_remote ( - list_id TEXT NOT NULL, - origin TEXT NOT NULL, - user_id TEXT NOT NULL, - is_full BOOLEAN NOT NULL, - ts BIGINT NOT NULL -); - -CREATE INDEX device_list_streams_remote_id_origin ON device_list_streams_remote( - origin, list_id, user_id -); - CREATE TABLE device_lists_remote_cache ( user_id TEXT NOT NULL, @@ -35,6 +23,14 @@ CREATE TABLE device_lists_remote_cache ( CREATE INDEX device_lists_remote_cache_id ON device_lists_remote_cache(user_id, device_id); +CREATE TABLE device_lists_remote_extremeties ( + user_id TEXT NOT NULL, + stream_id TEXT NOT NULL +); + +CREATE INDEX device_lists_remote_extremeties_id ON device_lists_remote_extremeties(user_id, stream_id); + + CREATE TABLE device_lists_stream ( stream_id BIGINT NOT NULL, user_id TEXT NOT NULL, diff --git a/tests/handlers/test_device.py b/tests/handlers/test_device.py index 85a970a6c9..2eaaa8253c 100644 --- a/tests/handlers/test_device.py +++ b/tests/handlers/test_device.py @@ -35,51 +35,51 @@ class DeviceTestCase(unittest.TestCase): @defer.inlineCallbacks def setUp(self): - hs = yield utils.setup_test_homeserver(handlers=None) - self.handler = synapse.handlers.device.DeviceHandler(hs) + hs = yield utils.setup_test_homeserver() + self.handler = hs.get_device_handler() self.store = hs.get_datastore() self.clock = hs.get_clock() @defer.inlineCallbacks def test_device_is_created_if_doesnt_exist(self): res = yield self.handler.check_device_registered( - user_id="boris", + user_id="@boris:foo", device_id="fco", initial_device_display_name="display name" ) self.assertEqual(res, "fco") - dev = yield self.handler.store.get_device("boris", "fco") + dev = yield self.handler.store.get_device("@boris:foo", "fco") self.assertEqual(dev["display_name"], "display name") @defer.inlineCallbacks def test_device_is_preserved_if_exists(self): res1 = yield self.handler.check_device_registered( - user_id="boris", + user_id="@boris:foo", device_id="fco", initial_device_display_name="display name" ) self.assertEqual(res1, "fco") res2 = yield self.handler.check_device_registered( - user_id="boris", + user_id="@boris:foo", device_id="fco", initial_device_display_name="new display name" ) self.assertEqual(res2, "fco") - dev = yield self.handler.store.get_device("boris", "fco") + dev = yield self.handler.store.get_device("@boris:foo", "fco") self.assertEqual(dev["display_name"], "display name") @defer.inlineCallbacks def test_device_id_is_made_up_if_unspecified(self): device_id = yield self.handler.check_device_registered( - user_id="theresa", + user_id="@theresa:foo", device_id=None, initial_device_display_name="display" ) - dev = yield self.handler.store.get_device("theresa", device_id) + dev = yield self.handler.store.get_device("@theresa:foo", device_id) self.assertEqual(dev["display_name"], "display") @defer.inlineCallbacks diff --git a/tests/handlers/test_directory.py b/tests/handlers/test_directory.py index 5d602c1531..ceb9aa5765 100644 --- a/tests/handlers/test_directory.py +++ b/tests/handlers/test_directory.py @@ -37,6 +37,7 @@ class DirectoryTestCase(unittest.TestCase): def setUp(self): self.mock_federation = Mock(spec=[ "make_query", + "register_edu_handler", ]) self.query_handlers = {} diff --git a/tests/handlers/test_profile.py b/tests/handlers/test_profile.py index f1f664275f..979cebf600 100644 --- a/tests/handlers/test_profile.py +++ b/tests/handlers/test_profile.py @@ -39,6 +39,7 @@ class ProfileTestCase(unittest.TestCase): def setUp(self): self.mock_federation = Mock(spec=[ "make_query", + "register_edu_handler", ]) self.query_handlers = {} diff --git a/tests/storage/test_appservice.py b/tests/storage/test_appservice.py index 9ff1abcd80..9e98d0e330 100644 --- a/tests/storage/test_appservice.py +++ b/tests/storage/test_appservice.py @@ -39,7 +39,11 @@ class ApplicationServiceStoreTestCase(unittest.TestCase): event_cache_size=1, password_providers=[], ) - hs = yield setup_test_homeserver(config=config, federation_sender=Mock()) + hs = yield setup_test_homeserver( + config=config, + federation_sender=Mock(), + replication_layer=Mock(), + ) self.as_token = "token1" self.as_url = "some_url" @@ -112,7 +116,11 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase): event_cache_size=1, password_providers=[], ) - hs = yield setup_test_homeserver(config=config, federation_sender=Mock()) + hs = yield setup_test_homeserver( + config=config, + federation_sender=Mock(), + replication_layer=Mock(), + ) self.db_pool = hs.get_db_pool() self.as_list = [ @@ -446,7 +454,8 @@ class ApplicationServiceStoreConfigTestCase(unittest.TestCase): hs = yield setup_test_homeserver( config=config, datastore=Mock(), - federation_sender=Mock() + federation_sender=Mock(), + replication_layer=Mock(), ) ApplicationServiceStore(hs) @@ -463,7 +472,8 @@ class ApplicationServiceStoreConfigTestCase(unittest.TestCase): hs = yield setup_test_homeserver( config=config, datastore=Mock(), - federation_sender=Mock() + federation_sender=Mock(), + replication_layer=Mock(), ) with self.assertRaises(ConfigError) as cm: @@ -486,7 +496,8 @@ class ApplicationServiceStoreConfigTestCase(unittest.TestCase): hs = yield setup_test_homeserver( config=config, datastore=Mock(), - federation_sender=Mock() + federation_sender=Mock(), + replication_layer=Mock(), ) with self.assertRaises(ConfigError) as cm: From fbfad76c03afe7538c67205ceb30825d9ce4fb07 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 26 Jan 2017 16:30:37 +0000 Subject: [PATCH 04/19] Add comments --- synapse/handlers/device.py | 19 ++++++++-- synapse/handlers/e2e_keys.py | 4 +- synapse/storage/devices.py | 37 ++++++++++++++++++- .../schema/delta/40/device_list_streams.sql | 8 +++- 4 files changed, 61 insertions(+), 7 deletions(-) diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index 2d66b3721a..a2ffd273bf 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -192,6 +192,9 @@ class DeviceHandler(BaseHandler): @defer.inlineCallbacks def notify_device_update(self, user_id, device_ids): + """Notify that a user's device(s) has changed. Pokes the notifier, and + remote servers if the user is local. + """ rooms = yield self.store.get_rooms_for_user(user_id) room_ids = [r.room_id for r in rooms] @@ -210,12 +213,16 @@ class DeviceHandler(BaseHandler): "device_list_key", position, rooms=room_ids, ) - logger.info("Sending device list update notif to: %r", hosts) - for host in hosts: - self.federation_sender.send_device_messages(host) + if hosts: + logger.info("Sending device list update notif to: %r", hosts) + for host in hosts: + self.federation_sender.send_device_messages(host) @defer.inlineCallbacks def get_device_list_changes(self, user_id, room_ids, from_key): + """For a user and their joined rooms, calculate which device updates + we need to return. + """ room_ids = frozenset(room_ids) user_ids_changed = set() @@ -236,11 +243,14 @@ class DeviceHandler(BaseHandler): if get_domain_from_id(user_id) != origin: # TODO: Raise? + logger.warning("Got device list update edu for %r from %r", user_id, origin) return logger.info("Got edu: %r", edu_content) with (yield self._remote_edue_linearizer.queue(user_id)): + # If the prev id matches whats in our cache table, then we don't need + # to resync the users device list, otherwise we do. resync = True if len(prev_ids) == 1: extremity = yield self.store.get_device_list_remote_extremity(user_id) @@ -249,6 +259,7 @@ class DeviceHandler(BaseHandler): resync = False if resync: + # Fetch all devices for the user. result = yield self.federation.query_user_devices(origin, user_id) stream_id = result["stream_id"] devices = result["devices"] @@ -258,6 +269,8 @@ class DeviceHandler(BaseHandler): device_ids = [device["device_id"] for device in devices] yield self.notify_device_update(user_id, device_ids) else: + # Simply update the single device, since we know that is the only + # change (becuase of the single prev_id matching the current cache) content = dict(edu_content) for key in ("user_id", "device_id", "stream_id", "prev_ids"): content.pop(key, None) diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py index 832998a6d3..a16b9def8d 100644 --- a/synapse/handlers/e2e_keys.py +++ b/synapse/handlers/e2e_keys.py @@ -75,7 +75,7 @@ class E2eKeysHandler(object): else: remote_queries[user_id] = device_ids - # do the queries + # Firt get local devices. failures = {} results = {} if local_query: @@ -84,6 +84,7 @@ class E2eKeysHandler(object): if user_id in local_query: results[user_id] = keys + # Now attempt to get any remote devices from our local cache. remote_queries_not_in_cache = {} if remote_queries: query_list = [] @@ -115,6 +116,7 @@ class E2eKeysHandler(object): r = remote_queries_not_in_cache.setdefault(domain, {}) r[user_id] = remote_queries[user_id] + # Now fetch any devices that we don't have in our cache @defer.inlineCallbacks def do_remote_query(destination): destination_query = remote_queries_not_in_cache[destination] diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py index 8ee3119db2..cf38dbaa3c 100644 --- a/synapse/storage/devices.py +++ b/synapse/storage/devices.py @@ -139,6 +139,9 @@ class DeviceStore(SQLBaseStore): defer.returnValue({d["device_id"]: d for d in devices}) def get_device_list_remote_extremity(self, user_id): + """Get the last stream_id we got for a user. May be None if we haven't + got any information for them. + """ return self._simple_select_one_onecol( table="device_lists_remote_extremeties", keyvalues={"user_id": user_id}, @@ -149,6 +152,8 @@ class DeviceStore(SQLBaseStore): def update_remote_device_list_cache_entry(self, user_id, device_id, content, stream_id): + """Updates a single user's device in the cache. + """ return self.runInteraction( "update_remote_device_list_cache_entry", self._update_remote_device_list_cache_entry_txn, @@ -181,6 +186,8 @@ class DeviceStore(SQLBaseStore): ) def update_remote_device_list_cache(self, user_id, devices, stream_id): + """Replace the cache of the remote user's devices. + """ return self.runInteraction( "update_remote_device_list_cache", self._update_remote_device_list_cache_txn, @@ -222,6 +229,11 @@ class DeviceStore(SQLBaseStore): ) def get_devices_by_remote(self, destination, from_stream_id): + """Get stream of updates to send to remote servers + + Returns: + (now_stream_id, [ { updates }, .. ]) + """ now_stream_id = self._device_list_id_gen.get_current_token() has_changed = self._device_list_federation_stream_cache.has_entity_changed( @@ -290,6 +302,17 @@ class DeviceStore(SQLBaseStore): return (now_stream_id, results) def get_user_devices_from_cache(self, query_list): + """Get the devices (and keys if any) for remote users from the cache. + + Args: + query_list(list): List of (user_id, device_ids), if device_ids is + falsey then return all device ids for that user. + + Returns: + (user_ids_not_in_cache, results_map), where user_ids_not_in_cache is + a set of user_ids and results_map is a mapping of + user_id -> device_id -> device_info + """ return self.runInteraction( "get_user_devices_from_cache", self._get_user_devices_from_cache_txn, query_list, @@ -347,6 +370,11 @@ class DeviceStore(SQLBaseStore): return user_ids_not_in_cache, results def get_devices_with_keys_by_user(self, user_id): + """Get all devices (with any device keys) for a user + + Returns: + (stream_id, devices) + """ return self.runInteraction( "get_devices_with_keys_by_user", self._get_devices_with_keys_by_user_txn, user_id, @@ -380,6 +408,8 @@ class DeviceStore(SQLBaseStore): return now_stream_id, [] def mark_as_sent_devices_by_remote(self, destination, stream_id): + """Mark that updates have successfully been sent to the destination. + """ return self.runInteraction( "mark_as_sent_devices_by_remote", self._mark_as_sent_devices_by_remote_txn, destination, stream_id, @@ -403,6 +433,8 @@ class DeviceStore(SQLBaseStore): @defer.inlineCallbacks def get_user_whose_devices_changed(self, from_key): + """Get set of users whose devices have changed since `from_key`. + """ from_key = int(from_key) changed = self._device_list_stream_cache.get_all_entities_changed(from_key) if changed is not None: @@ -416,8 +448,9 @@ class DeviceStore(SQLBaseStore): @defer.inlineCallbacks def add_device_change_to_streams(self, user_id, device_ids, hosts): - # device_lists_stream - # device_lists_outbound_pokes + """Persist that a user's devices have been updated, and which hosts + (if any) should be poked. + """ with self._device_list_id_gen.get_next() as stream_id: yield self.runInteraction( "add_device_change_to_streams", self._add_device_change_txn, diff --git a/synapse/storage/schema/delta/40/device_list_streams.sql b/synapse/storage/schema/delta/40/device_list_streams.sql index d1051c6ddf..8348c143c3 100644 --- a/synapse/storage/schema/delta/40/device_list_streams.sql +++ b/synapse/storage/schema/delta/40/device_list_streams.sql @@ -13,7 +13,7 @@ * limitations under the License. */ - +-- Cache of remote devices. CREATE TABLE device_lists_remote_cache ( user_id TEXT NOT NULL, device_id TEXT NOT NULL, @@ -23,6 +23,8 @@ CREATE TABLE device_lists_remote_cache ( CREATE INDEX device_lists_remote_cache_id ON device_lists_remote_cache(user_id, device_id); +-- The last update we got for a user. Empty if we're not receiving updates for +-- that user. CREATE TABLE device_lists_remote_extremeties ( user_id TEXT NOT NULL, stream_id TEXT NOT NULL @@ -31,6 +33,7 @@ CREATE TABLE device_lists_remote_extremeties ( CREATE INDEX device_lists_remote_extremeties_id ON device_lists_remote_extremeties(user_id, stream_id); +-- Stream of device lists updates. Includes both local and remotes CREATE TABLE device_lists_stream ( stream_id BIGINT NOT NULL, user_id TEXT NOT NULL, @@ -40,6 +43,9 @@ CREATE TABLE device_lists_stream ( CREATE INDEX device_lists_stream_id ON device_lists_stream(stream_id, user_id); +-- The stream of updates to send to other servers. We keep at least one row +-- per user that was sent so that the prev_id for any new updates can be +-- calculated CREATE TABLE device_lists_outbound_pokes ( destination TEXT NOT NULL, stream_id BIGINT NOT NULL, From 76d40f490411ce1a0a208acb4242678b0cb4afb3 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 26 Jan 2017 16:39:33 +0000 Subject: [PATCH 05/19] Handle users leaving rooms --- synapse/handlers/device.py | 17 ++++++++++++++++- synapse/storage/devices.py | 8 ++++++++ 2 files changed, 24 insertions(+), 1 deletion(-) diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index a2ffd273bf..1116dfd27c 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -42,6 +42,8 @@ class DeviceHandler(BaseHandler): "user_devices", self.on_federation_query_user_devices, ) + hs.get_distributor().observe("user_left_room", self.user_left_room) + @defer.inlineCallbacks def check_device_registered(self, user_id, device_id, initial_device_display_name=None): @@ -246,7 +248,11 @@ class DeviceHandler(BaseHandler): logger.warning("Got device list update edu for %r from %r", user_id, origin) return - logger.info("Got edu: %r", edu_content) + rooms = yield self.store.get_rooms_for_user(user_id) + if not rooms: + # We don't share any rooms with this user. Ignore update, as we + # probably won't get any further updates. + return with (yield self._remote_edue_linearizer.queue(user_id)): # If the prev id matches whats in our cache table, then we don't need @@ -288,6 +294,15 @@ class DeviceHandler(BaseHandler): "devices": devices, }) + @defer.inlineCallbacks + def user_left_room(self, user, room_id): + user_id = user.to_string() + rooms = yield self.store.get_rooms_for_user(user_id) + if not rooms: + # We no longer share rooms with this user, so we'll no longer + # receive device updates. Mark this in DB. + yield self.store.mark_remote_user_device_list_as_unsubscribed(user_id) + def _update_device_from_client_ips(device, client_ips): ip = client_ips.get((device["user_id"], device["device_id"]), {}) diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py index cf38dbaa3c..1c48c3af99 100644 --- a/synapse/storage/devices.py +++ b/synapse/storage/devices.py @@ -150,6 +150,14 @@ class DeviceStore(SQLBaseStore): allow_none=True, ) + def mark_remote_user_device_list_as_unsubscribed(self, user_id): + return self._simple_delete( + table="device_lists_remote_extremeties", + keyvalues={ + "user_id": user_id, + }, + ) + def update_remote_device_list_cache_entry(self, user_id, device_id, content, stream_id): """Updates a single user's device in the cache. From 31aca5589c3790201b2087e28d2901d00e1f77d5 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 26 Jan 2017 16:55:50 +0000 Subject: [PATCH 06/19] Fix on sqlite: use left rather than outer join --- synapse/storage/end_to_end_keys.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/synapse/storage/end_to_end_keys.py b/synapse/storage/end_to_end_keys.py index a915c790ff..441286d1a1 100644 --- a/synapse/storage/end_to_end_keys.py +++ b/synapse/storage/end_to_end_keys.py @@ -65,11 +65,11 @@ class EndToEndKeyStore(SQLBaseStore): "SELECT user_id, device_id, " " d.display_name AS device_display_name, " " k.key_json" - " FROM e2e_device_keys_json k" - " %s JOIN devices d USING (user_id, device_id)" + " FROM devices d" + " %s JOIN e2e_device_keys_json k USING (user_id, device_id)" " WHERE %s" ) % ( - "FULL OUTER" if include_all_devices else "LEFT", + "LEFT" if include_all_devices else "INNER", " OR ".join("(" + q + ")" for q in query_clauses) ) From b3e1f2aa7a1d583119378bb938ad476e72cc35ac Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 26 Jan 2017 17:16:24 +0000 Subject: [PATCH 07/19] Fix unit tests --- tests/storage/test_end_to_end_keys.py | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/tests/storage/test_end_to_end_keys.py b/tests/storage/test_end_to_end_keys.py index 453bc61438..bfa6294250 100644 --- a/tests/storage/test_end_to_end_keys.py +++ b/tests/storage/test_end_to_end_keys.py @@ -35,6 +35,10 @@ class EndToEndKeyStoreTestCase(tests.unittest.TestCase): now = 1470174257070 json = '{ "key": "value" }' + yield self.store.store_device( + "user", "device", None + ) + yield self.store.set_e2e_device_keys( "user", "device", now, json) @@ -71,6 +75,19 @@ class EndToEndKeyStoreTestCase(tests.unittest.TestCase): def test_multiple_devices(self): now = 1470174257070 + yield self.store.store_device( + "user1", "device1", None + ) + yield self.store.store_device( + "user1", "device2", None + ) + yield self.store.store_device( + "user2", "device1", None + ) + yield self.store.store_device( + "user2", "device2", None + ) + yield self.store.set_e2e_device_keys( "user1", "device1", now, 'json11') yield self.store.set_e2e_device_keys( From f25a4a4692d9b4618efb64984c10a6e8243a4a0b Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 27 Jan 2017 10:27:39 +0000 Subject: [PATCH 08/19] Remove unused param --- synapse/storage/devices.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py index 1c48c3af99..b99de2f1b0 100644 --- a/synapse/storage/devices.py +++ b/synapse/storage/devices.py @@ -26,8 +26,7 @@ logger = logging.getLogger(__name__) class DeviceStore(SQLBaseStore): @defer.inlineCallbacks def store_device(self, user_id, device_id, - initial_device_display_name, - ignore_if_known=True): + initial_device_display_name): """Ensure the given device is known; add it to the store if not Args: From 888c59c955b33c3c69a73766507e134d64a8f25b Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 27 Jan 2017 10:29:47 +0000 Subject: [PATCH 09/19] Better name --- synapse/handlers/device.py | 4 +++- synapse/storage/devices.py | 2 +- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index 1116dfd27c..ed077c9a76 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -259,7 +259,9 @@ class DeviceHandler(BaseHandler): # to resync the users device list, otherwise we do. resync = True if len(prev_ids) == 1: - extremity = yield self.store.get_device_list_remote_extremity(user_id) + extremity = yield self.store.get_device_list_last_stream_id_for_remote( + user_id + ) logger.info("Extrem: %r, prev_ids: %r", extremity, prev_ids) if str(extremity) == str(prev_ids[0]): resync = False diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py index b99de2f1b0..d46203dd35 100644 --- a/synapse/storage/devices.py +++ b/synapse/storage/devices.py @@ -137,7 +137,7 @@ class DeviceStore(SQLBaseStore): defer.returnValue({d["device_id"]: d for d in devices}) - def get_device_list_remote_extremity(self, user_id): + def get_device_list_last_stream_id_for_remote(self, user_id): """Get the last stream_id we got for a user. May be None if we haven't got any information for them. """ From 755adff0e407abd48bd30b544e7025da1381f3d2 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 27 Jan 2017 10:31:06 +0000 Subject: [PATCH 10/19] User if rather than for --- synapse/storage/devices.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py index d46203dd35..ad5411dbe7 100644 --- a/synapse/storage/devices.py +++ b/synapse/storage/devices.py @@ -150,6 +150,8 @@ class DeviceStore(SQLBaseStore): ) def mark_remote_user_device_list_as_unsubscribed(self, user_id): + """Mark that we no longer track device lists for remote user. + """ return self._simple_delete( table="device_lists_remote_extremeties", keyvalues={ @@ -394,7 +396,8 @@ class DeviceStore(SQLBaseStore): txn, [(user_id, None)], include_all_devices=True ) - for user_id, user_devices in devices.iteritems(): + if devices: + user_devices = devices[user_id] results = [] for device_id, device in user_devices.iteritems(): result = { From 738a2867c8d6e8c97b956b6a58c7373a49e60ddb Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 27 Jan 2017 10:31:29 +0000 Subject: [PATCH 11/19] SQL param ordering --- synapse/storage/devices.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py index ad5411dbe7..918520269e 100644 --- a/synapse/storage/devices.py +++ b/synapse/storage/devices.py @@ -260,7 +260,7 @@ class DeviceStore(SQLBaseStore): now_stream_id): sql = """ SELECT user_id, device_id, max(stream_id) FROM device_lists_outbound_pokes - WHERE destination = ? AND stream_id > ? AND stream_id <= ? AND sent = ? + WHERE destination = ? AND ? < stream_id AND stream_id <= ? AND sent = ? GROUP BY user_id, device_id """ txn.execute( From c517a19c2ddc042e5a87dfaaecc55a790f62ed71 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 27 Jan 2017 10:33:26 +0000 Subject: [PATCH 12/19] Comment --- synapse/storage/end_to_end_keys.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/synapse/storage/end_to_end_keys.py b/synapse/storage/end_to_end_keys.py index 441286d1a1..85763f7ceb 100644 --- a/synapse/storage/end_to_end_keys.py +++ b/synapse/storage/end_to_end_keys.py @@ -35,6 +35,8 @@ class EndToEndKeyStore(SQLBaseStore): """Fetch a list of device keys. Args: query_list(list): List of pairs of user_ids and device_ids. + include_all_devices (bool): whether to include entries for devices + that don't have device keys Returns: Dict mapping from user-id to dict mapping from device_id to dict containing "key_json", "device_display_name". From 84a35f32c72693e2fd98677dc4e26e14ca8d56c5 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 27 Jan 2017 10:35:12 +0000 Subject: [PATCH 13/19] Comment --- synapse/storage/devices.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py index 918520269e..00317b0c1f 100644 --- a/synapse/storage/devices.py +++ b/synapse/storage/devices.py @@ -285,6 +285,8 @@ class DeviceStore(SQLBaseStore): results = [] for user_id, user_devices in devices.iteritems(): + # We bind literal True, as its database dependent how booleans are + # handled. txn.execute(prev_sent_id_sql, (destination, user_id, True)) rows = txn.fetchall() prev_id = rows[0][0] From 252b503fc8626078141dc6b82eeff63607874347 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 27 Jan 2017 13:36:39 +0000 Subject: [PATCH 14/19] Hook device list updates to replication --- synapse/app/federation_sender.py | 3 +- synapse/app/synchrotron.py | 27 +++++++- synapse/handlers/device.py | 16 ----- synapse/handlers/sync.py | 35 +++++++--- synapse/replication/resource.py | 20 +++++- synapse/replication/slave/storage/devices.py | 72 ++++++++++++++++++++ synapse/storage/devices.py | 15 ++++ 7 files changed, 159 insertions(+), 29 deletions(-) create mode 100644 synapse/replication/slave/storage/devices.py diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py index ec06620efb..411e47d98d 100644 --- a/synapse/app/federation_sender.py +++ b/synapse/app/federation_sender.py @@ -30,6 +30,7 @@ from synapse.replication.slave.storage.events import SlavedEventStore from synapse.replication.slave.storage.receipts import SlavedReceiptsStore from synapse.replication.slave.storage.registration import SlavedRegistrationStore from synapse.replication.slave.storage.transactions import TransactionStore +from synapse.replication.slave.storage.devices import SlavedDeviceStore from synapse.storage.engines import create_engine from synapse.storage.presence import UserPresenceState from synapse.util.async import sleep @@ -56,7 +57,7 @@ logger = logging.getLogger("synapse.app.appservice") class FederationSenderSlaveStore( SlavedDeviceInboxStore, TransactionStore, SlavedReceiptsStore, SlavedEventStore, - SlavedRegistrationStore, + SlavedRegistrationStore, SlavedDeviceStore, ): pass diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index 4dfc2dc648..9d250502e0 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -39,6 +39,7 @@ from synapse.replication.slave.storage.filtering import SlavedFilteringStore from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore from synapse.replication.slave.storage.presence import SlavedPresenceStore from synapse.replication.slave.storage.deviceinbox import SlavedDeviceInboxStore +from synapse.replication.slave.storage.devices import SlavedDeviceStore from synapse.replication.slave.storage.room import RoomStore from synapse.server import HomeServer from synapse.storage.client_ips import ClientIpStore @@ -77,6 +78,7 @@ class SynchrotronSlavedStore( SlavedFilteringStore, SlavedPresenceStore, SlavedDeviceInboxStore, + SlavedDeviceStore, RoomStore, BaseSlavedStore, ClientIpStore, # After BaseSlavedStore because the constructor is different @@ -380,6 +382,28 @@ class SynchrotronServer(HomeServer): stream_key, position, users=users, rooms=rooms ) + @defer.inlineCallbacks + def notify_device_list_update(result): + stream = result.get("device_lists") + if not stream: + return + + position_index = stream["field_names"].index("position") + user_index = stream["field_names"].index("user_id") + + for row in stream["rows"]: + logger.info("Handling device list row: %r", row) + position = row[position_index] + user_id = row[user_index] + + rooms = yield store.get_rooms_for_user(user_id) + room_ids = [r.room_id for r in rooms] + + notifier.on_new_event( + "device_list_key", position, rooms=room_ids, + ) + + @defer.inlineCallbacks def notify(result): stream = result.get("events") if stream: @@ -417,6 +441,7 @@ class SynchrotronServer(HomeServer): notify_from_stream( result, "to_device", "to_device_key", user="user_id" ) + yield notify_device_list_update(result) while True: try: @@ -427,7 +452,7 @@ class SynchrotronServer(HomeServer): yield store.process_replication(result) typing_handler.process_replication(result) yield presence_handler.process_replication(result) - notify(result) + yield notify(result) except: logger.exception("Error replicating from %r", replication_url) yield sleep(5) diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index ed077c9a76..6fefb85890 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -220,22 +220,6 @@ class DeviceHandler(BaseHandler): for host in hosts: self.federation_sender.send_device_messages(host) - @defer.inlineCallbacks - def get_device_list_changes(self, user_id, room_ids, from_key): - """For a user and their joined rooms, calculate which device updates - we need to return. - """ - room_ids = frozenset(room_ids) - - user_ids_changed = set() - changed = yield self.store.get_user_whose_devices_changed(from_key) - for other_user_id in changed: - other_rooms = yield self.store.get_rooms_for_user(other_user_id) - if room_ids.intersection(e.room_id for e in other_rooms): - user_ids_changed.add(other_user_id) - - defer.returnValue(user_ids_changed) - @defer.inlineCallbacks def _incoming_device_list_update(self, origin, edu_content): user_id = edu_content["user_id"] diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 06bf626367..9199f20817 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -144,7 +144,6 @@ class SyncHandler(object): self.clock = hs.get_clock() self.response_cache = ResponseCache(hs) self.state = hs.get_state_handler() - self.device_handler = hs.get_device_handler() def wait_for_sync_for_user(self, sync_config, since_token=None, timeout=0, full_state=False): @@ -546,15 +545,9 @@ class SyncHandler(object): yield self._generate_sync_entry_for_to_device(sync_result_builder) - if since_token and since_token.device_list_key: - user_id = sync_config.user.to_string() - rooms = yield self.store.get_rooms_for_user(user_id) - joined_room_ids = set(r.room_id for r in rooms) - device_lists = yield self.device_handler.get_device_list_changes( - user_id, joined_room_ids, since_token.device_list_key - ) - else: - device_lists = [] + device_lists = yield self._generate_sync_entry_for_device_list( + sync_result_builder + ) defer.returnValue(SyncResult( presence=sync_result_builder.presence, @@ -567,6 +560,28 @@ class SyncHandler(object): next_batch=sync_result_builder.now_token, )) + @defer.inlineCallbacks + def _generate_sync_entry_for_device_list(self, sync_result_builder): + user_id = sync_result_builder.sync_config.user.to_string() + since_token = sync_result_builder.since_token + + if since_token and since_token.device_list_key: + rooms = yield self.store.get_rooms_for_user(user_id) + room_ids = set(r.room_id for r in rooms) + + user_ids_changed = set() + changed = yield self.store.get_user_whose_devices_changed( + since_token.device_list_key + ) + for other_user_id in changed: + other_rooms = yield self.store.get_rooms_for_user(other_user_id) + if room_ids.intersection(e.room_id for e in other_rooms): + user_ids_changed.add(other_user_id) + + defer.returnValue(user_ids_changed) + else: + defer.returnValue([]) + @defer.inlineCallbacks def _generate_sync_entry_for_to_device(self, sync_result_builder): """Generates the portion of the sync response. Populates diff --git a/synapse/replication/resource.py b/synapse/replication/resource.py index 4616e9b34a..36548c5eda 100644 --- a/synapse/replication/resource.py +++ b/synapse/replication/resource.py @@ -46,6 +46,7 @@ STREAM_NAMES = ( ("to_device",), ("public_rooms",), ("federation",), + ("device_lists",), ) @@ -140,6 +141,7 @@ class ReplicationResource(Resource): caches_token = self.store.get_cache_stream_token() public_rooms_token = self.store.get_current_public_room_stream_id() federation_token = self.federation_sender.get_current_token() + device_list_token = self.store.get_device_stream_token() defer.returnValue(_ReplicationToken( room_stream_token, @@ -155,6 +157,7 @@ class ReplicationResource(Resource): int(stream_token.to_device_key), int(public_rooms_token), int(federation_token), + int(device_list_token), )) @request_handler() @@ -214,6 +217,7 @@ class ReplicationResource(Resource): yield self.caches(writer, current_token, limit, request_streams) yield self.to_device(writer, current_token, limit, request_streams) yield self.public_rooms(writer, current_token, limit, request_streams) + yield self.device_lists(writer, current_token, limit, request_streams) self.federation(writer, current_token, limit, request_streams, federation_ack) self.streams(writer, current_token, request_streams) @@ -495,6 +499,20 @@ class ReplicationResource(Resource): "position", "type", "content", ), position=upto_token) + @defer.inlineCallbacks + def device_lists(self, writer, current_token, limit, request_streams): + current_position = current_token.device_lists + + device_lists = request_streams.get("device_lists") + + if device_lists is not None and device_lists != current_position: + changes = yield self.store.get_users_and_hosts_device_list_changes( + device_lists, + ) + writer.write_header_and_rows("device_lists", changes, ( + "position", "user_id", "destination", + ), position=current_position) + class _Writer(object): """Writes the streams as a JSON object as the response to the request""" @@ -527,7 +545,7 @@ class _Writer(object): class _ReplicationToken(collections.namedtuple("_ReplicationToken", ( "events", "presence", "typing", "receipts", "account_data", "backfill", "push_rules", "pushers", "state", "caches", "to_device", "public_rooms", - "federation", + "federation", "device_lists", ))): __slots__ = [] diff --git a/synapse/replication/slave/storage/devices.py b/synapse/replication/slave/storage/devices.py new file mode 100644 index 0000000000..ca46aa17b6 --- /dev/null +++ b/synapse/replication/slave/storage/devices.py @@ -0,0 +1,72 @@ +# -*- coding: utf-8 -*- +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ._base import BaseSlavedStore +from ._slaved_id_tracker import SlavedIdTracker +from synapse.storage import DataStore +from synapse.util.caches.stream_change_cache import StreamChangeCache + + +class SlavedDeviceStore(BaseSlavedStore): + def __init__(self, db_conn, hs): + super(SlavedDeviceStore, self).__init__(db_conn, hs) + + self.hs = hs + + self._device_list_id_gen = SlavedIdTracker( + db_conn, "device_lists_stream", "stream_id", + ) + device_list_max = self._device_list_id_gen.get_current_token() + self._device_list_stream_cache = StreamChangeCache( + "DeviceListStreamChangeCache", device_list_max, + ) + self._device_list_federation_stream_cache = StreamChangeCache( + "DeviceListFederationStreamChangeCache", device_list_max, + ) + + get_device_stream_token = DataStore.get_device_stream_token.__func__ + get_user_whose_devices_changed = DataStore.get_user_whose_devices_changed.__func__ + get_devices_by_remote = DataStore.get_devices_by_remote.__func__ + _get_devices_by_remote_txn = DataStore._get_devices_by_remote_txn.__func__ + _get_e2e_device_keys_txn = DataStore._get_e2e_device_keys_txn.__func__ + mark_as_sent_devices_by_remote = DataStore.mark_as_sent_devices_by_remote.__func__ + _mark_as_sent_devices_by_remote_txn = ( + DataStore._mark_as_sent_devices_by_remote_txn.__func__ + ) + + def stream_positions(self): + result = super(SlavedDeviceStore, self).stream_positions() + result["device_lists"] = self._device_list_id_gen.get_current_token() + return result + + def process_replication(self, result): + stream = result.get("device_lists") + if stream: + self._device_list_id_gen.advance(int(stream["position"])) + for row in stream["rows"]: + stream_id = row[0] + user_id = row[1] + destination = row[2] + + self._device_list_stream_cache.entity_has_changed( + user_id, stream_id + ) + + if destination: + self._device_list_federation_stream_cache.entity_has_changed( + destination, stream_id + ) + + return super(SlavedDeviceStore, self).process_replication(result) diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py index 00317b0c1f..2b2cebacfa 100644 --- a/synapse/storage/devices.py +++ b/synapse/storage/devices.py @@ -458,6 +458,21 @@ class DeviceStore(SQLBaseStore): rows = yield self._execute("get_user_whose_devices_changed", None, sql, from_key) defer.returnValue(set(row["user_id"] for row in rows)) + def get_users_and_hosts_device_list_changes(self, from_key): + """Return a list of `(stream_id, user_id, destination)` which is the + combined list of changes to devices, and which destinations need to be + poked. `destination` may be None if no destinations need to be poked. + """ + sql = """ + SELECT stream_id, user_id, destination FROM device_lists_stream + LEFT JOIN device_lists_outbound_pokes USING (stream_id, user_id, device_id) + WHERE stream_id > ? + """ + return self._execute( + "get_users_and_hosts_device_list", None, + sql, from_key, + ) + @defer.inlineCallbacks def add_device_change_to_streams(self, user_id, device_ids, hosts): """Persist that a user's devices have been updated, and which hosts From d1e1fd62108c9b285d7c57d357311c6d5df2190e Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 27 Jan 2017 15:23:48 +0000 Subject: [PATCH 15/19] Add ts column to device_lists_outbound_pokes --- synapse/storage/devices.py | 3 +++ synapse/storage/schema/delta/40/device_list_streams.sql | 3 ++- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py index 2b2cebacfa..89c7bc0cc0 100644 --- a/synapse/storage/devices.py +++ b/synapse/storage/devices.py @@ -486,6 +486,8 @@ class DeviceStore(SQLBaseStore): defer.returnValue(stream_id) def _add_device_change_txn(self, txn, user_id, device_ids, hosts, stream_id): + now = self._clock.time_msec() + txn.call_after( self._device_list_stream_cache.entity_has_changed, user_id, stream_id, @@ -519,6 +521,7 @@ class DeviceStore(SQLBaseStore): "user_id": user_id, "device_id": device_id, "sent": False, + "ts": now, } for destination in hosts for device_id in device_ids diff --git a/synapse/storage/schema/delta/40/device_list_streams.sql b/synapse/storage/schema/delta/40/device_list_streams.sql index 8348c143c3..54841b3843 100644 --- a/synapse/storage/schema/delta/40/device_list_streams.sql +++ b/synapse/storage/schema/delta/40/device_list_streams.sql @@ -51,7 +51,8 @@ CREATE TABLE device_lists_outbound_pokes ( stream_id BIGINT NOT NULL, user_id TEXT NOT NULL, device_id TEXT NOT NULL, - sent BOOLEAN NOT NULL + sent BOOLEAN NOT NULL, + ts BIGINT NOT NULL -- So that in future we can clear out pokes to dead servers ); CREATE INDEX device_lists_outbound_pokes_id ON device_lists_outbound_pokes(destination, stream_id); From 76100203aba979e21f3831e3a675ae4e3d578ad4 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 30 Jan 2017 10:11:46 +0000 Subject: [PATCH 16/19] Always use the latest stream_id, sent or unsent --- synapse/storage/devices.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py index 89c7bc0cc0..d72f60d94b 100644 --- a/synapse/storage/devices.py +++ b/synapse/storage/devices.py @@ -280,14 +280,14 @@ class DeviceStore(SQLBaseStore): prev_sent_id_sql = """ SELECT coalesce(max(stream_id), 0) as stream_id FROM device_lists_outbound_pokes - WHERE destination = ? AND user_id = ? AND sent = ? + WHERE destination = ? AND user_id = ? AND stream_id <= ? """ results = [] for user_id, user_devices in devices.iteritems(): - # We bind literal True, as its database dependent how booleans are - # handled. - txn.execute(prev_sent_id_sql, (destination, user_id, True)) + # The prev_id for the first row is always the last row before + # `from_stream_id` + txn.execute(prev_sent_id_sql, (destination, user_id, from_stream_id)) rows = txn.fetchall() prev_id = rows[0][0] for device_id, device in user_devices.iteritems(): From d360c97ae1e79789e97ab6a12e005d22334e416f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 30 Jan 2017 10:12:00 +0000 Subject: [PATCH 17/19] Clear out old destination pokes. --- synapse/storage/devices.py | 42 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 42 insertions(+) diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py index d72f60d94b..c05ca7c5e0 100644 --- a/synapse/storage/devices.py +++ b/synapse/storage/devices.py @@ -24,6 +24,13 @@ logger = logging.getLogger(__name__) class DeviceStore(SQLBaseStore): + def __init__(self, hs): + super(DeviceStore, self).__init__(hs) + + self._clock.looping_call( + self._prune_old_outbound_device_pokes, 60 * 60 * 1000 + ) + @defer.inlineCallbacks def store_device(self, user_id, device_id, initial_device_display_name): @@ -530,3 +537,38 @@ class DeviceStore(SQLBaseStore): def get_device_stream_token(self): return self._device_list_id_gen.get_current_token() + + def _prune_old_outbound_device_pokes(self): + """Delete old entries out of the device_lists_outbound_pokes to ensure + that we don't fill up due to dead servers. We keep one entry per + (destination, user_id) tuple to ensure that the prev_ids remain correct + if the server does come back. + """ + now = self._clock.time_msec() + + def _prune_txn(txn): + select_sql = """ + SELECT destination, user_id, max(stream_id) as stream_id + FROM device_lists_outbound_pokes + GROUP BY destination, user_id + """ + + txn.execute(select_sql) + rows = txn.fetchall() + + delete_sql = """ + DELETE FROM device_lists_outbound_pokes + WHERE ts < ? AND destination = ? AND user_id = ? AND stream_id < ? + """ + + txn.executemany( + delete_sql, + ( + (now, row["destination"], row["user_id"], row["stream_id"]) + for row in rows + ) + ) + + return self.runInteraction( + "_prune_old_outbound_device_pokes", _prune_txn + ) From 4ac363a16886b05cf15064932b6510cdff729c57 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 30 Jan 2017 14:10:12 +0000 Subject: [PATCH 18/19] Remove debug logging --- synapse/app/synchrotron.py | 1 - 1 file changed, 1 deletion(-) diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index 9d250502e0..b3fb408cfd 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -392,7 +392,6 @@ class SynchrotronServer(HomeServer): user_index = stream["field_names"].index("user_id") for row in stream["rows"]: - logger.info("Handling device list row: %r", row) position = row[position_index] user_id = row[user_index] From 3670025e641e338de14a012a24d0ceb1cade194c Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 30 Jan 2017 14:11:31 +0000 Subject: [PATCH 19/19] Rename func --- synapse/replication/resource.py | 2 +- synapse/storage/devices.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/replication/resource.py b/synapse/replication/resource.py index 36548c5eda..a30e647474 100644 --- a/synapse/replication/resource.py +++ b/synapse/replication/resource.py @@ -506,7 +506,7 @@ class ReplicationResource(Resource): device_lists = request_streams.get("device_lists") if device_lists is not None and device_lists != current_position: - changes = yield self.store.get_users_and_hosts_device_list_changes( + changes = yield self.store.get_all_device_list_changes_for_remotes( device_lists, ) writer.write_header_and_rows("device_lists", changes, ( diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py index c05ca7c5e0..e68ee50152 100644 --- a/synapse/storage/devices.py +++ b/synapse/storage/devices.py @@ -465,7 +465,7 @@ class DeviceStore(SQLBaseStore): rows = yield self._execute("get_user_whose_devices_changed", None, sql, from_key) defer.returnValue(set(row["user_id"] for row in rows)) - def get_users_and_hosts_device_list_changes(self, from_key): + def get_all_device_list_changes_for_remotes(self, from_key): """Return a list of `(stream_id, user_id, destination)` which is the combined list of changes to devices, and which destinations need to be poked. `destination` may be None if no destinations need to be poked.