From be14c24cea7d96f850180759870e83c3789bcdda Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 9 Dec 2016 15:43:18 +0000 Subject: [PATCH 1/3] Fix rare notifier bug where listeners dont timeout There was a race condition that caused the notifier to 'miss' the timeout notification, since there were no other checks for the timeout this caused listeners to get stuck in a loop until something happened. --- synapse/handlers/sync.py | 1 + synapse/notifier.py | 20 +++++++++----------- 2 files changed, 10 insertions(+), 11 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index a86996689c..b62773dcbe 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -510,6 +510,7 @@ class SyncHandler(object): Returns: Deferred(SyncResult) """ + logger.info("Calculating sync response for %r", sync_config.user) # NB: The now_token gets changed by some of the generate_sync_* methods, # this is due to some of the underlying streams not supporting the ability diff --git a/synapse/notifier.py b/synapse/notifier.py index 054ca59ad2..40baa6969a 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -294,14 +294,7 @@ class Notifier(object): result = None if timeout: - # Will be set to a _NotificationListener that we'll be waiting on. - # Allows us to cancel it. - listener = None - - def timed_out(): - if listener: - listener.deferred.cancel() - timer = self.clock.call_later(timeout / 1000., timed_out) + end_time = self.clock.time_msec() + timeout prev_token = from_token while not result: @@ -312,6 +305,10 @@ class Notifier(object): if result: break + now = self.clock.time_msec() + if end_time <= now: + break + # Now we wait for the _NotifierUserStream to be told there # is a new token. # We need to supply the token we supplied to callback so @@ -319,11 +316,12 @@ class Notifier(object): prev_token = current_token listener = user_stream.new_listener(prev_token) with PreserveLoggingContext(): - yield listener.deferred + yield self.clock.time_bound_deferred( + listener.deferred, + time_out=(end_time - now) / 1000. + ) except defer.CancelledError: break - - self.clock.cancel_call_later(timer, ignore_errs=True) else: current_token = user_stream.current_token result = yield callback(from_token, current_token) From fbaf868f621c2ecb6ea10679eb435f9adffa3b2a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 9 Dec 2016 16:30:29 +0000 Subject: [PATCH 2/3] Correctly handle timeout errors --- synapse/notifier.py | 20 ++++++++++++++------ synapse/util/__init__.py | 7 ++++++- 2 files changed, 20 insertions(+), 7 deletions(-) diff --git a/synapse/notifier.py b/synapse/notifier.py index 40baa6969a..acbd4bb5ae 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -17,6 +17,7 @@ from twisted.internet import defer from synapse.api.constants import EventTypes, Membership from synapse.api.errors import AuthError +from synapse.util import DeferredTimedOutError from synapse.util.logutils import log_function from synapse.util.async import ObservableDeferred from synapse.util.logcontext import PreserveLoggingContext, preserve_fn @@ -320,6 +321,8 @@ class Notifier(object): listener.deferred, time_out=(end_time - now) / 1000. ) + except DeferredTimedOutError: + break except defer.CancelledError: break else: @@ -490,22 +493,27 @@ class Notifier(object): """ listener = _NotificationListener(None) - def timed_out(): - listener.deferred.cancel() + end_time = self.clock.time_msec() + timeout - timer = self.clock.call_later(timeout / 1000., timed_out) while True: listener.deferred = self.replication_deferred.observe() result = yield callback() if result: break + now = self.clock.time_msec() + if end_time <= now: + break + try: with PreserveLoggingContext(): - yield listener.deferred + yield self.clock.time_bound_deferred( + listener.deferred, + time_out=(end_time - now) / 1000. + ) + except DeferredTimedOutError: + break except defer.CancelledError: break - self.clock.cancel_call_later(timer, ignore_errs=True) - defer.returnValue(result) diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py index c05b9450be..30fc480108 100644 --- a/synapse/util/__init__.py +++ b/synapse/util/__init__.py @@ -24,6 +24,11 @@ import logging logger = logging.getLogger(__name__) +class DeferredTimedOutError(SynapseError): + def __init__(self): + super(SynapseError).__init__(504, "Timed out") + + def unwrapFirstError(failure): # defer.gatherResults and DeferredLists wrap failures. failure.trap(defer.FirstError) @@ -89,7 +94,7 @@ class Clock(object): def timed_out_fn(): try: - ret_deferred.errback(SynapseError(504, "Timed out")) + ret_deferred.errback(DeferredTimedOutError()) except: pass From 8b34f71bea05f7190767ec9aebf85528e409c09d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 9 Dec 2016 16:48:48 +0000 Subject: [PATCH 3/3] Fix unit tests --- tests/utils.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tests/utils.py b/tests/utils.py index 2d0bd205fd..d3d6c8021d 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -294,6 +294,10 @@ class MockClock(object): def advance_time_msec(self, ms): self.advance_time(ms / 1000.) + def time_bound_deferred(self, d, *args, **kwargs): + # We don't bother timing things out for now. + return d + class SQLiteMemoryDbPool(ConnectionPool, object): def __init__(self):