Convert the well known resolver to async (#8214)

This commit is contained in:
Patrick Cloke 2020-09-01 09:15:22 -04:00 committed by GitHub
parent da77520cd1
commit 5bf8e5f55b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 53 additions and 34 deletions

1
changelog.d/8214.misc Normal file
View file

@ -0,0 +1 @@
Convert various parts of the codebase to async/await.

View file

@ -28,6 +28,7 @@ files =
synapse/handlers/saml_handler.py, synapse/handlers/saml_handler.py,
synapse/handlers/sync.py, synapse/handlers/sync.py,
synapse/handlers/ui_auth, synapse/handlers/ui_auth,
synapse/http/federation/well_known_resolver.py,
synapse/http/server.py, synapse/http/server.py,
synapse/http/site.py, synapse/http/site.py,
synapse/logging/, synapse/logging/,

View file

@ -134,8 +134,8 @@ class MatrixFederationAgent(object):
and not _is_ip_literal(parsed_uri.hostname) and not _is_ip_literal(parsed_uri.hostname)
and not parsed_uri.port and not parsed_uri.port
): ):
well_known_result = yield self._well_known_resolver.get_well_known( well_known_result = yield defer.ensureDeferred(
parsed_uri.hostname self._well_known_resolver.get_well_known(parsed_uri.hostname)
) )
delegated_server = well_known_result.delegated_server delegated_server = well_known_result.delegated_server

View file

