Define the concept of a 'federation Query'; creating API for making and handling Queries on the Federation's increasingly-inaccurately-named ReplicationLayer

This commit is contained in:
Paul "LeoNerd" Evans 2014-08-13 16:55:53 +01:00
parent 1ddae5f40b
commit 827de7cee9
3 changed files with 113 additions and 0 deletions

View file

@ -66,6 +66,7 @@ class ReplicationLayer(object):
self.handler = None self.handler = None
self.edu_handlers = {} self.edu_handlers = {}
self.query_handlers = {}
self._order = 0 self._order = 0
@ -84,6 +85,27 @@ class ReplicationLayer(object):
self.edu_handlers[edu_type] = handler self.edu_handlers[edu_type] = handler
def register_query_handler(self, query_type, handler):
"""Sets the handler callable that will be used to handle an incoming
federation Query of the given type.
Args:
query_type (str): Category name of the query, which should match
the string used by make_query.
handler (callable): Invoked to handle incoming queries of this type
handler is invoked as:
result = handler(args)
where 'args' is a dict mapping strings to strings of the query
arguments. It should return a Deferred that will eventually yield an
object to encode as JSON.
"""
if query_type in self.query_handlers:
raise KeyError("Already have a Query handler for %s" % (query_type))
self.query_handlers[query_type] = handler
@defer.inlineCallbacks @defer.inlineCallbacks
@log_function @log_function
def send_pdu(self, pdu): def send_pdu(self, pdu):
@ -137,6 +159,24 @@ class ReplicationLayer(object):
# TODO, add errback, etc. # TODO, add errback, etc.
self._transaction_queue.enqueue_edu(edu) self._transaction_queue.enqueue_edu(edu)
@log_function
def make_query(self, destination, query_type, args):
"""Sends a federation Query to a remote homeserver of the given type
and arguments.
Args:
destination (str): Domain name of the remote homeserver
query_type (str): Category of the query type; should match the
handler name used in register_query_handler().
args (dict): Mapping of strings to strings containing the details
of the query request.
Returns:
a Deferred which will eventually yield a JSON object from the
response
"""
return self.transport_layer.make_query(destination, query_type, args)
@defer.inlineCallbacks @defer.inlineCallbacks
@log_function @log_function
def paginate(self, dest, context, limit): def paginate(self, dest, context, limit):
@ -340,6 +380,16 @@ class ReplicationLayer(object):
(200, self._transaction_from_pdus(response).get_dict()) (200, self._transaction_from_pdus(response).get_dict())
) )
@defer.inlineCallbacks
def on_query_request(self, query_type, args):
if query_type in self.query_handlers:
response = yield self.query_handlers[query_type](args)
defer.returnValue((200, response))
else:
defer.returnValue((404, "No handler for Query type '%s'"
% (query_type)
))
@defer.inlineCallbacks @defer.inlineCallbacks
@log_function @log_function
def _get_persisted_pdu(self, pdu_id, pdu_origin): def _get_persisted_pdu(self, pdu_id, pdu_origin):

View file

@ -180,6 +180,19 @@ class TransportLayer(object):
defer.returnValue((code, response)) defer.returnValue((code, response))
@defer.inlineCallbacks
@log_function
def make_query(self, destination, query_type, args):
path = PREFIX + "/query/%s" % query_type
response = yield self.client.get_json(
destination=destination,
path=path,
args=args
)
defer.returnValue(response)
@log_function @log_function
def register_received_handler(self, handler): def register_received_handler(self, handler):
""" Register a handler that will be fired when we receive data. """ Register a handler that will be fired when we receive data.
@ -251,6 +264,15 @@ class TransportLayer(object):
lambda request, context: handler.on_context_pdus_request(context) lambda request, context: handler.on_context_pdus_request(context)
) )
# This is when we receive a server-server Query
self.server.register_path(
"GET",
re.compile("^" + PREFIX + "/query/([^/]*)$"),
lambda request, query_type: handler.on_query_request(
query_type, {k: v[0] for k, v in request.args.items()}
)
)
@defer.inlineCallbacks @defer.inlineCallbacks
@log_function @log_function
def _on_send_request(self, request, transaction_id): def _on_send_request(self, request, transaction_id):
@ -456,3 +478,6 @@ class TransportRequestHandler(object):
of what went wrong. of what went wrong.
""" """
pass pass
def on_query_request(self):
""" Called on a GET /query/<query_type> request. """

View file

@ -62,6 +62,7 @@ class FederationTestCase(unittest.TestCase):
def setUp(self): def setUp(self):
self.mock_http_server = MockHttpServer() self.mock_http_server = MockHttpServer()
self.mock_http_client = Mock(spec=[ self.mock_http_client = Mock(spec=[
"get_json",
"put_json", "put_json",
]) ])
self.mock_persistence = Mock(spec=[ self.mock_persistence = Mock(spec=[
@ -253,3 +254,40 @@ class FederationTestCase(unittest.TestCase):
recv_observer.assert_called_with( recv_observer.assert_called_with(
"remote", {"testing": "reply here"} "remote", {"testing": "reply here"}
) )
@defer.inlineCallbacks
def test_send_query(self):
self.mock_http_client.get_json.return_value = defer.succeed(
{"your": "response"}
)
response = yield self.federation.make_query(
destination="remote",
query_type="a-question",
args={"one": "1", "two": "2"}
)
self.assertEquals({"your": "response"}, response)
self.mock_http_client.get_json.assert_called_with(
destination="remote",
path="/matrix/federation/v1/query/a-question",
args={"one": "1", "two": "2"}
)
@defer.inlineCallbacks
def test_recv_query(self):
recv_handler = Mock()
recv_handler.return_value = defer.succeed({"another": "response"})
self.federation.register_query_handler("a-question", recv_handler)
code, response = yield self.mock_http_server.trigger("GET",
"/matrix/federation/v1/query/a-question?three=3&four=4", None)
self.assertEquals(200, code)
self.assertEquals({"another": "response"}, response)
recv_handler.assert_called_with(
{"three": "3", "four": "4"}
)