Split replication layer into two

This commit is contained in:
Erik Johnston 2018-03-12 14:34:31 +00:00
parent e05bf34117
commit 265b993b8a
12 changed files with 19 additions and 27 deletions

View file

@ -348,7 +348,7 @@ def setup(config_options):
hs.get_state_handler().start_caching() hs.get_state_handler().start_caching()
hs.get_datastore().start_profiling() hs.get_datastore().start_profiling()
hs.get_datastore().start_doing_background_updates() hs.get_datastore().start_doing_background_updates()
hs.get_replication_layer().start_get_pdu_cache() hs.get_replication_client().start_get_pdu_cache()
register_memory_metrics(hs) register_memory_metrics(hs)

View file

@ -54,27 +54,19 @@ class FederationServer(FederationBase):
super(FederationServer, self).__init__(hs) super(FederationServer, self).__init__(hs)
self.auth = hs.get_auth() self.auth = hs.get_auth()
self.handler = hs.get_handlers().federation_handler
self._server_linearizer = async.Linearizer("fed_server") self._server_linearizer = async.Linearizer("fed_server")
self._transaction_linearizer = async.Linearizer("fed_txn_handler") self._transaction_linearizer = async.Linearizer("fed_txn_handler")
self.transaction_actions = TransactionActions(self.store) self.transaction_actions = TransactionActions(self.store)
self.handler = None
self.registry = hs.get_federation_registry() self.registry = hs.get_federation_registry()
# We cache responses to state queries, as they take a while and often # We cache responses to state queries, as they take a while and often
# come in waves. # come in waves.
self._state_resp_cache = ResponseCache(hs, timeout_ms=30000) self._state_resp_cache = ResponseCache(hs, timeout_ms=30000)
def set_handler(self, handler):
"""Sets the handler that the replication layer will use to communicate
receipt of new PDUs from other home servers. The required methods are
documented on :py:class:`.ReplicationHandler`.
"""
self.handler = handler
@defer.inlineCallbacks @defer.inlineCallbacks
@log_function @log_function
def on_backfill_request(self, origin, room_id, versions, limit): def on_backfill_request(self, origin, room_id, versions, limit):

View file

