Wait for events if the incremental sync is empty and a timeout is given

This commit is contained in:
Mark Haines 2015-01-27 20:09:52 +00:00
parent a56008842b
commit b19cf6a105
3 changed files with 61 additions and 8 deletions

View file

@ -16,7 +16,7 @@ if [ $# -eq 1 ]; then
fi fi
fi fi
for port in 8080 8081 8082; do for port in 8080; do
echo "Starting server on port $port... " echo "Starting server on port $port... "
https_port=$((port + 400)) https_port=$((port + 400))

View file

@ -72,6 +72,7 @@ class SyncHandler(BaseHandler):
self.event_sources = hs.get_event_sources() self.event_sources = hs.get_event_sources()
self.clock = hs.get_clock() self.clock = hs.get_clock()
@defer.inlineCallbacks
def wait_for_sync_for_user(self, sync_config, since_token=None, timeout=0): def wait_for_sync_for_user(self, sync_config, since_token=None, timeout=0):
"""Get the sync for a client if we have new data for it now. Otherwise """Get the sync for a client if we have new data for it now. Otherwise
wait for new data to arrive on the server. If the timeout expires, then wait for new data to arrive on the server. If the timeout expires, then
@ -80,15 +81,19 @@ class SyncHandler(BaseHandler):
A Deferred SyncResult. A Deferred SyncResult.
""" """
if timeout == 0 or since_token is None: if timeout == 0 or since_token is None:
return self.current_sync_for_user(sync_config, since_token) result = yield self.current_sync_for_user(sync_config, since_token)
defer.returnValue(result)
else: else:
def current_sync_callback(since_token): def current_sync_callback():
return self.current_sync_for_user( return self.current_sync_for_user(sync_config, since_token)
self, since_token, sync_config
) rm_handler = self.hs.get_handlers().room_member_handler
return self.notifier.wait_for_events( room_ids = yield rm_handler.get_rooms_for_user(sync_config.user)
sync_config.filter, since_token, current_sync_callback result = yield self.notifier.wait_for_events(
sync_config.user, room_ids,
sync_config.filter, timeout, current_sync_callback
) )
defer.returnValue(result)
def current_sync_for_user(self, sync_config, since_token=None): def current_sync_for_user(self, sync_config, since_token=None):
"""Get the sync for client needed to match what the server has now. """Get the sync for client needed to match what the server has now.

View file

@ -18,6 +18,7 @@ from twisted.internet import defer
from synapse.util.logutils import log_function from synapse.util.logutils import log_function
from synapse.util.logcontext import PreserveLoggingContext from synapse.util.logcontext import PreserveLoggingContext
from synapse.util.async import run_on_reactor from synapse.util.async import run_on_reactor
from synapse.types import StreamToken
import logging import logging
@ -205,6 +206,53 @@ class Notifier(object):
[notify(l).addErrback(eb) for l in listeners] [notify(l).addErrback(eb) for l in listeners]
) )
@defer.inlineCallbacks
def wait_for_events(self, user, rooms, filter, timeout, callback):
"""Wait until the callback returns a non empty response or the
timeout fires.
"""
deferred = defer.Deferred()
from_token=StreamToken("s0","0","0")
listener = [_NotificationListener(
user=user,
rooms=rooms,
from_token=from_token,
limit=1,
timeout=timeout,
deferred=deferred,
)]
if timeout:
self._register_with_keys(listener[0])
result = yield callback()
if timeout:
timed_out = [False]
def _timeout_listener():
timed_out[0] = True
listener[0].notify(self, [], from_token, from_token)
self.clock.call_later(timeout/1000., _timeout_listener)
while not result and not timed_out[0]:
yield deferred
deferred = defer.Deferred()
listener[0] = _NotificationListener(
user=user,
rooms=rooms,
from_token=from_token,
limit=1,
timeout=timeout,
deferred=deferred,
)
self._register_with_keys(listener[0])
result = yield callback()
defer.returnValue(result)
def get_events_for(self, user, rooms, pagination_config, timeout): def get_events_for(self, user, rooms, pagination_config, timeout):
""" For the given user and rooms, return any new events for them. If """ For the given user and rooms, return any new events for them. If
there are no new events wait for up to `timeout` milliseconds for any there are no new events wait for up to `timeout` milliseconds for any