From 81e0f57800590e63bbc07319db6da964595978d0 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 30 Sep 2024 13:52:33 +0100 Subject: [PATCH] Fix perf when streams don't change often (#17767) There is a bug with the `StreamChangeCache` where it would incorrectly return that all entities had changed if asked for entities changed *since* the earliest stream position. Note that for streams we use the inequalities: `$min_stream_id < stream_id <= $max_stream_id`, i.e. when we ask the stream change cache for all things that have changed since `$stream_id` we don't care for events that happened *at* `$stream_id`. Specifically: `_earliest_known_stream_pos` is the position at which we know that we'll have entries for all changes since that point, we can use the cache for any stream IDs that equal `_earliest_known_stream_pos`. `_earliest_known_stream_pos` is set in three places: - On startup we set it either to: - the current maximum stream ID, with not prefilled values; or - the minimum of the latest N values we pulled from the DB - When we evict items from the bottom, we set it to the stream ID of the evicted items. This was changed in https://github.com/matrix-org/synapse/pull/14435, but I think we were overly conservative there. --------- Co-authored-by: Andrew Morgan <1342360+anoadragon453@users.noreply.github.com> --- changelog.d/17767.misc | 1 + synapse/util/caches/stream_change_cache.py | 14 +++++++------- tests/util/test_stream_change_cache.py | 18 +++++++++++------- 3 files changed, 19 insertions(+), 14 deletions(-) create mode 100644 changelog.d/17767.misc diff --git a/changelog.d/17767.misc b/changelog.d/17767.misc new file mode 100644 index 0000000000..36f23d0f60 --- /dev/null +++ b/changelog.d/17767.misc @@ -0,0 +1 @@ +Fix performance of streams that don't change often. diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py index 16fcb00206..03503abe0f 100644 --- a/synapse/util/caches/stream_change_cache.py +++ b/synapse/util/caches/stream_change_cache.py @@ -142,9 +142,9 @@ class StreamChangeCache: """ assert isinstance(stream_pos, int) - # _cache is not valid at or before the earliest known stream position, so + # _cache is not valid before the earliest known stream position, so # return that the entity has changed. - if stream_pos <= self._earliest_known_stream_pos: + if stream_pos < self._earliest_known_stream_pos: self.metrics.inc_misses() return True @@ -186,7 +186,7 @@ class StreamChangeCache: This will be all entities if the given stream position is at or earlier than the earliest known stream position. """ - if not self._cache or stream_pos <= self._earliest_known_stream_pos: + if not self._cache or stream_pos < self._earliest_known_stream_pos: self.metrics.inc_misses() return set(entities) @@ -238,9 +238,9 @@ class StreamChangeCache: """ assert isinstance(stream_pos, int) - # _cache is not valid at or before the earliest known stream position, so + # _cache is not valid before the earliest known stream position, so # return that an entity has changed. - if stream_pos <= self._earliest_known_stream_pos: + if stream_pos < self._earliest_known_stream_pos: self.metrics.inc_misses() return True @@ -270,9 +270,9 @@ class StreamChangeCache: """ assert isinstance(stream_pos, int) - # _cache is not valid at or before the earliest known stream position, so + # _cache is not valid before the earliest known stream position, so # return None to mark that it is unknown if an entity has changed. - if stream_pos <= self._earliest_known_stream_pos: + if stream_pos < self._earliest_known_stream_pos: return AllEntitiesChangedResult(None) changed_entities: List[EntityType] = [] diff --git a/tests/util/test_stream_change_cache.py b/tests/util/test_stream_change_cache.py index af1199ef8a..c41f5706af 100644 --- a/tests/util/test_stream_change_cache.py +++ b/tests/util/test_stream_change_cache.py @@ -53,8 +53,8 @@ class StreamChangeCacheTests(unittest.HomeserverTestCase): # return True, whether it's a known entity or not. self.assertTrue(cache.has_entity_changed("user@foo.com", 0)) self.assertTrue(cache.has_entity_changed("not@here.website", 0)) - self.assertTrue(cache.has_entity_changed("user@foo.com", 3)) - self.assertTrue(cache.has_entity_changed("not@here.website", 3)) + self.assertTrue(cache.has_entity_changed("user@foo.com", 2)) + self.assertTrue(cache.has_entity_changed("not@here.website", 2)) def test_entity_has_changed_pops_off_start(self) -> None: """ @@ -76,9 +76,11 @@ class StreamChangeCacheTests(unittest.HomeserverTestCase): self.assertTrue("user@foo.com" not in cache._entity_to_key) self.assertEqual( - cache.get_all_entities_changed(3).entities, ["user@elsewhere.org"] + cache.get_all_entities_changed(2).entities, + ["bar@baz.net", "user@elsewhere.org"], ) - self.assertFalse(cache.get_all_entities_changed(2).hit) + self.assertFalse(cache.get_all_entities_changed(1).hit) + self.assertTrue(cache.get_all_entities_changed(2).hit) # If we update an existing entity, it keeps the two existing entities cache.entity_has_changed("bar@baz.net", 5) @@ -89,7 +91,8 @@ class StreamChangeCacheTests(unittest.HomeserverTestCase): cache.get_all_entities_changed(3).entities, ["user@elsewhere.org", "bar@baz.net"], ) - self.assertFalse(cache.get_all_entities_changed(2).hit) + self.assertFalse(cache.get_all_entities_changed(1).hit) + self.assertTrue(cache.get_all_entities_changed(2).hit) def test_get_all_entities_changed(self) -> None: """ @@ -114,7 +117,8 @@ class StreamChangeCacheTests(unittest.HomeserverTestCase): self.assertEqual( cache.get_all_entities_changed(3).entities, ["user@elsewhere.org"] ) - self.assertFalse(cache.get_all_entities_changed(1).hit) + self.assertFalse(cache.get_all_entities_changed(0).hit) + self.assertTrue(cache.get_all_entities_changed(1).hit) # ... later, things gest more updates cache.entity_has_changed("user@foo.com", 5) @@ -149,7 +153,7 @@ class StreamChangeCacheTests(unittest.HomeserverTestCase): # With no entities, it returns True for the past, present, and False for # the future. self.assertTrue(cache.has_any_entity_changed(0)) - self.assertTrue(cache.has_any_entity_changed(1)) + self.assertFalse(cache.has_any_entity_changed(1)) self.assertFalse(cache.has_any_entity_changed(2)) # We add an entity