@ -16,6 +16,7 @@
import logging import logging
import random import random
import time import time
from typing import Callable, Dict, Optional, Tuple
import attr import attr
@ -23,6 +24,7 @@ from twisted.internet import defer
from twisted.web.client import RedirectAgent, readBody from twisted.web.client import RedirectAgent, readBody
from twisted.web.http import stringToDatetime from twisted.web.http import stringToDatetime
from twisted.web.http_headers import Headers from twisted.web.http_headers import Headers
from twisted.web.iweb import IResponse
from synapse.logging.context import make_deferred_yieldable from synapse.logging.context import make_deferred_yieldable
from synapse.util import Clock, json_decoder from synapse.util import Clock, json_decoder
@ -99,15 +101,14 @@ class WellKnownResolver(object):
self._well_known_agent = RedirectAgent(agent) self._well_known_agent = RedirectAgent(agent)
self.user_agent = user_agent self.user_agent = user_agent
@defer.inlineCallbacks async def get_well_known(self, server_name: bytes) -> WellKnownLookupResult:
def get_well_known(self, server_name):
"""Attempt to fetch and parse a .well-known file for the given server """Attempt to fetch and parse a .well-known file for the given server
Args: Args:
server_name (bytes): name of the server, from the requested url server_name: name of the server, from the requested url
Returns: Returns:
Deferred[WellKnownLookupResult]: The result of the lookup The result of the lookup
""" """
try: try:
prev_result, expiry, ttl = self._well_known_cache.get_with_expiry( prev_result, expiry, ttl = self._well_known_cache.get_with_expiry(
@ -124,7 +125,9 @@ class WellKnownResolver(object):
# requests for the same server in parallel? # requests for the same server in parallel?
try: try:
with Measure(self._clock, "get_well_known"): with Measure(self._clock, "get_well_known"):
result, cache_period = yield self._fetch_well_known(server_name) result, cache_period = await self._fetch_well_known(
server_name
) # type: Tuple[Optional[bytes], float]
except _FetchWellKnownFailure as e: except _FetchWellKnownFailure as e:
if prev_result and e.temporary: if prev_result and e.temporary:
@ -153,18 +156,17 @@ class WellKnownResolver(object):
return WellKnownLookupResult(delegated_server=result) return WellKnownLookupResult(delegated_server=result)
@defer.inlineCallbacks async def _fetch_well_known(self, server_name: bytes) -> Tuple[bytes, float]:
def _fetch_well_known(self, server_name):
"""Actually fetch and parse a .well-known, without checking the cache """Actually fetch and parse a .well-known, without checking the cache
Args: Args:
server_name (bytes): name of the server, from the requested url server_name: name of the server, from the requested url
Raises: Raises:
_FetchWellKnownFailure if we fail to lookup a result _FetchWellKnownFailure if we fail to lookup a result
Returns: Returns:
Deferred[Tuple[bytes,int]]: The lookup result and cache period. The lookup result and cache period.
""" """
had_valid_well_known = self._had_valid_well_known_cache.get(server_name, False) had_valid_well_known = self._had_valid_well_known_cache.get(server_name, False)
@ -172,7 +174,7 @@ class WellKnownResolver(object):
# We do this in two steps to differentiate between possibly transient # We do this in two steps to differentiate between possibly transient
# errors (e.g. can't connect to host, 503 response) and more permenant # errors (e.g. can't connect to host, 503 response) and more permenant
# errors (such as getting a 404 response). # errors (such as getting a 404 response).
response, body = yield self._make_well_known_request( response, body = await self._make_well_known_request(
server_name, retry=had_valid_well_known server_name, retry=had_valid_well_known
) )
@ -215,20 +217,20 @@ class WellKnownResolver(object):
return result, cache_period return result, cache_period
@defer.inlineCallbacks async def _make_well_known_request(
def _make_well_known_request(self, server_name, retry): self, server_name: bytes, retry: bool
) -> Tuple[IResponse, bytes]:
"""Make the well known request. """Make the well known request.
This will retry the request if requested and it fails (with unable This will retry the request if requested and it fails (with unable
to connect or receives a 5xx error). to connect or receives a 5xx error).
Args: Args:
server_name (bytes) server_name: name of the server, from the requested url
retry (bool): Whether to retry the request if it fails. retry: Whether to retry the request if it fails.
Returns: Returns:
Deferred[tuple[IResponse, bytes]] Returns the response object and Returns the response object and body. Response may be a non-200 response.
body. Response may be a non-200 response.
""" """
uri = b"https://%s/.well-known/matrix/server" % (server_name,) uri = b"https://%s/.well-known/matrix/server" % (server_name,)
uri_str = uri.decode("ascii") uri_str = uri.decode("ascii")
@ -243,12 +245,12 @@ class WellKnownResolver(object):
logger.info("Fetching %s", uri_str) logger.info("Fetching %s", uri_str)
try: try:
response = yield make_deferred_yieldable( response = await make_deferred_yieldable(
self._well_known_agent.request( self._well_known_agent.request(
b"GET", uri, headers=Headers(headers) b"GET", uri, headers=Headers(headers)
) )
) )
body = yield make_deferred_yieldable(readBody(response)) body = await make_deferred_yieldable(readBody(response))
if 500 <= response.code < 600: if 500 <= response.code < 600:
raise Exception("Non-200 response %s" % (response.code,)) raise Exception("Non-200 response %s" % (response.code,))
@ -265,21 +267,24 @@ class WellKnownResolver(object):
logger.info("Error fetching %s: %s. Retrying", uri_str, e) logger.info("Error fetching %s: %s. Retrying", uri_str, e)
# Sleep briefly in the hopes that they come back up # Sleep briefly in the hopes that they come back up
yield self._clock.sleep(0.5) await self._clock.sleep(0.5)
def _cache_period_from_headers(headers, time_now=time.time): def _cache_period_from_headers(
headers: Headers, time_now: Callable[[], float] = time.time
) -> Optional[float]:
cache_controls = _parse_cache_control(headers) cache_controls = _parse_cache_control(headers)
if b"no-store" in cache_controls: if b"no-store" in cache_controls:
return 0 return 0
if b"max-age" in cache_controls: if b"max-age" in cache_controls:
try: max_age = cache_controls[b"max-age"]
max_age = int(cache_controls[b"max-age"]) if max_age:
return max_age try:
except ValueError: return int(max_age)
pass except ValueError:
pass
expires = headers.getRawHeaders(b"expires") expires = headers.getRawHeaders(b"expires")
if expires is not None: if expires is not None:
@ -295,7 +300,7 @@ def _cache_period_from_headers(headers, time_now=time.time):
return None return None
def _parse_cache_control(headers): def _parse_cache_control(headers: Headers) -> Dict[bytes, Optional[bytes]]:
cache_controls = {} cache_controls = {}
for hdr in headers.getRawHeaders(b"cache-control", []): for hdr in headers.getRawHeaders(b"cache-control", []):
for directive in hdr.split(b","): for directive in hdr.split(b","):

View file

@ -972,7 +972,9 @@ class MatrixFederationAgentTests(unittest.TestCase):
def test_well_known_cache(self): def test_well_known_cache(self):
self.reactor.lookups["testserv"] = "1.2.3.4" self.reactor.lookups["testserv"] = "1.2.3.4"
fetch_d = self.well_known_resolver.get_well_known(b"testserv") fetch_d = defer.ensureDeferred(
self.well_known_resolver.get_well_known(b"testserv")
)
# there should be an attempt to connect on port 443 for the .well-known # there should be an attempt to connect on port 443 for the .well-known
clients = self.reactor.tcpClients clients = self.reactor.tcpClients
@ -995,7 +997,9 @@ class MatrixFederationAgentTests(unittest.TestCase):
well_known_server.loseConnection() well_known_server.loseConnection()
# repeat the request: it should hit the cache # repeat the request: it should hit the cache
fetch_d = self.well_known_resolver.get_well_known(b"testserv") fetch_d = defer.ensureDeferred(
self.well_known_resolver.get_well_known(b"testserv")
)
r = self.successResultOf(fetch_d) r = self.successResultOf(fetch_d)
self.assertEqual(r.delegated_server, b"target-server") self.assertEqual(r.delegated_server, b"target-server")
@ -1003,7 +1007,9 @@ class MatrixFederationAgentTests(unittest.TestCase):
self.reactor.pump((1000.0,)) self.reactor.pump((1000.0,))
# now it should connect again # now it should connect again
fetch_d = self.well_known_resolver.get_well_known(b"testserv") fetch_d = defer.ensureDeferred(
self.well_known_resolver.get_well_known(b"testserv")
)
self.assertEqual(len(clients), 1) self.assertEqual(len(clients), 1)
(host, port, client_factory, _timeout, _bindAddress) = clients.pop(0) (host, port, client_factory, _timeout, _bindAddress) = clients.pop(0)
@ -1026,7 +1032,9 @@ class MatrixFederationAgentTests(unittest.TestCase):
self.reactor.lookups["testserv"] = "1.2.3.4" self.reactor.lookups["testserv"] = "1.2.3.4"
fetch_d = self.well_known_resolver.get_well_known(b"testserv") fetch_d = defer.ensureDeferred(
self.well_known_resolver.get_well_known(b"testserv")
)
# there should be an attempt to connect on port 443 for the .well-known # there should be an attempt to connect on port 443 for the .well-known
clients = self.reactor.tcpClients clients = self.reactor.tcpClients
@ -1052,7 +1060,9 @@ class MatrixFederationAgentTests(unittest.TestCase):
# another lookup. # another lookup.
self.reactor.pump((900.0,)) self.reactor.pump((900.0,))
fetch_d = self.well_known_resolver.get_well_known(b"testserv") fetch_d = defer.ensureDeferred(
self.well_known_resolver.get_well_known(b"testserv")
)
# The resolver may retry a few times, so fonx all requests that come along # The resolver may retry a few times, so fonx all requests that come along
attempts = 0 attempts = 0
@ -1082,7 +1092,9 @@ class MatrixFederationAgentTests(unittest.TestCase):
self.reactor.pump((10000.0,)) self.reactor.pump((10000.0,))
# Repated the request, this time it should fail if the lookup fails. # Repated the request, this time it should fail if the lookup fails.
fetch_d = self.well_known_resolver.get_well_known(b"testserv") fetch_d = defer.ensureDeferred(
self.well_known_resolver.get_well_known(b"testserv")
)
clients = self.reactor.tcpClients clients = self.reactor.tcpClients
(host, port, client_factory, _timeout, _bindAddress) = clients.pop(0) (host, port, client_factory, _timeout, _bindAddress) = clients.pop(0)