diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py index 95fd20e434..748548bbe2 100644 --- a/synapse/federation/send_queue.py +++ b/synapse/federation/send_queue.py @@ -477,12 +477,12 @@ ParsedFederationStreamData = namedtuple("ParsedFederationStreamData", ( )) -def process_rows_for_federation(federation_sender, rows): +def process_rows_for_federation(transaction_queue, rows): """Parse a list of rows from the federation stream and put them in the transaction queue ready for sending to the relevant homeservers. Args: - federation_sender (TransactionQueue) + transaction_queue (TransactionQueue) rows (list(synapse.replication.tcp.streams.FederationStreamRow)) """ @@ -509,23 +509,23 @@ def process_rows_for_federation(federation_sender, rows): parsed_row.add_to_buffer(buff) for destination, states in buff.presence.iteritems(): - federation_sender.send_presence(destination, states) + transaction_queue.send_presence(destination, states) for destination, edu_map in buff.keyed_edus.iteritems(): for key, edu in edu_map.items(): - federation_sender.send_edu( + transaction_queue.send_edu( edu.destination, edu.edu_type, edu.content, key=key, ) for destination, edu_list in buff.edus.iteritems(): for edu in edu_list: - federation_sender.send_edu( + transaction_queue.send_edu( edu.destination, edu.edu_type, edu.content, key=None, ) for destination, failure_list in buff.failures.iteritems(): for failure in failure_list: - federation_sender.send_failure(destination, failure) + transaction_queue.send_failure(destination, failure) for destination in buff.device_destinations: - federation_sender.send_device_messages(destination) + transaction_queue.send_device_messages(destination)