From 40453b3f84c933dddb58cb3d2746091a80272546 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 10 Apr 2017 16:49:51 +0100 Subject: [PATCH 1/3] Dedupe KeyedEdu and Devices federation repl traffic --- synapse/federation/send_queue.py | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py index 748548bbe2..7d79c069f1 100644 --- a/synapse/federation/send_queue.py +++ b/synapse/federation/send_queue.py @@ -267,9 +267,12 @@ class FederationRemoteSendQueue(object): keys = self.keyed_edu_changed.keys() i = keys.bisect_right(from_token) j = keys.bisect_right(to_token) + 1 - keyed_edus = set((k, self.keyed_edu_changed[k]) for k in keys[i:j]) + # We purposefully clobber based on the key here, python dict comprehensions + # always use the last value, so this will correctly point to the last + # stream position. + keyed_edus = {self.keyed_edu_changed[k]: k for k in keys[i:j]} - for (pos, (destination, edu_key)) in keyed_edus: + for ((destination, edu_key), pos) in keyed_edus.iteritems(): rows.append((pos, KeyedEduRow( key=edu_key, edu=self.keyed_edu[(destination, edu_key)], @@ -279,7 +282,7 @@ class FederationRemoteSendQueue(object): keys = self.edus.keys() i = keys.bisect_right(from_token) j = keys.bisect_right(to_token) + 1 - edus = set((k, self.edus[k]) for k in keys[i:j]) + edus = [(k, self.edus[k]) for k in keys[i:j]] for (pos, edu) in edus: rows.append((pos, EduRow(edu))) @@ -288,7 +291,7 @@ class FederationRemoteSendQueue(object): keys = self.failures.keys() i = keys.bisect_right(from_token) j = keys.bisect_right(to_token) + 1 - failures = set((k, self.failures[k]) for k in keys[i:j]) + failures = [(k, self.failures[k]) for k in keys[i:j]] for (pos, (destination, failure)) in failures: rows.append((pos, FailureRow( @@ -300,9 +303,9 @@ class FederationRemoteSendQueue(object): keys = self.device_messages.keys() i = keys.bisect_right(from_token) j = keys.bisect_right(to_token) + 1 - device_messages = set((k, self.device_messages[k]) for k in keys[i:j]) + device_messages = {self.device_messages[k]: k for k in keys[i:j]} - for (pos, destination) in device_messages: + for (destination, pos) in device_messages.iteritems(): rows.append((pos, DeviceRow( destination=destination, ))) From 84fbb80c8f6cef2e37195e3fd321c3ddab760c1f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 10 Apr 2017 16:55:56 +0100 Subject: [PATCH 2/3] Use generators --- synapse/federation/send_queue.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py index 7d79c069f1..fcd9929990 100644 --- a/synapse/federation/send_queue.py +++ b/synapse/federation/send_queue.py @@ -282,7 +282,7 @@ class FederationRemoteSendQueue(object): keys = self.edus.keys() i = keys.bisect_right(from_token) j = keys.bisect_right(to_token) + 1 - edus = [(k, self.edus[k]) for k in keys[i:j]] + edus = ((k, self.edus[k]) for k in keys[i:j]) for (pos, edu) in edus: rows.append((pos, EduRow(edu))) @@ -291,7 +291,7 @@ class FederationRemoteSendQueue(object): keys = self.failures.keys() i = keys.bisect_right(from_token) j = keys.bisect_right(to_token) + 1 - failures = [(k, self.failures[k]) for k in keys[i:j]] + failures = ((k, self.failures[k]) for k in keys[i:j]) for (pos, (destination, failure)) in failures: rows.append((pos, FailureRow( From 26ae5178a4705f46f1eef605d6af1cbf8f7b7a85 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 12 Apr 2017 10:36:29 +0100 Subject: [PATCH 3/3] Add some comments --- synapse/federation/send_queue.py | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py index fcd9929990..b952e59518 100644 --- a/synapse/federation/send_queue.py +++ b/synapse/federation/send_queue.py @@ -383,6 +383,10 @@ class KeyedEduRow(BaseFederationRow, namedtuple("KeyedEduRow", ( "key", # tuple(str) - the edu key passed to send_edu "edu", # Edu ))): + """Streams EDUs that have an associated key that is ued to clobber. For example, + typing EDUs clobber based on room_id. + """ + TypeId = "k" @staticmethod @@ -407,6 +411,8 @@ class KeyedEduRow(BaseFederationRow, namedtuple("KeyedEduRow", ( class EduRow(BaseFederationRow, namedtuple("EduRow", ( "edu", # Edu ))): + """Streams EDUs that don't have keys. See KeyedEduRow + """ TypeId = "e" @staticmethod @@ -424,6 +430,11 @@ class FailureRow(BaseFederationRow, namedtuple("FailureRow", ( "destination", # str "failure", ))): + """Streams failures to a remote server. Failures are issued when there was + something wrong with a transaction the remote sent us, e.g. it included + an event that was invalid. + """ + TypeId = "f" @staticmethod @@ -446,6 +457,10 @@ class FailureRow(BaseFederationRow, namedtuple("FailureRow", ( class DeviceRow(BaseFederationRow, namedtuple("DeviceRow", ( "destination", # str ))): + """Streams the fact that either a) there is pending to device messages for + users on the remote, or b) a local users device has changed and needs to + be sent to the remote. + """ TypeId = "d" @staticmethod