diff --git a/synapse/notifier.py b/synapse/notifier.py index 78eb28e4b2..e16a4608e9 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -305,14 +305,16 @@ class Notifier(object): ) @defer.inlineCallbacks - def wait_for_events(self, user, rooms, filter, timeout, callback): + def wait_for_events(self, user, rooms, timeout, callback, + from_token=StreamToken("s0", "0", "0")): """Wait until the callback returns a non empty response or the timeout fires. """ deferred = defer.Deferred() - - from_token = StreamToken("s0", "0", "0") + appservice = yield self.hs.get_datastore().get_app_service_by_user_id( + user.to_string() + ) listener = [_NotificationListener( user=user, @@ -321,6 +323,7 @@ class Notifier(object): limit=1, timeout=timeout, deferred=deferred, + appservice=appservice, )] if timeout: @@ -363,65 +366,43 @@ class Notifier(object): defer.returnValue(result) + @defer.inlineCallbacks def get_events_for(self, user, rooms, pagination_config, timeout): """ For the given user and rooms, return any new events for them. If there are no new events wait for up to `timeout` milliseconds for any new events to happen before returning. """ - deferred = defer.Deferred() - - self._get_events( - deferred, user, rooms, pagination_config.from_token, - pagination_config.limit, timeout - ).addErrback(deferred.errback) - - return deferred - - @defer.inlineCallbacks - def _get_events(self, deferred, user, rooms, from_token, limit, timeout): + from_token = pagination_config.from_token if not from_token: from_token = yield self.event_sources.get_current_token() - appservice = yield self.hs.get_datastore().get_app_service_by_user_id( - user.to_string() - ) + limit = pagination_config.limit - listener = _NotificationListener( - user, - rooms, - from_token, - limit, - timeout, - deferred, - appservice=appservice - ) - - def _timeout_listener(): - # TODO (erikj): We should probably set to_token to the current - # max rather than reusing from_token. - # Remove the timer from the listener so we don't try to cancel it. - listener.timer = None - listener.notify( - self, - [], - listener.from_token, - listener.from_token, - ) - - if timeout: - self._register_with_keys(listener) - - yield self._check_for_updates(listener) - - if not timeout: - _timeout_listener() - else: - # Only add the timer if the listener hasn't been notified - if not listener.notified(): - listener.timer = self.clock.call_later( - timeout/1000.0, _timeout_listener + @defer.inlineCallbacks + def check_for_updates(): + events = [] + end_token = from_token + for name, source in self.event_sources.sources.items(): + keyname = "%s_key" % name + stuff, new_key = yield source.get_new_events_for_user( + user, getattr(from_token, keyname), limit, ) - return + events.extend(stuff) + end_token = from_token.copy_and_replace(keyname, new_key) + + if events: + defer.returnValue((events, (from_token, end_token))) + else: + defer.returnValue(None) + + result = yield self.wait_for_events( + user, rooms, timeout, check_for_updates, from_token=from_token + ) + + if result is None: + result = ([], (from_token, from_token)) + + defer.returnValue(result) @log_function def _register_with_keys(self, listener): @@ -436,36 +417,6 @@ class Notifier(object): listener.appservice, set() ).add(listener) - @defer.inlineCallbacks - @log_function - def _check_for_updates(self, listener): - # TODO (erikj): We need to think about limits across multiple sources - events = [] - - from_token = listener.from_token - limit = listener.limit - - # TODO (erikj): DeferredList? - for name, source in self.event_sources.sources.items(): - keyname = "%s_key" % name - - stuff, new_key = yield source.get_new_events_for_user( - listener.user, - getattr(from_token, keyname), - limit, - ) - - events.extend(stuff) - - from_token = from_token.copy_and_replace(keyname, new_key) - - end_token = from_token - - if events: - listener.notify(self, events, listener.from_token, end_token) - - defer.returnValue(listener) - def _user_joined_room(self, user, room_id): new_listeners = self.user_to_listeners.get(user, set())