Move cache invalidation to the main thread

See https://github.com/matrix-org/synapse/pull/13861#discussion_r976982283
This commit is contained in:
Eric Eastwood 2022-09-21 17:55:45 -05:00
parent dd4be2453f
commit 24905b78f5
3 changed files with 31 additions and 22 deletions

View file

@ -43,7 +43,7 @@ from prometheus_client import Counter, Histogram
from twisted.internet import defer from twisted.internet import defer
from synapse.api.constants import EventTypes, Membership from synapse.api.constants import EventTypes, Membership
from synapse.events import EventBase from synapse.events import EventBase, relation_from_event
from synapse.events.snapshot import EventContext from synapse.events.snapshot import EventContext
from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable
from synapse.logging.tracing import ( from synapse.logging.tracing import (
@ -435,6 +435,21 @@ class EventsPersistenceStorageController:
else: else:
events.append(event) events.append(event)
# We expect events to be persisted by this point
assert event.internal_metadata.stream_ordering
# Invalidate related caches after we persist a new event
relation = relation_from_event(event)
self.main_store._invalidate_caches_for_event(
stream_ordering=event.internal_metadata.stream_ordering,
event_id=event.event_id,
room_id=event.room_id,
etype=event.type,
state_key=event.state_key if hasattr(event, "state_key") else None,
redacts=event.redacts,
relates_to=relation.parent_id if relation else None,
backfilled=backfilled,
)
return ( return (
events, events,
self.main_store.get_room_max_token(), self.main_store.get_room_max_token(),
@ -467,6 +482,21 @@ class EventsPersistenceStorageController:
replaced_event = replaced_events.get(event.event_id) replaced_event = replaced_events.get(event.event_id)
if replaced_event: if replaced_event:
event = await self.main_store.get_event(replaced_event) event = await self.main_store.get_event(replaced_event)
else:
# We expect events to be persisted by this point
assert event.internal_metadata.stream_ordering
# Invalidate related caches after we persist a new event
relation = relation_from_event(event)
self.main_store._invalidate_caches_for_event(
stream_ordering=event.internal_metadata.stream_ordering,
event_id=event.event_id,
room_id=event.room_id,
etype=event.type,
state_key=event.state_key if hasattr(event, "state_key") else None,
redacts=event.redacts,
relates_to=relation.parent_id if relation else None,
backfilled=backfilled,
)
event_stream_id = event.internal_metadata.stream_ordering event_stream_id = event.internal_metadata.stream_ordering
# stream ordering should have been assigned by now # stream ordering should have been assigned by now

View file

@ -228,9 +228,6 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
room_id, room_id,
event_id, event_id,
) )
logger.info(
"CacheInvalidationWorkerStore self.have_seen_event=%s", self.have_seen_event
)
self._invalidate_local_get_event_cache(event_id) self._invalidate_local_get_event_cache(event_id)
self.have_seen_event.invalidate(((room_id, event_id),)) self.have_seen_event.invalidate(((room_id, event_id),))

View file

@ -434,24 +434,6 @@ class PersistEventsStore:
self._store_event_txn(txn, events_and_contexts=events_and_contexts) self._store_event_txn(txn, events_and_contexts=events_and_contexts)
for event, _ in events_and_contexts:
# We expect events to be persisted by this point
assert event.internal_metadata.stream_ordering
relation = relation_from_event(event)
self.store._invalidate_caches_for_event(
stream_ordering=event.internal_metadata.stream_ordering,
event_id=event.event_id,
room_id=event.room_id,
etype=event.type,
state_key=None, # event.state_key,
# TODO
redacts=None,
relates_to=relation.parent_id if relation else None,
# TODO
backfilled=False,
)
self._persist_transaction_ids_txn(txn, events_and_contexts) self._persist_transaction_ids_txn(txn, events_and_contexts)
# Insert into event_to_state_groups. # Insert into event_to_state_groups.