diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index d68f0d8950..1c466cbded 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -243,6 +243,17 @@ class TransactionQueue(object): next_token, "federation_sender", ) + if events: + now = self.clock.time_msec() + ts = yield self.store.get_received_ts(events[-1].event_id) + + synapse.metrics.event_processing_lag.set( + now - ts, "federation_sender", + ) + synapse.metrics.event_processing_last_ts.set( + ts, "federation_sender", + ) + finally: self._is_processing = False diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 2a7e31e711..fcc1246bee 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -115,6 +115,16 @@ class ApplicationServicesHandler(object): synapse.metrics.event_processing_positions.set( upper_bound, "appservice_sender", ) + + now = self.clock.time_msec() + ts = yield self.store.get_received_ts(events[-1].event_id) + + synapse.metrics.event_processing_lag.set( + now - ts, "appservice_sender", + ) + synapse.metrics.event_processing_last_ts.set( + ts, "appservice_sender", + ) finally: self.is_processing = False diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py index a6c0d7e1bf..e3b831db67 100644 --- a/synapse/metrics/__init__.py +++ b/synapse/metrics/__init__.py @@ -164,6 +164,19 @@ event_persisted_position = synapse_metrics.register_gauge( "event_persisted_position", ) +# Used to track the received_ts of the last event processed by various +# components +event_processing_last_ts = synapse_metrics.register_gauge( + "event_processing_last_ts", labels=["name"], +) + +# Used to track the lag processing events. This is the time difference +# between the last processed event's received_ts and the time it was +# finished being processed. +event_processing_lag = synapse_metrics.register_gauge( + "event_processing_lag", labels=["name"], +) + def runUntilCurrentTimer(func): diff --git a/synapse/storage/events_worker.py b/synapse/storage/events_worker.py index 2e23dd78ba..769eb51489 100644 --- a/synapse/storage/events_worker.py +++ b/synapse/storage/events_worker.py @@ -51,6 +51,24 @@ _EventCacheEntry = namedtuple("_EventCacheEntry", ("event", "redacted_event")) class EventsWorkerStore(SQLBaseStore): + def get_received_ts(self, event_id): + """Get received_ts (when it was persisted) for the event + + Args: + event_id (str) + + Returns: + Deferred[int|None]: Timstamp in milliseconds, or None for events + that were persisted before received_ts was implemented. + """ + return self._simple_select_one_onecol( + table="events", + keyvalues={ + "event_id": event_id, + }, + retcol="received_ts", + desc="get_received_ts", + ) @defer.inlineCallbacks def get_event(self, event_id, check_redacted=True,