diff --git a/synapse/http/endpoint.py b/synapse/http/endpoint.py index 442696d393..8c64339a7c 100644 --- a/synapse/http/endpoint.py +++ b/synapse/http/endpoint.py @@ -14,7 +14,7 @@ # limitations under the License. from twisted.internet.endpoints import SSL4ClientEndpoint, TCP4ClientEndpoint -from twisted.internet import defer +from twisted.internet import defer, reactor from twisted.internet.error import ConnectError from twisted.names import client, dns from twisted.names.error import DNSNameError, DomainError @@ -66,13 +66,75 @@ def matrix_federation_endpoint(reactor, destination, ssl_context_factory=None, default_port = 8448 if port is None: - return SRVClientEndpoint( + return _WrappingEndpointFac(SRVClientEndpoint( reactor, "matrix", domain, protocol="tcp", default_port=default_port, endpoint=transport_endpoint, endpoint_kw_args=endpoint_kw_args - ) + )) else: - return transport_endpoint(reactor, domain, port, **endpoint_kw_args) + return _WrappingEndpointFac(transport_endpoint( + reactor, domain, port, **endpoint_kw_args + )) + + +class _WrappingEndpointFac(object): + def __init__(self, endpoint_fac): + self.endpoint_fac = endpoint_fac + + @defer.inlineCallbacks + def connect(self, protocolFactory): + conn = yield self.endpoint_fac.connect(protocolFactory) + conn = _WrappedConnection(conn) + defer.returnValue(conn) + + +class _WrappedConnection(object): + """Wraps a connection and calls abort on it if it hasn't seen any action + for 2.5-3 minutes. + """ + __slots__ = ["conn", "last_request"] + + def __init__(self, conn): + object.__setattr__(self, "conn", conn) + object.__setattr__(self, "last_request", time.time()) + + def __getattr__(self, name): + return getattr(self.conn, name) + + def __setattr__(self, name, value): + setattr(self.conn, name, value) + + def _time_things_out_maybe(self): + # We use a slightly shorter timeout here just in case the callLater is + # triggered early. Paranoia ftw. + # TODO: Cancel the previous callLater rather than comparing time.time()? + if time.time() - self.last_request >= 2.5 * 60: + self.abort() + # Abort the underlying TLS connection. The abort() method calls + # loseConnection() on the underlying TLS connection which tries to + # shutdown the connection cleanly. We call abortConnection() + # since that will promptly close the underlying TCP connection. + self.transport.abortConnection() + + def request(self, request): + self.last_request = time.time() + + # Time this connection out if we haven't send a request in the last + # N minutes + # TODO: Cancel the previous callLater? + reactor.callLater(3 * 60, self._time_things_out_maybe) + + d = self.conn.request(request) + + def update_request_time(res): + self.last_request = time.time() + # TODO: Cancel the previous callLater? + reactor.callLater(3 * 60, self._time_things_out_maybe) + return res + + d.addCallback(update_request_time) + + return d class SpiderEndpoint(object): diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index d5970c05a8..78b92cef36 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -88,7 +88,8 @@ class MatrixFederationHttpClient(object): self.signing_key = hs.config.signing_key[0] self.server_name = hs.hostname pool = HTTPConnectionPool(reactor) - pool.maxPersistentPerHost = 10 + pool.maxPersistentPerHost = 5 + pool.cachedConnectionTimeout = 2 * 60 self.agent = Agent.usingEndpointFactory( reactor, MatrixFederationEndpointFactory(hs), pool=pool ) @@ -299,7 +300,7 @@ class MatrixFederationHttpClient(object): defer.returnValue(json.loads(body)) @defer.inlineCallbacks - def post_json(self, destination, path, data={}, long_retries=True, + def post_json(self, destination, path, data={}, long_retries=False, timeout=None): """ Sends the specifed json data using POST @@ -332,7 +333,7 @@ class MatrixFederationHttpClient(object): path.encode("ascii"), body_callback=body_callback, headers_dict={"Content-Type": ["application/json"]}, - long_retries=True, + long_retries=long_retries, timeout=timeout, )