From aede7248ab04118b83d7787547b9cf3fd615e7ad Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 21 Jul 2016 17:37:44 +0100 Subject: [PATCH 1/7] Split out a FederationReader process --- synapse/app/federation_reader.py | 200 ++++++++++++++++++++ synapse/replication/slave/storage/events.py | 5 + synapse/replication/slave/storage/keys.py | 29 +++ synapse/storage/keys.py | 4 + 4 files changed, 238 insertions(+) create mode 100644 synapse/app/federation_reader.py create mode 100644 synapse/replication/slave/storage/keys.py diff --git a/synapse/app/federation_reader.py b/synapse/app/federation_reader.py new file mode 100644 index 0000000000..98a18f9b3d --- /dev/null +++ b/synapse/app/federation_reader.py @@ -0,0 +1,200 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import synapse + +from synapse.config._base import ConfigError +from synapse.config.homeserver import HomeServerConfig +from synapse.config.logger import setup_logging +from synapse.http.site import SynapseSite +from synapse.metrics.resource import MetricsResource, METRICS_PREFIX +from synapse.replication.slave.storage._base import BaseSlavedStore +from synapse.replication.slave.storage.events import SlavedEventStore +from synapse.replication.slave.storage.keys import SlavedKeyStore +from synapse.server import HomeServer +from synapse.storage.engines import create_engine +from synapse.util.async import sleep +from synapse.util.httpresourcetree import create_resource_tree +from synapse.util.logcontext import LoggingContext +from synapse.util.manhole import manhole +from synapse.util.rlimit import change_resource_limit +from synapse.util.versionstring import get_version_string +from synapse.api.urls import FEDERATION_PREFIX +from synapse.federation.transport.server import TransportLayerServer +from synapse.crypto import context_factory + + +from twisted.internet import reactor, defer +from twisted.web.resource import Resource + +from daemonize import Daemonize + +import sys +import logging +import gc + +logger = logging.getLogger("synapse.app.federation_reader") + + +class FederationReaderSlavedStore( + SlavedEventStore, + SlavedKeyStore, + BaseSlavedStore, +): + pass + + +class FederationReaderServer(HomeServer): + def get_db_conn(self, run_new_connection=True): + # Any param beginning with cp_ is a parameter for adbapi, and should + # not be passed to the database engine. + db_params = { + k: v for k, v in self.db_config.get("args", {}).items() + if not k.startswith("cp_") + } + db_conn = self.database_engine.module.connect(**db_params) + + if run_new_connection: + self.database_engine.on_new_connection(db_conn) + return db_conn + + def setup(self): + logger.info("Setting up.") + self.datastore = FederationReaderSlavedStore(self.get_db_conn(), self) + logger.info("Finished setting up.") + + def _listen_http(self, listener_config): + port = listener_config["port"] + bind_address = listener_config.get("bind_address", "") + site_tag = listener_config.get("tag", port) + resources = {} + for res in listener_config["resources"]: + for name in res["names"]: + if name == "metrics": + resources[METRICS_PREFIX] = MetricsResource(self) + elif name == "federation": + resources.update({ + FEDERATION_PREFIX: TransportLayerServer(self), + }) + + root_resource = create_resource_tree(resources, Resource()) + reactor.listenTCP( + port, + SynapseSite( + "synapse.access.http.%s" % (site_tag,), + site_tag, + listener_config, + root_resource, + ), + interface=bind_address + ) + logger.info("Synapse federation reader now listening on port %d", port) + + def start_listening(self, listeners): + for listener in listeners: + if listener["type"] == "http": + self._listen_http(listener) + elif listener["type"] == "manhole": + reactor.listenTCP( + listener["port"], + manhole( + username="matrix", + password="rabbithole", + globals={"hs": self}, + ), + interface=listener.get("bind_address", '127.0.0.1') + ) + else: + logger.warn("Unrecognized listener type: %s", listener["type"]) + + @defer.inlineCallbacks + def replicate(self): + http_client = self.get_simple_http_client() + store = self.get_datastore() + replication_url = self.config.worker_replication_url + + while True: + try: + args = store.stream_positions() + args["timeout"] = 30000 + result = yield http_client.get_json(replication_url, args=args) + yield store.process_replication(result) + except: + logger.exception("Error replicating from %r", replication_url) + yield sleep(5) + + +def start(config_options): + try: + config = HomeServerConfig.load_config( + "Synapse federation reader", config_options + ) + except ConfigError as e: + sys.stderr.write("\n" + e.message + "\n") + sys.exit(1) + + assert config.worker_app == "synapse.app.federation_reader" + + setup_logging(config.worker_log_config, config.worker_log_file) + + database_engine = create_engine(config.database_config) + + tls_server_context_factory = context_factory.ServerContextFactory(config) + + ss = FederationReaderServer( + config.server_name, + db_config=config.database_config, + tls_server_context_factory=tls_server_context_factory, + config=config, + version_string=get_version_string("Synapse", synapse), + database_engine=database_engine, + ) + + ss.setup() + ss.get_handlers() + ss.start_listening(config.worker_listeners) + + def run(): + with LoggingContext("run"): + logger.info("Running") + change_resource_limit(config.soft_file_limit) + if config.gc_thresholds: + gc.set_threshold(*config.gc_thresholds) + reactor.run() + + def start(): + ss.get_datastore().start_profiling() + ss.replicate() + + reactor.callWhenRunning(start) + + if config.worker_daemonize: + daemon = Daemonize( + app="synapse-federation-reader", + pid=config.worker_pid_file, + action=run, + auto_close_fds=False, + verbose=True, + logger=logger, + ) + daemon.start() + else: + run() + + +if __name__ == '__main__': + with LoggingContext("main"): + start(sys.argv[1:]) diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py index 369d839464..2ba1e6b803 100644 --- a/synapse/replication/slave/storage/events.py +++ b/synapse/replication/slave/storage/events.py @@ -142,6 +142,11 @@ class SlavedEventStore(BaseSlavedStore): _get_events_around_txn = DataStore._get_events_around_txn.__func__ _get_some_state_from_cache = DataStore._get_some_state_from_cache.__func__ + get_backfill_events = DataStore.get_backfill_events.__func__ + _get_backfill_events = DataStore._get_backfill_events.__func__ + get_missing_events = DataStore.get_missing_events.__func__ + _get_missing_events = DataStore._get_missing_events.__func__ + def stream_positions(self): result = super(SlavedEventStore, self).stream_positions() result["events"] = self._stream_id_gen.get_current_token() diff --git a/synapse/replication/slave/storage/keys.py b/synapse/replication/slave/storage/keys.py new file mode 100644 index 0000000000..c1c895439d --- /dev/null +++ b/synapse/replication/slave/storage/keys.py @@ -0,0 +1,29 @@ +# -*- coding: utf-8 -*- +# Copyright 2015, 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from twisted.internet import defer + +from ._base import BaseSlavedStore +from synapse.storage import DataStore +from synapse.storage.keys import KeyStore + + +class SlavedKeyStore(BaseSlavedStore): + # TODO: use the cached version and invalidate deleted tokens + get_all_server_verify_keys = defer.inlineCallbacks(KeyStore.__dict__[ + "get_all_server_verify_keys" + ].orig) + + get_server_verify_keys = DataStore.get_server_verify_keys.__func__ diff --git a/synapse/storage/keys.py b/synapse/storage/keys.py index a495a8a7d9..1195efec08 100644 --- a/synapse/storage/keys.py +++ b/synapse/storage/keys.py @@ -22,6 +22,10 @@ import OpenSSL from signedjson.key import decode_verify_key_bytes import hashlib +import logging + +logger = logging.getLogger(__name__) + class KeyStore(SQLBaseStore): """Persistence for signature verification keys and tls X.509 certificates From 6ede23ff1b956e72b3a2864e85accb8c05fff6f0 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 27 Jul 2016 15:51:43 +0100 Subject: [PATCH 2/7] Add more key storage funcs into slave store --- synapse/replication/slave/storage/keys.py | 16 +++++++---- synapse/storage/keys.py | 34 +++++++++++------------ 2 files changed, 26 insertions(+), 24 deletions(-) diff --git a/synapse/replication/slave/storage/keys.py b/synapse/replication/slave/storage/keys.py index c1c895439d..dd2ae49e48 100644 --- a/synapse/replication/slave/storage/keys.py +++ b/synapse/replication/slave/storage/keys.py @@ -13,17 +13,21 @@ # See the License for the specific language governing permissions and # limitations under the License. -from twisted.internet import defer - from ._base import BaseSlavedStore from synapse.storage import DataStore from synapse.storage.keys import KeyStore class SlavedKeyStore(BaseSlavedStore): - # TODO: use the cached version and invalidate deleted tokens - get_all_server_verify_keys = defer.inlineCallbacks(KeyStore.__dict__[ - "get_all_server_verify_keys" - ].orig) + _get_server_verify_key = KeyStore.__dict__[ + "_get_server_verify_key" + ] get_server_verify_keys = DataStore.get_server_verify_keys.__func__ + store_server_verify_key = DataStore.store_server_verify_key.__func__ + + get_server_certificate = DataStore.get_server_certificate.__func__ + store_server_certificate = DataStore.store_server_certificate.__func__ + + get_server_keys_json = DataStore.get_server_keys_json.__func__ + store_server_keys_json = DataStore.store_server_keys_json.__func__ diff --git a/synapse/storage/keys.py b/synapse/storage/keys.py index 1195efec08..86b37b9ddd 100644 --- a/synapse/storage/keys.py +++ b/synapse/storage/keys.py @@ -78,22 +78,22 @@ class KeyStore(SQLBaseStore): ) @cachedInlineCallbacks() - def get_all_server_verify_keys(self, server_name): - rows = yield self._simple_select_list( + def _get_server_verify_key(self, server_name, key_id): + verify_key_bytes = yield self._simple_select_one_onecol( table="server_signature_keys", keyvalues={ "server_name": server_name, + "key_id": key_id, }, - retcols=["key_id", "verify_key"], - desc="get_all_server_verify_keys", + retcol="verify_key", + desc="_get_server_verify_key", + allow_none=True, ) - defer.returnValue({ - row["key_id"]: decode_verify_key_bytes( - row["key_id"], str(row["verify_key"]) - ) - for row in rows - }) + if verify_key_bytes: + defer.returnValue(decode_verify_key_bytes( + key_id, str(verify_key_bytes) + )) @defer.inlineCallbacks def get_server_verify_keys(self, server_name, key_ids): @@ -105,12 +105,12 @@ class KeyStore(SQLBaseStore): Returns: (list of VerifyKey): The verification keys. """ - keys = yield self.get_all_server_verify_keys(server_name) - defer.returnValue({ - k: keys[k] - for k in key_ids - if k in keys and keys[k] - }) + keys = {} + for key_id in key_ids: + key = yield self._get_server_verify_key(server_name, key_id) + if key: + keys[key_id] = key + defer.returnValue(keys) @defer.inlineCallbacks def store_server_verify_key(self, server_name, from_server, time_now_ms, @@ -137,8 +137,6 @@ class KeyStore(SQLBaseStore): desc="store_server_verify_key", ) - self.get_all_server_verify_keys.invalidate((server_name,)) - def store_server_keys_json(self, server_name, key_id, from_server, ts_now_ms, ts_expires_ms, key_json_bytes): """Stores the JSON bytes for a set of keys from a server From 0fcbca531f448e3cef50074404cbf7af457105f3 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 28 Jul 2016 16:36:28 +0100 Subject: [PATCH 3/7] Add get_auth_chain to slave store --- synapse/replication/slave/storage/events.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py index 2ba1e6b803..fcd0f14a6c 100644 --- a/synapse/replication/slave/storage/events.py +++ b/synapse/replication/slave/storage/events.py @@ -147,6 +147,10 @@ class SlavedEventStore(BaseSlavedStore): get_missing_events = DataStore.get_missing_events.__func__ _get_missing_events = DataStore._get_missing_events.__func__ + get_auth_chain = DataStore.get_auth_chain.__func__ + get_auth_chain_ids = DataStore.get_auth_chain_ids.__func__ + _get_auth_chain_ids_txn = DataStore._get_auth_chain_ids_txn.__func__ + def stream_positions(self): result = super(SlavedEventStore, self).stream_positions() result["events"] = self._stream_id_gen.get_current_token() From 76b89d0edb9df7c5d8b595b85ff895367631fdf2 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 28 Jul 2016 17:03:40 +0100 Subject: [PATCH 4/7] Add slace storage functions for public room list --- synapse/app/federation_reader.py | 4 ++++ .../replication/slave/storage/directory.py | 23 +++++++++++++++++++ synapse/replication/slave/storage/room.py | 21 +++++++++++++++++ 3 files changed, 48 insertions(+) create mode 100644 synapse/replication/slave/storage/directory.py create mode 100644 synapse/replication/slave/storage/room.py diff --git a/synapse/app/federation_reader.py b/synapse/app/federation_reader.py index 98a18f9b3d..2e5ba09014 100644 --- a/synapse/app/federation_reader.py +++ b/synapse/app/federation_reader.py @@ -24,6 +24,8 @@ from synapse.metrics.resource import MetricsResource, METRICS_PREFIX from synapse.replication.slave.storage._base import BaseSlavedStore from synapse.replication.slave.storage.events import SlavedEventStore from synapse.replication.slave.storage.keys import SlavedKeyStore +from synapse.replication.slave.storage.room import RoomStore +from synapse.replication.slave.storage.directory import DirectoryStore from synapse.server import HomeServer from synapse.storage.engines import create_engine from synapse.util.async import sleep @@ -52,6 +54,8 @@ logger = logging.getLogger("synapse.app.federation_reader") class FederationReaderSlavedStore( SlavedEventStore, SlavedKeyStore, + RoomStore, + DirectoryStore, BaseSlavedStore, ): pass diff --git a/synapse/replication/slave/storage/directory.py b/synapse/replication/slave/storage/directory.py new file mode 100644 index 0000000000..5fbe3a303a --- /dev/null +++ b/synapse/replication/slave/storage/directory.py @@ -0,0 +1,23 @@ +# -*- coding: utf-8 -*- +# Copyright 2015, 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ._base import BaseSlavedStore +from synapse.storage.directory import DirectoryStore + + +class DirectoryStore(BaseSlavedStore): + get_aliases_for_room = DirectoryStore.__dict__[ + "get_aliases_for_room" + ].orig diff --git a/synapse/replication/slave/storage/room.py b/synapse/replication/slave/storage/room.py new file mode 100644 index 0000000000..d5bb0f98ea --- /dev/null +++ b/synapse/replication/slave/storage/room.py @@ -0,0 +1,21 @@ +# -*- coding: utf-8 -*- +# Copyright 2015, 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ._base import BaseSlavedStore +from synapse.storage import DataStore + + +class RoomStore(BaseSlavedStore): + get_public_room_ids = DataStore.get_public_room_ids.__func__ From ec8b217722be15fe110be77c7c7909a7758202cb Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 28 Jul 2016 17:35:53 +0100 Subject: [PATCH 5/7] Add destination retry to slave store --- synapse/app/federation_reader.py | 2 ++ .../replication/slave/storage/transactions.py | 30 +++++++++++++++++++ 2 files changed, 32 insertions(+) create mode 100644 synapse/replication/slave/storage/transactions.py diff --git a/synapse/app/federation_reader.py b/synapse/app/federation_reader.py index 2e5ba09014..58d425f9ac 100644 --- a/synapse/app/federation_reader.py +++ b/synapse/app/federation_reader.py @@ -25,6 +25,7 @@ from synapse.replication.slave.storage._base import BaseSlavedStore from synapse.replication.slave.storage.events import SlavedEventStore from synapse.replication.slave.storage.keys import SlavedKeyStore from synapse.replication.slave.storage.room import RoomStore +from synapse.replication.slave.storage.transactions import TransactionStore from synapse.replication.slave.storage.directory import DirectoryStore from synapse.server import HomeServer from synapse.storage.engines import create_engine @@ -56,6 +57,7 @@ class FederationReaderSlavedStore( SlavedKeyStore, RoomStore, DirectoryStore, + TransactionStore, BaseSlavedStore, ): pass diff --git a/synapse/replication/slave/storage/transactions.py b/synapse/replication/slave/storage/transactions.py new file mode 100644 index 0000000000..6f2ba98af5 --- /dev/null +++ b/synapse/replication/slave/storage/transactions.py @@ -0,0 +1,30 @@ +# -*- coding: utf-8 -*- +# Copyright 2015, 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from twisted.internet import defer +from ._base import BaseSlavedStore +from synapse.storage import DataStore +from synapse.storage.transactions import TransactionStore + + +class TransactionStore(BaseSlavedStore): + get_destination_retry_timings = TransactionStore.__dict__[ + "get_destination_retry_timings" + ].orig + _get_destination_retry_timings = DataStore._get_destination_retry_timings.__func__ + + # For now, don't record the destination rety timings + def set_destination_retry_timings(*args, **kwargs): + return defer.succeed(None) From 74106ba17177db837bea06c35b39dbf1adc75648 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 29 Jul 2016 11:45:03 +0100 Subject: [PATCH 6/7] Make jenkins dendron test federation read apis --- jenkins-dendron-postgres.sh | 1 + 1 file changed, 1 insertion(+) diff --git a/jenkins-dendron-postgres.sh b/jenkins-dendron-postgres.sh index 50268e0982..9e3b2df9cf 100755 --- a/jenkins-dendron-postgres.sh +++ b/jenkins-dendron-postgres.sh @@ -82,6 +82,7 @@ echo >&2 "Running sytest with PostgreSQL"; --dendron $WORKSPACE/dendron/bin/dendron \ --pusher \ --synchrotron \ + --federation-reader \ --port-range ${PORT_BASE}:$((PORT_BASE+PORT_COUNT-1)) cd .. From 55e8a8788895b0c6b6b5a27d153f6d9e7e21d68b Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 2 Aug 2016 13:41:17 +0100 Subject: [PATCH 7/7] Change default jenkins port base and count --- jenkins-dendron-postgres.sh | 4 ++-- jenkins-postgres.sh | 4 ++-- jenkins-sqlite.sh | 5 +++-- 3 files changed, 7 insertions(+), 6 deletions(-) diff --git a/jenkins-dendron-postgres.sh b/jenkins-dendron-postgres.sh index 9e3b2df9cf..f715cd559a 100755 --- a/jenkins-dendron-postgres.sh +++ b/jenkins-dendron-postgres.sh @@ -69,8 +69,8 @@ cd sytest git checkout "${GIT_BRANCH}" || (echo >&2 "No ref ${GIT_BRANCH} found, falling back to develop" ; git checkout develop) -: ${PORT_BASE:=8000} -: ${PORT_COUNT=20} +: ${PORT_BASE:=20000} +: ${PORT_COUNT=100} ./jenkins/prep_sytest_for_postgres.sh diff --git a/jenkins-postgres.sh b/jenkins-postgres.sh index 2f0768fcb7..7a43df0d58 100755 --- a/jenkins-postgres.sh +++ b/jenkins-postgres.sh @@ -43,8 +43,8 @@ cd sytest git checkout "${GIT_BRANCH}" || (echo >&2 "No ref ${GIT_BRANCH} found, falling back to develop" ; git checkout develop) -: ${PORT_BASE:=8000} -: ${PORT_COUNT=20} +: ${PORT_BASE:=20000} +: ${PORT_COUNT=100} ./jenkins/prep_sytest_for_postgres.sh diff --git a/jenkins-sqlite.sh b/jenkins-sqlite.sh index da603c5af8..27e61af6ee 100755 --- a/jenkins-sqlite.sh +++ b/jenkins-sqlite.sh @@ -41,8 +41,9 @@ cd sytest git checkout "${GIT_BRANCH}" || (echo >&2 "No ref ${GIT_BRANCH} found, falling back to develop" ; git checkout develop) -: ${PORT_COUNT=20} -: ${PORT_BASE:=8000} +: ${PORT_BASE:=20000} +: ${PORT_COUNT=100} + ./jenkins/install_and_run.sh --coverage \ --python $TOX_BIN/python \ --synapse-directory $WORKSPACE \