Fix perf when streams don't change often (#17767)

There is a bug with the `StreamChangeCache` where it would incorrectly
return that all entities had changed if asked for entities changed
*since* the earliest stream position.

Note that for streams we use the inequalities: `$min_stream_id <
stream_id <= $max_stream_id`, i.e. when we ask the stream change cache
for all things that have changed since `$stream_id` we don't care for
events that happened *at* `$stream_id`.

Specifically: `_earliest_known_stream_pos` is the position at which we
know that we'll have entries for all changes since that point, we can
use the cache for any stream IDs that equal
`_earliest_known_stream_pos`.

`_earliest_known_stream_pos` is set in three places:
- On startup we set it either to:
  - the current maximum stream ID, with not prefilled values; or
  - the minimum of the latest N values we pulled from the DB
- When we evict items from the bottom, we set it to the stream ID of the
evicted items.

This was changed in https://github.com/matrix-org/synapse/pull/14435,
but I think we were overly conservative there.

---------

Co-authored-by: Andrew Morgan <1342360+anoadragon453@users.noreply.github.com>
This commit is contained in:
Erik Johnston 2024-09-30 13:52:33 +01:00 committed by GitHub
parent ae4862c38f
commit 81e0f57800
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 19 additions and 14 deletions

1
changelog.d/17767.misc Normal file
View file

@ -0,0 +1 @@
Fix performance of streams that don't change often.

View file

@ -142,9 +142,9 @@ class StreamChangeCache:
""" """
assert isinstance(stream_pos, int) assert isinstance(stream_pos, int)
# _cache is not valid at or before the earliest known stream position, so # _cache is not valid before the earliest known stream position, so
# return that the entity has changed. # return that the entity has changed.
if stream_pos <= self._earliest_known_stream_pos: if stream_pos < self._earliest_known_stream_pos:
self.metrics.inc_misses() self.metrics.inc_misses()
return True return True
@ -186,7 +186,7 @@ class StreamChangeCache:
This will be all entities if the given stream position is at or earlier This will be all entities if the given stream position is at or earlier
than the earliest known stream position. than the earliest known stream position.
""" """
if not self._cache or stream_pos <= self._earliest_known_stream_pos: if not self._cache or stream_pos < self._earliest_known_stream_pos:
self.metrics.inc_misses() self.metrics.inc_misses()
return set(entities) return set(entities)
@ -238,9 +238,9 @@ class StreamChangeCache:
""" """
assert isinstance(stream_pos, int) assert isinstance(stream_pos, int)
# _cache is not valid at or before the earliest known stream position, so # _cache is not valid before the earliest known stream position, so
# return that an entity has changed. # return that an entity has changed.
if stream_pos <= self._earliest_known_stream_pos: if stream_pos < self._earliest_known_stream_pos:
self.metrics.inc_misses() self.metrics.inc_misses()
return True return True
@ -270,9 +270,9 @@ class StreamChangeCache:
""" """
assert isinstance(stream_pos, int) assert isinstance(stream_pos, int)
# _cache is not valid at or before the earliest known stream position, so # _cache is not valid before the earliest known stream position, so
# return None to mark that it is unknown if an entity has changed. # return None to mark that it is unknown if an entity has changed.
if stream_pos <= self._earliest_known_stream_pos: if stream_pos < self._earliest_known_stream_pos:
return AllEntitiesChangedResult(None) return AllEntitiesChangedResult(None)
changed_entities: List[EntityType] = [] changed_entities: List[EntityType] = []

View file

@ -53,8 +53,8 @@ class StreamChangeCacheTests(unittest.HomeserverTestCase):
# return True, whether it's a known entity or not. # return True, whether it's a known entity or not.
self.assertTrue(cache.has_entity_changed("user@foo.com", 0)) self.assertTrue(cache.has_entity_changed("user@foo.com", 0))
self.assertTrue(cache.has_entity_changed("not@here.website", 0)) self.assertTrue(cache.has_entity_changed("not@here.website", 0))
self.assertTrue(cache.has_entity_changed("user@foo.com", 3)) self.assertTrue(cache.has_entity_changed("user@foo.com", 2))
self.assertTrue(cache.has_entity_changed("not@here.website", 3)) self.assertTrue(cache.has_entity_changed("not@here.website", 2))
def test_entity_has_changed_pops_off_start(self) -> None: def test_entity_has_changed_pops_off_start(self) -> None:
""" """
@ -76,9 +76,11 @@ class StreamChangeCacheTests(unittest.HomeserverTestCase):
self.assertTrue("user@foo.com" not in cache._entity_to_key) self.assertTrue("user@foo.com" not in cache._entity_to_key)
self.assertEqual( self.assertEqual(
cache.get_all_entities_changed(3).entities, ["user@elsewhere.org"] cache.get_all_entities_changed(2).entities,
["bar@baz.net", "user@elsewhere.org"],
) )
self.assertFalse(cache.get_all_entities_changed(2).hit) self.assertFalse(cache.get_all_entities_changed(1).hit)
self.assertTrue(cache.get_all_entities_changed(2).hit)
# If we update an existing entity, it keeps the two existing entities # If we update an existing entity, it keeps the two existing entities
cache.entity_has_changed("bar@baz.net", 5) cache.entity_has_changed("bar@baz.net", 5)
@ -89,7 +91,8 @@ class StreamChangeCacheTests(unittest.HomeserverTestCase):
cache.get_all_entities_changed(3).entities, cache.get_all_entities_changed(3).entities,
["user@elsewhere.org", "bar@baz.net"], ["user@elsewhere.org", "bar@baz.net"],
) )
self.assertFalse(cache.get_all_entities_changed(2).hit) self.assertFalse(cache.get_all_entities_changed(1).hit)
self.assertTrue(cache.get_all_entities_changed(2).hit)
def test_get_all_entities_changed(self) -> None: def test_get_all_entities_changed(self) -> None:
""" """
@ -114,7 +117,8 @@ class StreamChangeCacheTests(unittest.HomeserverTestCase):
self.assertEqual( self.assertEqual(
cache.get_all_entities_changed(3).entities, ["user@elsewhere.org"] cache.get_all_entities_changed(3).entities, ["user@elsewhere.org"]
) )
self.assertFalse(cache.get_all_entities_changed(1).hit) self.assertFalse(cache.get_all_entities_changed(0).hit)
self.assertTrue(cache.get_all_entities_changed(1).hit)
# ... later, things gest more updates # ... later, things gest more updates
cache.entity_has_changed("user@foo.com", 5) cache.entity_has_changed("user@foo.com", 5)
@ -149,7 +153,7 @@ class StreamChangeCacheTests(unittest.HomeserverTestCase):
# With no entities, it returns True for the past, present, and False for # With no entities, it returns True for the past, present, and False for
# the future. # the future.
self.assertTrue(cache.has_any_entity_changed(0)) self.assertTrue(cache.has_any_entity_changed(0))
self.assertTrue(cache.has_any_entity_changed(1)) self.assertFalse(cache.has_any_entity_changed(1))
self.assertFalse(cache.has_any_entity_changed(2)) self.assertFalse(cache.has_any_entity_changed(2))
# We add an entity # We add an entity