diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py index 3dd53f2038..1f35063e9f 100644 --- a/synapse/storage/databases/main/event_federation.py +++ b/synapse/storage/databases/main/event_federation.py @@ -21,6 +21,7 @@ import datetime import itertools import logging +import time from queue import Empty, PriorityQueue from typing import ( TYPE_CHECKING, @@ -381,14 +382,23 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas BATCH_SIZE = 1000 chains_to_fetch_sorted = SortedSet(chains_to_fetch) + logger.info("CHAINS: Fetching chain links %d", len(chains_to_fetch_sorted)) + + start_block = time.monotonic() + while chains_to_fetch_sorted: batch2 = list(chains_to_fetch_sorted.islice(-BATCH_SIZE)) chains_to_fetch_sorted.difference_update(batch2) + logger.info("CHAINS: batch2 %d", len(batch2)) clause, args = make_in_list_sql_clause( txn.database_engine, "origin_chain_id", batch2 ) + start_query = time.monotonic() txn.execute(sql % (clause,), args) + end_query = time.monotonic() + + logger.info("CHAINS: query took %d ms", (end_query - start_query) * 1000) links: Dict[int, List[Tuple[int, int, int]]] = {} @@ -404,8 +414,15 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas chains_to_fetch_sorted.difference_update(links) + logger.info("CHAINS: returned %d", len(links)) + logger.info("CHAINS: remaining %d", len(chains_to_fetch_sorted)) + yield links + end_block = time.monotonic() + + logger.info("CHAINS: block took %d ms", (end_block - start_block) * 1000) + def _get_auth_chain_ids_txn( self, txn: LoggingTransaction, event_ids: Collection[str], include_given: bool ) -> Set[str]: @@ -592,6 +609,7 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas # are reachable from any event. # (We need to take a copy of `seen_chains` as the function mutates it) + logger.info("CHAINS: for room %s", room_id) for links in self._get_chain_links(txn, seen_chains): for chains in set_to_chain: for chain_id in links: @@ -602,6 +620,8 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas seen_chains.update(chains) + logger.info("CHAINS: materialized chains %d", len(chains)) + # Now for each chain we figure out the maximum sequence number reachable # from *any* state set and the minimum sequence number reachable from # *all* state sets. Events in that range are in the auth chain