diff --git a/changelog.d/11610.misc b/changelog.d/11610.misc new file mode 100644 index 0000000000..3af049b969 --- /dev/null +++ b/changelog.d/11610.misc @@ -0,0 +1 @@ +Deduplicate in-flight requests in `_get_state_for_groups`. diff --git a/synapse/storage/databases/state/store.py b/synapse/storage/databases/state/store.py index b8016f679a..dadf3d1e3a 100644 --- a/synapse/storage/databases/state/store.py +++ b/synapse/storage/databases/state/store.py @@ -25,6 +25,7 @@ from typing import ( ) import attr +from sortedcontainers import SortedDict from twisted.internet import defer @@ -72,6 +73,24 @@ class _GetStateGroupDelta: return len(self.delta_ids) if self.delta_ids else 0 +def state_filter_rough_priority_comparator( + state_filter: StateFilter, +) -> Tuple[int, int]: + """ + Returns a comparable value that roughly indicates the relative size of this + state filter compared to others. + 'Larger' state filters should sort first when using ascending order, so + this is essentially the opposite of 'size'. + It should be treated as a rough guide only and should not be interpreted to + have any particular meaning. The representation may also change + + The current implementation returns a tuple of the form: + * -1 for include_others, 0 otherwise + * -(number of entries in state_filter.types) + """ + return -int(state_filter.include_others), -len(state_filter.types) + + class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore): """A data store for fetching/storing state groups.""" @@ -127,7 +146,7 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore): # Current ongoing get_state_for_groups in-flight requests # {group ID -> {StateFilter -> ObservableDeferred}} self._state_group_inflight_requests: Dict[ - int, Dict[StateFilter, AbstractObservableDeferred[StateMap[str]]] + int, SortedDict[StateFilter, AbstractObservableDeferred[StateMap[str]]] ] = {} def get_max_state_group_txn(txn: Cursor) -> int: @@ -279,7 +298,10 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore): # The list of ongoing requests which will help narrow the current request. reusable_requests = [] - for (request_state_filter, request_deferred) in inflight_requests.items(): + + # Iterate over existing requests in roughly biggest-first order. + for request_state_filter in inflight_requests: + request_deferred = inflight_requests[request_state_filter] new_state_filter_left_over = state_filter_left_over.approx_difference( request_state_filter ) @@ -358,7 +380,9 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore): observable_deferred = ObservableDeferred(request_deferred, consumeErrors=True) # Insert the ObservableDeferred into the cache - group_request_dict = self._state_group_inflight_requests.setdefault(group, {}) + group_request_dict = self._state_group_inflight_requests.setdefault( + group, SortedDict(state_filter_rough_priority_comparator) + ) group_request_dict[db_state_filter] = observable_deferred return await make_deferred_yieldable(observable_deferred.observe()) diff --git a/tests/storage/databases/test_state_store.py b/tests/storage/databases/test_state_store.py index 076b660809..2b484c95a9 100644 --- a/tests/storage/databases/test_state_store.py +++ b/tests/storage/databases/test_state_store.py @@ -15,11 +15,16 @@ import typing from typing import Dict, List, Sequence, Tuple from unittest.mock import patch +from parameterized import parameterized + from twisted.internet.defer import Deferred, ensureDeferred from twisted.test.proto_helpers import MemoryReactor from synapse.api.constants import EventTypes -from synapse.storage.databases.state.store import MAX_INFLIGHT_REQUESTS_PER_GROUP +from synapse.storage.databases.state.store import ( + MAX_INFLIGHT_REQUESTS_PER_GROUP, + state_filter_rough_priority_comparator, +) from synapse.storage.state import StateFilter from synapse.types import StateMap from synapse.util import Clock @@ -350,3 +355,100 @@ class StateGroupInflightCachingTestCase(HomeserverTestCase): self._complete_request_fake(groups, sf, d) self.assertTrue(reqs[CAP_COUNT].called) self.assertTrue(reqs[CAP_COUNT + 1].called) + + @parameterized.expand([(False,), (True,)]) + def test_ordering_of_request_reuse(self, reverse: bool) -> None: + """ + Tests that 'larger' in-flight requests are ordered first. + + This is mostly a design decision in order to prevent a request from + hanging on to multiple queries when it would have been sufficient to + hang on to only one bigger query. + + The 'size' of a state filter is a rough heuristic. + + - requests two pieces of state, one 'larger' than the other, but each + spawning a query + - requests a third piece of state + - completes the larger of the first two queries + - checks that the third request gets completed (and doesn't needlessly + wait for the other query) + + Parameters: + reverse: whether to reverse the order of the initial requests, to ensure + that the effect doesn't depend on the order of request submission. + """ + + # We add in an extra state type to make sure that both requests spawn + # queries which are not optimised out. + state_filters = [ + StateFilter.freeze( + {"state.type": {"A"}, "other.state.type": {"a"}}, include_others=False + ), + StateFilter.freeze( + { + "state.type": None, + "other.state.type": {"b"}, + # The current rough size comparator uses the number of state types + # as an indicator of size. + # To influence it to make this state filter bigger than the previous one, + # we add another dummy state type. + "extra.state.type": {"c"}, + }, + include_others=False, + ), + ] + + if reverse: + # For fairness, we perform one test run with the list reversed. + state_filters.reverse() + smallest_state_filter_idx = 1 + biggest_state_filter_idx = 0 + else: + smallest_state_filter_idx = 0 + biggest_state_filter_idx = 1 + + # This assertion is for our own sanity more than anything else. + self.assertLess( + state_filter_rough_priority_comparator( + state_filters[biggest_state_filter_idx] + ), + state_filter_rough_priority_comparator( + state_filters[smallest_state_filter_idx] + ), + "Test invalid: bigger state filter is not actually bigger.", + ) + + # Spawn the initial two requests + for state_filter in state_filters: + ensureDeferred( + self.state_datastore._get_state_for_group_using_inflight_cache( + 42, + state_filter, + ) + ) + + # Spawn a third request + req = ensureDeferred( + self.state_datastore._get_state_for_group_using_inflight_cache( + 42, + StateFilter.freeze( + { + "state.type": {"A"}, + }, + include_others=False, + ), + ) + ) + self.pump(by=0.1) + + self.assertFalse(req.called) + + # Complete the largest request's query to make sure that the final request + # only waits for that one (and doesn't needlessly wait for both queries) + self._complete_request_fake( + *self.get_state_group_calls[biggest_state_filter_idx] + ) + + # That should have been sufficient to complete the third request + self.assertTrue(req.called)