From 70aee0717c22acf7eabb5f158cbaf527137bc90e Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 6 Jun 2016 11:08:12 +0100 Subject: [PATCH 1/9] Add events to cache when we persist them --- synapse/storage/events.py | 41 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 41 insertions(+) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 5db24e86f9..16398dc0a8 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -635,6 +635,8 @@ class EventsStore(SQLBaseStore): ], ) + self._add_to_cache(txn, events_and_contexts) + if backfilled: # Backfilled events come before the current state so we don't need # to update the current state table @@ -676,6 +678,45 @@ class EventsStore(SQLBaseStore): return + def _add_to_cache(self, txn, events_and_contexts): + to_prefill = [] + + rows = [] + N = 200 + for i in range(0, len(events_and_contexts), N): + ev_map = { + e[0].event_id: e[0] + for e in events_and_contexts[i:i + N] + } + if not ev_map: + break + + sql = ( + "SELECT " + " e.event_id as event_id, " + " r.redacts as redacts," + " rej.event_id as rejects " + " FROM events as e" + " LEFT JOIN rejections as rej USING (event_id)" + " LEFT JOIN redactions as r ON e.event_id = r.redacts" + " WHERE e.event_id IN (%s)" + ) % (",".join(["?"] * len(ev_map)),) + + txn.execute(sql, ev_map.keys()) + rows = self.cursor_to_dict(txn) + for row in rows: + event = ev_map[row["event_id"]] + if not row["rejects"] and not row["redacts"]: + to_prefill.append(_EventCacheEntry( + event=event, + redacted_event=None, + )) + + def prefill(): + for cache_entry in to_prefill: + self._get_event_cache.prefill((cache_entry[0].event_id,), cache_entry) + txn.call_after(prefill) + def _store_redaction(self, txn, event): # invalidate the cache for the redacted event txn.call_after(self._invalidate_get_event_cache, event.redacts) From 7aa778fba9bb81087c3a1029e0a0d4ff55b1a065 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 6 Jun 2016 11:58:09 +0100 Subject: [PATCH 2/9] Add metric counter for number of persisted events --- synapse/storage/events.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 5db24e86f9..ff4f742f6a 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -27,6 +27,9 @@ from synapse.api.constants import EventTypes from canonicaljson import encode_canonical_json from collections import deque, namedtuple +import synapse +import synapse.metrics + import logging import math @@ -35,6 +38,10 @@ import ujson as json logger = logging.getLogger(__name__) +metrics = synapse.metrics.get_metrics_for(__name__) +persist_event_counter = metrics.register_counter("persisted_events") + + def encode_json(json_object): if USE_FROZEN_DICTS: # ujson doesn't like frozen_dicts @@ -261,6 +268,7 @@ class EventsStore(SQLBaseStore): events_and_contexts=chunk, backfilled=backfilled, ) + persist_event_counter.inc_by(len(chunk)) @defer.inlineCallbacks @log_function @@ -278,6 +286,7 @@ class EventsStore(SQLBaseStore): current_state=current_state, backfilled=backfilled, ) + persist_event_counter.inc() except _RollbackButIsFineException: pass From 377eb480ca66a376e85cf8927f7f9112ed60e8bc Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 6 Jun 2016 15:14:21 +0100 Subject: [PATCH 3/9] Fire after 30s not 8h --- synapse/handlers/presence.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 0e19f777b8..2e772da660 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -183,7 +183,7 @@ class PresenceHandler(object): # The initial delay is to allow disconnected clients a chance to # reconnect before we treat them as offline. self.clock.call_later( - 30 * 1000, + 30, self.clock.looping_call, self._handle_timeouts, 5000, From 96dc600579cd6ef9937b0e007f51aa4da0fc122d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 6 Jun 2016 15:44:41 +0100 Subject: [PATCH 4/9] Fix typos --- synapse/handlers/presence.py | 70 +++++++++++++++++++----------------- 1 file changed, 37 insertions(+), 33 deletions(-) diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 2e772da660..94160a5be7 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -283,44 +283,48 @@ class PresenceHandler(object): """Checks the presence of users that have timed out and updates as appropriate. """ + logger.info("Handling presence timeouts") now = self.clock.time_msec() - with Measure(self.clock, "presence_handle_timeouts"): - # Fetch the list of users that *may* have timed out. Things may have - # changed since the timeout was set, so we won't necessarily have to - # take any action. - users_to_check = set(self.wheel_timer.fetch(now)) + try: + with Measure(self.clock, "presence_handle_timeouts"): + # Fetch the list of users that *may* have timed out. Things may have + # changed since the timeout was set, so we won't necessarily have to + # take any action. + users_to_check = set(self.wheel_timer.fetch(now)) - # Check whether the lists of syncing processes from an external - # process have expired. - expired_process_ids = [ - process_id for process_id, last_update - in self.external_process_last_update.items() - if now - last_update > EXTERNAL_PROCESS_EXPIRY - ] - for process_id in expired_process_ids: - users_to_check.update( - self.external_process_to_current_syncs.pop(process_id, ()) + # Check whether the lists of syncing processes from an external + # process have expired. + expired_process_ids = [ + process_id for process_id, last_update + in self.external_process_last_updated_ms.items() + if now - last_update > EXTERNAL_PROCESS_EXPIRY + ] + for process_id in expired_process_ids: + users_to_check.update( + self.external_process_last_updated_ms.pop(process_id, ()) + ) + self.external_process_last_update.pop(process_id) + + states = [ + self.user_to_current_state.get( + user_id, UserPresenceState.default(user_id) + ) + for user_id in users_to_check + ] + + timers_fired_counter.inc_by(len(states)) + + changes = handle_timeouts( + states, + is_mine_fn=self.is_mine_id, + syncing_user_ids=self.get_currently_syncing_users(), + now=now, ) - self.external_process_last_update.pop(process_id) - states = [ - self.user_to_current_state.get( - user_id, UserPresenceState.default(user_id) - ) - for user_id in users_to_check - ] - - timers_fired_counter.inc_by(len(states)) - - changes = handle_timeouts( - states, - is_mine_fn=self.is_mine_id, - syncing_users=self.get_syncing_users(), - now=now, - ) - - preserve_fn(self._update_states)(changes) + preserve_fn(self._update_states)(changes) + except: + logger.exception("Exception in _handle_timeouts loop") @defer.inlineCallbacks def bump_presence_active_time(self, user): From 216a05b3e39e08b0600a39fc111b4d669d06ff7c Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 6 Jun 2016 16:00:09 +0100 Subject: [PATCH 5/9] .values() returns list of sets --- synapse/handlers/presence.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 94160a5be7..6b70fa3817 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -406,7 +406,8 @@ class PresenceHandler(object): user_id for user_id, count in self.user_to_num_current_syncs.items() if count } - syncing_user_ids.update(self.external_process_to_current_syncs.values()) + for user_ids in self.external_process_to_current_syncs.values(): + syncing_user_ids.update(user_ids) return syncing_user_ids @defer.inlineCallbacks From 5ef84da4f11f1b1cceb0c44d9867bb597ee68e64 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Mon, 6 Jun 2016 16:05:28 +0100 Subject: [PATCH 6/9] Yield on the sleeps intended to backoff replication --- synapse/app/pusher.py | 2 +- synapse/app/synchrotron.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py index f1de1e7ce9..3c3fa38053 100644 --- a/synapse/app/pusher.py +++ b/synapse/app/pusher.py @@ -311,7 +311,7 @@ class PusherServer(HomeServer): poke_pushers(result) except: logger.exception("Error replicating from %r", replication_url) - sleep(30) + yield sleep(30) def setup(config_options): diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index aa81e1c5da..7273055cc1 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -443,7 +443,7 @@ class SynchrotronServer(HomeServer): notify(result) except: logger.exception("Error replicating from %r", replication_url) - sleep(5) + yield sleep(5) def build_presence_handler(self): return SynchrotronPresence(self) From 4a5bbb1941ae63f1d6632aa35e80274e56c8dbb9 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Mon, 6 Jun 2016 16:37:12 +0100 Subject: [PATCH 7/9] Fix a KeyError in the synchrotron presence --- synapse/app/synchrotron.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index aa81e1c5da..3d0d5cc15a 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -187,7 +187,10 @@ class SynchrotronPresence(object): yield self._send_syncing_users_now() def _end(): - if affect_presence: + # We check that the user_id is in user_to_num_current_syncs because + # user_to_num_current_syncs may have been cleared if we are + # shutting down. + if affect_presence and user_id in self.user_to_num_current_syncs: self.user_to_num_current_syncs[user_id] -= 1 @contextlib.contextmanager From 310197bab5cf8ed2c26fae522f15f092dbcdff58 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 7 Jun 2016 09:34:50 +0100 Subject: [PATCH 8/9] Fix AS retries --- synapse/storage/appservice.py | 4 ++-- tests/storage/test_appservice.py | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py index ffb7d4a25b..a281571637 100644 --- a/synapse/storage/appservice.py +++ b/synapse/storage/appservice.py @@ -320,10 +320,10 @@ class ApplicationServiceTransactionStore(SQLBaseStore): event_ids = json.loads(entry["event_ids"]) - events = yield self.get_events(event_ids) + event_map = yield self.get_events(event_ids) defer.returnValue(AppServiceTransaction( - service=service, id=entry["txn_id"], events=events + service=service, id=entry["txn_id"], events=event_map.values() )) def _get_oldest_unsent_txn(self, txn, service): diff --git a/tests/storage/test_appservice.py b/tests/storage/test_appservice.py index f44c4870e3..6db4b966db 100644 --- a/tests/storage/test_appservice.py +++ b/tests/storage/test_appservice.py @@ -353,21 +353,21 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase): @defer.inlineCallbacks def test_get_oldest_unsent_txn(self): service = Mock(id=self.as_list[0]["id"]) - events = [Mock(event_id="e1"), Mock(event_id="e2")] + events = {"e1": Mock(event_id="e1"), "e2": Mock(event_id="e2")} other_events = [Mock(event_id="e5"), Mock(event_id="e6")] # we aren't testing store._base stuff here, so mock this out self.store.get_events = Mock(return_value=events) yield self._insert_txn(self.as_list[1]["id"], 9, other_events) - yield self._insert_txn(service.id, 10, events) + yield self._insert_txn(service.id, 10, events.values()) yield self._insert_txn(service.id, 11, other_events) yield self._insert_txn(service.id, 12, other_events) txn = yield self.store.get_oldest_unsent_txn(service) self.assertEquals(service, txn.service) self.assertEquals(10, txn.id) - self.assertEquals(events, txn.events) + self.assertEquals(events.values(), txn.events) @defer.inlineCallbacks def test_get_appservices_by_state_single(self): From 84379062f9ec259abc302af321d4ed8f5a958c01 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 7 Jun 2016 10:24:50 +0100 Subject: [PATCH 9/9] Fix AS retries, but with correct ordering --- synapse/storage/appservice.py | 4 ++-- tests/storage/test_appservice.py | 8 ++++---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py index a281571637..d1ee533fac 100644 --- a/synapse/storage/appservice.py +++ b/synapse/storage/appservice.py @@ -320,10 +320,10 @@ class ApplicationServiceTransactionStore(SQLBaseStore): event_ids = json.loads(entry["event_ids"]) - event_map = yield self.get_events(event_ids) + events = yield self._get_events(event_ids) defer.returnValue(AppServiceTransaction( - service=service, id=entry["txn_id"], events=event_map.values() + service=service, id=entry["txn_id"], events=events )) def _get_oldest_unsent_txn(self, txn, service): diff --git a/tests/storage/test_appservice.py b/tests/storage/test_appservice.py index 6db4b966db..3e2862daae 100644 --- a/tests/storage/test_appservice.py +++ b/tests/storage/test_appservice.py @@ -353,21 +353,21 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase): @defer.inlineCallbacks def test_get_oldest_unsent_txn(self): service = Mock(id=self.as_list[0]["id"]) - events = {"e1": Mock(event_id="e1"), "e2": Mock(event_id="e2")} + events = [Mock(event_id="e1"), Mock(event_id="e2")] other_events = [Mock(event_id="e5"), Mock(event_id="e6")] # we aren't testing store._base stuff here, so mock this out - self.store.get_events = Mock(return_value=events) + self.store._get_events = Mock(return_value=events) yield self._insert_txn(self.as_list[1]["id"], 9, other_events) - yield self._insert_txn(service.id, 10, events.values()) + yield self._insert_txn(service.id, 10, events) yield self._insert_txn(service.id, 11, other_events) yield self._insert_txn(service.id, 12, other_events) txn = yield self.store.get_oldest_unsent_txn(service) self.assertEquals(service, txn.service) self.assertEquals(10, txn.id) - self.assertEquals(events.values(), txn.events) + self.assertEquals(events, txn.events) @defer.inlineCallbacks def test_get_appservices_by_state_single(self):