diff --git a/synapse/util/async.py b/synapse/util/async.py index 5a50d9700f..a7094e2fb4 100644 --- a/synapse/util/async.py +++ b/synapse/util/async.py @@ -184,13 +184,13 @@ class Linearizer(object): # key_to_defer is a map from the key to a 2 element list where # the first element is the number of things executing, and - # the second element is a deque of deferreds for the things blocked from - # executing. + # the second element is an OrderedDict, where the keys are deferreds for the + # things blocked from executing. self.key_to_defer = {} @defer.inlineCallbacks def queue(self, key): - entry = self.key_to_defer.setdefault(key, [0, collections.deque()]) + entry = self.key_to_defer.setdefault(key, [0, collections.OrderedDict()]) # If the number of things executing is greater than the maximum # then add a deferred to the list of blocked items @@ -198,12 +198,28 @@ class Linearizer(object): # this item so that it can continue executing. if entry[0] >= self.max_count: new_defer = defer.Deferred() - entry[1].append(new_defer) + entry[1][new_defer] = 1 logger.info( "Waiting to acquire linearizer lock %r for key %r", self.name, key, ) - yield make_deferred_yieldable(new_defer) + try: + yield make_deferred_yieldable(new_defer) + except Exception as e: + if isinstance(e, CancelledError): + logger.info( + "Cancelling wait for linearizer lock %r for key %r", + self.name, key, + ) + else: + logger.warn( + "Unexpected exception waiting for linearizer lock %r for key %r", + self.name, key, + ) + + # we just have to take ourselves back out of the queue. + del entry[1][new_defer] + raise logger.info("Acquired linearizer lock %r for key %r", self.name, key) entry[0] += 1 @@ -238,7 +254,7 @@ class Linearizer(object): entry[0] -= 1 if entry[1]: - next_def = entry[1].popleft() + (next_def, _) = entry[1].popitem(last=False) # we need to run the next thing in the sentinel context. with PreserveLoggingContext(): diff --git a/tests/util/test_linearizer.py b/tests/util/test_linearizer.py index c9563124f9..4729bd5a0a 100644 --- a/tests/util/test_linearizer.py +++ b/tests/util/test_linearizer.py @@ -17,6 +17,7 @@ from six.moves import range from twisted.internet import defer, reactor +from twisted.internet.defer import CancelledError from synapse.util import Clock, logcontext from synapse.util.async import Linearizer @@ -112,3 +113,33 @@ class LinearizerTestCase(unittest.TestCase): d6 = limiter.queue(key) with (yield d6): pass + + @defer.inlineCallbacks + def test_cancellation(self): + linearizer = Linearizer() + + key = object() + + d1 = linearizer.queue(key) + cm1 = yield d1 + + d2 = linearizer.queue(key) + self.assertFalse(d2.called) + + d3 = linearizer.queue(key) + self.assertFalse(d3.called) + + d2.cancel() + + with cm1: + pass + + self.assertTrue(d2.called) + try: + yield d2 + self.fail("Expected d2 to raise CancelledError") + except CancelledError: + pass + + with (yield d3): + pass