This commit is contained in:
Erik Johnston 2024-06-20 11:51:36 +01:00
parent 3aa2d97590
commit 03b674e86c
3 changed files with 23 additions and 13 deletions

View file

@ -148,6 +148,10 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
500000, "_event_auth_cache", size_callback=len 500000, "_event_auth_cache", size_callback=len
) )
# Flag used by unit tests to disable fallback when there is no chain cover
# index.
self.tests_allow_no_chain_cover_index = True
self._clock.looping_call(self._get_stats_for_federation_staging, 30 * 1000) self._clock.looping_call(self._get_stats_for_federation_staging, 30 * 1000)
if isinstance(self.database_engine, PostgresEngine): if isinstance(self.database_engine, PostgresEngine):
@ -220,8 +224,10 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
) )
except _NoChainCoverIndex: except _NoChainCoverIndex:
# For whatever reason we don't actually have a chain cover index # For whatever reason we don't actually have a chain cover index
# for the events in question, so we fall back to the old method. # for the events in question, so we fall back to the old method
pass # (except in tests)
if not self.tests_allow_no_chain_cover_index:
raise
return await self.db_pool.runInteraction( return await self.db_pool.runInteraction(
"get_auth_chain_ids", "get_auth_chain_ids",
@ -271,7 +277,7 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
if events_missing_chain_info: if events_missing_chain_info:
# This can happen due to e.g. downgrade/upgrade of the server. We # This can happen due to e.g. downgrade/upgrade of the server. We
# raise an exception and fall back to the previous algorithm. # raise an exception and fall back to the previous algorithm.
logger.info( logger.error(
"Unexpectedly found that events don't have chain IDs in room %s: %s", "Unexpectedly found that events don't have chain IDs in room %s: %s",
room_id, room_id,
events_missing_chain_info, events_missing_chain_info,
@ -482,8 +488,10 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
) )
except _NoChainCoverIndex: except _NoChainCoverIndex:
# For whatever reason we don't actually have a chain cover index # For whatever reason we don't actually have a chain cover index
# for the events in question, so we fall back to the old method. # for the events in question, so we fall back to the old method
pass # (except in tests)
if not self.tests_allow_no_chain_cover_index:
raise
return await self.db_pool.runInteraction( return await self.db_pool.runInteraction(
"get_auth_chain_difference", "get_auth_chain_difference",
@ -710,7 +718,7 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
if events_missing_chain_info - event_to_auth_ids.keys(): if events_missing_chain_info - event_to_auth_ids.keys():
# Uh oh, we somehow haven't correctly done the chain cover index, # Uh oh, we somehow haven't correctly done the chain cover index,
# bail and fall back to the old method. # bail and fall back to the old method.
logger.info( logger.error(
"Unexpectedly found that events don't have chain IDs in room %s: %s", "Unexpectedly found that events don't have chain IDs in room %s: %s",
room_id, room_id,
events_missing_chain_info - event_to_auth_ids.keys(), events_missing_chain_info - event_to_auth_ids.keys(),

View file

@ -303,13 +303,13 @@ class PersistEventsStore:
retcols=("room_id", "has_auth_chain_index"), retcols=("room_id", "has_auth_chain_index"),
allow_none=True, allow_none=True,
) )
if row is None: if row is None or row[1] is False:
return {} return {}
# Filter out already persisted events. # Filter out events that we've already calculated.
rows = self.db_pool.simple_select_many_txn( rows = self.db_pool.simple_select_many_txn(
txn, txn,
table="events", table="event_auth_chains",
column="event_id", column="event_id",
iterable=[e.event_id for e in state_events], iterable=[e.event_id for e in state_events],
keyvalues={}, keyvalues={},
@ -319,7 +319,7 @@ class PersistEventsStore:
state_events = [ state_events = [
event event
for event in state_events for event in state_events
if event.event_id in already_persisted_events if event.event_id not in already_persisted_events
] ]
if not state_events: if not state_events:
@ -600,6 +600,9 @@ class PersistEventsStore:
events: List[EventBase], events: List[EventBase],
new_event_links: Dict[str, NewEventChainLinks], new_event_links: Dict[str, NewEventChainLinks],
) -> None: ) -> None:
if new_event_links:
self._persist_chain_cover_index(txn, self.db_pool, new_event_links)
# We only care about state events, so this if there are no state events. # We only care about state events, so this if there are no state events.
if not any(e.is_state() for e in events): if not any(e.is_state() for e in events):
return return
@ -622,9 +625,6 @@ class PersistEventsStore:
], ],
) )
if new_event_links:
self._persist_chain_cover_index(txn, self.db_pool, new_event_links)
@classmethod @classmethod
def _add_chain_cover_index( def _add_chain_cover_index(
cls, cls,

View file

@ -344,6 +344,8 @@ class HomeserverTestCase(TestCase):
self._hs_args = {"clock": self.clock, "reactor": self.reactor} self._hs_args = {"clock": self.clock, "reactor": self.reactor}
self.hs = self.make_homeserver(self.reactor, self.clock) self.hs = self.make_homeserver(self.reactor, self.clock)
self.hs.get_datastores().main.tests_allow_no_chain_cover_index = False
# Honour the `use_frozen_dicts` config option. We have to do this # Honour the `use_frozen_dicts` config option. We have to do this
# manually because this is taken care of in the app `start` code, which # manually because this is taken care of in the app `start` code, which
# we don't run. Plus we want to reset it on tearDown. # we don't run. Plus we want to reset it on tearDown.