From 08e60476d5b71d0c13a250a84f12cb1ea7960a41 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 2 Jun 2016 13:59:24 +0100 Subject: [PATCH] Replicate the presence into the synchrotron --- synapse/app/synchrotron.py | 44 ++++++++++---- .../replication/slave/storage/account_data.py | 2 + synapse/replication/slave/storage/presence.py | 59 +++++++++++++++++++ synapse/storage/__init__.py | 6 +- 4 files changed, 97 insertions(+), 14 deletions(-) create mode 100644 synapse/replication/slave/storage/presence.py diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index 5ea157a4ef..c1338e8e36 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -22,6 +22,7 @@ from synapse.config.database import DatabaseConfig from synapse.config.logger import LoggingConfig from synapse.config.appservice import AppServiceConfig from synapse.events import FrozenEvent +from synapse.handlers.presence import PresenceHandler from synapse.http.site import SynapseSite from synapse.http.server import JsonResource from synapse.metrics.resource import MetricsResource, METRICS_PREFIX @@ -33,8 +34,10 @@ from synapse.replication.slave.storage.appservice import SlavedApplicationServic from synapse.replication.slave.storage.registration import SlavedRegistrationStore from synapse.replication.slave.storage.filtering import SlavedFilteringStore from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore +from synapse.replication.slave.storage.presence import SlavedPresenceStore from synapse.server import HomeServer from synapse.storage.engines import create_engine +from synapse.storage.presence import UserPresenceState from synapse.storage.roommember import RoomMemberStore from synapse.util.async import sleep from synapse.util.httpresourcetree import create_resource_tree @@ -115,12 +118,8 @@ class SynchrotronSlavedStore( SlavedApplicationServiceStore, SlavedRegistrationStore, SlavedFilteringStore, + SlavedPresenceStore, ): - def get_current_presence_token(self): - return 0 - - presence_stream_cache = () - def get_presence_list_accepted(self, user_localpart): return () @@ -140,19 +139,26 @@ class SynchrotronSlavedStore( class SynchrotronPresence(object): def __init__(self, hs): self.http_client = hs.get_simple_http_client() + self.store = hs.get_datastore() self.user_to_num_current_syncs = {} - self.process_id = random_string(16) self.syncing_users_url = hs.config.replication_url + "/syncing_users" + self.clock = hs.get_clock() + + active_presence = self.store.take_presence_startup_info() + self.user_to_current_state = { + state.user_id: state + for state in active_presence + } + + self.process_id = random_string(16) logger.info("Presence process_id is %r", self.process_id) def set_state(self, user, state): + # TODO Hows this supposed to work? pass - def get_states(self, user_ids, as_event=False): - return {} - - def current_state_for_users(self, user_ids): - return {} + get_states = PresenceHandler.get_states.__func__ + current_state_for_users = PresenceHandler.current_state_for_users.__func__ @defer.inlineCallbacks def user_syncing(self, user_id, affect_presence): @@ -188,6 +194,20 @@ class SynchrotronPresence(object): ], }) + def process_replication(self, result): + stream = result.get("presence", {"rows": []}) + for row in stream["rows"]: + ( + position, user_id, state, last_active_ts, + last_federation_update_ts, last_user_sync_ts, status_msg, + currently_active + ) = row + self.user_to_current_state[user_id] = UserPresenceState( + user_id, state, last_active_ts, + last_federation_update_ts, last_user_sync_ts, status_msg, + currently_active + ) + class SynchrotronTyping(object): _latest_room_serial = 0 @@ -273,6 +293,7 @@ class SynchrotronServer(HomeServer): replication_url = self.config.replication_url clock = self.get_clock() notifier = self.get_notifier() + presence_handler = self.get_presence_handler() def expire_broken_caches(): store.who_forgot_in_room.invalidate_all() @@ -307,6 +328,7 @@ class SynchrotronServer(HomeServer): now_ms + store.BROKEN_CACHE_EXPIRY_MS ) yield store.process_replication(result) + presence_handler.process_replication(result) notify(result) except: logger.exception("Error replicating from %r", replication_url) diff --git a/synapse/replication/slave/storage/account_data.py b/synapse/replication/slave/storage/account_data.py index 5a44d314a3..735c03c7eb 100644 --- a/synapse/replication/slave/storage/account_data.py +++ b/synapse/replication/slave/storage/account_data.py @@ -96,3 +96,5 @@ class SlavedAccountDataStore(BaseSlavedStore): self._account_data_stream_cache.entity_has_changed( user_id, position ) + + return super(SlavedAccountDataStore, self).process_replication(result) diff --git a/synapse/replication/slave/storage/presence.py b/synapse/replication/slave/storage/presence.py new file mode 100644 index 0000000000..703f4a49bf --- /dev/null +++ b/synapse/replication/slave/storage/presence.py @@ -0,0 +1,59 @@ +# -*- coding: utf-8 -*- +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ._base import BaseSlavedStore +from ._slaved_id_tracker import SlavedIdTracker + +from synapse.util.caches.stream_change_cache import StreamChangeCache +from synapse.storage import DataStore + + +class SlavedPresenceStore(BaseSlavedStore): + def __init__(self, db_conn, hs): + super(SlavedPresenceStore, self).__init__(db_conn, hs) + self._presence_id_gen = SlavedIdTracker( + db_conn, "presence_stream", "stream_id", + ) + + self._presence_on_startup = self._get_active_presence(db_conn) + + self.presence_stream_cache = self.presence_stream_cache = StreamChangeCache( + "PresenceStreamChangeCache", self._presence_id_gen.get_current_token() + ) + + _get_active_presence = DataStore._get_active_presence.__func__ + take_presence_startup_info = DataStore.take_presence_startup_info.__func__ + get_presence_for_users = DataStore.get_presence_for_users.__func__ + + def get_current_presence_token(self): + return self._presence_id_gen.get_current_token() + + def stream_positions(self): + result = super(SlavedPresenceStore, self).stream_positions() + position = self._presence_id_gen.get_current_token() + result["presence"] = position + return result + + def process_replication(self, result): + stream = result.get("presence") + if stream: + self._presence_id_gen.advance(int(stream["position"])) + for row in stream["rows"]: + position, user_id = row[:2] + self.presence_stream_cache.entity_has_changed( + user_id, position + ) + + return super(SlavedPresenceStore, self).process_replication(result) diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 8581796b7e..6928a213e8 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -149,7 +149,7 @@ class DataStore(RoomMemberStore, RoomStore, "AccountDataAndTagsChangeCache", account_max, ) - self.__presence_on_startup = self._get_active_presence(db_conn) + self._presence_on_startup = self._get_active_presence(db_conn) presence_cache_prefill, min_presence_val = self._get_cache_dict( db_conn, "presence_stream", @@ -190,8 +190,8 @@ class DataStore(RoomMemberStore, RoomStore, super(DataStore, self).__init__(hs) def take_presence_startup_info(self): - active_on_startup = self.__presence_on_startup - self.__presence_on_startup = None + active_on_startup = self._presence_on_startup + self._presence_on_startup = None return active_on_startup def _get_active_presence(self, db_conn):