Merge pull request #1683 from matrix-org/erikj/notifier_sadness

Fix rare notifier bug where listeners dont timeout
This commit is contained in:
Erik Johnston 2016-12-09 17:27:45 +00:00 committed by GitHub
commit b541fac7c3
4 changed files with 34 additions and 18 deletions

View file

@ -510,6 +510,7 @@ class SyncHandler(object):
Returns: Returns:
Deferred(SyncResult) Deferred(SyncResult)
""" """
logger.info("Calculating sync response for %r", sync_config.user)
# NB: The now_token gets changed by some of the generate_sync_* methods, # NB: The now_token gets changed by some of the generate_sync_* methods,
# this is due to some of the underlying streams not supporting the ability # this is due to some of the underlying streams not supporting the ability

View file

@ -17,6 +17,7 @@ from twisted.internet import defer
from synapse.api.constants import EventTypes, Membership from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import AuthError from synapse.api.errors import AuthError
from synapse.util import DeferredTimedOutError
from synapse.util.logutils import log_function from synapse.util.logutils import log_function
from synapse.util.async import ObservableDeferred from synapse.util.async import ObservableDeferred
from synapse.util.logcontext import PreserveLoggingContext, preserve_fn from synapse.util.logcontext import PreserveLoggingContext, preserve_fn
@ -294,14 +295,7 @@ class Notifier(object):
result = None result = None
if timeout: if timeout:
# Will be set to a _NotificationListener that we'll be waiting on. end_time = self.clock.time_msec() + timeout
# Allows us to cancel it.
listener = None
def timed_out():
if listener:
listener.deferred.cancel()
timer = self.clock.call_later(timeout / 1000., timed_out)
prev_token = from_token prev_token = from_token
while not result: while not result:
@ -312,6 +306,10 @@ class Notifier(object):
if result: if result:
break break
now = self.clock.time_msec()
if end_time <= now:
break
# Now we wait for the _NotifierUserStream to be told there # Now we wait for the _NotifierUserStream to be told there
# is a new token. # is a new token.
# We need to supply the token we supplied to callback so # We need to supply the token we supplied to callback so
@ -319,11 +317,14 @@ class Notifier(object):
prev_token = current_token prev_token = current_token
listener = user_stream.new_listener(prev_token) listener = user_stream.new_listener(prev_token)
with PreserveLoggingContext(): with PreserveLoggingContext():
yield listener.deferred yield self.clock.time_bound_deferred(
listener.deferred,
time_out=(end_time - now) / 1000.
)
except DeferredTimedOutError:
break
except defer.CancelledError: except defer.CancelledError:
break break
self.clock.cancel_call_later(timer, ignore_errs=True)
else: else:
current_token = user_stream.current_token current_token = user_stream.current_token
result = yield callback(from_token, current_token) result = yield callback(from_token, current_token)
@ -492,22 +493,27 @@ class Notifier(object):
""" """
listener = _NotificationListener(None) listener = _NotificationListener(None)
def timed_out(): end_time = self.clock.time_msec() + timeout
listener.deferred.cancel()
timer = self.clock.call_later(timeout / 1000., timed_out)
while True: while True:
listener.deferred = self.replication_deferred.observe() listener.deferred = self.replication_deferred.observe()
result = yield callback() result = yield callback()
if result: if result:
break break
now = self.clock.time_msec()
if end_time <= now:
break
try: try:
with PreserveLoggingContext(): with PreserveLoggingContext():
yield listener.deferred yield self.clock.time_bound_deferred(
listener.deferred,
time_out=(end_time - now) / 1000.
)
except DeferredTimedOutError:
break
except defer.CancelledError: except defer.CancelledError:
break break
self.clock.cancel_call_later(timer, ignore_errs=True)
defer.returnValue(result) defer.returnValue(result)

View file

@ -24,6 +24,11 @@ import logging
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class DeferredTimedOutError(SynapseError):
def __init__(self):
super(SynapseError).__init__(504, "Timed out")
def unwrapFirstError(failure): def unwrapFirstError(failure):
# defer.gatherResults and DeferredLists wrap failures. # defer.gatherResults and DeferredLists wrap failures.
failure.trap(defer.FirstError) failure.trap(defer.FirstError)
@ -89,7 +94,7 @@ class Clock(object):
def timed_out_fn(): def timed_out_fn():
try: try:
ret_deferred.errback(SynapseError(504, "Timed out")) ret_deferred.errback(DeferredTimedOutError())
except: except:
pass pass

View file

@ -294,6 +294,10 @@ class MockClock(object):
def advance_time_msec(self, ms): def advance_time_msec(self, ms):
self.advance_time(ms / 1000.) self.advance_time(ms / 1000.)
def time_bound_deferred(self, d, *args, **kwargs):
# We don't bother timing things out for now.
return d
class SQLiteMemoryDbPool(ConnectionPool, object): class SQLiteMemoryDbPool(ConnectionPool, object):
def __init__(self): def __init__(self):