Merge remote-tracking branch 'origin/erikj/stream_change_cache' into anoa/configure_workers_updates

This commit is contained in:
Olivier 'reivilibre 2024-09-27 15:29:02 +01:00
commit be472547bb
3 changed files with 19 additions and 14 deletions

1
changelog.d/17767.misc Normal file
View file

@ -0,0 +1 @@
Fix perf when streams don't change often.

View file

@ -142,9 +142,9 @@ class StreamChangeCache:
""" """
assert isinstance(stream_pos, int) assert isinstance(stream_pos, int)
# _cache is not valid at or before the earliest known stream position, so # _cache is not valid before the earliest known stream position, so
# return that the entity has changed. # return that the entity has changed.
if stream_pos <= self._earliest_known_stream_pos: if stream_pos < self._earliest_known_stream_pos:
self.metrics.inc_misses() self.metrics.inc_misses()
return True return True
@ -186,7 +186,7 @@ class StreamChangeCache:
This will be all entities if the given stream position is at or earlier This will be all entities if the given stream position is at or earlier
than the earliest known stream position. than the earliest known stream position.
""" """
if not self._cache or stream_pos <= self._earliest_known_stream_pos: if not self._cache or stream_pos < self._earliest_known_stream_pos:
self.metrics.inc_misses() self.metrics.inc_misses()
return set(entities) return set(entities)
@ -238,9 +238,9 @@ class StreamChangeCache:
""" """
assert isinstance(stream_pos, int) assert isinstance(stream_pos, int)
# _cache is not valid at or before the earliest known stream position, so # _cache is not valid before the earliest known stream position, so
# return that an entity has changed. # return that an entity has changed.
if stream_pos <= self._earliest_known_stream_pos: if stream_pos < self._earliest_known_stream_pos:
self.metrics.inc_misses() self.metrics.inc_misses()
return True return True
@ -270,9 +270,9 @@ class StreamChangeCache:
""" """
assert isinstance(stream_pos, int) assert isinstance(stream_pos, int)
# _cache is not valid at or before the earliest known stream position, so # _cache is not valid before the earliest known stream position, so
# return None to mark that it is unknown if an entity has changed. # return None to mark that it is unknown if an entity has changed.
if stream_pos <= self._earliest_known_stream_pos: if stream_pos < self._earliest_known_stream_pos:
return AllEntitiesChangedResult(None) return AllEntitiesChangedResult(None)
changed_entities: List[EntityType] = [] changed_entities: List[EntityType] = []

View file

@ -53,8 +53,8 @@ class StreamChangeCacheTests(unittest.HomeserverTestCase):
# return True, whether it's a known entity or not. # return True, whether it's a known entity or not.
self.assertTrue(cache.has_entity_changed("user@foo.com", 0)) self.assertTrue(cache.has_entity_changed("user@foo.com", 0))
self.assertTrue(cache.has_entity_changed("not@here.website", 0)) self.assertTrue(cache.has_entity_changed("not@here.website", 0))
self.assertTrue(cache.has_entity_changed("user@foo.com", 3)) self.assertTrue(cache.has_entity_changed("user@foo.com", 2))
self.assertTrue(cache.has_entity_changed("not@here.website", 3)) self.assertTrue(cache.has_entity_changed("not@here.website", 2))
def test_entity_has_changed_pops_off_start(self) -> None: def test_entity_has_changed_pops_off_start(self) -> None:
""" """
@ -76,9 +76,11 @@ class StreamChangeCacheTests(unittest.HomeserverTestCase):
self.assertTrue("user@foo.com" not in cache._entity_to_key) self.assertTrue("user@foo.com" not in cache._entity_to_key)
self.assertEqual( self.assertEqual(
cache.get_all_entities_changed(3).entities, ["user@elsewhere.org"] cache.get_all_entities_changed(2).entities,
["bar@baz.net", "user@elsewhere.org"],
) )
self.assertFalse(cache.get_all_entities_changed(2).hit) self.assertFalse(cache.get_all_entities_changed(1).hit)
self.assertTrue(cache.get_all_entities_changed(2).hit)
# If we update an existing entity, it keeps the two existing entities # If we update an existing entity, it keeps the two existing entities
cache.entity_has_changed("bar@baz.net", 5) cache.entity_has_changed("bar@baz.net", 5)
@ -89,7 +91,8 @@ class StreamChangeCacheTests(unittest.HomeserverTestCase):
cache.get_all_entities_changed(3).entities, cache.get_all_entities_changed(3).entities,
["user@elsewhere.org", "bar@baz.net"], ["user@elsewhere.org", "bar@baz.net"],
) )
self.assertFalse(cache.get_all_entities_changed(2).hit) self.assertFalse(cache.get_all_entities_changed(1).hit)
self.assertTrue(cache.get_all_entities_changed(2).hit)
def test_get_all_entities_changed(self) -> None: def test_get_all_entities_changed(self) -> None:
""" """
@ -114,7 +117,8 @@ class StreamChangeCacheTests(unittest.HomeserverTestCase):
self.assertEqual( self.assertEqual(
cache.get_all_entities_changed(3).entities, ["user@elsewhere.org"] cache.get_all_entities_changed(3).entities, ["user@elsewhere.org"]
) )
self.assertFalse(cache.get_all_entities_changed(1).hit) self.assertFalse(cache.get_all_entities_changed(0).hit)
self.assertTrue(cache.get_all_entities_changed(1).hit)
# ... later, things gest more updates # ... later, things gest more updates
cache.entity_has_changed("user@foo.com", 5) cache.entity_has_changed("user@foo.com", 5)
@ -149,7 +153,7 @@ class StreamChangeCacheTests(unittest.HomeserverTestCase):
# With no entities, it returns True for the past, present, and False for # With no entities, it returns True for the past, present, and False for
# the future. # the future.
self.assertTrue(cache.has_any_entity_changed(0)) self.assertTrue(cache.has_any_entity_changed(0))
self.assertTrue(cache.has_any_entity_changed(1)) self.assertFalse(cache.has_any_entity_changed(1))
self.assertFalse(cache.has_any_entity_changed(2)) self.assertFalse(cache.has_any_entity_changed(2))
# We add an entity # We add an entity