@ -1190,7 +1190,7 @@ GROUP_ATTESTATION_SERVLET_CLASSES = (
def register_servlets(hs, resource, authenticator, ratelimiter): def register_servlets(hs, resource, authenticator, ratelimiter):
for servletclass in FEDERATION_SERVLET_CLASSES: for servletclass in FEDERATION_SERVLET_CLASSES:
servletclass( servletclass(
handler=hs.get_replication_layer(), handler=hs.get_replication_server(),
authenticator=authenticator, authenticator=authenticator,
ratelimiter=ratelimiter, ratelimiter=ratelimiter,
server_name=hs.hostname, server_name=hs.hostname,

View file

@ -37,7 +37,6 @@ class DeviceHandler(BaseHandler):
self.state = hs.get_state_handler() self.state = hs.get_state_handler()
self._auth_handler = hs.get_auth_handler() self._auth_handler = hs.get_auth_handler()
self.federation_sender = hs.get_federation_sender() self.federation_sender = hs.get_federation_sender()
self.federation = hs.get_replication_layer()
self._edu_updater = DeviceListEduUpdater(hs, self) self._edu_updater = DeviceListEduUpdater(hs, self)
@ -432,7 +431,7 @@ class DeviceListEduUpdater(object):
def __init__(self, hs, device_handler): def __init__(self, hs, device_handler):
self.store = hs.get_datastore() self.store = hs.get_datastore()
self.federation = hs.get_replication_layer() self.federation = hs.get_replication_client()
self.clock = hs.get_clock() self.clock = hs.get_clock()
self.device_handler = device_handler self.device_handler = device_handler

View file

@ -36,7 +36,7 @@ class DirectoryHandler(BaseHandler):
self.appservice_handler = hs.get_application_service_handler() self.appservice_handler = hs.get_application_service_handler()
self.event_creation_handler = hs.get_event_creation_handler() self.event_creation_handler = hs.get_event_creation_handler()
self.federation = hs.get_replication_layer() self.federation = hs.get_replication_client()
hs.get_federation_registry().register_query_handler( hs.get_federation_registry().register_query_handler(
"directory", self.on_directory_query "directory", self.on_directory_query
) )

View file

@ -32,7 +32,7 @@ logger = logging.getLogger(__name__)
class E2eKeysHandler(object): class E2eKeysHandler(object):
def __init__(self, hs): def __init__(self, hs):
self.store = hs.get_datastore() self.store = hs.get_datastore()
self.federation = hs.get_replication_layer() self.federation = hs.get_replication_client()
self.device_handler = hs.get_device_handler() self.device_handler = hs.get_device_handler()
self.is_mine = hs.is_mine self.is_mine = hs.is_mine
self.clock = hs.get_clock() self.clock = hs.get_clock()

View file

@ -68,7 +68,7 @@ class FederationHandler(BaseHandler):
self.hs = hs self.hs = hs
self.store = hs.get_datastore() self.store = hs.get_datastore()
self.replication_layer = hs.get_replication_layer() self.replication_layer = hs.get_replication_client()
self.state_handler = hs.get_state_handler() self.state_handler = hs.get_state_handler()
self.server_name = hs.hostname self.server_name = hs.hostname
self.keyring = hs.get_keyring() self.keyring = hs.get_keyring()
@ -78,8 +78,6 @@ class FederationHandler(BaseHandler):
self.spam_checker = hs.get_spam_checker() self.spam_checker = hs.get_spam_checker()
self.event_creation_handler = hs.get_event_creation_handler() self.event_creation_handler = hs.get_event_creation_handler()
self.replication_layer.set_handler(self)
# When joining a room we need to queue any events for that room up # When joining a room we need to queue any events for that room up
self.room_queues = {} self.room_queues = {}
self._room_pdu_linearizer = Linearizer("fed_room_pdu") self._room_pdu_linearizer = Linearizer("fed_room_pdu")

View file

@ -93,7 +93,6 @@ class PresenceHandler(object):
self.store = hs.get_datastore() self.store = hs.get_datastore()
self.wheel_timer = WheelTimer() self.wheel_timer = WheelTimer()
self.notifier = hs.get_notifier() self.notifier = hs.get_notifier()
self.replication = hs.get_replication_layer()
self.federation = hs.get_federation_sender() self.federation = hs.get_federation_sender()
self.state = hs.get_state_handler() self.state = hs.get_state_handler()

View file

@ -31,7 +31,7 @@ class ProfileHandler(BaseHandler):
def __init__(self, hs): def __init__(self, hs):
super(ProfileHandler, self).__init__(hs) super(ProfileHandler, self).__init__(hs)
self.federation = hs.get_replication_layer() self.federation = hs.get_replication_client()
hs.get_federation_registry().register_query_handler( hs.get_federation_registry().register_query_handler(
"profile", self.on_profile_query "profile", self.on_profile_query
) )

View file

@ -409,7 +409,7 @@ class RoomListHandler(BaseHandler):
def _get_remote_list_cached(self, server_name, limit=None, since_token=None, def _get_remote_list_cached(self, server_name, limit=None, since_token=None,
search_filter=None, include_all_networks=False, search_filter=None, include_all_networks=False,
third_party_instance_id=None,): third_party_instance_id=None,):
repl_layer = self.hs.get_replication_layer() repl_layer = self.hs.get_replication_client()
if search_filter: if search_filter:
# We can't cache when asking for search # We can't cache when asking for search
return repl_layer.get_public_rooms( return repl_layer.get_public_rooms(

View file

@ -55,7 +55,6 @@ class RoomMemberHandler(object):
self.registration_handler = hs.get_handlers().registration_handler self.registration_handler = hs.get_handlers().registration_handler
self.profile_handler = hs.get_profile_handler() self.profile_handler = hs.get_profile_handler()
self.event_creation_hander = hs.get_event_creation_handler() self.event_creation_hander = hs.get_event_creation_handler()
self.replication_layer = hs.get_replication_layer()
self.member_linearizer = Linearizer(name="member") self.member_linearizer = Linearizer(name="member")
@ -212,7 +211,7 @@ class RoomMemberHandler(object):
# if this is a join with a 3pid signature, we may need to turn a 3pid # if this is a join with a 3pid signature, we may need to turn a 3pid
# invite into a normal invite before we can handle the join. # invite into a normal invite before we can handle the join.
if third_party_signed is not None: if third_party_signed is not None:
yield self.replication_layer.exchange_third_party_invite( yield self.federation_handler.exchange_third_party_invite(
third_party_signed["sender"], third_party_signed["sender"],
target.to_string(), target.to_string(),
room_id, room_id,

View file

@ -32,7 +32,8 @@ from synapse.appservice.scheduler import ApplicationServiceScheduler
from synapse.crypto.keyring import Keyring from synapse.crypto.keyring import Keyring
from synapse.events.builder import EventBuilderFactory from synapse.events.builder import EventBuilderFactory
from synapse.events.spamcheck import SpamChecker from synapse.events.spamcheck import SpamChecker
from synapse.federation import initialize_http_replication from synapse.federation.federation_client import FederationClient
from synapse.federation.federation_server import FederationServer
from synapse.federation.send_queue import FederationRemoteSendQueue from synapse.federation.send_queue import FederationRemoteSendQueue
from synapse.federation.federation_server import FederationHandlerRegistry from synapse.federation.federation_server import FederationHandlerRegistry
from synapse.federation.transport.client import TransportLayerClient from synapse.federation.transport.client import TransportLayerClient
@ -100,7 +101,8 @@ class HomeServer(object):
DEPENDENCIES = [ DEPENDENCIES = [
'http_client', 'http_client',
'db_pool', 'db_pool',
'replication_layer', 'replication_client',
'replication_server',
'handlers', 'handlers',
'v1auth', 'v1auth',
'auth', 'auth',
@ -197,8 +199,11 @@ class HomeServer(object):
def get_ratelimiter(self): def get_ratelimiter(self):
return self.ratelimiter return self.ratelimiter
def build_replication_layer(self): def build_replication_client(self):
return initialize_http_replication(self) return FederationClient(self)
def build_replication_server(self):
return FederationServer(self)
def build_handlers(self): def build_handlers(self):
return Handlers(self) return Handlers(self)