diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 8831d83c56..0a061fe9b2 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -217,55 +217,20 @@ class PresenceHandler(BaseHandler): user_id, UserPresenceState.default(user_id) ) - # If the users are ours then we want to set up a bunch of timers - # to time things out. - if self.hs.is_mine_id(user_id): - if new_state.state == PresenceState.ONLINE: - # Idle timer - self.wheel_timer.insert( - now=now, - obj=user_id, - then=new_state.last_active_ts + IDLE_TIMER - ) - - if new_state.state != PresenceState.OFFLINE: - # User has stopped syncing - self.wheel_timer.insert( - now=now, - obj=user_id, - then=new_state.last_user_sync_ts + SYNC_ONLINE_TIMEOUT - ) - - last_federate = new_state.last_federation_update_ts - if now - last_federate > FEDERATION_PING_INTERVAL: - # Been a while since we've poked remote servers - new_state = new_state.copy_and_replace( - last_federation_update_ts=now, - ) - to_federation_ping[user_id] = new_state - - else: - self.wheel_timer.insert( - now=now, - obj=user_id, - then=new_state.last_federation_update_ts + FEDERATION_TIMEOUT - ) - - if new_state.state == PresenceState.ONLINE: - active = now - new_state.last_active_ts < LAST_ACTIVE_GRANULARITY - new_state = new_state.copy_and_replace( - currently_active=active, - ) - - # Check whether the change was something worth notifying about - if should_notify(prev_state, new_state): - new_state.copy_and_replace( - last_federation_update_ts=now, - ) - to_notify[user_id] = new_state + new_state, should_notify, should_ping = handle_update( + prev_state, new_state, + is_mine=self.hs.is_mine_id(user_id), + wheel_timer=self.wheel_timer, + now=now + ) self.user_to_current_state[user_id] = new_state + if should_notify: + to_notify[user_id] = new_state + elif should_ping: + to_federation_ping[user_id] = new_state + # TODO: We should probably ensure there are no races hereafter if to_notify: @@ -296,55 +261,22 @@ class PresenceHandler(BaseHandler): # take any action. users_to_check = self.wheel_timer.fetch(now) - changes = {} # Actual changes we need to notify people about + states = [ + self.user_to_current_state.get( + user_id, UserPresenceState.default(user_id) + ) + for user_id in set(users_to_check) + ] - for user_id in set(users_to_check): - state = self.user_to_current_state.get(user_id, None) - if not state: - continue + changes = handle_timeouts( + states, + is_mine_fn=self.hs.is_mine_id, + user_to_current_state=self.user_to_current_state, + user_to_num_current_syncs=self.user_to_num_current_syncs, + now=now, + ) - if state.state == PresenceState.OFFLINE: - # No timeouts are associated with offline states. - continue - - if self.hs.is_mine_id(user_id): - if state.state == PresenceState.ONLINE: - if now - state.last_active_ts > IDLE_TIMER: - # Currently online, but last activity ages ago so auto - # idle - changes[user_id] = state.copy_and_replace( - state=PresenceState.UNAVAILABLE, - ) - elif now - state.last_active_ts > LAST_ACTIVE_GRANULARITY: - # So that we send down a notification that we've - # stopped updating. - changes[user_id] = state - - if now - state.last_federation_update_ts > FEDERATION_PING_INTERVAL: - # Need to send ping to other servers to ensure they don't - # timeout and set us to offline - changes[user_id] = state - - # If there are have been no sync for a while (and none ongoing), - # set presence to offline - if not self.user_to_num_current_syncs.get(user_id, 0): - if now - state.last_user_sync_ts > SYNC_ONLINE_TIMEOUT: - changes[user_id] = state.copy_and_replace( - state=PresenceState.OFFLINE, - status_msg=None, - ) - else: - # We expect to be poked occaisonally by the other side. - # This is to protect against forgetful/buggy servers, so that - # no one gets stuck online forever. - if now - state.last_federation_update_ts > FEDERATION_TIMEOUT: - # The other side seems to have disappeared. - changes[user_id] = state.copy_and_replace( - state=PresenceState.OFFLINE, - status_msg=None, - ) - - preserve_fn(self._update_states)(changes.values()) + preserve_fn(self._update_states)(changes) @defer.inlineCallbacks def bump_presence_active_time(self, user): @@ -925,3 +857,165 @@ class PresenceEventSource(object): def get_pagination_rows(self, user, pagination_config, key): return self.get_new_events(user, from_key=None, include_offline=False) + + +def handle_timeouts(user_states, is_mine_fn, user_to_num_current_syncs, now): + """Checks the presence of users that have timed out and updates as + appropriate. + + Args: + user_states(list): List of UserPresenceState's to check. + is_mine_fn (fn): Function that returns if a user_id is ours + user_to_num_current_syncs (dict): Mapping of user_id to number of currently + active syncs. + now (int): Current time in ms. + + Returns: + List of UserPresenceState updates + """ + changes = {} # Actual changes we need to notify people about + + for state in user_states: + is_mine = is_mine_fn(state.user_id) + + new_state = handle_timeout(state, is_mine, user_to_num_current_syncs, now) + if new_state: + changes[state.user_id] = new_state + + return changes.values() + + +def handle_timeout(state, is_mine, user_to_num_current_syncs, now): + """Checks the presence of the user to see if any of the timers have elapsed + + Args: + state (UserPresenceState) + is_mine (bool): Whether the user is ours + user_to_num_current_syncs (dict): Mapping of user_id to number of currently + active syncs. + now (int): Current time in ms. + + Returns: + A UserPresenceState update or None if no update. + """ + if state.state == PresenceState.OFFLINE: + # No timeouts are associated with offline states. + return None + + changed = False + user_id = state.user_id + + if is_mine: + if state.state == PresenceState.ONLINE: + if now - state.last_active_ts > IDLE_TIMER: + # Currently online, but last activity ages ago so auto + # idle + state = state.copy_and_replace( + state=PresenceState.UNAVAILABLE, + ) + changed = True + elif now - state.last_active_ts > LAST_ACTIVE_GRANULARITY: + # So that we send down a notification that we've + # stopped updating. + changed = True + + if now - state.last_federation_update_ts > FEDERATION_PING_INTERVAL: + # Need to send ping to other servers to ensure they don't + # timeout and set us to offline + changed = True + + # If there are have been no sync for a while (and none ongoing), + # set presence to offline + if not user_to_num_current_syncs.get(user_id, 0): + if now - state.last_user_sync_ts > SYNC_ONLINE_TIMEOUT: + state = state.copy_and_replace( + state=PresenceState.OFFLINE, + status_msg=None, + ) + changed = True + else: + # We expect to be poked occaisonally by the other side. + # This is to protect against forgetful/buggy servers, so that + # no one gets stuck online forever. + if now - state.last_federation_update_ts > FEDERATION_TIMEOUT: + # The other side seems to have disappeared. + state = state.copy_and_replace( + state=PresenceState.OFFLINE, + status_msg=None, + ) + changed = True + + return state if changed else None + + +def handle_update(prev_state, new_state, is_mine, wheel_timer, now): + """Given a presence update: + 1. Add any appropriate timers. + 2. Check if we should notify anyone. + + Args: + prev_state (UserPresenceState) + new_state (UserPresenceState) + is_mine (bool): Whether the user is ours + wheel_timer (WheelTimer) + now (int): Time now in ms + + Returns: + 3-tuple: `(new_state, persist_and_notify, federation_ping)` where: + - new_state: is the state to actually persist + - persist_and_notify (bool): whether to persist and notify people + - federation_ping (bool): whether we should send a ping over federation + """ + user_id = new_state.user_id + + persist_and_notify = False + federation_ping = False + + # If the users are ours then we want to set up a bunch of timers + # to time things out. + if is_mine: + if new_state.state == PresenceState.ONLINE: + # Idle timer + wheel_timer.insert( + now=now, + obj=user_id, + then=new_state.last_active_ts + IDLE_TIMER + ) + + if new_state.state != PresenceState.OFFLINE: + # User has stopped syncing + wheel_timer.insert( + now=now, + obj=user_id, + then=new_state.last_user_sync_ts + SYNC_ONLINE_TIMEOUT + ) + + last_federate = new_state.last_federation_update_ts + if now - last_federate > FEDERATION_PING_INTERVAL: + # Been a while since we've poked remote servers + new_state = new_state.copy_and_replace( + last_federation_update_ts=now, + ) + federation_ping = True + + else: + wheel_timer.insert( + now=now, + obj=user_id, + then=new_state.last_federation_update_ts + FEDERATION_TIMEOUT + ) + + if new_state.state == PresenceState.ONLINE: + active = now - new_state.last_active_ts < LAST_ACTIVE_GRANULARITY + new_state = new_state.copy_and_replace( + currently_active=active, + ) + + # Check whether the change was something worth notifying about + if should_notify(prev_state, new_state): + new_state = new_state.copy_and_replace( + last_federation_update_ts=now, + ) + persist_and_notify = True + + return new_state, persist_and_notify, federation_ping diff --git a/tests/handlers/test_presence.py b/tests/handlers/test_presence.py new file mode 100644 index 0000000000..197298db15 --- /dev/null +++ b/tests/handlers/test_presence.py @@ -0,0 +1,373 @@ +# -*- coding: utf-8 -*- +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from tests import unittest + +from mock import Mock, call + +from synapse.api.constants import PresenceState +from synapse.handlers.presence import ( + handle_update, handle_timeout, + IDLE_TIMER, SYNC_ONLINE_TIMEOUT, LAST_ACTIVE_GRANULARITY, FEDERATION_TIMEOUT, + FEDERATION_PING_INTERVAL, +) +from synapse.storage.presence import UserPresenceState + + +class PresenceUpdateTestCase(unittest.TestCase): + def test_offline_to_online(self): + wheel_timer = Mock() + user_id = "@foo:bar" + now = 5000000 + + prev_state = UserPresenceState.default(user_id) + new_state = prev_state.copy_and_replace( + state=PresenceState.ONLINE, + last_active_ts=now, + ) + + state, persist_and_notify, federation_ping = handle_update( + prev_state, new_state, is_mine=True, wheel_timer=wheel_timer, now=now + ) + + self.assertTrue(persist_and_notify) + self.assertTrue(state.currently_active) + self.assertEquals(new_state.state, state.state) + self.assertEquals(new_state.status_msg, state.status_msg) + self.assertEquals(state.last_federation_update_ts, now) + + self.assertEquals(wheel_timer.insert.call_count, 2) + wheel_timer.insert.assert_has_calls([ + call( + now=now, + obj=user_id, + then=new_state.last_active_ts + IDLE_TIMER + ), + call( + now=now, + obj=user_id, + then=new_state.last_user_sync_ts + SYNC_ONLINE_TIMEOUT + ) + ], any_order=True) + + def test_online_to_online(self): + wheel_timer = Mock() + user_id = "@foo:bar" + now = 5000000 + + prev_state = UserPresenceState.default(user_id) + prev_state = prev_state.copy_and_replace( + state=PresenceState.ONLINE, + last_active_ts=now, + currently_active=True, + ) + + new_state = prev_state.copy_and_replace( + state=PresenceState.ONLINE, + last_active_ts=now, + ) + + state, persist_and_notify, federation_ping = handle_update( + prev_state, new_state, is_mine=True, wheel_timer=wheel_timer, now=now + ) + + self.assertFalse(persist_and_notify) + self.assertTrue(federation_ping) + self.assertTrue(state.currently_active) + self.assertEquals(new_state.state, state.state) + self.assertEquals(new_state.status_msg, state.status_msg) + self.assertEquals(state.last_federation_update_ts, now) + + self.assertEquals(wheel_timer.insert.call_count, 2) + wheel_timer.insert.assert_has_calls([ + call( + now=now, + obj=user_id, + then=new_state.last_active_ts + IDLE_TIMER + ), + call( + now=now, + obj=user_id, + then=new_state.last_user_sync_ts + SYNC_ONLINE_TIMEOUT + ) + ], any_order=True) + + def test_online_to_online_last_active(self): + wheel_timer = Mock() + user_id = "@foo:bar" + now = 5000000 + + prev_state = UserPresenceState.default(user_id) + prev_state = prev_state.copy_and_replace( + state=PresenceState.ONLINE, + last_active_ts=now - LAST_ACTIVE_GRANULARITY - 1, + currently_active=True, + ) + + new_state = prev_state.copy_and_replace( + state=PresenceState.ONLINE, + ) + + state, persist_and_notify, federation_ping = handle_update( + prev_state, new_state, is_mine=True, wheel_timer=wheel_timer, now=now + ) + + self.assertTrue(persist_and_notify) + self.assertFalse(state.currently_active) + self.assertEquals(new_state.state, state.state) + self.assertEquals(new_state.status_msg, state.status_msg) + self.assertEquals(state.last_federation_update_ts, now) + + self.assertEquals(wheel_timer.insert.call_count, 2) + wheel_timer.insert.assert_has_calls([ + call( + now=now, + obj=user_id, + then=new_state.last_active_ts + IDLE_TIMER + ), + call( + now=now, + obj=user_id, + then=new_state.last_user_sync_ts + SYNC_ONLINE_TIMEOUT + ) + ], any_order=True) + + def test_remote_ping_timer(self): + wheel_timer = Mock() + user_id = "@foo:bar" + now = 5000000 + + prev_state = UserPresenceState.default(user_id) + prev_state = prev_state.copy_and_replace( + state=PresenceState.ONLINE, + ) + + new_state = prev_state.copy_and_replace( + state=PresenceState.ONLINE, + ) + + state, persist_and_notify, federation_ping = handle_update( + prev_state, new_state, is_mine=False, wheel_timer=wheel_timer, now=now + ) + + self.assertFalse(persist_and_notify) + self.assertFalse(federation_ping) + self.assertFalse(state.currently_active) + self.assertEquals(new_state.state, state.state) + self.assertEquals(new_state.status_msg, state.status_msg) + + self.assertEquals(wheel_timer.insert.call_count, 1) + wheel_timer.insert.assert_has_calls([ + call( + now=now, + obj=user_id, + then=new_state.last_federation_update_ts + FEDERATION_TIMEOUT + ), + ], any_order=True) + + def test_online_to_offline(self): + wheel_timer = Mock() + user_id = "@foo:bar" + now = 5000000 + + prev_state = UserPresenceState.default(user_id) + prev_state = prev_state.copy_and_replace( + state=PresenceState.ONLINE, + last_active_ts=now, + currently_active=True, + ) + + new_state = prev_state.copy_and_replace( + state=PresenceState.OFFLINE, + ) + + state, persist_and_notify, federation_ping = handle_update( + prev_state, new_state, is_mine=True, wheel_timer=wheel_timer, now=now + ) + + self.assertTrue(persist_and_notify) + self.assertEquals(new_state.state, state.state) + self.assertEquals(state.last_federation_update_ts, now) + + self.assertEquals(wheel_timer.insert.call_count, 0) + + def test_online_to_idle(self): + wheel_timer = Mock() + user_id = "@foo:bar" + now = 5000000 + + prev_state = UserPresenceState.default(user_id) + prev_state = prev_state.copy_and_replace( + state=PresenceState.ONLINE, + last_active_ts=now, + currently_active=True, + ) + + new_state = prev_state.copy_and_replace( + state=PresenceState.UNAVAILABLE, + ) + + state, persist_and_notify, federation_ping = handle_update( + prev_state, new_state, is_mine=True, wheel_timer=wheel_timer, now=now + ) + + self.assertTrue(persist_and_notify) + self.assertEquals(new_state.state, state.state) + self.assertEquals(state.last_federation_update_ts, now) + self.assertEquals(new_state.state, state.state) + self.assertEquals(new_state.status_msg, state.status_msg) + + self.assertEquals(wheel_timer.insert.call_count, 1) + wheel_timer.insert.assert_has_calls([ + call( + now=now, + obj=user_id, + then=new_state.last_user_sync_ts + SYNC_ONLINE_TIMEOUT + ) + ], any_order=True) + + +class PresenceTimeoutTestCase(unittest.TestCase): + def test_idle_timer(self): + user_id = "@foo:bar" + now = 5000000 + + state = UserPresenceState.default(user_id) + state = state.copy_and_replace( + state=PresenceState.ONLINE, + last_active_ts=now - IDLE_TIMER - 1, + last_user_sync_ts=now, + ) + + new_state = handle_timeout( + state, is_mine=True, user_to_num_current_syncs={}, now=now + ) + + self.assertIsNotNone(new_state) + self.assertEquals(new_state.state, PresenceState.UNAVAILABLE) + + def test_sync_timeout(self): + user_id = "@foo:bar" + now = 5000000 + + state = UserPresenceState.default(user_id) + state = state.copy_and_replace( + state=PresenceState.ONLINE, + last_active_ts=now, + last_user_sync_ts=now - SYNC_ONLINE_TIMEOUT - 1, + ) + + new_state = handle_timeout( + state, is_mine=True, user_to_num_current_syncs={}, now=now + ) + + self.assertIsNotNone(new_state) + self.assertEquals(new_state.state, PresenceState.OFFLINE) + + def test_sync_online(self): + user_id = "@foo:bar" + now = 5000000 + + state = UserPresenceState.default(user_id) + state = state.copy_and_replace( + state=PresenceState.ONLINE, + last_active_ts=now - SYNC_ONLINE_TIMEOUT - 1, + last_user_sync_ts=now - SYNC_ONLINE_TIMEOUT - 1, + ) + + new_state = handle_timeout( + state, is_mine=True, user_to_num_current_syncs={ + user_id: 1, + }, now=now + ) + + self.assertIsNotNone(new_state) + self.assertEquals(new_state.state, PresenceState.ONLINE) + + def test_federation_ping(self): + user_id = "@foo:bar" + now = 5000000 + + state = UserPresenceState.default(user_id) + state = state.copy_and_replace( + state=PresenceState.ONLINE, + last_active_ts=now, + last_user_sync_ts=now, + last_federation_update_ts=now - FEDERATION_PING_INTERVAL - 1, + ) + + new_state = handle_timeout( + state, is_mine=True, user_to_num_current_syncs={}, now=now + ) + + self.assertIsNotNone(new_state) + self.assertEquals(new_state, new_state) + + def test_no_timeout(self): + user_id = "@foo:bar" + now = 5000000 + + state = UserPresenceState.default(user_id) + state = state.copy_and_replace( + state=PresenceState.ONLINE, + last_active_ts=now, + last_user_sync_ts=now, + last_federation_update_ts=now, + ) + + new_state = handle_timeout( + state, is_mine=True, user_to_num_current_syncs={}, now=now + ) + + self.assertIsNone(new_state) + + def test_federation_timeout(self): + user_id = "@foo:bar" + now = 5000000 + + state = UserPresenceState.default(user_id) + state = state.copy_and_replace( + state=PresenceState.ONLINE, + last_active_ts=now, + last_user_sync_ts=now, + last_federation_update_ts=now - FEDERATION_TIMEOUT - 1, + ) + + new_state = handle_timeout( + state, is_mine=False, user_to_num_current_syncs={}, now=now + ) + + self.assertIsNotNone(new_state) + self.assertEquals(new_state.state, PresenceState.OFFLINE) + + def test_last_active(self): + user_id = "@foo:bar" + now = 5000000 + + state = UserPresenceState.default(user_id) + state = state.copy_and_replace( + state=PresenceState.ONLINE, + last_active_ts=now - LAST_ACTIVE_GRANULARITY - 1, + last_user_sync_ts=now, + last_federation_update_ts=now, + ) + + new_state = handle_timeout( + state, is_mine=True, user_to_num_current_syncs={}, now=now + ) + + self.assertIsNotNone(new_state) + self.assertEquals(state, new_state)