Use federation handler function rather than duplicate

This involves renaming _persist_events to be a public function.
This commit is contained in:
Erik Johnston 2018-08-15 14:17:18 +01:00
parent 773db62a22
commit 488ffe6fdb
2 changed files with 10 additions and 48 deletions

View file

@ -1175,7 +1175,7 @@ class FederationHandler(BaseHandler):
) )
context = yield self.state_handler.compute_event_context(event) context = yield self.state_handler.compute_event_context(event)
yield self._persist_events([(event, context)]) yield self.persist_events_and_notify([(event, context)])
defer.returnValue(event) defer.returnValue(event)
@ -1206,7 +1206,7 @@ class FederationHandler(BaseHandler):
) )
context = yield self.state_handler.compute_event_context(event) context = yield self.state_handler.compute_event_context(event)
yield self._persist_events([(event, context)]) yield self.persist_events_and_notify([(event, context)])
defer.returnValue(event) defer.returnValue(event)
@ -1449,7 +1449,7 @@ class FederationHandler(BaseHandler):
event, context event, context
) )
yield self._persist_events( yield self.persist_events_and_notify(
[(event, context)], [(event, context)],
backfilled=backfilled, backfilled=backfilled,
) )
@ -1487,7 +1487,7 @@ class FederationHandler(BaseHandler):
], consumeErrors=True, ], consumeErrors=True,
)) ))
yield self._persist_events( yield self.persist_events_and_notify(
[ [
(ev_info["event"], context) (ev_info["event"], context)
for ev_info, context in zip(event_infos, contexts) for ev_info, context in zip(event_infos, contexts)
@ -1575,7 +1575,7 @@ class FederationHandler(BaseHandler):
raise raise
events_to_context[e.event_id].rejected = RejectedReason.AUTH_ERROR events_to_context[e.event_id].rejected = RejectedReason.AUTH_ERROR
yield self._persist_events( yield self.persist_events_and_notify(
[ [
(e, events_to_context[e.event_id]) (e, events_to_context[e.event_id])
for e in itertools.chain(auth_events, state) for e in itertools.chain(auth_events, state)
@ -1586,7 +1586,7 @@ class FederationHandler(BaseHandler):
event, old_state=state event, old_state=state
) )
yield self._persist_events( yield self.persist_events_and_notify(
[(event, new_event_context)], [(event, new_event_context)],
) )
@ -2327,7 +2327,7 @@ class FederationHandler(BaseHandler):
raise AuthError(403, "Third party certificate was invalid") raise AuthError(403, "Third party certificate was invalid")
@defer.inlineCallbacks @defer.inlineCallbacks
def _persist_events(self, event_and_contexts, backfilled=False): def persist_events_and_notify(self, event_and_contexts, backfilled=False):
"""Persists events and tells the notifier/pushers about them, if """Persists events and tells the notifier/pushers about them, if
necessary. necessary.

View file

@ -17,13 +17,10 @@ import logging
from twisted.internet import defer from twisted.internet import defer
from synapse.api.constants import EventTypes, Membership
from synapse.events import FrozenEvent from synapse.events import FrozenEvent
from synapse.events.snapshot import EventContext from synapse.events.snapshot import EventContext
from synapse.http.servlet import parse_json_object_from_request from synapse.http.servlet import parse_json_object_from_request
from synapse.replication.http._base import ReplicationEndpoint from synapse.replication.http._base import ReplicationEndpoint
from synapse.types import UserID
from synapse.util.logcontext import run_in_background
from synapse.util.metrics import Measure from synapse.util.metrics import Measure
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -55,9 +52,7 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
self.store = hs.get_datastore() self.store = hs.get_datastore()
self.clock = hs.get_clock() self.clock = hs.get_clock()
self.is_mine_id = hs.is_mine_id self.federation_handler = hs.get_handlers().federation_handler
self.notifier = hs.get_notifier()
self.pusher_pool = hs.get_pusherpool()
@staticmethod @staticmethod
@defer.inlineCallbacks @defer.inlineCallbacks
@ -114,45 +109,12 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
len(event_and_contexts), len(event_and_contexts),
) )
max_stream_id = yield self.store.persist_events( yield self.federation_handler.persist_events_and_notify(
event_and_contexts, event_and_contexts, backfilled,
backfilled=backfilled
) )
if not backfilled:
for event, _ in event_and_contexts:
self._notify_persisted_event(event, max_stream_id)
defer.returnValue((200, {})) defer.returnValue((200, {}))
def _notify_persisted_event(self, event, max_stream_id):
extra_users = []
if event.type == EventTypes.Member:
target_user_id = event.state_key
# We notify for memberships if its an invite for one of our
# users
if event.internal_metadata.is_outlier():
if event.membership != Membership.INVITE:
if not self.is_mine_id(target_user_id):
return
target_user = UserID.from_string(target_user_id)
extra_users.append(target_user)
elif event.internal_metadata.is_outlier():
return
event_stream_id = event.internal_metadata.stream_ordering
self.notifier.on_new_room_event(
event, event_stream_id, max_stream_id,
extra_users=extra_users
)
run_in_background(
self.pusher_pool.on_new_notifications,
event_stream_id, max_stream_id,
)
class ReplicationFederationSendEduRestServlet(ReplicationEndpoint): class ReplicationFederationSendEduRestServlet(ReplicationEndpoint):
"""Handles EDUs newly received from federation, including persisting and """Handles EDUs newly received from federation, including persisting and