Merge branch 'develop' of github.com:matrix-org/synapse into erikj/timings

This commit is contained in:
Erik Johnston 2016-06-07 11:07:27 +01:00
commit dbd788c262
6 changed files with 98 additions and 40 deletions

View file

@ -311,7 +311,7 @@ class PusherServer(HomeServer):
poke_pushers(result) poke_pushers(result)
except: except:
logger.exception("Error replicating from %r", replication_url) logger.exception("Error replicating from %r", replication_url)
sleep(30) yield sleep(30)
def setup(config_options): def setup(config_options):

View file

@ -187,7 +187,10 @@ class SynchrotronPresence(object):
yield self._send_syncing_users_now() yield self._send_syncing_users_now()
def _end(): def _end():
if affect_presence: # We check that the user_id is in user_to_num_current_syncs because
# user_to_num_current_syncs may have been cleared if we are
# shutting down.
if affect_presence and user_id in self.user_to_num_current_syncs:
self.user_to_num_current_syncs[user_id] -= 1 self.user_to_num_current_syncs[user_id] -= 1
@contextlib.contextmanager @contextlib.contextmanager
@ -443,7 +446,7 @@ class SynchrotronServer(HomeServer):
notify(result) notify(result)
except: except:
logger.exception("Error replicating from %r", replication_url) logger.exception("Error replicating from %r", replication_url)
sleep(5) yield sleep(5)
def build_presence_handler(self): def build_presence_handler(self):
return SynchrotronPresence(self) return SynchrotronPresence(self)

View file

@ -183,7 +183,7 @@ class PresenceHandler(object):
# The initial delay is to allow disconnected clients a chance to # The initial delay is to allow disconnected clients a chance to
# reconnect before we treat them as offline. # reconnect before we treat them as offline.
self.clock.call_later( self.clock.call_later(
30 * 1000, 30,
self.clock.looping_call, self.clock.looping_call,
self._handle_timeouts, self._handle_timeouts,
5000, 5000,
@ -283,44 +283,48 @@ class PresenceHandler(object):
"""Checks the presence of users that have timed out and updates as """Checks the presence of users that have timed out and updates as
appropriate. appropriate.
""" """
logger.info("Handling presence timeouts")
now = self.clock.time_msec() now = self.clock.time_msec()
with Measure(self.clock, "presence_handle_timeouts"): try:
# Fetch the list of users that *may* have timed out. Things may have with Measure(self.clock, "presence_handle_timeouts"):
# changed since the timeout was set, so we won't necessarily have to # Fetch the list of users that *may* have timed out. Things may have
# take any action. # changed since the timeout was set, so we won't necessarily have to
users_to_check = set(self.wheel_timer.fetch(now)) # take any action.
users_to_check = set(self.wheel_timer.fetch(now))
# Check whether the lists of syncing processes from an external # Check whether the lists of syncing processes from an external
# process have expired. # process have expired.
expired_process_ids = [ expired_process_ids = [
process_id for process_id, last_update process_id for process_id, last_update
in self.external_process_last_update.items() in self.external_process_last_updated_ms.items()
if now - last_update > EXTERNAL_PROCESS_EXPIRY if now - last_update > EXTERNAL_PROCESS_EXPIRY
] ]
for process_id in expired_process_ids: for process_id in expired_process_ids:
users_to_check.update( users_to_check.update(
self.external_process_to_current_syncs.pop(process_id, ()) self.external_process_last_updated_ms.pop(process_id, ())
)
self.external_process_last_update.pop(process_id)
states = [
self.user_to_current_state.get(
user_id, UserPresenceState.default(user_id)
)
for user_id in users_to_check
]
timers_fired_counter.inc_by(len(states))
changes = handle_timeouts(
states,
is_mine_fn=self.is_mine_id,
syncing_user_ids=self.get_currently_syncing_users(),
now=now,
) )
self.external_process_last_update.pop(process_id)
states = [ preserve_fn(self._update_states)(changes)
self.user_to_current_state.get( except:
user_id, UserPresenceState.default(user_id) logger.exception("Exception in _handle_timeouts loop")
)
for user_id in users_to_check
]
timers_fired_counter.inc_by(len(states))
changes = handle_timeouts(
states,
is_mine_fn=self.is_mine_id,
syncing_users=self.get_syncing_users(),
now=now,
)
preserve_fn(self._update_states)(changes)
@defer.inlineCallbacks @defer.inlineCallbacks
def bump_presence_active_time(self, user): def bump_presence_active_time(self, user):
@ -402,7 +406,8 @@ class PresenceHandler(object):
user_id for user_id, count in self.user_to_num_current_syncs.items() user_id for user_id, count in self.user_to_num_current_syncs.items()
if count if count
} }
syncing_user_ids.update(self.external_process_to_current_syncs.values()) for user_ids in self.external_process_to_current_syncs.values():
syncing_user_ids.update(user_ids)
return syncing_user_ids return syncing_user_ids
@defer.inlineCallbacks @defer.inlineCallbacks

