Fix logcontext leak in HttpTransactionCache

ONE DAY I WILL PURGE THE WORLD OF THIS EVIL
This commit is contained in:
Richard van der Hoff 2018-05-21 16:58:20 +01:00
parent 6d6e7288fe
commit 6e1cb54a05
2 changed files with 38 additions and 21 deletions

View file

@ -19,6 +19,7 @@ import logging
from synapse.api.auth import get_access_token_from_request from synapse.api.auth import get_access_token_from_request
from synapse.util.async import ObservableDeferred from synapse.util.async import ObservableDeferred
from synapse.util.logcontext import make_deferred_yieldable, run_in_background
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -80,31 +81,26 @@ class HttpTransactionCache(object):
Returns: Returns:
Deferred which resolves to a tuple of (response_code, response_dict). Deferred which resolves to a tuple of (response_code, response_dict).
""" """
try: if txn_key in self.transactions:
return self.transactions[txn_key][0].observe() observable = self.transactions[txn_key][0]
except (KeyError, IndexError): else:
pass # execute the function instead. # execute the function instead.
deferred = run_in_background(fn, *args, **kwargs)
deferred = fn(*args, **kwargs) observable = ObservableDeferred(deferred)
self.transactions[txn_key] = (observable, self.clock.time_msec())
observable = ObservableDeferred(deferred, consumeErrors=False) # if the request fails with an exception, remove it
self.transactions[txn_key] = (observable, self.clock.time_msec()) # from the transaction map. This is done to ensure that we don't
# cache transient errors like rate-limiting errors, etc.
def remove_from_map(err):
self.transactions.pop(txn_key, None)
# we deliberately do not propagate the error any further, as we
# expect the observers to have reported it.
# if the request fails with an exception, remove it from the deferred.addErrback(remove_from_map)
# transaction map. This is done to ensure that we don't cache
# transient errors like rate-limiting errors, etc.
#
# (make sure we add this errback *after* adding the key above, in case
# the deferred has already failed and is running errbacks
# synchronously)
def remove_from_map(err):
self.transactions.pop(txn_key, None)
# we deliberately do not propagate the error any further, as we
# expect the observers to have reported it.
deferred.addErrback(remove_from_map) return make_deferred_yieldable(observable.observe())
return observable.observe()
def _cleanup(self): def _cleanup(self):
now = self.clock.time_msec() now = self.clock.time_msec()

View file

@ -3,6 +3,7 @@ from synapse.rest.client.transactions import CLEANUP_PERIOD_MS
from twisted.internet import defer from twisted.internet import defer
from mock import Mock, call from mock import Mock, call
from synapse.util import async
from synapse.util.logcontext import LoggingContext from synapse.util.logcontext import LoggingContext
from tests import unittest from tests import unittest
from tests.utils import MockClock from tests.utils import MockClock
@ -41,6 +42,26 @@ class HttpTransactionCacheTestCase(unittest.TestCase):
# expect only a single call to do the work # expect only a single call to do the work
cb.assert_called_once_with("some_arg", keyword="arg", changing_args=0) cb.assert_called_once_with("some_arg", keyword="arg", changing_args=0)
@defer.inlineCallbacks
def test_logcontexts_with_async_result(self):
@defer.inlineCallbacks
def cb():
yield async.sleep(0)
defer.returnValue("yay")
@defer.inlineCallbacks
def test():
with LoggingContext("c") as c1:
res = yield self.cache.fetch_or_execute(self.mock_key, cb)
self.assertIs(LoggingContext.current_context(), c1)
self.assertEqual(res, "yay")
# run the test twice in parallel
d = defer.gatherResults([test(), test()])
self.assertIs(LoggingContext.current_context(), LoggingContext.sentinel)
yield d
self.assertIs(LoggingContext.current_context(), LoggingContext.sentinel)
@defer.inlineCallbacks @defer.inlineCallbacks
def test_does_not_cache_exceptions(self): def test_does_not_cache_exceptions(self):
"""Checks that, if the callback throws an exception, it is called again """Checks that, if the callback throws an exception, it is called again