mirror of
https://github.com/element-hq/synapse
synced 2024-09-17 20:05:10 +00:00
Scratch changes for fix have_seen_event not being invalidated
See https://github.com/matrix-org/synapse/issues/13856
This commit is contained in:
parent
05e511368b
commit
a732796149
6 changed files with 58 additions and 2 deletions
|
@ -31,7 +31,9 @@ federation_ip_range_blacklist: []
|
||||||
# Disable server rate-limiting
|
# Disable server rate-limiting
|
||||||
rc_federation:
|
rc_federation:
|
||||||
window_size: 1000
|
window_size: 1000
|
||||||
sleep_limit: 10
|
# foo: We run into the rate limiter hard with the MSC2716 tests.
|
||||||
|
# We go from 35s /messages requests to 20s just by making `/state_ids` and `/state` go faster
|
||||||
|
sleep_limit: 99999
|
||||||
sleep_delay: 500
|
sleep_delay: 500
|
||||||
reject_limit: 99999
|
reject_limit: 99999
|
||||||
concurrent: 3
|
concurrent: 3
|
||||||
|
|
|
@ -1078,6 +1078,7 @@ class EventCreationHandler:
|
||||||
else:
|
else:
|
||||||
prev_event_ids = await self.store.get_prev_events_for_room(builder.room_id)
|
prev_event_ids = await self.store.get_prev_events_for_room(builder.room_id)
|
||||||
|
|
||||||
|
logger.info("allow_no_prev_events=%s", allow_no_prev_events)
|
||||||
# Do a quick sanity check here, rather than waiting until we've created the
|
# Do a quick sanity check here, rather than waiting until we've created the
|
||||||
# event and then try to auth it (which fails with a somewhat confusing "No
|
# event and then try to auth it (which fails with a somewhat confusing "No
|
||||||
# create event in auth events")
|
# create event in auth events")
|
||||||
|
|
|
@ -54,6 +54,7 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
|
||||||
db_conn: LoggingDatabaseConnection,
|
db_conn: LoggingDatabaseConnection,
|
||||||
hs: "HomeServer",
|
hs: "HomeServer",
|
||||||
):
|
):
|
||||||
|
logger.info("CacheInvalidationWorkerStore constructor")
|
||||||
super().__init__(database, db_conn, hs)
|
super().__init__(database, db_conn, hs)
|
||||||
|
|
||||||
self._instance_name = hs.get_instance_name()
|
self._instance_name = hs.get_instance_name()
|
||||||
|
@ -222,6 +223,7 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
|
||||||
# This invalidates any local in-memory cached event objects, the original
|
# This invalidates any local in-memory cached event objects, the original
|
||||||
# process triggering the invalidation is responsible for clearing any external
|
# process triggering the invalidation is responsible for clearing any external
|
||||||
# cached objects.
|
# cached objects.
|
||||||
|
logger.info("_invalidate_caches_for_event event_id=%s", event_id)
|
||||||
self._invalidate_local_get_event_cache(event_id)
|
self._invalidate_local_get_event_cache(event_id)
|
||||||
self.have_seen_event.invalidate((room_id, event_id))
|
self.have_seen_event.invalidate((room_id, event_id))
|
||||||
|
|
||||||
|
|
|
@ -216,6 +216,9 @@ class _PerHostRatelimiter:
|
||||||
self.reject_limit = config.reject_limit
|
self.reject_limit = config.reject_limit
|
||||||
self.concurrent_requests = config.concurrent
|
self.concurrent_requests = config.concurrent
|
||||||
|
|
||||||
|
logger.info("self.sleep_limit=%s", self.sleep_limit)
|
||||||
|
logger.info("self.reject_limit=%s", self.reject_limit)
|
||||||
|
|
||||||
# request_id objects for requests which have been slept
|
# request_id objects for requests which have been slept
|
||||||
self.sleeping_requests: Set[object] = set()
|
self.sleeping_requests: Set[object] = set()
|
||||||
|
|
||||||
|
|
|
@ -20,6 +20,9 @@ from twisted.enterprise.adbapi import ConnectionPool
|
||||||
from twisted.internet.defer import CancelledError, Deferred, ensureDeferred
|
from twisted.internet.defer import CancelledError, Deferred, ensureDeferred
|
||||||
from twisted.test.proto_helpers import MemoryReactor
|
from twisted.test.proto_helpers import MemoryReactor
|
||||||
|
|
||||||
|
import synapse.rest.admin
|
||||||
|
import synapse.rest.client.login
|
||||||
|
import synapse.rest.client.room
|
||||||
from synapse.api.room_versions import EventFormatVersions, RoomVersions
|
from synapse.api.room_versions import EventFormatVersions, RoomVersions
|
||||||
from synapse.events import make_event_from_dict
|
from synapse.events import make_event_from_dict
|
||||||
from synapse.logging.context import LoggingContext
|
from synapse.logging.context import LoggingContext
|
||||||
|
@ -33,12 +36,20 @@ from synapse.storage.databases.main.events_worker import (
|
||||||
from synapse.storage.types import Connection
|
from synapse.storage.types import Connection
|
||||||
from synapse.util import Clock
|
from synapse.util import Clock
|
||||||
from synapse.util.async_helpers import yieldable_gather_results
|
from synapse.util.async_helpers import yieldable_gather_results
|
||||||
|
from tests.test_utils.event_injection import create_event
|
||||||
|
|
||||||
from tests import unittest
|
from tests import unittest
|
||||||
|
|
||||||
|
|
||||||
class HaveSeenEventsTestCase(unittest.HomeserverTestCase):
|
class HaveSeenEventsTestCase(unittest.HomeserverTestCase):
|
||||||
|
servlets = [
|
||||||
|
synapse.rest.admin.register_servlets,
|
||||||
|
synapse.rest.client.login.register_servlets,
|
||||||
|
synapse.rest.client.room.register_servlets,
|
||||||
|
]
|
||||||
|
|
||||||
def prepare(self, reactor, clock, hs):
|
def prepare(self, reactor, clock, hs):
|
||||||
|
self.hs = hs
|
||||||
self.store: EventsWorkerStore = hs.get_datastores().main
|
self.store: EventsWorkerStore = hs.get_datastores().main
|
||||||
|
|
||||||
# insert some test data
|
# insert some test data
|
||||||
|
@ -121,6 +132,42 @@ class HaveSeenEventsTestCase(unittest.HomeserverTestCase):
|
||||||
self.assertEqual(res, {self.event_ids[0]})
|
self.assertEqual(res, {self.event_ids[0]})
|
||||||
self.assertEqual(ctx.get_resource_usage().db_txn_count, 0)
|
self.assertEqual(ctx.get_resource_usage().db_txn_count, 0)
|
||||||
|
|
||||||
|
def test_persisting_event_invalidates_cache(self):
|
||||||
|
with LoggingContext(name="test") as ctx:
|
||||||
|
alice = self.register_user("alice", "pass")
|
||||||
|
alice_token = self.login("alice", "pass")
|
||||||
|
room_id = self.helper.create_room_as(alice, tok=alice_token)
|
||||||
|
|
||||||
|
event, event_context = self.get_success(
|
||||||
|
create_event(
|
||||||
|
self.hs,
|
||||||
|
room_id=room_id,
|
||||||
|
room_version="6",
|
||||||
|
sender=alice,
|
||||||
|
type="test_event_type",
|
||||||
|
content={"body": "foobarbaz"},
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
# Check first `have_seen_events` for an event we have not seen yet
|
||||||
|
# to prime the cache with a `false`.
|
||||||
|
res = self.get_success(
|
||||||
|
self.store.have_seen_events(event.room_id, [event.event_id])
|
||||||
|
)
|
||||||
|
self.assertEqual(res, set())
|
||||||
|
|
||||||
|
# that should result in a single db query to lookup if we have the
|
||||||
|
# event that we have not persisted yet.
|
||||||
|
self.assertEqual(ctx.get_resource_usage().db_txn_count, 1)
|
||||||
|
|
||||||
|
persistence = self.hs.get_storage_controllers().persistence
|
||||||
|
self.get_success(
|
||||||
|
persistence.persist_event(
|
||||||
|
event,
|
||||||
|
event_context,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
class EventCacheTestCase(unittest.HomeserverTestCase):
|
class EventCacheTestCase(unittest.HomeserverTestCase):
|
||||||
"""Test that the various layers of event cache works."""
|
"""Test that the various layers of event cache works."""
|
||||||
|
|
|
@ -93,7 +93,8 @@ async def create_event(
|
||||||
KNOWN_ROOM_VERSIONS[room_version], kwargs
|
KNOWN_ROOM_VERSIONS[room_version], kwargs
|
||||||
)
|
)
|
||||||
event, context = await hs.get_event_creation_handler().create_new_client_event(
|
event, context = await hs.get_event_creation_handler().create_new_client_event(
|
||||||
builder, prev_event_ids=prev_event_ids
|
builder,
|
||||||
|
prev_event_ids=prev_event_ids,
|
||||||
)
|
)
|
||||||
|
|
||||||
return event, context
|
return event, context
|
||||||
|
|
Loading…
Reference in a new issue