diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py index f1de1e7ce9..3c3fa38053 100644 --- a/synapse/app/pusher.py +++ b/synapse/app/pusher.py @@ -311,7 +311,7 @@ class PusherServer(HomeServer): poke_pushers(result) except: logger.exception("Error replicating from %r", replication_url) - sleep(30) + yield sleep(30) def setup(config_options): diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index aa81e1c5da..5c552ffb29 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -187,7 +187,10 @@ class SynchrotronPresence(object): yield self._send_syncing_users_now() def _end(): - if affect_presence: + # We check that the user_id is in user_to_num_current_syncs because + # user_to_num_current_syncs may have been cleared if we are + # shutting down. + if affect_presence and user_id in self.user_to_num_current_syncs: self.user_to_num_current_syncs[user_id] -= 1 @contextlib.contextmanager @@ -443,7 +446,7 @@ class SynchrotronServer(HomeServer): notify(result) except: logger.exception("Error replicating from %r", replication_url) - sleep(5) + yield sleep(5) def build_presence_handler(self): return SynchrotronPresence(self) diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 0e19f777b8..6b70fa3817 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -183,7 +183,7 @@ class PresenceHandler(object): # The initial delay is to allow disconnected clients a chance to # reconnect before we treat them as offline. self.clock.call_later( - 30 * 1000, + 30, self.clock.looping_call, self._handle_timeouts, 5000, @@ -283,44 +283,48 @@ class PresenceHandler(object): """Checks the presence of users that have timed out and updates as appropriate. """ + logger.info("Handling presence timeouts") now = self.clock.time_msec() - with Measure(self.clock, "presence_handle_timeouts"): - # Fetch the list of users that *may* have timed out. Things may have - # changed since the timeout was set, so we won't necessarily have to - # take any action. - users_to_check = set(self.wheel_timer.fetch(now)) + try: + with Measure(self.clock, "presence_handle_timeouts"): + # Fetch the list of users that *may* have timed out. Things may have + # changed since the timeout was set, so we won't necessarily have to + # take any action. + users_to_check = set(self.wheel_timer.fetch(now)) - # Check whether the lists of syncing processes from an external - # process have expired. - expired_process_ids = [ - process_id for process_id, last_update - in self.external_process_last_update.items() - if now - last_update > EXTERNAL_PROCESS_EXPIRY - ] - for process_id in expired_process_ids: - users_to_check.update( - self.external_process_to_current_syncs.pop(process_id, ()) + # Check whether the lists of syncing processes from an external + # process have expired. + expired_process_ids = [ + process_id for process_id, last_update + in self.external_process_last_updated_ms.items() + if now - last_update > EXTERNAL_PROCESS_EXPIRY + ] + for process_id in expired_process_ids: + users_to_check.update( + self.external_process_last_updated_ms.pop(process_id, ()) + ) + self.external_process_last_update.pop(process_id) + + states = [ + self.user_to_current_state.get( + user_id, UserPresenceState.default(user_id) + ) + for user_id in users_to_check + ] + + timers_fired_counter.inc_by(len(states)) + + changes = handle_timeouts( + states, + is_mine_fn=self.is_mine_id, + syncing_user_ids=self.get_currently_syncing_users(), + now=now, ) - self.external_process_last_update.pop(process_id) - states = [ - self.user_to_current_state.get( - user_id, UserPresenceState.default(user_id) - ) - for user_id in users_to_check - ] - - timers_fired_counter.inc_by(len(states)) - - changes = handle_timeouts( - states, - is_mine_fn=self.is_mine_id, - syncing_users=self.get_syncing_users(), - now=now, - ) - - preserve_fn(self._update_states)(changes) + preserve_fn(self._update_states)(changes) + except: + logger.exception("Exception in _handle_timeouts loop") @defer.inlineCallbacks def bump_presence_active_time(self, user): @@ -402,7 +406,8 @@ class PresenceHandler(object): user_id for user_id, count in self.user_to_num_current_syncs.items() if count } - syncing_user_ids.update(self.external_process_to_current_syncs.values()) + for user_ids in self.external_process_to_current_syncs.values(): + syncing_user_ids.update(user_ids) return syncing_user_ids @defer.inlineCallbacks diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py index ffb7d4a25b..d1ee533fac 100644 --- a/synapse/storage/appservice.py +++ b/synapse/storage/appservice.py @@ -320,7 +320,7 @@ class ApplicationServiceTransactionStore(SQLBaseStore): event_ids = json.loads(entry["event_ids"]) - events = yield self.get_events(event_ids) + events = yield self._get_events(event_ids) defer.returnValue(AppServiceTransaction( service=service, id=entry["txn_id"], events=events diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 5db24e86f9..6d978ffcd5 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -27,6 +27,9 @@ from synapse.api.constants import EventTypes from canonicaljson import encode_canonical_json from collections import deque, namedtuple +import synapse +import synapse.metrics + import logging import math @@ -35,6 +38,10 @@ import ujson as json logger = logging.getLogger(__name__) +metrics = synapse.metrics.get_metrics_for(__name__) +persist_event_counter = metrics.register_counter("persisted_events") + + def encode_json(json_object): if USE_FROZEN_DICTS: # ujson doesn't like frozen_dicts @@ -261,6 +268,7 @@ class EventsStore(SQLBaseStore): events_and_contexts=chunk, backfilled=backfilled, ) + persist_event_counter.inc_by(len(chunk)) @defer.inlineCallbacks @log_function @@ -278,6 +286,7 @@ class EventsStore(SQLBaseStore): current_state=current_state, backfilled=backfilled, ) + persist_event_counter.inc() except _RollbackButIsFineException: pass @@ -635,6 +644,8 @@ class EventsStore(SQLBaseStore): ], ) + self._add_to_cache(txn, events_and_contexts) + if backfilled: # Backfilled events come before the current state so we don't need # to update the current state table @@ -676,6 +687,45 @@ class EventsStore(SQLBaseStore): return + def _add_to_cache(self, txn, events_and_contexts): + to_prefill = [] + + rows = [] + N = 200 + for i in range(0, len(events_and_contexts), N): + ev_map = { + e[0].event_id: e[0] + for e in events_and_contexts[i:i + N] + } + if not ev_map: + break + + sql = ( + "SELECT " + " e.event_id as event_id, " + " r.redacts as redacts," + " rej.event_id as rejects " + " FROM events as e" + " LEFT JOIN rejections as rej USING (event_id)" + " LEFT JOIN redactions as r ON e.event_id = r.redacts" + " WHERE e.event_id IN (%s)" + ) % (",".join(["?"] * len(ev_map)),) + + txn.execute(sql, ev_map.keys()) + rows = self.cursor_to_dict(txn) + for row in rows: + event = ev_map[row["event_id"]] + if not row["rejects"] and not row["redacts"]: + to_prefill.append(_EventCacheEntry( + event=event, + redacted_event=None, + )) + + def prefill(): + for cache_entry in to_prefill: + self._get_event_cache.prefill((cache_entry[0].event_id,), cache_entry) + txn.call_after(prefill) + def _store_redaction(self, txn, event): # invalidate the cache for the redacted event txn.call_after(self._invalidate_get_event_cache, event.redacts) diff --git a/tests/storage/test_appservice.py b/tests/storage/test_appservice.py index f44c4870e3..3e2862daae 100644 --- a/tests/storage/test_appservice.py +++ b/tests/storage/test_appservice.py @@ -357,7 +357,7 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase): other_events = [Mock(event_id="e5"), Mock(event_id="e6")] # we aren't testing store._base stuff here, so mock this out - self.store.get_events = Mock(return_value=events) + self.store._get_events = Mock(return_value=events) yield self._insert_txn(self.as_list[1]["id"], 9, other_events) yield self._insert_txn(service.id, 10, events)