diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py index 80f27f8c53..c4bc4f4d31 100644 --- a/synapse/crypto/keyring.py +++ b/synapse/crypto/keyring.py @@ -15,7 +15,6 @@ from synapse.crypto.keyclient import fetch_server_key from synapse.api.errors import SynapseError, Codes -from synapse.util.retryutils import get_retry_limiter from synapse.util import unwrapFirstError from synapse.util.async import ObservableDeferred from synapse.util.logcontext import ( @@ -363,30 +362,24 @@ class Keyring(object): def get_keys_from_server(self, server_name_and_key_ids): @defer.inlineCallbacks def get_key(server_name, key_ids): - limiter = yield get_retry_limiter( - server_name, - self.clock, - self.store, - ) - with limiter: - keys = None - try: - keys = yield self.get_server_verify_key_v2_direct( - server_name, key_ids - ) - except Exception as e: - logger.info( - "Unable to get key %r for %r directly: %s %s", - key_ids, server_name, - type(e).__name__, str(e.message), - ) + keys = None + try: + keys = yield self.get_server_verify_key_v2_direct( + server_name, key_ids + ) + except Exception as e: + logger.info( + "Unable to get key %r for %r directly: %s %s", + key_ids, server_name, + type(e).__name__, str(e.message), + ) - if not keys: - keys = yield self.get_server_verify_key_v1_direct( - server_name, key_ids - ) + if not keys: + keys = yield self.get_server_verify_key_v1_direct( + server_name, key_ids + ) - keys = {server_name: keys} + keys = {server_name: keys} defer.returnValue(keys) diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 5dcd4eecce..dc44727b36 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -29,7 +29,7 @@ from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred from synapse.events import FrozenEvent, builder import synapse.metrics -from synapse.util.retryutils import get_retry_limiter, NotRetryingDestination +from synapse.util.retryutils import NotRetryingDestination import copy import itertools @@ -234,31 +234,24 @@ class FederationClient(FederationBase): continue try: - limiter = yield get_retry_limiter( - destination, - self._clock, - self.store, + transaction_data = yield self.transport_layer.get_event( + destination, event_id, timeout=timeout, ) - with limiter: - transaction_data = yield self.transport_layer.get_event( - destination, event_id, timeout=timeout, - ) + logger.debug("transaction_data %r", transaction_data) - logger.debug("transaction_data %r", transaction_data) + pdu_list = [ + self.event_from_pdu_json(p, outlier=outlier) + for p in transaction_data["pdus"] + ] - pdu_list = [ - self.event_from_pdu_json(p, outlier=outlier) - for p in transaction_data["pdus"] - ] + if pdu_list and pdu_list[0]: + pdu = pdu_list[0] - if pdu_list and pdu_list[0]: - pdu = pdu_list[0] + # Check signatures are correct. + signed_pdu = yield self._check_sigs_and_hashes([pdu])[0] - # Check signatures are correct. - signed_pdu = yield self._check_sigs_and_hashes([pdu])[0] - - break + break pdu_attempts[destination] = now diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index c802dd67a3..d7ecefcc64 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -12,7 +12,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - +import datetime from twisted.internet import defer @@ -22,9 +22,7 @@ from .units import Transaction, Edu from synapse.api.errors import HttpResponseException from synapse.util.async import run_on_reactor from synapse.util.logcontext import preserve_context_over_fn -from synapse.util.retryutils import ( - get_retry_limiter, NotRetryingDestination, -) +from synapse.util.retryutils import NotRetryingDestination from synapse.util.metrics import measure_func from synapse.types import get_domain_from_id from synapse.handlers.presence import format_user_presence_state @@ -312,13 +310,6 @@ class TransactionQueue(object): yield run_on_reactor() while True: - limiter = yield get_retry_limiter( - destination, - self.clock, - self.store, - backoff_on_404=True, # If we get a 404 the other side has gone - ) - device_message_edus, device_stream_id, dev_list_id = ( yield self._get_new_device_messages(destination) ) @@ -374,7 +365,6 @@ class TransactionQueue(object): success = yield self._send_new_transaction( destination, pending_pdus, pending_edus, pending_failures, - limiter=limiter, ) if success: # Remove the acknowledged device messages from the database @@ -392,12 +382,24 @@ class TransactionQueue(object): self.last_device_list_stream_id_by_dest[destination] = dev_list_id else: break - except NotRetryingDestination: + except NotRetryingDestination as e: logger.debug( - "TX [%s] not ready for retry yet - " + "TX [%s] not ready for retry yet (next retry at %s) - " "dropping transaction for now", destination, + datetime.datetime.fromtimestamp( + (e.retry_last_ts + e.retry_interval) / 1000.0 + ), ) + except Exception as e: + logger.warn( + "TX [%s] Failed to send transaction: %s", + destination, + e, + ) + for p in pending_pdus: + logger.info("Failed to send event %s to %s", p.event_id, + destination) finally: # We want to be *very* sure we delete this after we stop processing self.pending_transactions.pop(destination, None) @@ -437,7 +439,7 @@ class TransactionQueue(object): @measure_func("_send_new_transaction") @defer.inlineCallbacks def _send_new_transaction(self, destination, pending_pdus, pending_edus, - pending_failures, limiter): + pending_failures): # Sort based on the order field pending_pdus.sort(key=lambda t: t[1]) @@ -447,132 +449,104 @@ class TransactionQueue(object): success = True + logger.debug("TX [%s] _attempt_new_transaction", destination) + + txn_id = str(self._next_txn_id) + + logger.debug( + "TX [%s] {%s} Attempting new transaction" + " (pdus: %d, edus: %d, failures: %d)", + destination, txn_id, + len(pdus), + len(edus), + len(failures) + ) + + logger.debug("TX [%s] Persisting transaction...", destination) + + transaction = Transaction.create_new( + origin_server_ts=int(self.clock.time_msec()), + transaction_id=txn_id, + origin=self.server_name, + destination=destination, + pdus=pdus, + edus=edus, + pdu_failures=failures, + ) + + self._next_txn_id += 1 + + yield self.transaction_actions.prepare_to_send(transaction) + + logger.debug("TX [%s] Persisted transaction", destination) + logger.info( + "TX [%s] {%s} Sending transaction [%s]," + " (PDUs: %d, EDUs: %d, failures: %d)", + destination, txn_id, + transaction.transaction_id, + len(pdus), + len(edus), + len(failures), + ) + + # Actually send the transaction + + # FIXME (erikj): This is a bit of a hack to make the Pdu age + # keys work + def json_data_cb(): + data = transaction.get_dict() + now = int(self.clock.time_msec()) + if "pdus" in data: + for p in data["pdus"]: + if "age_ts" in p: + unsigned = p.setdefault("unsigned", {}) + unsigned["age"] = now - int(p["age_ts"]) + del p["age_ts"] + return data + try: - logger.debug("TX [%s] _attempt_new_transaction", destination) - - txn_id = str(self._next_txn_id) - - logger.debug( - "TX [%s] {%s} Attempting new transaction" - " (pdus: %d, edus: %d, failures: %d)", - destination, txn_id, - len(pdus), - len(edus), - len(failures) + response = yield self.transport_layer.send_transaction( + transaction, json_data_cb ) + code = 200 - logger.debug("TX [%s] Persisting transaction...", destination) - - transaction = Transaction.create_new( - origin_server_ts=int(self.clock.time_msec()), - transaction_id=txn_id, - origin=self.server_name, - destination=destination, - pdus=pdus, - edus=edus, - pdu_failures=failures, - ) - - self._next_txn_id += 1 - - yield self.transaction_actions.prepare_to_send(transaction) - - logger.debug("TX [%s] Persisted transaction", destination) - logger.info( - "TX [%s] {%s} Sending transaction [%s]," - " (PDUs: %d, EDUs: %d, failures: %d)", - destination, txn_id, - transaction.transaction_id, - len(pdus), - len(edus), - len(failures), - ) - - with limiter: - # Actually send the transaction - - # FIXME (erikj): This is a bit of a hack to make the Pdu age - # keys work - def json_data_cb(): - data = transaction.get_dict() - now = int(self.clock.time_msec()) - if "pdus" in data: - for p in data["pdus"]: - if "age_ts" in p: - unsigned = p.setdefault("unsigned", {}) - unsigned["age"] = now - int(p["age_ts"]) - del p["age_ts"] - return data - - try: - response = yield self.transport_layer.send_transaction( - transaction, json_data_cb - ) - code = 200 - - if response: - for e_id, r in response.get("pdus", {}).items(): - if "error" in r: - logger.warn( - "Transaction returned error for %s: %s", - e_id, r, - ) - except HttpResponseException as e: - code = e.code - response = e.response - - if e.code in (401, 404, 429) or 500 <= e.code: - logger.info( - "TX [%s] {%s} got %d response", - destination, txn_id, code + if response: + for e_id, r in response.get("pdus", {}).items(): + if "error" in r: + logger.warn( + "Transaction returned error for %s: %s", + e_id, r, ) - raise e + except HttpResponseException as e: + code = e.code + response = e.response + if e.code in (401, 404, 429) or 500 <= e.code: logger.info( "TX [%s] {%s} got %d response", destination, txn_id, code ) + raise e - logger.debug("TX [%s] Sent transaction", destination) - logger.debug("TX [%s] Marking as delivered...", destination) + logger.info( + "TX [%s] {%s} got %d response", + destination, txn_id, code + ) - yield self.transaction_actions.delivered( - transaction, code, response - ) + logger.debug("TX [%s] Sent transaction", destination) + logger.debug("TX [%s] Marking as delivered...", destination) - logger.debug("TX [%s] Marked as delivered", destination) + yield self.transaction_actions.delivered( + transaction, code, response + ) - if code != 200: - for p in pdus: - logger.info( - "Failed to send event %s to %s", p.event_id, destination - ) - success = False - except RuntimeError as e: - # We capture this here as there as nothing actually listens - # for this finishing functions deferred. - logger.warn( - "TX [%s] Problem in _attempt_transaction: %s", - destination, - e, - ) - - success = False + logger.debug("TX [%s] Marked as delivered", destination) + if code != 200: for p in pdus: - logger.info("Failed to send event %s to %s", p.event_id, destination) - except Exception as e: - # We capture this here as there as nothing actually listens - # for this finishing functions deferred. - logger.warn( - "TX [%s] Problem in _attempt_transaction: %s", - destination, - e, - ) - + logger.info( + "Failed to send event %s to %s", p.event_id, destination + ) success = False - for p in pdus: - logger.info("Failed to send event %s to %s", p.event_id, destination) - defer.returnValue(success) diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py index f49e8a2cc4..cc9bc7f14b 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py @@ -163,6 +163,7 @@ class TransportLayerClient(object): data=json_data, json_data_callback=json_data_callback, long_retries=True, + backoff_on_404=True, # If we get a 404 the other side has gone ) logger.debug( diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py index e40495d1ab..a33135de67 100644 --- a/synapse/handlers/e2e_keys.py +++ b/synapse/handlers/e2e_keys.py @@ -22,7 +22,7 @@ from twisted.internet import defer from synapse.api.errors import SynapseError, CodeMessageException from synapse.types import get_domain_from_id from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred -from synapse.util.retryutils import get_retry_limiter, NotRetryingDestination +from synapse.util.retryutils import NotRetryingDestination logger = logging.getLogger(__name__) @@ -121,15 +121,11 @@ class E2eKeysHandler(object): def do_remote_query(destination): destination_query = remote_queries_not_in_cache[destination] try: - limiter = yield get_retry_limiter( - destination, self.clock, self.store + remote_result = yield self.federation.query_client_keys( + destination, + {"device_keys": destination_query}, + timeout=timeout ) - with limiter: - remote_result = yield self.federation.query_client_keys( - destination, - {"device_keys": destination_query}, - timeout=timeout - ) for user_id, keys in remote_result["device_keys"].items(): if user_id in destination_query: @@ -239,18 +235,14 @@ class E2eKeysHandler(object): def claim_client_keys(destination): device_keys = remote_queries[destination] try: - limiter = yield get_retry_limiter( - destination, self.clock, self.store + remote_result = yield self.federation.claim_client_keys( + destination, + {"one_time_keys": device_keys}, + timeout=timeout ) - with limiter: - remote_result = yield self.federation.claim_client_keys( - destination, - {"one_time_keys": device_keys}, - timeout=timeout - ) - for user_id, keys in remote_result["one_time_keys"].items(): - if user_id in device_keys: - json_result[user_id] = keys + for user_id, keys in remote_result["one_time_keys"].items(): + if user_id in device_keys: + json_result[user_id] = keys except CodeMessageException as e: failures[destination] = { "status": e.code, "message": e.message diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index f15903f862..b0885dc979 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -12,8 +12,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - - +import synapse.util.retryutils from twisted.internet import defer, reactor, protocol from twisted.internet.error import DNSLookupError from twisted.web.client import readBody, HTTPConnectionPool, Agent @@ -94,6 +93,7 @@ class MatrixFederationHttpClient(object): reactor, MatrixFederationEndpointFactory(hs), pool=pool ) self.clock = hs.get_clock() + self._store = hs.get_datastore() self.version_string = hs.version_string self._next_id = 1 @@ -106,133 +106,143 @@ class MatrixFederationHttpClient(object): def _request(self, destination, method, path, body_callback, headers_dict={}, param_bytes=b"", query_bytes=b"", retry_on_dns_fail=True, - timeout=None, long_retries=False): + timeout=None, long_retries=False, backoff_on_404=False): """ Creates and sends a request to the given server Args: destination (str): The remote server to send the HTTP request to. method (str): HTTP method path (str): The HTTP path + backoff_on_404 (bool): Back off if we get a 404 Returns: Deferred: resolves with the http response object on success. Fails with ``HTTPRequestException``: if we get an HTTP response - code >= 300. + code >= 300. + Fails with ``NotRetryingDestination`` if we are not yet ready + to retry this server. """ + limiter = yield synapse.util.retryutils.get_retry_limiter( + destination, + self.clock, + self._store, + backoff_on_404=backoff_on_404, + ) + destination = destination.encode("ascii") path_bytes = path.encode("ascii") + with limiter: + headers_dict[b"User-Agent"] = [self.version_string] + headers_dict[b"Host"] = [destination] - headers_dict[b"User-Agent"] = [self.version_string] - headers_dict[b"Host"] = [destination] + url_bytes = self._create_url( + destination, path_bytes, param_bytes, query_bytes + ) - url_bytes = self._create_url( - destination, path_bytes, param_bytes, query_bytes - ) + txn_id = "%s-O-%s" % (method, self._next_id) + self._next_id = (self._next_id + 1) % (sys.maxint - 1) - txn_id = "%s-O-%s" % (method, self._next_id) - self._next_id = (self._next_id + 1) % (sys.maxint - 1) + outbound_logger.info( + "{%s} [%s] Sending request: %s %s", + txn_id, destination, method, url_bytes + ) - outbound_logger.info( - "{%s} [%s] Sending request: %s %s", - txn_id, destination, method, url_bytes - ) + # XXX: Would be much nicer to retry only at the transaction-layer + # (once we have reliable transactions in place) + if long_retries: + retries_left = MAX_LONG_RETRIES + else: + retries_left = MAX_SHORT_RETRIES - # XXX: Would be much nicer to retry only at the transaction-layer - # (once we have reliable transactions in place) - if long_retries: - retries_left = MAX_LONG_RETRIES - else: - retries_left = MAX_SHORT_RETRIES + http_url_bytes = urlparse.urlunparse( + ("", "", path_bytes, param_bytes, query_bytes, "") + ) - http_url_bytes = urlparse.urlunparse( - ("", "", path_bytes, param_bytes, query_bytes, "") - ) + log_result = None + try: + while True: + producer = None + if body_callback: + producer = body_callback(method, http_url_bytes, headers_dict) - log_result = None - try: - while True: - producer = None - if body_callback: - producer = body_callback(method, http_url_bytes, headers_dict) + try: + def send_request(): + request_deferred = preserve_context_over_fn( + self.agent.request, + method, + url_bytes, + Headers(headers_dict), + producer + ) - try: - def send_request(): - request_deferred = preserve_context_over_fn( - self.agent.request, + return self.clock.time_bound_deferred( + request_deferred, + time_out=timeout / 1000. if timeout else 60, + ) + + response = yield preserve_context_over_fn(send_request) + + log_result = "%d %s" % (response.code, response.phrase,) + break + except Exception as e: + if not retry_on_dns_fail and isinstance(e, DNSLookupError): + logger.warn( + "DNS Lookup failed to %s with %s", + destination, + e + ) + log_result = "DNS Lookup failed to %s with %s" % ( + destination, e + ) + raise + + logger.warn( + "{%s} Sending request failed to %s: %s %s: %s - %s", + txn_id, + destination, method, url_bytes, - Headers(headers_dict), - producer + type(e).__name__, + _flatten_response_never_received(e), ) - return self.clock.time_bound_deferred( - request_deferred, - time_out=timeout / 1000. if timeout else 60, + log_result = "%s - %s" % ( + type(e).__name__, _flatten_response_never_received(e), ) - response = yield preserve_context_over_fn(send_request) + if retries_left and not timeout: + if long_retries: + delay = 4 ** (MAX_LONG_RETRIES + 1 - retries_left) + delay = min(delay, 60) + delay *= random.uniform(0.8, 1.4) + else: + delay = 0.5 * 2 ** (MAX_SHORT_RETRIES - retries_left) + delay = min(delay, 2) + delay *= random.uniform(0.8, 1.4) - log_result = "%d %s" % (response.code, response.phrase,) - break - except Exception as e: - if not retry_on_dns_fail and isinstance(e, DNSLookupError): - logger.warn( - "DNS Lookup failed to %s with %s", - destination, - e - ) - log_result = "DNS Lookup failed to %s with %s" % ( - destination, e - ) - raise - - logger.warn( - "{%s} Sending request failed to %s: %s %s: %s - %s", - txn_id, - destination, - method, - url_bytes, - type(e).__name__, - _flatten_response_never_received(e), - ) - - log_result = "%s - %s" % ( - type(e).__name__, _flatten_response_never_received(e), - ) - - if retries_left and not timeout: - if long_retries: - delay = 4 ** (MAX_LONG_RETRIES + 1 - retries_left) - delay = min(delay, 60) - delay *= random.uniform(0.8, 1.4) + yield sleep(delay) + retries_left -= 1 else: - delay = 0.5 * 2 ** (MAX_SHORT_RETRIES - retries_left) - delay = min(delay, 2) - delay *= random.uniform(0.8, 1.4) + raise + finally: + outbound_logger.info( + "{%s} [%s] Result: %s", + txn_id, + destination, + log_result, + ) - yield sleep(delay) - retries_left -= 1 - else: - raise - finally: - outbound_logger.info( - "{%s} [%s] Result: %s", - txn_id, - destination, - log_result, - ) + if 200 <= response.code < 300: + pass + else: + # :'( + # Update transactions table? + body = yield preserve_context_over_fn(readBody, response) + raise HttpResponseException( + response.code, response.phrase, body + ) - if 200 <= response.code < 300: - pass - else: - # :'( - # Update transactions table? - body = yield preserve_context_over_fn(readBody, response) - raise HttpResponseException( - response.code, response.phrase, body - ) - - defer.returnValue(response) + defer.returnValue(response) def sign_request(self, destination, method, url_bytes, headers_dict, content=None): @@ -261,7 +271,7 @@ class MatrixFederationHttpClient(object): @defer.inlineCallbacks def put_json(self, destination, path, data={}, json_data_callback=None, - long_retries=False, timeout=None): + long_retries=False, timeout=None, backoff_on_404=False): """ Sends the specifed json data using PUT Args: @@ -276,11 +286,17 @@ class MatrixFederationHttpClient(object): retry for a short or long time. timeout(int): How long to try (in ms) the destination for before giving up. None indicates no timeout. + backoff_on_404 (bool): True if we should count a 404 response as + a failure of the server (and should therefore back off future + requests) Returns: Deferred: Succeeds when we get a 2xx HTTP response. The result will be the decoded JSON body. On a 4xx or 5xx error response a CodeMessageException is raised. + + Fails with ``NotRetryingDestination`` if we are not yet ready + to retry this server. """ if not json_data_callback: @@ -303,6 +319,7 @@ class MatrixFederationHttpClient(object): headers_dict={"Content-Type": ["application/json"]}, long_retries=long_retries, timeout=timeout, + backoff_on_404=backoff_on_404, ) if 200 <= response.code < 300: @@ -332,6 +349,9 @@ class MatrixFederationHttpClient(object): Deferred: Succeeds when we get a 2xx HTTP response. The result will be the decoded JSON body. On a 4xx or 5xx error response a CodeMessageException is raised. + + Fails with ``NotRetryingDestination`` if we are not yet ready + to retry this server. """ def body_callback(method, url_bytes, headers_dict): @@ -377,6 +397,9 @@ class MatrixFederationHttpClient(object): The result of the deferred is a tuple of `(code, response)`, where `response` is a dict representing the decoded JSON body. + + Fails with ``NotRetryingDestination`` if we are not yet ready + to retry this server. """ logger.debug("get_json args: %s", args) @@ -426,6 +449,9 @@ class MatrixFederationHttpClient(object): Fails with ``HTTPRequestException`` if we get an HTTP response code >= 300 + + Fails with ``NotRetryingDestination`` if we are not yet ready + to retry this server. """ encoded_args = {} diff --git a/synapse/util/retryutils.py b/synapse/util/retryutils.py index 153ef001ad..7e5a952584 100644 --- a/synapse/util/retryutils.py +++ b/synapse/util/retryutils.py @@ -124,7 +124,13 @@ class RetryDestinationLimiter(object): def __exit__(self, exc_type, exc_val, exc_tb): valid_err_code = False - if exc_type is not None and issubclass(exc_type, CodeMessageException): + if exc_type is None: + valid_err_code = True + elif not issubclass(exc_type, Exception): + # avoid treating exceptions which don't derive from Exception as + # failures; this is mostly so as not to catch defer._DefGen. + valid_err_code = True + elif issubclass(exc_type, CodeMessageException): # Some error codes are perfectly fine for some APIs, whereas other # APIs may expect to never received e.g. a 404. It's important to # handle 404 as some remote servers will return a 404 when the HS @@ -142,11 +148,13 @@ class RetryDestinationLimiter(object): else: valid_err_code = False - if exc_type is None or valid_err_code: + if valid_err_code: # We connected successfully. if not self.retry_interval: return + logger.debug("Connection to %s was successful; clearing backoff", + self.destination) retry_last_ts = 0 self.retry_interval = 0 else: @@ -160,6 +168,10 @@ class RetryDestinationLimiter(object): else: self.retry_interval = self.min_retry_interval + logger.debug( + "Connection to %s was unsuccessful (%s(%s)); backoff now %i", + self.destination, exc_type, exc_val, self.retry_interval + ) retry_last_ts = int(self.clock.time_msec()) @defer.inlineCallbacks diff --git a/tests/handlers/test_typing.py b/tests/handlers/test_typing.py index f88d2be7c5..dbe50383da 100644 --- a/tests/handlers/test_typing.py +++ b/tests/handlers/test_typing.py @@ -192,6 +192,7 @@ class TypingNotificationsTestCase(unittest.TestCase): ), json_data_callback=ANY, long_retries=True, + backoff_on_404=True, ), defer.succeed((200, "OK")) ) @@ -263,6 +264,7 @@ class TypingNotificationsTestCase(unittest.TestCase): ), json_data_callback=ANY, long_retries=True, + backoff_on_404=True, ), defer.succeed((200, "OK")) )