Add some metrics to staging area (#10284)

This commit is contained in:
Erik Johnston 2021-07-01 10:18:25 +01:00 committed by GitHub
parent 04c8f308f4
commit 76addadd7c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 40 additions and 0 deletions

View file

@ -0,0 +1 @@
Add metrics for new inbound federation staging area.

View file

@ -16,6 +16,8 @@ import logging
from queue import Empty, PriorityQueue from queue import Empty, PriorityQueue
from typing import Collection, Dict, Iterable, List, Optional, Set, Tuple from typing import Collection, Dict, Iterable, List, Optional, Set, Tuple
from prometheus_client import Gauge
from synapse.api.constants import MAX_DEPTH from synapse.api.constants import MAX_DEPTH
from synapse.api.errors import StoreError from synapse.api.errors import StoreError
from synapse.api.room_versions import RoomVersion from synapse.api.room_versions import RoomVersion
@ -32,6 +34,16 @@ from synapse.util.caches.descriptors import cached
from synapse.util.caches.lrucache import LruCache from synapse.util.caches.lrucache import LruCache
from synapse.util.iterutils import batch_iter from synapse.util.iterutils import batch_iter
oldest_pdu_in_federation_staging = Gauge(
"synapse_federation_server_oldest_inbound_pdu_in_staging",
"The age in seconds since we received the oldest pdu in the federation staging area",
)
number_pdus_in_federation_queue = Gauge(
"synapse_federation_server_number_inbound_pdu_in_staging",
"The total number of events in the inbound federation staging",
)
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -54,6 +66,8 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
500000, "_event_auth_cache", size_callback=len 500000, "_event_auth_cache", size_callback=len
) # type: LruCache[str, List[Tuple[str, int]]] ) # type: LruCache[str, List[Tuple[str, int]]]
self._clock.looping_call(self._get_stats_for_federation_staging, 30 * 1000)
async def get_auth_chain( async def get_auth_chain(
self, room_id: str, event_ids: Collection[str], include_given: bool = False self, room_id: str, event_ids: Collection[str], include_given: bool = False
) -> List[EventBase]: ) -> List[EventBase]:
@ -1193,6 +1207,31 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
return origin, event return origin, event
@wrap_as_background_process("_get_stats_for_federation_staging")
async def _get_stats_for_federation_staging(self):
"""Update the prometheus metrics for the inbound federation staging area."""
def _get_stats_for_federation_staging_txn(txn):
txn.execute(
"SELECT coalesce(count(*), 0) FROM federation_inbound_events_staging"
)
(count,) = txn.fetchone()
txn.execute(
"SELECT coalesce(min(received_ts), 0) FROM federation_inbound_events_staging"
)
(age,) = txn.fetchone()
return count, age
count, age = await self.db_pool.runInteraction(
"_get_stats_for_federation_staging", _get_stats_for_federation_staging_txn
)
number_pdus_in_federation_queue.set(count)
oldest_pdu_in_federation_staging.set(age)
class EventFederationStore(EventFederationWorkerStore): class EventFederationStore(EventFederationWorkerStore):
"""Responsible for storing and serving up the various graphs associated """Responsible for storing and serving up the various graphs associated