From 24905b78f57c330c8e7bf53633326a311b945196 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 21 Sep 2022 17:55:45 -0500 Subject: [PATCH] Move cache invalidation to the main thread See https://github.com/matrix-org/synapse/pull/13861#discussion_r976982283 --- synapse/storage/controllers/persist_events.py | 32 ++++++++++++++++++- synapse/storage/databases/main/cache.py | 3 -- synapse/storage/databases/main/events.py | 18 ----------- 3 files changed, 31 insertions(+), 22 deletions(-) diff --git a/synapse/storage/controllers/persist_events.py b/synapse/storage/controllers/persist_events.py index bde7a6648a..ffe14283a3 100644 --- a/synapse/storage/controllers/persist_events.py +++ b/synapse/storage/controllers/persist_events.py @@ -43,7 +43,7 @@ from prometheus_client import Counter, Histogram from twisted.internet import defer from synapse.api.constants import EventTypes, Membership -from synapse.events import EventBase +from synapse.events import EventBase, relation_from_event from synapse.events.snapshot import EventContext from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable from synapse.logging.tracing import ( @@ -435,6 +435,21 @@ class EventsPersistenceStorageController: else: events.append(event) + # We expect events to be persisted by this point + assert event.internal_metadata.stream_ordering + # Invalidate related caches after we persist a new event + relation = relation_from_event(event) + self.main_store._invalidate_caches_for_event( + stream_ordering=event.internal_metadata.stream_ordering, + event_id=event.event_id, + room_id=event.room_id, + etype=event.type, + state_key=event.state_key if hasattr(event, "state_key") else None, + redacts=event.redacts, + relates_to=relation.parent_id if relation else None, + backfilled=backfilled, + ) + return ( events, self.main_store.get_room_max_token(), @@ -467,6 +482,21 @@ class EventsPersistenceStorageController: replaced_event = replaced_events.get(event.event_id) if replaced_event: event = await self.main_store.get_event(replaced_event) + else: + # We expect events to be persisted by this point + assert event.internal_metadata.stream_ordering + # Invalidate related caches after we persist a new event + relation = relation_from_event(event) + self.main_store._invalidate_caches_for_event( + stream_ordering=event.internal_metadata.stream_ordering, + event_id=event.event_id, + room_id=event.room_id, + etype=event.type, + state_key=event.state_key if hasattr(event, "state_key") else None, + redacts=event.redacts, + relates_to=relation.parent_id if relation else None, + backfilled=backfilled, + ) event_stream_id = event.internal_metadata.stream_ordering # stream ordering should have been assigned by now diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py index 53646b978a..b28a91e8f6 100644 --- a/synapse/storage/databases/main/cache.py +++ b/synapse/storage/databases/main/cache.py @@ -228,9 +228,6 @@ class CacheInvalidationWorkerStore(SQLBaseStore): room_id, event_id, ) - logger.info( - "CacheInvalidationWorkerStore self.have_seen_event=%s", self.have_seen_event - ) self._invalidate_local_get_event_cache(event_id) self.have_seen_event.invalidate(((room_id, event_id),)) diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index 368e9b47e9..5932668f2f 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -434,24 +434,6 @@ class PersistEventsStore: self._store_event_txn(txn, events_and_contexts=events_and_contexts) - for event, _ in events_and_contexts: - # We expect events to be persisted by this point - assert event.internal_metadata.stream_ordering - - relation = relation_from_event(event) - self.store._invalidate_caches_for_event( - stream_ordering=event.internal_metadata.stream_ordering, - event_id=event.event_id, - room_id=event.room_id, - etype=event.type, - state_key=None, # event.state_key, - # TODO - redacts=None, - relates_to=relation.parent_id if relation else None, - # TODO - backfilled=False, - ) - self._persist_transaction_ids_txn(txn, events_and_contexts) # Insert into event_to_state_groups.