diff --git a/synapse/app/_base.py b/synapse/app/_base.py index d4c6c4c8e2..08199a5e8d 100644 --- a/synapse/app/_base.py +++ b/synapse/app/_base.py @@ -22,13 +22,14 @@ import traceback import psutil from daemonize import Daemonize -from twisted.internet import error, reactor +from twisted.internet import defer, error, reactor from twisted.protocols.tls import TLSMemoryBIOFactory import synapse from synapse.app import check_bind_error from synapse.crypto import context_factory from synapse.util import PreserveLoggingContext +from synapse.util.async_helpers import Linearizer from synapse.util.rlimit import change_resource_limit from synapse.util.versionstring import get_version_string @@ -99,6 +100,8 @@ def start_reactor( logger (logging.Logger): logger instance to pass to Daemonize """ + install_dns_limiter(reactor) + def run(): # make sure that we run the reactor with the sentinel log context, # otherwise other PreserveLoggingContext instances will get confused @@ -312,3 +315,81 @@ def setup_sentry(hs): name = hs.config.worker_name if hs.config.worker_name else "master" scope.set_tag("worker_app", app) scope.set_tag("worker_name", name) + + +def install_dns_limiter(reactor, max_dns_requests_in_flight=100): + """Replaces the resolver with one that limits the number of in flight DNS + requests. + + This is to workaround https://twistedmatrix.com/trac/ticket/9620, where we + can run out of file descriptors and infinite loop if we attempt to do too + many DNS queries at once + """ + new_resolver = _LimitedHostnameResolver( + reactor.nameResolver, max_dns_requests_in_flight, + ) + + reactor.installNameResolver(new_resolver) + + +class _LimitedHostnameResolver(object): + """Wraps a IHostnameResolver, limiting the number of in-flight DNS lookups. + """ + + def __init__(self, resolver, max_dns_requests_in_flight): + self._resolver = resolver + self._limiter = Linearizer( + name="dns_client_limiter", max_count=max_dns_requests_in_flight, + ) + + def resolveHostName(self, resolutionReceiver, hostName, portNumber=0, + addressTypes=None, transportSemantics='TCP'): + # Note this is happening deep within the reactor, so we don't need to + # worry about log contexts. + + # We need this function to return `resolutionReceiver` so we do all the + # actual logic involving deferreds in a separate function. + self._resolve( + resolutionReceiver, hostName, portNumber, + addressTypes, transportSemantics, + ) + + return resolutionReceiver + + @defer.inlineCallbacks + def _resolve(self, resolutionReceiver, hostName, portNumber=0, + addressTypes=None, transportSemantics='TCP'): + + with (yield self._limiter.queue(())): + # resolveHostName doesn't return a Deferred, so we need to hook into + # the receiver interface to get told when resolution has finished. + + deferred = defer.Deferred() + receiver = _DeferredResolutionReceiver(resolutionReceiver, deferred) + + self._resolver.resolveHostName( + receiver, hostName, portNumber, + addressTypes, transportSemantics, + ) + + yield deferred + + +class _DeferredResolutionReceiver(object): + """Wraps a IResolutionReceiver and simply resolves the given deferred when + resolution is complete + """ + + def __init__(self, receiver, deferred): + self._receiver = receiver + self._deferred = deferred + + def resolutionBegan(self, resolutionInProgress): + self._receiver.resolutionBegan(resolutionInProgress) + + def addressResolved(self, address): + self._receiver.addressResolved(address) + + def resolutionComplete(self): + self._deferred.callback(()) + self._receiver.resolutionComplete()