View file

@ -320,7 +320,7 @@ class ApplicationServiceTransactionStore(SQLBaseStore):
event_ids = json.loads(entry["event_ids"]) event_ids = json.loads(entry["event_ids"])
events = yield self.get_events(event_ids) events = yield self._get_events(event_ids)
defer.returnValue(AppServiceTransaction( defer.returnValue(AppServiceTransaction(
service=service, id=entry["txn_id"], events=events service=service, id=entry["txn_id"], events=events

View file

@ -27,6 +27,9 @@ from synapse.api.constants import EventTypes
from canonicaljson import encode_canonical_json from canonicaljson import encode_canonical_json
from collections import deque, namedtuple from collections import deque, namedtuple
import synapse
import synapse.metrics
import logging import logging
import math import math
@ -35,6 +38,10 @@ import ujson as json
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
metrics = synapse.metrics.get_metrics_for(__name__)
persist_event_counter = metrics.register_counter("persisted_events")
def encode_json(json_object): def encode_json(json_object):
if USE_FROZEN_DICTS: if USE_FROZEN_DICTS:
# ujson doesn't like frozen_dicts # ujson doesn't like frozen_dicts
@ -261,6 +268,7 @@ class EventsStore(SQLBaseStore):
events_and_contexts=chunk, events_and_contexts=chunk,
backfilled=backfilled, backfilled=backfilled,
) )
persist_event_counter.inc_by(len(chunk))
@defer.inlineCallbacks @defer.inlineCallbacks
@log_function @log_function
@ -278,6 +286,7 @@ class EventsStore(SQLBaseStore):
current_state=current_state, current_state=current_state,
backfilled=backfilled, backfilled=backfilled,
) )
persist_event_counter.inc()
except _RollbackButIsFineException: except _RollbackButIsFineException:
pass pass
@ -635,6 +644,8 @@ class EventsStore(SQLBaseStore):
], ],
) )
self._add_to_cache(txn, events_and_contexts)
if backfilled: if backfilled:
# Backfilled events come before the current state so we don't need # Backfilled events come before the current state so we don't need
# to update the current state table # to update the current state table
@ -676,6 +687,45 @@ class EventsStore(SQLBaseStore):
return return
def _add_to_cache(self, txn, events_and_contexts):
to_prefill = []
rows = []
N = 200
for i in range(0, len(events_and_contexts), N):
ev_map = {
e[0].event_id: e[0]
for e in events_and_contexts[i:i + N]
}
if not ev_map:
break
sql = (
"SELECT "
" e.event_id as event_id, "
" r.redacts as redacts,"
" rej.event_id as rejects "
" FROM events as e"
" LEFT JOIN rejections as rej USING (event_id)"
" LEFT JOIN redactions as r ON e.event_id = r.redacts"
" WHERE e.event_id IN (%s)"
) % (",".join(["?"] * len(ev_map)),)
txn.execute(sql, ev_map.keys())
rows = self.cursor_to_dict(txn)
for row in rows:
event = ev_map[row["event_id"]]
if not row["rejects"] and not row["redacts"]:
to_prefill.append(_EventCacheEntry(
event=event,
redacted_event=None,
))
def prefill():
for cache_entry in to_prefill:
self._get_event_cache.prefill((cache_entry[0].event_id,), cache_entry)
txn.call_after(prefill)
def _store_redaction(self, txn, event): def _store_redaction(self, txn, event):
# invalidate the cache for the redacted event # invalidate the cache for the redacted event
txn.call_after(self._invalidate_get_event_cache, event.redacts) txn.call_after(self._invalidate_get_event_cache, event.redacts)

View file

@ -357,7 +357,7 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase):
other_events = [Mock(event_id="e5"), Mock(event_id="e6")] other_events = [Mock(event_id="e5"), Mock(event_id="e6")]
# we aren't testing store._base stuff here, so mock this out # we aren't testing store._base stuff here, so mock this out
self.store.get_events = Mock(return_value=events) self.store._get_events = Mock(return_value=events)
yield self._insert_txn(self.as_list[1]["id"], 9, other_events) yield self._insert_txn(self.as_list[1]["id"], 9, other_events)
yield self._insert_txn(service.id, 10, events) yield self._insert_txn(service.id, 10, events)