From dd4be2453f59926af005598b2d168654d13cadd0 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 21 Sep 2022 15:45:08 -0500 Subject: [PATCH] Fix have_seen_event cache not being invalidated when we persist the event Fix for https://github.com/matrix-org/synapse/issues/13856 Fixed by calling `_invalidate_caches_for_event` when we persist an event. And an additional fix in `_invalidate_caches_for_event` to make sure it uses the correct cache key. This seems like it would be an easy foot-gun for any `tree=True` cache. Wrong: ```py self.have_seen_event.invalidate((room_id, event_id)) ``` Correct: ```py self.have_seen_event.invalidate(((room_id, event_id),)) ``` --- synapse/handlers/message.py | 1 - synapse/storage/databases/main/cache.py | 11 +- synapse/storage/databases/main/events.py | 18 +++ .../storage/databases/main/events_worker.py | 4 +- synapse/util/caches/deferred_cache.py | 6 + synapse/util/caches/lrucache.py | 6 + .../databases/main/test_events_worker.py | 135 ++++++++---------- 7 files changed, 98 insertions(+), 83 deletions(-) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index e391338406..10b5dad030 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -1078,7 +1078,6 @@ class EventCreationHandler: else: prev_event_ids = await self.store.get_prev_events_for_room(builder.room_id) - logger.info("allow_no_prev_events=%s", allow_no_prev_events) # Do a quick sanity check here, rather than waiting until we've created the # event and then try to auth it (which fails with a somewhat confusing "No # create event in auth events") diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py index aabf3dbba0..53646b978a 100644 --- a/synapse/storage/databases/main/cache.py +++ b/synapse/storage/databases/main/cache.py @@ -223,9 +223,16 @@ class CacheInvalidationWorkerStore(SQLBaseStore): # This invalidates any local in-memory cached event objects, the original # process triggering the invalidation is responsible for clearing any external # cached objects. - logger.info("_invalidate_caches_for_event event_id=%s", event_id) + logger.info( + "CacheInvalidationWorkerStore _invalidate_caches_for_event room_id=%s event_id=%s", + room_id, + event_id, + ) + logger.info( + "CacheInvalidationWorkerStore self.have_seen_event=%s", self.have_seen_event + ) self._invalidate_local_get_event_cache(event_id) - self.have_seen_event.invalidate((room_id, event_id)) + self.have_seen_event.invalidate(((room_id, event_id),)) self.get_latest_event_ids_in_room.invalidate((room_id,)) diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index 5932668f2f..368e9b47e9 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -434,6 +434,24 @@ class PersistEventsStore: self._store_event_txn(txn, events_and_contexts=events_and_contexts) + for event, _ in events_and_contexts: + # We expect events to be persisted by this point + assert event.internal_metadata.stream_ordering + + relation = relation_from_event(event) + self.store._invalidate_caches_for_event( + stream_ordering=event.internal_metadata.stream_ordering, + event_id=event.event_id, + room_id=event.room_id, + etype=event.type, + state_key=None, # event.state_key, + # TODO + redacts=None, + relates_to=relation.parent_id if relation else None, + # TODO + backfilled=False, + ) + self._persist_transaction_ids_txn(txn, events_and_contexts) # Insert into event_to_state_groups. diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index 9f6b1fcef1..debe8e5f3f 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -1453,7 +1453,7 @@ class EventsWorkerStore(SQLBaseStore): @trace @tag_args async def have_seen_events( - self, room_id: str, event_ids: Iterable[str] + self, room_id: str, event_ids: Collection[str] ) -> Set[str]: """Given a list of event ids, check if we have already processed them. @@ -1468,6 +1468,7 @@ class EventsWorkerStore(SQLBaseStore): Returns: The set of events we have already seen. """ + logger.info("have_seen_events room_id=%s event_ids=%s", room_id, event_ids) # @cachedList chomps lots of memory if you call it with a big list, so # we break it down. However, each batch requires its own index scan, so we make @@ -1491,6 +1492,7 @@ class EventsWorkerStore(SQLBaseStore): Returns: a dict {(room_id, event_id)-> bool} """ + logger.info("_have_seen_events_dict keys=%s", keys) # if the event cache contains the event, obviously we've seen it. cache_results = { diff --git a/synapse/util/caches/deferred_cache.py b/synapse/util/caches/deferred_cache.py index 6425f851ea..36b05fc344 100644 --- a/synapse/util/caches/deferred_cache.py +++ b/synapse/util/caches/deferred_cache.py @@ -383,8 +383,14 @@ class DeferredCache(Generic[KT, VT]): may be of lower cardinality than the TreeCache - in which case the whole subtree is deleted. """ + import logging + + logger = logging.getLogger(__name__) + logger.info("DeferredCache before=%s", self.cache.len()) + logger.info("DeferredCache invalidate key=%s", key) self.check_thread() self.cache.del_multi(key) + logger.info("DeferredCache after=%s", self.cache.len()) # if we have a pending lookup for this key, remove it from the # _pending_deferred_cache, which will (a) stop it being returned for diff --git a/synapse/util/caches/lrucache.py b/synapse/util/caches/lrucache.py index aa93109d13..5a745eb8c5 100644 --- a/synapse/util/caches/lrucache.py +++ b/synapse/util/caches/lrucache.py @@ -511,6 +511,7 @@ class LruCache(Generic[KT, VT]): callbacks, prune_unread_entries, ) + logger.info("LruCache add_node key=%s value=%s", key, value) cache[key] = node if size_callback: @@ -722,7 +723,12 @@ class LruCache(Generic[KT, VT]): may be of lower cardinality than the TreeCache - in which case the whole subtree is deleted. """ + logger.info( + "LruCache cache values before pop %s", + {node.key: node.value for node in cache.values()}, + ) popped = cache.pop(key, None) + logger.info("LruCache cache_del_multi key=%s popped=%s", key, popped) if popped is None: return # for each deleted node, we now need to remove it from the linked list diff --git a/tests/storage/databases/main/test_events_worker.py b/tests/storage/databases/main/test_events_worker.py index 6602dffea0..158ad1f439 100644 --- a/tests/storage/databases/main/test_events_worker.py +++ b/tests/storage/databases/main/test_events_worker.py @@ -20,9 +20,6 @@ from twisted.enterprise.adbapi import ConnectionPool from twisted.internet.defer import CancelledError, Deferred, ensureDeferred from twisted.test.proto_helpers import MemoryReactor -import synapse.rest.admin -import synapse.rest.client.login -import synapse.rest.client.room from synapse.api.room_versions import EventFormatVersions, RoomVersions from synapse.events import make_event_from_dict from synapse.logging.context import LoggingContext @@ -36,76 +33,47 @@ from synapse.storage.databases.main.events_worker import ( from synapse.storage.types import Connection from synapse.util import Clock from synapse.util.async_helpers import yieldable_gather_results -from tests.test_utils.event_injection import create_event from tests import unittest +from tests.test_utils.event_injection import create_event, inject_event class HaveSeenEventsTestCase(unittest.HomeserverTestCase): servlets = [ - synapse.rest.admin.register_servlets, - synapse.rest.client.login.register_servlets, - synapse.rest.client.room.register_servlets, + admin.register_servlets, + room.register_servlets, + login.register_servlets, ] def prepare(self, reactor, clock, hs): self.hs = hs self.store: EventsWorkerStore = hs.get_datastores().main - # insert some test data - for rid in ("room1", "room2"): - self.get_success( - self.store.db_pool.simple_insert( - "rooms", - {"room_id": rid, "room_version": 4}, - ) - ) + self.user = self.register_user("user", "pass") + self.token = self.login(self.user, "pass") + self.room_id = self.helper.create_room_as(self.user, tok=self.token) self.event_ids: List[str] = [] - for idx, rid in enumerate( - ( - "room1", - "room1", - "room1", - "room2", + for i in range(3): + event = self.get_success( + inject_event( + hs, + room_version=RoomVersions.V7.identifier, + room_id=self.room_id, + sender=self.user, + type="test_event_type", + content={"body": f"foobarbaz{i}"}, + ) ) - ): - event_json = {"type": f"test {idx}", "room_id": rid} - event = make_event_from_dict(event_json, room_version=RoomVersions.V4) - event_id = event.event_id - self.get_success( - self.store.db_pool.simple_insert( - "events", - { - "event_id": event_id, - "room_id": rid, - "topological_ordering": idx, - "stream_ordering": idx, - "type": event.type, - "processed": True, - "outlier": False, - }, - ) - ) - self.get_success( - self.store.db_pool.simple_insert( - "event_json", - { - "event_id": event_id, - "room_id": rid, - "json": json.dumps(event_json), - "internal_metadata": "{}", - "format_version": 3, - }, - ) - ) - self.event_ids.append(event_id) + self.event_ids.append(event.event_id) def test_simple(self): with LoggingContext(name="test") as ctx: res = self.get_success( - self.store.have_seen_events("room1", [self.event_ids[0], "event19"]) + self.store.have_seen_events( + self.room_id, [self.event_ids[0], "eventdoesnotexist"] + ) ) self.assertEqual(res, {self.event_ids[0]}) @@ -115,7 +83,9 @@ class HaveSeenEventsTestCase(unittest.HomeserverTestCase): # a second lookup of the same events should cause no queries with LoggingContext(name="test") as ctx: res = self.get_success( - self.store.have_seen_events("room1", [self.event_ids[0], "event19"]) + self.store.have_seen_events( + self.room_id, [self.event_ids[0], "eventdoesnotexist"] + ) ) self.assertEqual(res, {self.event_ids[0]}) self.assertEqual(ctx.get_resource_usage().db_txn_count, 0) @@ -127,46 +97,53 @@ class HaveSeenEventsTestCase(unittest.HomeserverTestCase): # looking it up should now cause no db hits with LoggingContext(name="test") as ctx: res = self.get_success( - self.store.have_seen_events("room1", [self.event_ids[0]]) + self.store.have_seen_events(self.room_id, [self.event_ids[0]]) ) self.assertEqual(res, {self.event_ids[0]}) self.assertEqual(ctx.get_resource_usage().db_txn_count, 0) def test_persisting_event_invalidates_cache(self): - with LoggingContext(name="test") as ctx: - alice = self.register_user("alice", "pass") - alice_token = self.login("alice", "pass") - room_id = self.helper.create_room_as(alice, tok=alice_token) - - event, event_context = self.get_success( - create_event( - self.hs, - room_id=room_id, - room_version="6", - sender=alice, - type="test_event_type", - content={"body": "foobarbaz"}, - ) + event, event_context = self.get_success( + create_event( + self.hs, + room_id=self.room_id, + sender=self.user, + type="test_event_type", + content={"body": "garply"}, ) + ) - # Check first `have_seen_events` for an event we have not seen yet - # to prime the cache with a `false`. + with LoggingContext(name="test") as ctx: + # First, check `have_seen_event` for an event we have not seen yet + # to prime the cache with a `false` value. res = self.get_success( self.store.have_seen_events(event.room_id, [event.event_id]) ) self.assertEqual(res, set()) - # that should result in a single db query to lookup if we have the - # event that we have not persisted yet. + # That should result in a single db query to lookup self.assertEqual(ctx.get_resource_usage().db_txn_count, 1) - persistence = self.hs.get_storage_controllers().persistence - self.get_success( - persistence.persist_event( - event, - event_context, - ) + # Persist the event which should invalidate or prefill the + # `have_seen_event` cache so we don't return stale values. + persistence = self.hs.get_storage_controllers().persistence + self.get_success( + persistence.persist_event( + event, + event_context, ) + ) + + with LoggingContext(name="test") as ctx: + # Check `have_seen_event` again and we should see the updated fact + # that we have now seen the event after persisting it. + res = self.get_success( + self.store.have_seen_events(event.room_id, [event.event_id]) + ) + self.assertEqual(res, {event.event_id}) + + # That should result in a single db query to lookup + self.assertEqual(ctx.get_resource_usage().db_txn_count, 1) class EventCacheTestCase(unittest.HomeserverTestCase):