From 7bec359408023dcec8fd85f3d175cc6e72a15756 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 21 Aug 2014 10:55:54 +0100 Subject: [PATCH 01/10] Add in StreamToken type --- synapse/types.py | 74 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 74 insertions(+) diff --git a/synapse/types.py b/synapse/types.py index fd6a3d1d72..baec8a6002 100644 --- a/synapse/types.py +++ b/synapse/types.py @@ -92,3 +92,77 @@ class RoomAlias(DomainSpecificString): class RoomID(DomainSpecificString): """Structure representing a room id. """ SIGIL = "!" + + +class StreamToken( + namedtuple( + "Token", + ("events_type", "topological_key", "stream_key", "presence_key") + ) +): + _SEPARATOR = "_" + + _TOPOLOGICAL_PREFIX = "t" + _STREAM_PREFIX = "s" + + _TOPOLOGICAL_SEPERATOR = "-" + + TOPOLOGICAL_TYPE = "topo" + STREAM_TYPE = "stream" + + @classmethod + def from_string(cls, string): + try: + events_part, presence_part = string.split(cls._SEPARATOR) + + presence_key = int(presence_part) + + topo_length = len(cls._TOPOLOGICAL_PREFIX) + stream_length = len(cls._STREAM_PREFIX) + if events_part[:topo_length] == cls._TOPOLOGICAL_PREFIX: + # topological event token + topo_tok = events_part[topo_length:] + topo_key, stream_key = topo_tok.split( + cls._TOPOLOGICAL_SEPERATOR, 1 + ) + + topo_key = int(topo_key) + stream_key = int(stream_key) + + events_type = cls.TOPOLOGICAL_TYPE + elif events_part[:stream_length] == cls._STREAM_PREFIX: + topo_key = None + stream_key = int(events_part[stream_length:]) + + events_type = cls.STREAM_TYPE + else: + raise + + return cls( + events_type=events_type, + topological_key=topo_key, + stream_key=stream_key, + presence_key=presence_key, + ) + except: + raise SynapseError(400, "Invalid Token") + + def to_string(self): + if self.events_type == self.TOPOLOGICAL_TYPE: + return "".join([ + self._TOPOLOGICAL_PREFIX, + str(self.topological_key), + self._TOPOLOGICAL_SEPERATOR, + str(self.stream_key), + self._SEPARATOR, + str(self.presence_key), + ]) + elif self.events_type == self.STREAM_TYPE: + return "".join([ + self._STREAM_PREFIX, + str(self.stream_key), + self._SEPARATOR, + str(self.presence_key), + ]) + + raise RuntimeError("Unrecognized event type: %s", self.events_type) From 81a95937de91078d4a9d46ab6f7a565e659b0560 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 21 Aug 2014 10:56:31 +0100 Subject: [PATCH 02/10] Use new StreamToken in pagination config --- synapse/api/streams/__init__.py | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/synapse/api/streams/__init__.py b/synapse/api/streams/__init__.py index d831eafbab..0ba4783ea2 100644 --- a/synapse/api/streams/__init__.py +++ b/synapse/api/streams/__init__.py @@ -14,6 +14,7 @@ # limitations under the License. from synapse.api.errors import SynapseError +from synapse.types import StreamToken class PaginationConfig(object): @@ -21,10 +22,10 @@ class PaginationConfig(object): """A configuration object which stores pagination parameters.""" def __init__(self, from_tok=None, to_tok=None, direction='f', limit=0): - self.from_tok = from_tok - self.to_tok = to_tok - self.direction = direction - self.limit = limit + self.from_tok = StreamToken(from_tok) if from_tok else None + self.to_tok = StreamToken(to_tok) if to_tok else None + self.direction = 'f' if direction == 'f' else 'b' + self.limit = int(limit) @classmethod def from_request(cls, request, raise_invalid_params=True): @@ -47,7 +48,10 @@ class PaginationConfig(object): elif raise_invalid_params: raise SynapseError(400, "%s parameter is invalid." % qp) - return PaginationConfig(**params) + try: + return PaginationConfig(**params) + except: + raise SynapseError(400, "Invalid request.") def __str__(self): return ( From 3a2a5b959cb1f56b26af32e1ad4c1db424279eb7 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 26 Aug 2014 18:57:46 +0100 Subject: [PATCH 03/10] WIP: Completely change how event streaming and pagination work. This reflects the change in the underlying storage model. --- .gitignore | 7 + synapse/api/notifier.py | 196 ------------------ synapse/api/streams/event.py | 194 ----------------- synapse/handlers/events.py | 116 ++--------- synapse/handlers/presence.py | 32 --- synapse/handlers/room.py | 46 ++-- synapse/notifier.py | 184 ++++++++++++++++ synapse/rest/events.py | 4 +- synapse/rest/initial_sync.py | 2 +- synapse/rest/room.py | 2 +- synapse/server.py | 7 +- synapse/storage/stream.py | 4 +- synapse/streams/__init__.py | 14 ++ .../streams/__init__.py => streams/config.py} | 59 ++---- synapse/streams/events.py | 149 +++++++++++++ synapse/types.py | 66 ++---- 16 files changed, 432 insertions(+), 650 deletions(-) delete mode 100644 synapse/api/notifier.py delete mode 100644 synapse/api/streams/event.py create mode 100644 synapse/notifier.py create mode 100644 synapse/streams/__init__.py rename synapse/{api/streams/__init__.py => streams/config.py} (58%) create mode 100644 synapse/streams/events.py diff --git a/.gitignore b/.gitignore index 14bd9078f3..ab1a3e1aae 100644 --- a/.gitignore +++ b/.gitignore @@ -17,3 +17,10 @@ htmlcov demo/*.db demo/*.log +demo/*.pid + +graph/*.svg +graph/*.png +graph/*.dot + +uploads diff --git a/synapse/api/notifier.py b/synapse/api/notifier.py deleted file mode 100644 index ec9c4e513d..0000000000 --- a/synapse/api/notifier.py +++ /dev/null @@ -1,196 +0,0 @@ -# -*- coding: utf-8 -*- -# Copyright 2014 matrix.org -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from synapse.api.constants import Membership -from synapse.api.events.room import RoomMemberEvent -from synapse.api.streams.event import EventsStreamData - -from twisted.internet import defer -from twisted.internet import reactor - -import logging - -logger = logging.getLogger(__name__) - - -class Notifier(object): - - def __init__(self, hs): - self.store = hs.get_datastore() - self.hs = hs - self.stored_event_listeners = {} - - @defer.inlineCallbacks - def on_new_room_event(self, event, store_id): - """Called when there is a new room event which may potentially be sent - down listening users' event streams. - - This function looks for interested *users* who may want to be notified - for this event. This is different to users requesting from the event - stream which looks for interested *events* for this user. - - Args: - event (SynapseEvent): The new event, which must have a room_id - store_id (int): The ID of this event after it was stored with the - data store. - '""" - member_list = yield self.store.get_room_members(room_id=event.room_id, - membership="join") - if not member_list: - member_list = [] - - member_list = [u.user_id for u in member_list] - - # invites MUST prod the person being invited, who won't be in the room. - if (event.type == RoomMemberEvent.TYPE and - event.content["membership"] == Membership.INVITE): - member_list.append(event.state_key) - # similarly, LEAVEs must be sent to the person leaving - if (event.type == RoomMemberEvent.TYPE and - event.content["membership"] == Membership.LEAVE): - member_list.append(event.state_key) - - for user_id in member_list: - if user_id in self.stored_event_listeners: - self._notify_and_callback( - user_id=user_id, - event_data=event.get_dict(), - stream_type=EventsStreamData.EVENT_TYPE, - store_id=store_id) - - def on_new_user_event(self, user_id, event_data, stream_type, store_id): - if user_id in self.stored_event_listeners: - self._notify_and_callback( - user_id=user_id, - event_data=event_data, - stream_type=stream_type, - store_id=store_id - ) - - def _notify_and_callback(self, user_id, event_data, stream_type, store_id): - logger.debug( - "Notifying %s of a new event.", - user_id - ) - - stream_ids = list(self.stored_event_listeners[user_id]) - for stream_id in stream_ids: - self._notify_and_callback_stream(user_id, stream_id, event_data, - stream_type, store_id) - - if not self.stored_event_listeners[user_id]: - del self.stored_event_listeners[user_id] - - def _notify_and_callback_stream(self, user_id, stream_id, event_data, - stream_type, store_id): - - event_listener = self.stored_event_listeners[user_id].pop(stream_id) - return_event_object = { - k: event_listener[k] for k in ["start", "chunk", "end"] - } - - # work out the new end token - token = event_listener["start"] - end = self._next_token(stream_type, store_id, token) - return_event_object["end"] = end - - # add the event to the chunk - chunk = event_listener["chunk"] - chunk.append(event_data) - - # callback the defer. We know this can't have been resolved before as - # we always remove the event_listener from the map before resolving. - event_listener["defer"].callback(return_event_object) - - def _next_token(self, stream_type, store_id, current_token): - stream_handler = self.hs.get_handlers().event_stream_handler - return stream_handler.get_event_stream_token( - stream_type, - store_id, - current_token - ) - - def store_events_for(self, user_id=None, stream_id=None, from_tok=None): - """Store all incoming events for this user. This should be paired with - get_events_for to return chunked data. - - Args: - user_id (str): The user to monitor incoming events for. - stream (object): The stream that is receiving events - from_tok (str): The token to monitor incoming events from. - """ - event_listener = { - "start": from_tok, - "chunk": [], - "end": from_tok, - "defer": defer.Deferred(), - } - - if user_id not in self.stored_event_listeners: - self.stored_event_listeners[user_id] = {stream_id: event_listener} - else: - self.stored_event_listeners[user_id][stream_id] = event_listener - - def purge_events_for(self, user_id=None, stream_id=None): - """Purges any stored events for this user. - - Args: - user_id (str): The user to purge stored events for. - """ - try: - del self.stored_event_listeners[user_id][stream_id] - if not self.stored_event_listeners[user_id]: - del self.stored_event_listeners[user_id] - except KeyError: - pass - - def get_events_for(self, user_id=None, stream_id=None, timeout=0): - """Retrieve stored events for this user, waiting if necessary. - - It is advisable to wrap this call in a maybeDeferred. - - Args: - user_id (str): The user to get events for. - timeout (int): The time in seconds to wait before giving up. - Returns: - A Deferred or a dict containing the chunk data, depending on if - there was data to return yet. The Deferred callback may be None if - there were no events before the timeout expired. - """ - logger.debug("%s is listening for events.", user_id) - - try: - streams = self.stored_event_listeners[user_id][stream_id]["chunk"] - if streams: - logger.debug("%s returning existing chunk.", user_id) - return streams - except KeyError: - return None - - reactor.callLater( - (timeout / 1000.0), self._timeout, user_id, stream_id - ) - return self.stored_event_listeners[user_id][stream_id]["defer"] - - def _timeout(self, user_id, stream_id): - try: - # We remove the event_listener from the map so that we can't - # resolve the deferred twice. - event_listeners = self.stored_event_listeners[user_id] - event_listener = event_listeners.pop(stream_id) - event_listener["defer"].callback(None) - logger.debug("%s event listening timed out.", user_id) - except KeyError: - pass diff --git a/synapse/api/streams/event.py b/synapse/api/streams/event.py deleted file mode 100644 index fe44a488bc..0000000000 --- a/synapse/api/streams/event.py +++ /dev/null @@ -1,194 +0,0 @@ -# -*- coding: utf-8 -*- -# Copyright 2014 matrix.org -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""This module contains classes for streaming from the event stream: /events. -""" -from twisted.internet import defer - -from synapse.api.errors import EventStreamError -from synapse.api.events import SynapseEvent -from synapse.api.streams import PaginationStream, StreamData - -import logging - -logger = logging.getLogger(__name__) - - -class EventsStreamData(StreamData): - EVENT_TYPE = "EventsStream" - - def __init__(self, hs, room_id=None, feedback=False): - super(EventsStreamData, self).__init__(hs) - self.room_id = room_id - self.with_feedback = feedback - - @defer.inlineCallbacks - def get_rows(self, user_id, from_key, to_key, limit, direction): - data, latest_ver = yield self.store.get_room_events( - user_id=user_id, - from_key=from_key, - to_key=to_key, - limit=limit, - room_id=self.room_id, - with_feedback=self.with_feedback - ) - defer.returnValue((data, latest_ver)) - - @defer.inlineCallbacks - def max_token(self): - val = yield self.store.get_room_events_max_id() - defer.returnValue(val) - - -class EventStream(PaginationStream): - - SEPARATOR = '_' - - def __init__(self, user_id, stream_data_list): - super(EventStream, self).__init__() - self.user_id = user_id - self.stream_data = stream_data_list - - @defer.inlineCallbacks - def fix_tokens(self, pagination_config): - pagination_config.from_tok = yield self.fix_token( - pagination_config.from_tok) - pagination_config.to_tok = yield self.fix_token( - pagination_config.to_tok) - - if ( - not pagination_config.to_tok - and pagination_config.direction == 'f' - ): - pagination_config.to_tok = yield self.get_current_max_token() - - logger.debug("pagination_config: %s", pagination_config) - - defer.returnValue(pagination_config) - - @defer.inlineCallbacks - def fix_token(self, token): - """Fixes unknown values in a token to known values. - - Args: - token (str): The token to fix up. - Returns: - The fixed-up token, which may == token. - """ - if token == PaginationStream.TOK_END: - new_token = yield self.get_current_max_token() - - logger.debug("fix_token: From %s to %s", token, new_token) - - token = new_token - - defer.returnValue(token) - - @defer.inlineCallbacks - def get_current_max_token(self): - new_token_parts = [] - for s in self.stream_data: - mx = yield s.max_token() - new_token_parts.append(str(mx)) - - new_token = EventStream.SEPARATOR.join(new_token_parts) - - logger.debug("get_current_max_token: %s", new_token) - - defer.returnValue(new_token) - - @defer.inlineCallbacks - def get_chunk(self, config): - # no support for limit on >1 streams, makes no sense. - if config.limit and len(self.stream_data) > 1: - raise EventStreamError( - 400, "Limit not supported on multiplexed streams." - ) - - chunk_data, next_tok = yield self._get_chunk_data( - config.from_tok, - config.to_tok, - config.limit, - config.direction, - ) - - defer.returnValue({ - "chunk": chunk_data, - "start": config.from_tok, - "end": next_tok - }) - - @defer.inlineCallbacks - def _get_chunk_data(self, from_tok, to_tok, limit, direction): - """ Get event data between the two tokens. - - Tokens are SEPARATOR separated values representing pkey values of - certain tables, and the position determines the StreamData invoked - according to the STREAM_DATA list. - - The magic value '-1' can be used to get the latest value. - - Args: - from_tok - The token to start from. - to_tok - The token to end at. Must have values > from_tok or be -1. - Returns: - A list of event data. - Raises: - EventStreamError if something went wrong. - """ - # sanity check - if to_tok is not None: - if (from_tok.count(EventStream.SEPARATOR) != - to_tok.count(EventStream.SEPARATOR) or - (from_tok.count(EventStream.SEPARATOR) + 1) != - len(self.stream_data)): - raise EventStreamError(400, "Token lengths don't match.") - - chunk = [] - next_ver = [] - for i, (from_pkey, to_pkey) in enumerate(zip( - self._split_token(from_tok), - self._split_token(to_tok) - )): - if from_pkey == to_pkey: - # tokens are the same, we have nothing to do. - next_ver.append(str(to_pkey)) - continue - - (event_chunk, max_pkey) = yield self.stream_data[i].get_rows( - self.user_id, from_pkey, to_pkey, limit, direction, - ) - - chunk.extend([ - e.get_dict() if isinstance(e, SynapseEvent) else e - for e in event_chunk - ]) - next_ver.append(str(max_pkey)) - - defer.returnValue((chunk, EventStream.SEPARATOR.join(next_ver))) - - def _split_token(self, token): - """Splits the given token into a list of pkeys. - - Args: - token (str): The token with SEPARATOR values. - Returns: - A list of ints. - """ - if token: - segments = token.split(EventStream.SEPARATOR) - else: - segments = [None] * len(self.stream_data) - return segments diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py index 6bb797caf2..2d7bd5083b 100644 --- a/synapse/handlers/events.py +++ b/synapse/handlers/events.py @@ -16,19 +16,15 @@ from twisted.internet import defer from ._base import BaseHandler -from synapse.api.streams.event import ( - EventStream, EventsStreamData -) -from synapse.handlers.presence import PresenceStreamData + +import logging + + +logger = logging.getLogger(__name__) class EventStreamHandler(BaseHandler): - stream_data_classes = [ - EventsStreamData, - PresenceStreamData, - ] - def __init__(self, hs): super(EventStreamHandler, self).__init__(hs) @@ -43,104 +39,22 @@ class EventStreamHandler(BaseHandler): self.clock = hs.get_clock() - def get_event_stream_token(self, stream_type, store_id, start_token): - """Return the next token after this event. - - Args: - stream_type (str): The StreamData.EVENT_TYPE - store_id (int): The new storage ID assigned from the data store. - start_token (str): The token the user started with. - Returns: - str: The end token. - """ - for i, stream_cls in enumerate(EventStreamHandler.stream_data_classes): - if stream_cls.EVENT_TYPE == stream_type: - # this is the stream for this event, so replace this part of - # the token - store_ids = start_token.split(EventStream.SEPARATOR) - store_ids[i] = str(store_id) - return EventStream.SEPARATOR.join(store_ids) - raise RuntimeError("Didn't find a stream type %s" % stream_type) + self.notifier = hs.get_notifier() @defer.inlineCallbacks def get_stream(self, auth_user_id, pagin_config, timeout=0): - """Gets events as an event stream for this user. - - This function looks for interesting *events* for this user. This is - different from the notifier, which looks for interested *users* who may - want to know about a single event. - - Args: - auth_user_id (str): The user requesting their event stream. - pagin_config (synapse.api.streams.PaginationConfig): The config to - use when obtaining the stream. - timeout (int): The max time to wait for an incoming event in ms. - Returns: - A pagination stream API dict - """ auth_user = self.hs.parse_userid(auth_user_id) - stream_id = object() + if pagin_config.from_token is None: + pagin_config.from_token = None - try: - if auth_user not in self._streams_per_user: - self._streams_per_user[auth_user] = 0 - if auth_user in self._stop_timer_per_user: - self.clock.cancel_call_later( - self._stop_timer_per_user.pop(auth_user)) - else: - self.distributor.fire( - "started_user_eventstream", auth_user - ) - self._streams_per_user[auth_user] += 1 + events, tokens = yield self.notifier.get_events_for(auth_user, pagin_config, timeout) - # construct an event stream with the correct data ordering - stream_data_list = [] - for stream_class in EventStreamHandler.stream_data_classes: - stream_data_list.append(stream_class(self.hs)) - event_stream = EventStream(auth_user_id, stream_data_list) + chunk = { + "chunk": [e.get_dict() for e in events], + "start_token": tokens[0].to_string(), + "end_token": tokens[1].to_string(), + } - # fix unknown tokens to known tokens - pagin_config = yield event_stream.fix_tokens(pagin_config) + defer.returnValue(chunk) - # register interest in receiving new events - self.notifier.store_events_for(user_id=auth_user_id, - stream_id=stream_id, - from_tok=pagin_config.from_tok) - - # see if we can grab a chunk now - data_chunk = yield event_stream.get_chunk(config=pagin_config) - - # if there are previous events, return those. If not, wait on the - # new events for 'timeout' seconds. - if len(data_chunk["chunk"]) == 0 and timeout != 0: - results = yield defer.maybeDeferred( - self.notifier.get_events_for, - user_id=auth_user_id, - stream_id=stream_id, - timeout=timeout - ) - if results: - defer.returnValue(results) - - defer.returnValue(data_chunk) - finally: - # cleanup - self.notifier.purge_events_for(user_id=auth_user_id, - stream_id=stream_id) - - self._streams_per_user[auth_user] -= 1 - if not self._streams_per_user[auth_user]: - del self._streams_per_user[auth_user] - - # 10 seconds of grace to allow the client to reconnect again - # before we think they're gone - def _later(): - self.distributor.fire( - "stopped_user_eventstream", auth_user - ) - del self._stop_timer_per_user[auth_user] - - self._stop_timer_per_user[auth_user] = ( - self.clock.call_later(5, _later) - ) diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index be10162db5..30d6269e2e 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -17,7 +17,6 @@ from twisted.internet import defer from synapse.api.errors import SynapseError, AuthError from synapse.api.constants import PresenceState -from synapse.api.streams import StreamData from ._base import BaseHandler @@ -682,41 +681,10 @@ class PresenceHandler(BaseHandler): user=observed_user, clock=self.clock ), - stream_type=PresenceStreamData, store_id=statuscache.serial ) -class PresenceStreamData(StreamData): - def __init__(self, hs): - super(PresenceStreamData, self).__init__(hs) - self.presence = hs.get_handlers().presence_handler - - def get_rows(self, user_id, from_key, to_key, limit, direction): - from_key = int(from_key) - to_key = int(to_key) - - cachemap = self.presence._user_cachemap - - # TODO(paul): limit, and filter by visibility - updates = [(k, cachemap[k]) for k in cachemap - if from_key < cachemap[k].serial <= to_key] - - if updates: - clock = self.presence.clock - - latest_serial = max([x[1].serial for x in updates]) - data = [x[1].make_event(user=x[0], clock=clock) for x in updates] - return ((data, latest_serial)) - else: - return (([], self.presence._user_cachemap_latest_serial)) - - def max_token(self): - return self.presence._user_cachemap_latest_serial - -PresenceStreamData.EVENT_TYPE = PresenceStreamData - - class UserPresenceCache(object): """Store an observed user's state and status message. diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 5a4569ac95..20b4bbb665 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -22,8 +22,6 @@ from synapse.api.errors import RoomError, StoreError, SynapseError from synapse.api.events.room import ( RoomTopicEvent, RoomMemberEvent, RoomConfigEvent ) -from synapse.api.streams.event import EventStream, EventsStreamData -from synapse.handlers.presence import PresenceStreamData from synapse.util import stringutils from ._base import BaseHandler @@ -115,13 +113,24 @@ class MessageHandler(BaseHandler): """ yield self.auth.check_joined_room(room_id, user_id) - data_source = [ - EventsStreamData(self.hs, room_id=room_id, feedback=feedback) - ] - event_stream = EventStream(user_id, data_source) - pagin_config = yield event_stream.fix_tokens(pagin_config) - data_chunk = yield event_stream.get_chunk(config=pagin_config) - defer.returnValue(data_chunk) + data_source = self.hs.get_event_sources().sources[0] + + if pagin_config.from_token: + from_token = pagin_config.from_token + else: + from_token = yield self.hs.get_event_sources().get_current_token() + + events, next_token = yield data_source.get_pagination_rows( + from_token, pagin_config.to_token, pagin_config.limit, room_id + ) + + chunk = { + "chunk": [e.get_dict() for e in events], + "start_token": from_token.to_string(), + "end_token": next_token.to_string(), + } + + defer.returnValue(chunk) @defer.inlineCallbacks def store_room_data(self, event=None, stamp_event=True): @@ -258,18 +267,15 @@ class MessageHandler(BaseHandler): rooms_ret = [] - now_rooms_token = yield self.store.get_room_events_max_id() + # FIXME (erikj): We need to not generate this token, + now_token = yield self.hs.get_event_sources().get_current_token() # FIXME (erikj): Fix this. - presence_stream = PresenceStreamData(self.hs) - now_presence_token = yield presence_stream.max_token() - presence = yield presence_stream.get_rows( - user_id, 0, now_presence_token, None, None + presence_stream = self.hs.get_event_sources().sources[1] + presence = yield presence_stream.get_new_events_for_user( + user_id, now_token, None, None ) - # FIXME (erikj): We need to not generate this token, - now_token = "%s_%s" % (now_rooms_token, now_presence_token) - limit = pagin_config.limit if not limit: limit = 10 @@ -291,7 +297,7 @@ class MessageHandler(BaseHandler): messages, token = yield self.store.get_recent_events_for_room( event.room_id, limit=limit, - end_token=now_rooms_token, + end_token=now_token.events_key.to_string(), ) d["messages"] = { @@ -305,9 +311,7 @@ class MessageHandler(BaseHandler): except: logger.exception("Failed to get snapshot") - ret = {"rooms": rooms_ret, "presence": presence[0], "end": now_token} - - # logger.debug("snapshot_all_rooms returning: %s", ret) + ret = {"rooms": rooms_ret, "presence": presence[0], "end": now_token.to_string()} defer.returnValue(ret) diff --git a/synapse/notifier.py b/synapse/notifier.py new file mode 100644 index 0000000000..1911fd20ae --- /dev/null +++ b/synapse/notifier.py @@ -0,0 +1,184 @@ +# -*- coding: utf-8 -*- +# Copyright 2014 matrix.org +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from twisted.internet import defer, reactor + +from synapse.util.logutils import log_function + +import logging + + +logger = logging.getLogger(__name__) + + +class _NotificationListener(object): + def __init__(self, user, from_token, limit, timeout, deferred): + self.user = user + self.from_token = from_token + self.limit = limit + self.timeout = timeout + self.deferred = deferred + + self.signal_key_list = [] + + self.pending_notifications = [] + + def notify(self, notifier, events, start_token, end_token): + result = (events, (start_token, end_token)) + + try: + self.deferred.callback(result) + except defer.AlreadyCalledError: + pass + + for signal, key in self.signal_key_list: + lst = notifier.signal_keys_to_users.get((signal, key), []) + + try: + lst.remove(self) + except: + pass + +class Notifier(object): + + def __init__(self, hs): + self.hs = hs + + self.signal_keys_to_users = {} + + self.event_sources = hs.get_event_sources() + + @log_function + @defer.inlineCallbacks + def on_new_room_event(self, event, store_id): + room_id = event.room_id + + source = self.event_sources.sources[0] + + listeners = self.signal_keys_to_users.get( + (source.SIGNAL_NAME, room_id), + [] + ) + + logger.debug("on_new_room_event self.signal_keys_to_users %s", listeners) + logger.debug("on_new_room_event listeners %s", listeners) + + # TODO (erikj): Can we make this more efficient by hitting the + # db once? + for listener in listeners: + events, end_token = yield source.get_new_events_for_user( + listener.user, + listener.from_token, + listener.limit, + key=room_id, + ) + + if events: + listener.notify( + self, events, listener.from_token, end_token + ) + + def on_new_user_event(self, *args, **kwargs): + pass + + def get_events_for(self, user, pagination_config, timeout): + deferred = defer.Deferred() + + self._get_events( + deferred, user, pagination_config.from_token, + pagination_config.limit, timeout + ).addErrback(deferred.errback) + + return deferred + + @defer.inlineCallbacks + def _get_events(self, deferred, user, from_token, limit, timeout): + if not from_token: + from_token = yield self.event_sources.get_current_token() + + listener = _NotificationListener( + user, + from_token, + limit, + timeout, + deferred, + ) + + if timeout: + reactor.callLater(timeout/1000, self._timeout_listener, listener) + + yield self._register_with_keys(listener) + yield self._check_for_updates(listener) + + return + + def _timeout_listener(self, listener): + # TODO (erikj): We should probably set to_token to the current max + # rather than reusing from_token. + listener.notify( + self, + [], + listener.from_token, + listener.from_token, + ) + + @defer.inlineCallbacks + @log_function + def _register_with_keys(self, listener): + signals_keys = {} + + # TODO (erikj): This can probably be replaced by a DeferredList + for source in self.event_sources.sources: + keys = yield source.get_keys_for_user(listener.user) + signals_keys.setdefault(source.SIGNAL_NAME, []).extend(keys) + + for signal, keys in signals_keys.items(): + for key in keys: + s = self.signal_keys_to_users.setdefault((signal, key), []) + s.append(listener) + listener.signal_key_list.append((signal, key)) + + logger.debug("New signal_keys_to_users: %s", self.signal_keys_to_users) + + defer.returnValue(listener) + + @defer.inlineCallbacks + @log_function + def _check_for_updates(self, listener): + # TODO (erikj): We need to think about limits across multiple sources + events = [] + + from_token = listener.from_token + limit = listener.limit + + # TODO (erikj): DeferredList? + for source in self.event_sources.sources: + stuff, new_token = yield source.get_new_events_for_user( + listener.user, + from_token, + limit, + ) + + events.extend(stuff) + + from_token = new_token + + end_token = from_token + + + if events: + listener.notify(self, events, listener.from_token, end_token) + + defer.returnValue(listener) diff --git a/synapse/rest/events.py b/synapse/rest/events.py index ca2f6978e5..28da418498 100644 --- a/synapse/rest/events.py +++ b/synapse/rest/events.py @@ -17,7 +17,7 @@ from twisted.internet import defer from synapse.api.errors import SynapseError -from synapse.api.streams import PaginationConfig +from synapse.streams.config import PaginationConfig from synapse.rest.base import RestServlet, client_path_pattern @@ -41,11 +41,13 @@ class EventStreamRestServlet(RestServlet): chunk = yield handler.get_stream(auth_user.to_string(), pagin_config, timeout=timeout) + defer.returnValue((200, chunk)) def on_OPTIONS(self, request): return (200, {}) + def register_servlets(hs, http_server): EventStreamRestServlet(hs).register(http_server) diff --git a/synapse/rest/initial_sync.py b/synapse/rest/initial_sync.py index fab748f562..e5059ff78c 100644 --- a/synapse/rest/initial_sync.py +++ b/synapse/rest/initial_sync.py @@ -15,7 +15,7 @@ from twisted.internet import defer -from synapse.api.streams import PaginationConfig +from synapse.streams.config import PaginationConfig from base import RestServlet, client_path_pattern diff --git a/synapse/rest/room.py b/synapse/rest/room.py index b031ad1bc4..ab05a7438d 100644 --- a/synapse/rest/room.py +++ b/synapse/rest/room.py @@ -22,7 +22,7 @@ from synapse.api.events.room import ( MessageEvent, RoomMemberEvent, FeedbackEvent ) from synapse.api.constants import Feedback -from synapse.api.streams import PaginationConfig +from synapse.streams.config import PaginationConfig import json import logging diff --git a/synapse/server.py b/synapse/server.py index c5b0a32757..5e9b76603a 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -22,7 +22,7 @@ from synapse.federation import initialize_http_replication from synapse.federation.handler import FederationEventHandler from synapse.api.events.factory import EventFactory -from synapse.api.notifier import Notifier +from synapse.notifier import Notifier from synapse.api.auth import Auth from synapse.handlers import Handlers from synapse.rest import RestServletFactory @@ -32,6 +32,7 @@ from synapse.types import UserID, RoomAlias from synapse.util import Clock from synapse.util.distributor import Distributor from synapse.util.lockutils import LockManager +from synapse.streams.events import EventSources class BaseHomeServer(object): @@ -73,6 +74,7 @@ class BaseHomeServer(object): 'resource_for_federation', 'resource_for_web_client', 'resource_for_content_repo', + 'event_sources', ] def __init__(self, hostname, **kwargs): @@ -182,6 +184,9 @@ class HomeServer(BaseHomeServer): def build_distributor(self): return Distributor() + def build_event_sources(self): + return EventSources(self) + def register_servlets(self): """ Register all servlets associated with this HomeServer. """ diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index cae80563b4..6a22d5aead 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -174,7 +174,7 @@ class StreamStore(SQLBaseStore): "SELECT * FROM events as e WHERE " "((room_id IN (%(current)s)) OR " "(event_id IN (%(invites)s))) " - "AND e.stream_ordering > ? AND e.stream_ordering < ? " + "AND e.stream_ordering > ? AND e.stream_ordering <= ? " "AND e.outlier = 0 " "ORDER BY stream_ordering ASC LIMIT %(limit)d " ) % { @@ -293,5 +293,5 @@ class StreamStore(SQLBaseStore): defer.returnValue("s1") return - key = res[0]["m"] + 1 + key = res[0]["m"] defer.returnValue("s%d" % (key,)) diff --git a/synapse/streams/__init__.py b/synapse/streams/__init__.py new file mode 100644 index 0000000000..fe8a073cd3 --- /dev/null +++ b/synapse/streams/__init__.py @@ -0,0 +1,14 @@ +# -*- coding: utf-8 -*- +# Copyright 2014 matrix.org +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/synapse/api/streams/__init__.py b/synapse/streams/config.py similarity index 58% rename from synapse/api/streams/__init__.py rename to synapse/streams/config.py index 0ba4783ea2..b6ffbab1e7 100644 --- a/synapse/api/streams/__init__.py +++ b/synapse/streams/config.py @@ -16,21 +16,25 @@ from synapse.api.errors import SynapseError from synapse.types import StreamToken +import logging + + +logger = logging.getLogger(__name__) + class PaginationConfig(object): """A configuration object which stores pagination parameters.""" def __init__(self, from_tok=None, to_tok=None, direction='f', limit=0): - self.from_tok = StreamToken(from_tok) if from_tok else None - self.to_tok = StreamToken(to_tok) if to_tok else None + self.from_token = StreamToken.from_string(from_tok) if from_tok else None + self.to_token = StreamToken.from_string(to_tok) if to_tok else None self.direction = 'f' if direction == 'f' else 'b' self.limit = int(limit) @classmethod def from_request(cls, request, raise_invalid_params=True): params = { - "from_tok": "END", "direction": 'f', } @@ -48,9 +52,14 @@ class PaginationConfig(object): elif raise_invalid_params: raise SynapseError(400, "%s parameter is invalid." % qp) + if "from_tok" in params and params["from_tok"] == "END": + # TODO (erikj): This is for compatibility only. + del params["from_tok"] + try: return PaginationConfig(**params) except: + logger.exception("Failed to create pagination config") raise SynapseError(400, "Invalid request.") def __str__(self): @@ -60,48 +69,4 @@ class PaginationConfig(object): ) % (self.from_tok, self.to_tok, self.direction, self.limit) -class PaginationStream(object): - """ An interface for streaming data as chunks. """ - - TOK_END = "END" - - def get_chunk(self, config=None): - """ Return the next chunk in the stream. - - Args: - config (PaginationConfig): The config to aid which chunk to get. - Returns: - A dict containing the new start token "start", the new end token - "end" and the data "chunk" as a list. - """ - raise NotImplementedError() - - -class StreamData(object): - - """ An interface for obtaining streaming data from a table. """ - - def __init__(self, hs): - self.hs = hs - self.store = hs.get_datastore() - - def get_rows(self, user_id, from_pkey, to_pkey, limit, direction): - """ Get event stream data between the specified pkeys. - - Args: - user_id : The user's ID - from_pkey : The starting pkey. - to_pkey : The end pkey. May be -1 to mean "latest". - limit: The max number of results to return. - Returns: - A tuple containing the list of event stream data and the last pkey. - """ - raise NotImplementedError() - - def max_token(self): - """ Get the latest currently-valid token. - - Returns: - The latest token.""" - raise NotImplementedError() diff --git a/synapse/streams/events.py b/synapse/streams/events.py new file mode 100644 index 0000000000..1d4467af44 --- /dev/null +++ b/synapse/streams/events.py @@ -0,0 +1,149 @@ +# -*- coding: utf-8 -*- +# Copyright 2014 matrix.org +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from twisted.internet import defer + +from synapse.api.constants import Membership +from synapse.types import StreamToken + + +class RoomEventSource(object): + SIGNAL_NAME = "EventStreamSourceSignal" + + def __init__(self, hs): + self.store = hs.get_datastore() + + @defer.inlineCallbacks + def get_keys_for_user(self, user): + events = yield self.store.get_rooms_for_user_where_membership_is( + user.to_string(), + (Membership.JOIN,), + ) + + defer.returnValue(set([e.room_id for e in events])) + + @defer.inlineCallbacks + def get_new_events_for_user(self, user, from_token, limit, key=None): + # We just ignore the key for now. + + to_key = yield self.get_current_token_part() + + events, end_key = yield self.store.get_room_events_stream( + user_id=user.to_string(), + from_key=from_token.events_key, + to_key=to_key, + room_id=None, + limit=limit, + ) + + end_token = from_token.copy_and_replace("events_key", end_key) + + defer.returnValue((events, end_token)) + + def get_current_token_part(self): + return self.store.get_room_events_max_id() + + @defer.inlineCallbacks + def get_pagination_rows(self, from_token, to_token, limit, key): + to_key = to_token.events_key if to_token else None + + events, next_key = yield self.store.paginate_room_events( + room_id=key, + from_key=from_token.events_key, + to_key=to_key, + direction='b', + limit=limit, + with_feedback=True + ) + + next_token = from_token.copy_and_replace("events_key", next_key) + + defer.returnValue((events, next_token)) + + +class PresenceStreamSource(object): + def __init__(self, hs): + self.hs = hs + self.clock = hs.get_clock() + + def get_new_events_for_user(self, user, from_token, limit, key=None): + from_key = int(from_token.presence_key) + + presence = self.hs.get_handlers().presence_handler + cachemap = presence._user_cachemap + + # TODO(paul): limit, and filter by visibility + updates = [(k, cachemap[k]) for k in cachemap + if from_key < cachemap[k].serial] + + if updates: + clock = self.clock + + latest_serial = max([x[1].serial for x in updates]) + data = [x[1].make_event(user=x[0], clock=clock) for x in updates] + + end_token = from_token.copy_and_replace( + "presence_key", latest_serial + ) + return ((data, end_token)) + else: + end_token = from_token.copy_and_replace( + "presence_key", presence._user_cachemap_latest_serial + ) + return (([], end_token)) + + def get_keys_for_user(self, user): + raise NotImplementedError("get_keys_for_user") + + def get_current_token_part(self): + presence = self.hs.get_handlers().presence_handler + return presence._user_cachemap_latest_serial + + +class EventSources(object): + SOURCE_TYPES = [ + RoomEventSource, + PresenceStreamSource, + ] + + def __init__(self, hs): + self.sources = [t(hs) for t in EventSources.SOURCE_TYPES] + + @staticmethod + def create_token(events_key, presence_key): + return StreamToken(events_key=events_key, presence_key=presence_key) + + @defer.inlineCallbacks + def get_current_token(self): + events_key = yield self.sources[0].get_current_token_part() + token = EventSources.create_token(events_key, "0") + defer.returnValue(token) + + +class StreamSource(object): + def get_keys_for_user(self, user): + raise NotImplementedError("get_keys_for_user") + + def get_new_events_for_user(self, user, from_token, limit, key=None): + raise NotImplementedError("get_new_events_for_user") + + def get_current_token_part(self): + raise NotImplementedError("get_current_token_part") + + +class PaginationSource(object): + def get_pagination_rows(self, from_token, to_token, limit, key): + raise NotImplementedError("get_rows") + diff --git a/synapse/types.py b/synapse/types.py index baec8a6002..c8936b5758 100644 --- a/synapse/types.py +++ b/synapse/types.py @@ -97,72 +97,32 @@ class RoomID(DomainSpecificString): class StreamToken( namedtuple( "Token", - ("events_type", "topological_key", "stream_key", "presence_key") + ("events_key", "presence_key") ) ): _SEPARATOR = "_" - _TOPOLOGICAL_PREFIX = "t" - _STREAM_PREFIX = "s" - - _TOPOLOGICAL_SEPERATOR = "-" - - TOPOLOGICAL_TYPE = "topo" - STREAM_TYPE = "stream" - @classmethod def from_string(cls, string): try: - events_part, presence_part = string.split(cls._SEPARATOR) - - presence_key = int(presence_part) - - topo_length = len(cls._TOPOLOGICAL_PREFIX) - stream_length = len(cls._STREAM_PREFIX) - if events_part[:topo_length] == cls._TOPOLOGICAL_PREFIX: - # topological event token - topo_tok = events_part[topo_length:] - topo_key, stream_key = topo_tok.split( - cls._TOPOLOGICAL_SEPERATOR, 1 - ) - - topo_key = int(topo_key) - stream_key = int(stream_key) - - events_type = cls.TOPOLOGICAL_TYPE - elif events_part[:stream_length] == cls._STREAM_PREFIX: - topo_key = None - stream_key = int(events_part[stream_length:]) - - events_type = cls.STREAM_TYPE - else: - raise + events_key, presence_key = string.split(cls._SEPARATOR) return cls( - events_type=events_type, - topological_key=topo_key, - stream_key=stream_key, + events_key=events_key, presence_key=presence_key, ) except: raise SynapseError(400, "Invalid Token") def to_string(self): - if self.events_type == self.TOPOLOGICAL_TYPE: - return "".join([ - self._TOPOLOGICAL_PREFIX, - str(self.topological_key), - self._TOPOLOGICAL_SEPERATOR, - str(self.stream_key), - self._SEPARATOR, - str(self.presence_key), - ]) - elif self.events_type == self.STREAM_TYPE: - return "".join([ - self._STREAM_PREFIX, - str(self.stream_key), - self._SEPARATOR, - str(self.presence_key), - ]) + return "".join([ + str(self.events_key), + self._SEPARATOR, + str(self.presence_key), + ]) - raise RuntimeError("Unrecognized event type: %s", self.events_type) + + def copy_and_replace(self, key, new_value): + d = self._asdict() + d[key] = new_value + return StreamToken(**d) From c1cf0b334eaa6240ddc40f3a64119519b34ae213 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 26 Aug 2014 19:18:11 +0100 Subject: [PATCH 04/10] Fix exceptions so that the event stream works. Presence like events are turned off currently. --- synapse/streams/events.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/synapse/streams/events.py b/synapse/streams/events.py index 1d4467af44..887c792104 100644 --- a/synapse/streams/events.py +++ b/synapse/streams/events.py @@ -20,7 +20,7 @@ from synapse.types import StreamToken class RoomEventSource(object): - SIGNAL_NAME = "EventStreamSourceSignal" + SIGNAL_NAME = "RoomEventSource" def __init__(self, hs): self.store = hs.get_datastore() @@ -74,6 +74,8 @@ class RoomEventSource(object): class PresenceStreamSource(object): + SIGNAL_NAME = "PresenceStreamSource" + def __init__(self, hs): self.hs = hs self.clock = hs.get_clock() @@ -105,7 +107,7 @@ class PresenceStreamSource(object): return (([], end_token)) def get_keys_for_user(self, user): - raise NotImplementedError("get_keys_for_user") + return defer.succeed([]) def get_current_token_part(self): presence = self.hs.get_handlers().presence_handler From 67c5f89244b9ff5f1deca199f35ef7240d0549cc Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 26 Aug 2014 19:40:29 +0100 Subject: [PATCH 05/10] Enable presence again. Fix up api to match old api. --- synapse/handlers/events.py | 13 ++++++++++--- synapse/handlers/presence.py | 4 ++-- synapse/handlers/room.py | 4 ++-- synapse/notifier.py | 20 +++++++++++++++++++- synapse/streams/events.py | 4 ++-- 5 files changed, 35 insertions(+), 10 deletions(-) diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py index 2d7bd5083b..8c34776245 100644 --- a/synapse/handlers/events.py +++ b/synapse/handlers/events.py @@ -15,6 +15,8 @@ from twisted.internet import defer +from synapse.api.events import SynapseEvent + from ._base import BaseHandler import logging @@ -50,10 +52,15 @@ class EventStreamHandler(BaseHandler): events, tokens = yield self.notifier.get_events_for(auth_user, pagin_config, timeout) + chunks = [ + e.get_dict() if isinstance(e, SynapseEvent) else e + for e in events + ] + chunk = { - "chunk": [e.get_dict() for e in events], - "start_token": tokens[0].to_string(), - "end_token": tokens[1].to_string(), + "chunk": chunks, + "start": tokens[0].to_string(), + "end": tokens[1].to_string(), } defer.returnValue(chunk) diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 30d6269e2e..8408266da0 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -142,7 +142,7 @@ class PresenceHandler(BaseHandler): @defer.inlineCallbacks def is_presence_visible(self, observer_user, observed_user): defer.returnValue(True) - return + #return # FIXME (erikj): This code path absolutely kills the database. assert(observed_user.is_mine) @@ -189,7 +189,7 @@ class PresenceHandler(BaseHandler): @defer.inlineCallbacks def set_state(self, target_user, auth_user, state): - return + # return # TODO (erikj): Turn this back on. Why did we end up sending EDUs # everywhere? diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 20b4bbb665..6fbe84ea40 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -126,8 +126,8 @@ class MessageHandler(BaseHandler): chunk = { "chunk": [e.get_dict() for e in events], - "start_token": from_token.to_string(), - "end_token": next_token.to_string(), + "start": from_token.to_string(), + "end": next_token.to_string(), } defer.returnValue(chunk) diff --git a/synapse/notifier.py b/synapse/notifier.py index 1911fd20ae..df9be29f3d 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -91,7 +91,25 @@ class Notifier(object): ) def on_new_user_event(self, *args, **kwargs): - pass + source = self.event_sources.sources[1] + + listeners = self.signal_keys_to_users.get( + (source.SIGNAL_NAME, "moose"), + [] + ) + + for listener in listeners: + events, end_token = yield source.get_new_events_for_user( + listener.user, + listener.from_token, + listener.limit, + key="moose", + ) + + if events: + listener.notify( + self, events, listener.from_token, end_token + ) def get_events_for(self, user, pagination_config, timeout): deferred = defer.Deferred() diff --git a/synapse/streams/events.py b/synapse/streams/events.py index 887c792104..27c7734b36 100644 --- a/synapse/streams/events.py +++ b/synapse/streams/events.py @@ -97,7 +97,7 @@ class PresenceStreamSource(object): data = [x[1].make_event(user=x[0], clock=clock) for x in updates] end_token = from_token.copy_and_replace( - "presence_key", latest_serial + "presence_key", latest_serial + 1 ) return ((data, end_token)) else: @@ -107,7 +107,7 @@ class PresenceStreamSource(object): return (([], end_token)) def get_keys_for_user(self, user): - return defer.succeed([]) + return defer.succeed(["moose"]) def get_current_token_part(self): presence = self.hs.get_handlers().presence_handler From bd16b93e8f7143b4d2e98794a01aa62a060505d8 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 27 Aug 2014 14:03:27 +0100 Subject: [PATCH 06/10] Implement presence event source. Change the way the notifier indexes listeners --- synapse/handlers/events.py | 7 ++- synapse/handlers/presence.py | 7 +-- synapse/handlers/room.py | 15 ++++--- synapse/notifier.py | 85 ++++++++++++++++++------------------ synapse/streams/events.py | 73 ++++++++++++++++++++----------- 5 files changed, 107 insertions(+), 80 deletions(-) diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py index 8c34776245..aabec37fc0 100644 --- a/synapse/handlers/events.py +++ b/synapse/handlers/events.py @@ -50,7 +50,12 @@ class EventStreamHandler(BaseHandler): if pagin_config.from_token is None: pagin_config.from_token = None - events, tokens = yield self.notifier.get_events_for(auth_user, pagin_config, timeout) + rm_handler = self.hs.get_handlers().room_member_handler + room_ids = yield rm_handler.get_rooms_for_user(auth_user) + + events, tokens = yield self.notifier.get_events_for( + auth_user, room_ids, pagin_config, timeout + ) chunks = [ e.get_dict() if isinstance(e, SynapseEvent) else e diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 8408266da0..9a690258de 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -676,12 +676,7 @@ class PresenceHandler(BaseHandler): statuscache.make_event(user=observed_user, clock=self.clock) self.notifier.on_new_user_event( - observer_user.to_string(), - event_data=statuscache.make_event( - user=observed_user, - clock=self.clock - ), - store_id=statuscache.serial + [observer_user], ) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 6fbe84ea40..19ade10a91 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -120,8 +120,11 @@ class MessageHandler(BaseHandler): else: from_token = yield self.hs.get_event_sources().get_current_token() + user = self.hs.parse_userid(user_id) + events, next_token = yield data_source.get_pagination_rows( - from_token, pagin_config.to_token, pagin_config.limit, room_id + user, from_token, pagin_config.to_token, pagin_config.limit, + room_id ) chunk = { @@ -265,6 +268,8 @@ class MessageHandler(BaseHandler): membership_list=[Membership.INVITE, Membership.JOIN] ) + user = self.hs.parse_userid(user_id) + rooms_ret = [] # FIXME (erikj): We need to not generate this token, @@ -272,8 +277,8 @@ class MessageHandler(BaseHandler): # FIXME (erikj): Fix this. presence_stream = self.hs.get_event_sources().sources[1] - presence = yield presence_stream.get_new_events_for_user( - user_id, now_token, None, None + presence, _ = yield presence_stream.get_pagination_rows( + user, now_token, None, None, None ) limit = pagin_config.limit @@ -297,7 +302,7 @@ class MessageHandler(BaseHandler): messages, token = yield self.store.get_recent_events_for_room( event.room_id, limit=limit, - end_token=now_token.events_key.to_string(), + end_token=now_token.events_key, ) d["messages"] = { @@ -311,7 +316,7 @@ class MessageHandler(BaseHandler): except: logger.exception("Failed to get snapshot") - ret = {"rooms": rooms_ret, "presence": presence[0], "end": now_token.to_string()} + ret = {"rooms": rooms_ret, "presence": presence, "end": now_token.to_string()} defer.returnValue(ret) diff --git a/synapse/notifier.py b/synapse/notifier.py index df9be29f3d..a69d5343cb 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -24,14 +24,14 @@ logger = logging.getLogger(__name__) class _NotificationListener(object): - def __init__(self, user, from_token, limit, timeout, deferred): + def __init__(self, user, rooms, from_token, limit, timeout, deferred): self.user = user self.from_token = from_token self.limit = limit self.timeout = timeout self.deferred = deferred - self.signal_key_list = [] + self.rooms = rooms self.pending_notifications = [] @@ -43,36 +43,39 @@ class _NotificationListener(object): except defer.AlreadyCalledError: pass - for signal, key in self.signal_key_list: - lst = notifier.signal_keys_to_users.get((signal, key), []) + for room in self.rooms: + lst = notifier.rooms_to_listeners.get(room, set()) + lst.discard(self) + + notifier.user_to_listeners.get(self.user, set()).discard(self) - try: - lst.remove(self) - except: - pass class Notifier(object): def __init__(self, hs): self.hs = hs - self.signal_keys_to_users = {} + self.rooms_to_listeners = {} + self.user_to_listeners = {} self.event_sources = hs.get_event_sources() + hs.get_distributor().observe( + "user_joined_room", self._user_joined_room + ) + @log_function @defer.inlineCallbacks - def on_new_room_event(self, event, store_id): + def on_new_room_event(self, event, extra_users=[]): room_id = event.room_id source = self.event_sources.sources[0] - listeners = self.signal_keys_to_users.get( - (source.SIGNAL_NAME, room_id), - [] - ) + listeners = self.rooms_to_listeners.get(room_id, set()).copy() + + for user in extra_users: + listeners |= self.user_to_listeners.get(user, set()).copy() - logger.debug("on_new_room_event self.signal_keys_to_users %s", listeners) logger.debug("on_new_room_event listeners %s", listeners) # TODO (erikj): Can we make this more efficient by hitting the @@ -82,7 +85,6 @@ class Notifier(object): listener.user, listener.from_token, listener.limit, - key=room_id, ) if events: @@ -90,20 +92,23 @@ class Notifier(object): self, events, listener.from_token, end_token ) - def on_new_user_event(self, *args, **kwargs): + @defer.inlineCallbacks + def on_new_user_event(self, users=[], rooms=[]): source = self.event_sources.sources[1] - listeners = self.signal_keys_to_users.get( - (source.SIGNAL_NAME, "moose"), - [] - ) + listeners = set() + + for user in users: + listeners |= self.user_to_listeners.get(user, set()).copy() + + for room in rooms: + listeners |= self.rooms_to_listeners.get(room, set()).copy() for listener in listeners: events, end_token = yield source.get_new_events_for_user( listener.user, listener.from_token, listener.limit, - key="moose", ) if events: @@ -111,23 +116,24 @@ class Notifier(object): self, events, listener.from_token, end_token ) - def get_events_for(self, user, pagination_config, timeout): + def get_events_for(self, user, rooms, pagination_config, timeout): deferred = defer.Deferred() self._get_events( - deferred, user, pagination_config.from_token, + deferred, user, rooms, pagination_config.from_token, pagination_config.limit, timeout ).addErrback(deferred.errback) return deferred @defer.inlineCallbacks - def _get_events(self, deferred, user, from_token, limit, timeout): + def _get_events(self, deferred, user, rooms, from_token, limit, timeout): if not from_token: from_token = yield self.event_sources.get_current_token() listener = _NotificationListener( user, + rooms, from_token, limit, timeout, @@ -137,7 +143,7 @@ class Notifier(object): if timeout: reactor.callLater(timeout/1000, self._timeout_listener, listener) - yield self._register_with_keys(listener) + self._register_with_keys(listener) yield self._check_for_updates(listener) return @@ -152,25 +158,13 @@ class Notifier(object): listener.from_token, ) - @defer.inlineCallbacks @log_function def _register_with_keys(self, listener): - signals_keys = {} + for room in listener.rooms: + s = self.rooms_to_listeners.setdefault(room, set()) + s.add(listener) - # TODO (erikj): This can probably be replaced by a DeferredList - for source in self.event_sources.sources: - keys = yield source.get_keys_for_user(listener.user) - signals_keys.setdefault(source.SIGNAL_NAME, []).extend(keys) - - for signal, keys in signals_keys.items(): - for key in keys: - s = self.signal_keys_to_users.setdefault((signal, key), []) - s.append(listener) - listener.signal_key_list.append((signal, key)) - - logger.debug("New signal_keys_to_users: %s", self.signal_keys_to_users) - - defer.returnValue(listener) + self.user_to_listeners.setdefault(listener.user, set()).add(listener) @defer.inlineCallbacks @log_function @@ -195,8 +189,13 @@ class Notifier(object): end_token = from_token - if events: listener.notify(self, events, listener.from_token, end_token) defer.returnValue(listener) + + def _user_joined_room(self, user, room_id): + new_listeners = self.user_to_listeners.get(user, set()) + + listeners = self.rooms_to_listeners.setdefault(room_id, set()) + listeners |= new_listeners diff --git a/synapse/streams/events.py b/synapse/streams/events.py index 27c7734b36..36174a811b 100644 --- a/synapse/streams/events.py +++ b/synapse/streams/events.py @@ -26,16 +26,7 @@ class RoomEventSource(object): self.store = hs.get_datastore() @defer.inlineCallbacks - def get_keys_for_user(self, user): - events = yield self.store.get_rooms_for_user_where_membership_is( - user.to_string(), - (Membership.JOIN,), - ) - - defer.returnValue(set([e.room_id for e in events])) - - @defer.inlineCallbacks - def get_new_events_for_user(self, user, from_token, limit, key=None): + def get_new_events_for_user(self, user, from_token, limit): # We just ignore the key for now. to_key = yield self.get_current_token_part() @@ -56,7 +47,7 @@ class RoomEventSource(object): return self.store.get_room_events_max_id() @defer.inlineCallbacks - def get_pagination_rows(self, from_token, to_token, limit, key): + def get_pagination_rows(self, user, from_token, to_token, limit, key): to_key = to_token.events_key if to_token else None events, next_key = yield self.store.paginate_room_events( @@ -73,14 +64,14 @@ class RoomEventSource(object): defer.returnValue((events, next_token)) -class PresenceStreamSource(object): - SIGNAL_NAME = "PresenceStreamSource" +class PresenceSource(object): + SIGNAL_NAME = "PresenceSource" def __init__(self, hs): self.hs = hs self.clock = hs.get_clock() - def get_new_events_for_user(self, user, from_token, limit, key=None): + def get_new_events_for_user(self, user, from_token, limit): from_key = int(from_token.presence_key) presence = self.hs.get_handlers().presence_handler @@ -97,7 +88,7 @@ class PresenceStreamSource(object): data = [x[1].make_event(user=x[0], clock=clock) for x in updates] end_token = from_token.copy_and_replace( - "presence_key", latest_serial + 1 + "presence_key", latest_serial ) return ((data, end_token)) else: @@ -106,18 +97,52 @@ class PresenceStreamSource(object): ) return (([], end_token)) - def get_keys_for_user(self, user): - return defer.succeed(["moose"]) - def get_current_token_part(self): presence = self.hs.get_handlers().presence_handler return presence._user_cachemap_latest_serial + def get_pagination_rows(self, user, from_token, to_token, limit, key): + from_key = int(from_token.presence_key) + + if to_token: + to_key = int(to_token.presence_key) + else: + to_key = -1 + + presence = self.hs.get_handlers().presence_handler + cachemap = presence._user_cachemap + + # TODO(paul): limit, and filter by visibility + updates = [(k, cachemap[k]) for k in cachemap + if to_key < cachemap[k].serial < from_key] + + if updates: + clock = self.clock + + earliest_serial = max([x[1].serial for x in updates]) + data = [x[1].make_event(user=x[0], clock=clock) for x in updates] + + if to_token: + next_token = to_token + else: + next_token = from_token + + next_token = next_token.copy_and_replace( + "presence_key", earliest_serial + ) + return ((data, next_token)) + else: + if not to_token: + to_token = from_token.copy_and_replace( + "presence_key", 0 + ) + return (([], to_token)) + class EventSources(object): SOURCE_TYPES = [ RoomEventSource, - PresenceStreamSource, + PresenceSource, ] def __init__(self, hs): @@ -130,15 +155,13 @@ class EventSources(object): @defer.inlineCallbacks def get_current_token(self): events_key = yield self.sources[0].get_current_token_part() - token = EventSources.create_token(events_key, "0") + presence_key = yield self.sources[1].get_current_token_part() + token = EventSources.create_token(events_key, presence_key) defer.returnValue(token) class StreamSource(object): - def get_keys_for_user(self, user): - raise NotImplementedError("get_keys_for_user") - - def get_new_events_for_user(self, user, from_token, limit, key=None): + def get_new_events_for_user(self, user, from_token, limit): raise NotImplementedError("get_new_events_for_user") def get_current_token_part(self): @@ -146,6 +169,6 @@ class StreamSource(object): class PaginationSource(object): - def get_pagination_rows(self, from_token, to_token, limit, key): + def get_pagination_rows(self, user, from_token, to_token, limit, key): raise NotImplementedError("get_rows") From 77a255c7c38a6dab80add45632ffc574099566c8 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 27 Aug 2014 14:19:39 +0100 Subject: [PATCH 07/10] PEP8 tweaks. --- synapse/handlers/events.py | 1 - synapse/handlers/room.py | 12 +++++++++--- synapse/streams/config.py | 7 +++---- synapse/streams/events.py | 1 - synapse/types.py | 1 - 5 files changed, 12 insertions(+), 10 deletions(-) diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py index b336b292d3..e08231406d 100644 --- a/synapse/handlers/events.py +++ b/synapse/handlers/events.py @@ -59,7 +59,6 @@ class EventStreamHandler(BaseHandler): ) self._streams_per_user[auth_user] += 1 - if pagin_config.from_token is None: pagin_config.from_token = None diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 19ade10a91..bf66d74548 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -311,12 +311,18 @@ class MessageHandler(BaseHandler): "end": token[1], } - current_state = yield self.store.get_current_state(event.room_id) + current_state = yield self.store.get_current_state( + event.room_id + ) d["state"] = [c.get_dict() for c in current_state] except: logger.exception("Failed to get snapshot") - ret = {"rooms": rooms_ret, "presence": presence, "end": now_token.to_string()} + ret = { + "rooms": rooms_ret, + "presence": presence, + "end": now_token.to_string() + } defer.returnValue(ret) @@ -499,7 +505,7 @@ class RoomMemberHandler(BaseHandler): for entry in member_list ] chunk_data = { - "start": "START", # FIXME (erikj): START is no longer a valid value + "start": "START", # FIXME (erikj): START is no longer valid "end": "END", "chunk": event_list } diff --git a/synapse/streams/config.py b/synapse/streams/config.py index b6ffbab1e7..69c7145a36 100644 --- a/synapse/streams/config.py +++ b/synapse/streams/config.py @@ -27,7 +27,9 @@ class PaginationConfig(object): """A configuration object which stores pagination parameters.""" def __init__(self, from_tok=None, to_tok=None, direction='f', limit=0): - self.from_token = StreamToken.from_string(from_tok) if from_tok else None + self.from_token = ( + StreamToken.from_string(from_tok) if from_tok else None + ) self.to_token = StreamToken.from_string(to_tok) if to_tok else None self.direction = 'f' if direction == 'f' else 'b' self.limit = int(limit) @@ -67,6 +69,3 @@ class PaginationConfig(object): "" ) % (self.from_tok, self.to_tok, self.direction, self.limit) - - - diff --git a/synapse/streams/events.py b/synapse/streams/events.py index 36174a811b..bf48df5b79 100644 --- a/synapse/streams/events.py +++ b/synapse/streams/events.py @@ -171,4 +171,3 @@ class StreamSource(object): class PaginationSource(object): def get_pagination_rows(self, user, from_token, to_token, limit, key): raise NotImplementedError("get_rows") - diff --git a/synapse/types.py b/synapse/types.py index c8936b5758..63154855dd 100644 --- a/synapse/types.py +++ b/synapse/types.py @@ -121,7 +121,6 @@ class StreamToken( str(self.presence_key), ]) - def copy_and_replace(self, key, new_value): d = self._asdict() d[key] = new_value From 05672a6a8ca66bd2165217c06c5478a06b0cd952 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 27 Aug 2014 15:25:21 +0100 Subject: [PATCH 08/10] Convert get_paginat_rows to use PaginationConfig. This allows people to supply directions. --- synapse/handlers/room.py | 15 +++++----- synapse/streams/config.py | 63 +++++++++++++++++++++++---------------- synapse/streams/events.py | 20 +++++++++---- 3 files changed, 59 insertions(+), 39 deletions(-) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index bf66d74548..a32c22db33 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -22,6 +22,7 @@ from synapse.api.errors import RoomError, StoreError, SynapseError from synapse.api.events.room import ( RoomTopicEvent, RoomMemberEvent, RoomConfigEvent ) +from synapse.streams.config import PaginationConfig from synapse.util import stringutils from ._base import BaseHandler @@ -115,21 +116,18 @@ class MessageHandler(BaseHandler): data_source = self.hs.get_event_sources().sources[0] - if pagin_config.from_token: - from_token = pagin_config.from_token - else: - from_token = yield self.hs.get_event_sources().get_current_token() + if not pagin_config.from_token: + pagin_config.from_token = yield self.hs.get_event_sources().get_current_token() user = self.hs.parse_userid(user_id) events, next_token = yield data_source.get_pagination_rows( - user, from_token, pagin_config.to_token, pagin_config.limit, - room_id + user, pagin_config, room_id ) chunk = { "chunk": [e.get_dict() for e in events], - "start": from_token.to_string(), + "start": pagin_config.from_token.to_string(), "end": next_token.to_string(), } @@ -277,8 +275,9 @@ class MessageHandler(BaseHandler): # FIXME (erikj): Fix this. presence_stream = self.hs.get_event_sources().sources[1] + pagination_config = PaginationConfig(from_token=now_token) presence, _ = yield presence_stream.get_pagination_rows( - user, now_token, None, None, None + user, pagination_config, None ) limit = pagin_config.limit diff --git a/synapse/streams/config.py b/synapse/streams/config.py index 69c7145a36..2434844d80 100644 --- a/synapse/streams/config.py +++ b/synapse/streams/config.py @@ -26,40 +26,53 @@ class PaginationConfig(object): """A configuration object which stores pagination parameters.""" - def __init__(self, from_tok=None, to_tok=None, direction='f', limit=0): - self.from_token = ( - StreamToken.from_string(from_tok) if from_tok else None - ) - self.to_token = StreamToken.from_string(to_tok) if to_tok else None + def __init__(self, from_token=None, to_token=None, direction='f', + limit=0): + self.from_token = from_token + self.to_token = to_token self.direction = 'f' if direction == 'f' else 'b' self.limit = int(limit) @classmethod def from_request(cls, request, raise_invalid_params=True): - params = { - "direction": 'f', - } + def get_param(name, default=None): + lst = request.args.get(name, []) + if len(lst) > 1: + raise SynapseError( + 400, "%s must be specified only once" % (name,) + ) + elif len(lst) == 1: + return lst[0] + else: + return default - query_param_mappings = [ # 3-tuple of qp_key, attribute, rules - ("from", "from_tok", lambda x: type(x) == str), - ("to", "to_tok", lambda x: type(x) == str), - ("limit", "limit", lambda x: x.isdigit()), - ("dir", "direction", lambda x: x == 'f' or x == 'b'), - ] + direction = get_param("dir", 'f') + if direction not in ['f', 'b']: + raise SynapseError(400, "'dir' parameter is invalid.") - for qp, attr, is_valid in query_param_mappings: - if qp in request.args: - if is_valid(request.args[qp][0]): - params[attr] = request.args[qp][0] - elif raise_invalid_params: - raise SynapseError(400, "%s parameter is invalid." % qp) - - if "from_tok" in params and params["from_tok"] == "END": - # TODO (erikj): This is for compatibility only. - del params["from_tok"] + from_tok = get_param("from") + to_tok = get_param("to") try: - return PaginationConfig(**params) + if from_tok == "END": + from_tok = None # For backwards compat. + elif from_tok: + from_tok = StreamToken.from_string(from_tok) + except: + raise SynapseError(400, "'from' paramater is invalid") + + try: + if to_tok: + to_tok = StreamToken.from_string(to_tok) + except: + raise SynapseError(400, "'to' paramater is invalid") + + limit = get_param("limit", "0") + if not limit.isdigit(): + raise SynapseError(400, "'limit' parameter must be an integer.") + + try: + return PaginationConfig(from_tok, to_tok, direction, limit) except: logger.exception("Failed to create pagination config") raise SynapseError(400, "Invalid request.") diff --git a/synapse/streams/events.py b/synapse/streams/events.py index bf48df5b79..8a84a9d392 100644 --- a/synapse/streams/events.py +++ b/synapse/streams/events.py @@ -47,14 +47,19 @@ class RoomEventSource(object): return self.store.get_room_events_max_id() @defer.inlineCallbacks - def get_pagination_rows(self, user, from_token, to_token, limit, key): + def get_pagination_rows(self, user, pagination_config, key): + from_token = pagination_config.from_token + to_token = pagination_config.to_token + limit = pagination_config.limit + direction = pagination_config.direction + to_key = to_token.events_key if to_token else None events, next_key = yield self.store.paginate_room_events( room_id=key, from_key=from_token.events_key, to_key=to_key, - direction='b', + direction=direction, limit=limit, with_feedback=True ) @@ -101,7 +106,12 @@ class PresenceSource(object): presence = self.hs.get_handlers().presence_handler return presence._user_cachemap_latest_serial - def get_pagination_rows(self, user, from_token, to_token, limit, key): + def get_pagination_rows(self, user, pagination_config, key): + # TODO (erikj): Does this make sense? Ordering? + + from_token = pagination_config.from_token + to_token = pagination_config.to_token + from_key = int(from_token.presence_key) if to_token: @@ -167,7 +177,5 @@ class StreamSource(object): def get_current_token_part(self): raise NotImplementedError("get_current_token_part") - -class PaginationSource(object): - def get_pagination_rows(self, user, from_token, to_token, limit, key): + def get_pagination_rows(self, user, pagination_config, key): raise NotImplementedError("get_rows") From bfe9faad5abf429b7023aaaeb3ba3200a75bf485 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 27 Aug 2014 15:33:52 +0100 Subject: [PATCH 09/10] Index sources in a nicer fashion. --- synapse/handlers/room.py | 4 ++-- synapse/notifier.py | 6 +++--- synapse/streams/events.py | 21 ++++++++++----------- 3 files changed, 15 insertions(+), 16 deletions(-) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index a32c22db33..faea30b44e 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -114,7 +114,7 @@ class MessageHandler(BaseHandler): """ yield self.auth.check_joined_room(room_id, user_id) - data_source = self.hs.get_event_sources().sources[0] + data_source = self.hs.get_event_sources().sources["room"] if not pagin_config.from_token: pagin_config.from_token = yield self.hs.get_event_sources().get_current_token() @@ -274,7 +274,7 @@ class MessageHandler(BaseHandler): now_token = yield self.hs.get_event_sources().get_current_token() # FIXME (erikj): Fix this. - presence_stream = self.hs.get_event_sources().sources[1] + presence_stream = self.hs.get_event_sources().sources["presence"] pagination_config = PaginationConfig(from_token=now_token) presence, _ = yield presence_stream.get_pagination_rows( user, pagination_config, None diff --git a/synapse/notifier.py b/synapse/notifier.py index a69d5343cb..b969011b32 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -69,7 +69,7 @@ class Notifier(object): def on_new_room_event(self, event, extra_users=[]): room_id = event.room_id - source = self.event_sources.sources[0] + source = self.event_sources.sources["room"] listeners = self.rooms_to_listeners.get(room_id, set()).copy() @@ -94,7 +94,7 @@ class Notifier(object): @defer.inlineCallbacks def on_new_user_event(self, users=[], rooms=[]): - source = self.event_sources.sources[1] + source = self.event_sources.sources["presence"] listeners = set() @@ -176,7 +176,7 @@ class Notifier(object): limit = listener.limit # TODO (erikj): DeferredList? - for source in self.event_sources.sources: + for source in self.event_sources.sources.values(): stuff, new_token = yield source.get_new_events_for_user( listener.user, from_token, diff --git a/synapse/streams/events.py b/synapse/streams/events.py index 8a84a9d392..2e6ea6ca26 100644 --- a/synapse/streams/events.py +++ b/synapse/streams/events.py @@ -20,8 +20,6 @@ from synapse.types import StreamToken class RoomEventSource(object): - SIGNAL_NAME = "RoomEventSource" - def __init__(self, hs): self.store = hs.get_datastore() @@ -70,8 +68,6 @@ class RoomEventSource(object): class PresenceSource(object): - SIGNAL_NAME = "PresenceSource" - def __init__(self, hs): self.hs = hs self.clock = hs.get_clock() @@ -150,13 +146,16 @@ class PresenceSource(object): class EventSources(object): - SOURCE_TYPES = [ - RoomEventSource, - PresenceSource, - ] + SOURCE_TYPES = { + "room": RoomEventSource, + "presence": PresenceSource, + } def __init__(self, hs): - self.sources = [t(hs) for t in EventSources.SOURCE_TYPES] + self.sources = { + name: cls(hs) + for name, cls in EventSources.SOURCE_TYPES.items() + } @staticmethod def create_token(events_key, presence_key): @@ -164,8 +163,8 @@ class EventSources(object): @defer.inlineCallbacks def get_current_token(self): - events_key = yield self.sources[0].get_current_token_part() - presence_key = yield self.sources[1].get_current_token_part() + events_key = yield self.sources["room"].get_current_token_part() + presence_key = yield self.sources["presence"].get_current_token_part() token = EventSources.create_token(events_key, presence_key) defer.returnValue(token) From 7917ff1271b12aa2e7b11f714d63e8288d4fe733 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 27 Aug 2014 16:09:48 +0100 Subject: [PATCH 10/10] Turn off presence again. --- synapse/handlers/presence.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 9a690258de..c479908f61 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -142,7 +142,7 @@ class PresenceHandler(BaseHandler): @defer.inlineCallbacks def is_presence_visible(self, observer_user, observed_user): defer.returnValue(True) - #return + return # FIXME (erikj): This code path absolutely kills the database. assert(observed_user.is_mine) @@ -189,7 +189,7 @@ class PresenceHandler(BaseHandler): @defer.inlineCallbacks def set_state(self, target_user, auth_user, state): - # return + return # TODO (erikj): Turn this back on. Why did we end up sending EDUs # everywhere?