From 6b2867096b8a2cf8afdb5de2bab93bbf31f76065 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 3 Dec 2019 14:08:48 +0000 Subject: [PATCH] Move event fetch vars to EventWorkStore --- synapse/storage/_base.py | 12 ------------ synapse/storage/data_stores/main/client_ips.py | 2 +- synapse/storage/data_stores/main/devices.py | 2 +- synapse/storage/data_stores/main/events_worker.py | 13 +++++++++++++ 4 files changed, 15 insertions(+), 14 deletions(-) diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 7ebab31af0..6b8120a608 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -18,7 +18,6 @@ import itertools import logging import random import sys -import threading import time from typing import Iterable, Tuple @@ -36,7 +35,6 @@ from synapse.metrics.background_process_metrics import run_as_background_process from synapse.storage.engines import PostgresEngine, Sqlite3Engine from synapse.types import get_domain_from_id from synapse.util import batch_iter -from synapse.util.caches.descriptors import Cache from synapse.util.stringutils import exception_to_unicode # import a function which will return a monotonic time, in seconds @@ -237,16 +235,6 @@ class SQLBaseStore(object): # to watch it self._txn_perf_counters = PerformanceCounters() - self._get_event_cache = Cache( - "*getEvent*", keylen=3, max_entries=hs.config.event_cache_size - ) - - self._event_fetch_lock = threading.Condition() - self._event_fetch_list = [] - self._event_fetch_ongoing = 0 - - self._pending_ds = [] - self.database_engine = hs.database_engine # A set of tables that are not safe to use native upserts in. diff --git a/synapse/storage/data_stores/main/client_ips.py b/synapse/storage/data_stores/main/client_ips.py index 706c6a1f3f..7931b876ce 100644 --- a/synapse/storage/data_stores/main/client_ips.py +++ b/synapse/storage/data_stores/main/client_ips.py @@ -21,7 +21,7 @@ from twisted.internet import defer from synapse.metrics.background_process_metrics import wrap_as_background_process from synapse.storage import background_updates -from synapse.storage._base import Cache +from synapse.util.caches.descriptors import Cache from synapse.util.caches import CACHE_SIZE_FACTOR logger = logging.getLogger(__name__) diff --git a/synapse/storage/data_stores/main/devices.py b/synapse/storage/data_stores/main/devices.py index 71f62036c0..b50ee026a2 100644 --- a/synapse/storage/data_stores/main/devices.py +++ b/synapse/storage/data_stores/main/devices.py @@ -31,11 +31,11 @@ from synapse.logging.opentracing import ( ) from synapse.metrics.background_process_metrics import run_as_background_process from synapse.storage._base import ( - Cache, SQLBaseStore, db_to_json, make_in_list_sql_clause, ) +from synapse.util.caches.descriptors import Cache from synapse.storage.background_updates import BackgroundUpdateStore from synapse.types import get_verify_key_from_cross_signing_key from synapse.util import batch_iter diff --git a/synapse/storage/data_stores/main/events_worker.py b/synapse/storage/data_stores/main/events_worker.py index 4c4b76bd93..e782e8f481 100644 --- a/synapse/storage/data_stores/main/events_worker.py +++ b/synapse/storage/data_stores/main/events_worker.py @@ -17,6 +17,7 @@ from __future__ import division import itertools import logging +import threading from collections import namedtuple from canonicaljson import json @@ -34,6 +35,7 @@ from synapse.metrics.background_process_metrics import run_as_background_process from synapse.storage._base import SQLBaseStore, make_in_list_sql_clause from synapse.types import get_domain_from_id from synapse.util import batch_iter +from synapse.util.caches.descriptors import Cache from synapse.util.metrics import Measure logger = logging.getLogger(__name__) @@ -53,6 +55,17 @@ _EventCacheEntry = namedtuple("_EventCacheEntry", ("event", "redacted_event")) class EventsWorkerStore(SQLBaseStore): + def __init__(self, db_conn, hs): + super(EventsWorkerStore, self).__init__(db_conn, hs) + + self._get_event_cache = Cache( + "*getEvent*", keylen=3, max_entries=hs.config.event_cache_size + ) + + self._event_fetch_lock = threading.Condition() + self._event_fetch_list = [] + self._event_fetch_ongoing = 0 + def get_received_ts(self, event_id): """Get received_ts (when it was persisted) for the event.