diff --git a/synapse/http/__init__.py b/synapse/http/__init__.py index 20e568bc43..0d47ccdb59 100644 --- a/synapse/http/__init__.py +++ b/synapse/http/__init__.py @@ -28,7 +28,7 @@ class RequestTimedOutError(SynapseError): def cancelled_to_request_timed_out_error(value): """Turns CancelledErrors into RequestTimedOutErrors. - For use with deferred.addTimeout() + For use with async.add_timeout_to_deferred """ if isinstance(value, failure.Failure): value.trap(CancelledError) diff --git a/synapse/http/client.py b/synapse/http/client.py index 35c8d51e71..62309c3365 100644 --- a/synapse/http/client.py +++ b/synapse/http/client.py @@ -20,6 +20,7 @@ from synapse.api.errors import ( CodeMessageException, MatrixCodeMessageException, SynapseError, Codes, ) from synapse.http import cancelled_to_request_timed_out_error +from synapse.util.async import add_timeout_to_deferred from synapse.util.caches import CACHE_SIZE_FACTOR from synapse.util.logcontext import make_deferred_yieldable import synapse.metrics @@ -102,8 +103,9 @@ class SimpleHttpClient(object): request_deferred = self.agent.request( method, uri, *args, **kwargs ) - request_deferred.addTimeout( - 60, reactor, cancelled_to_request_timed_out_error, + add_timeout_to_deferred( + request_deferred, + 60, cancelled_to_request_timed_out_error, ) response = yield make_deferred_yieldable(request_deferred) diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index fe4b1636a6..30036fe81c 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -22,7 +22,7 @@ from twisted.web._newclient import ResponseDone from synapse.http import cancelled_to_request_timed_out_error from synapse.http.endpoint import matrix_federation_endpoint import synapse.metrics -from synapse.util.async import sleep +from synapse.util.async import sleep, add_timeout_to_deferred from synapse.util import logcontext from synapse.util.logcontext import make_deferred_yieldable import synapse.util.retryutils @@ -193,9 +193,10 @@ class MatrixFederationHttpClient(object): Headers(headers_dict), producer ) - request_deferred.addTimeout( + add_timeout_to_deferred( + request_deferred, timeout / 1000. if timeout else 60, - reactor, cancelled_to_request_timed_out_error, + cancelled_to_request_timed_out_error, ) response = yield make_deferred_yieldable( request_deferred, diff --git a/synapse/notifier.py b/synapse/notifier.py index 1e4f78b993..a1c06e8fca 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -13,15 +13,17 @@ # See the License for the specific language governing permissions and # limitations under the License. -from twisted.internet import defer, reactor -from twisted.internet.defer import TimeoutError +from twisted.internet import defer from synapse.api.constants import EventTypes, Membership from synapse.api.errors import AuthError from synapse.handlers.presence import format_user_presence_state from synapse.util.logutils import log_function -from synapse.util.async import ObservableDeferred +from synapse.util.async import ( + ObservableDeferred, add_timeout_to_deferred, + DeferredTimeoutError, +) from synapse.util.logcontext import PreserveLoggingContext, preserve_fn from synapse.util.metrics import Measure from synapse.types import StreamToken @@ -332,8 +334,9 @@ class Notifier(object): # Now we wait for the _NotifierUserStream to be told there # is a new token. listener = user_stream.new_listener(prev_token) - listener.deferred.addTimeout( - (end_time - now) / 1000., reactor, + add_timeout_to_deferred( + listener.deferred, + (end_time - now) / 1000., ) with PreserveLoggingContext(): yield listener.deferred @@ -347,7 +350,7 @@ class Notifier(object): # Update the prev_token to the current_token since nothing # has happened between the old prev_token and the current_token prev_token = current_token - except TimeoutError: + except DeferredTimeoutError: break except defer.CancelledError: break @@ -552,11 +555,14 @@ class Notifier(object): if end_time <= now: break - listener.deferred.addTimeout((end_time - now) / 1000., reactor) + add_timeout_to_deferred( + listener.deferred.addTimeout, + (end_time - now) / 1000., + ) try: with PreserveLoggingContext(): yield listener.deferred - except TimeoutError: + except DeferredTimeoutError: break except defer.CancelledError: break diff --git a/synapse/util/async.py b/synapse/util/async.py index 0729bb2863..1df5c5600c 100644 --- a/synapse/util/async.py +++ b/synapse/util/async.py @@ -15,6 +15,8 @@ from twisted.internet import defer, reactor +from twisted.internet.defer import CancelledError +from twisted.python import failure from .logcontext import ( PreserveLoggingContext, make_deferred_yieldable, preserve_fn @@ -392,3 +394,68 @@ class ReadWriteLock(object): self.key_to_current_writer.pop(key) defer.returnValue(_ctx_manager()) + + +class DeferredTimeoutError(Exception): + """ + This error is raised by default when a L{Deferred} times out. + """ + + +def add_timeout_to_deferred(deferred, timeout, on_timeout_cancel=None): + """ + Add a timeout to a deferred by scheduling it to be cancelled after + timeout seconds. + + This is essentially a backport of deferred.addTimeout, which was introduced + in twisted 16.5. + + If the deferred gets timed out, it errbacks with a DeferredTimeoutError, + unless a cancelable function was passed to its initialization or unless + a different on_timeout_cancel callable is provided. + + Args: + deferred (defer.Deferred): deferred to be timed out + timeout (Number): seconds to time out after + + on_timeout_cancel (callable): A callable which is called immediately + after the deferred times out, and not if this deferred is + otherwise cancelled before the timeout. + + It takes an arbitrary value, which is the value of the deferred at + that exact point in time (probably a CancelledError Failure), and + the timeout. + + The default callable (if none is provided) will translate a + CancelledError Failure into a DeferredTimeoutError. + """ + timed_out = [False] + + def time_it_out(): + timed_out[0] = True + deferred.cancel() + + delayed_call = reactor.callLater(timeout, time_it_out) + + def convert_cancelled(value): + if timed_out[0]: + to_call = on_timeout_cancel or _cancelled_to_timed_out_error + return to_call(value, timeout) + return value + + deferred.addBoth(convert_cancelled) + + def cancel_timeout(result): + # stop the pending call to cancel the deferred if it's been fired + if delayed_call.active(): + delayed_call.cancel() + return result + + deferred.addBoth(cancel_timeout) + + +def _cancelled_to_timed_out_error(value, timeout): + if isinstance(value, failure.Failure): + value.trap(CancelledError) + raise DeferredTimeoutError(timeout, "Deferred") + return value