diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index a8b0c95636..80f7ee3f12 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -566,7 +566,9 @@ class RoomEventSource(object): to_key = yield self.get_current_key() - app_service = self.store.get_app_service_by_user_id(user.to_string()) + app_service = yield self.store.get_app_service_by_user_id( + user.to_string() + ) if app_service: events, end_key = yield self.store.get_appservice_room_stream( service=app_service, diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 3c8f3320f1..6946e9fe70 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -128,25 +128,73 @@ class _StreamToken(namedtuple("_StreamToken", "topological stream")): class StreamStore(SQLBaseStore): + @defer.inlineCallbacks def get_appservice_room_stream(self, service, from_key, to_key, limit=0): # NB this lives here instead of appservice.py so we can reuse the # 'private' StreamToken class in this file. - logger.info("get_appservice_room_stream -> %s", service) - if limit: limit = max(limit, MAX_STREAM_SIZE) else: limit = MAX_STREAM_SIZE # From and to keys should be integers from ordering. - # from_id = _StreamToken.parse_stream_token(from_key) - # to_id = _StreamToken.parse_stream_token(to_key) + from_id = _StreamToken.parse_stream_token(from_key) + to_id = _StreamToken.parse_stream_token(to_key) if from_key == to_key: - return defer.succeed(([], to_key)) + defer.returnValue(([], to_key)) + return - # TODO stub - return defer.succeed(([], to_key)) + # Logic: + # - We want ALL events which match the AS room_id regex + # - We want ALL events which match the rooms represented by the AS + # room_alias regex + # - We want ALL events for rooms that AS users have joined. + # This is currently supported via get_app_service_rooms (which is used + # for the Notifier listener rooms). We can't reasonably make a SQL + # query for these room IDs, so we'll pull all the events between from/to + # and filter in python. + rooms_for_as = yield self.get_app_service_rooms(service) + room_ids_for_as = [r.room_id for r in rooms_for_as] + + # select all the events between from/to with a sensible limit + sql = ( + "SELECT e.event_id, e.room_id, e.stream_ordering FROM events AS e " + "WHERE e.stream_ordering > ? AND e.stream_ordering <= ? " + "ORDER BY stream_ordering ASC LIMIT %(limit)d " + ) % { + "limit": limit + } + + def f(txn): + txn.execute(sql, (from_id.stream, to_id.stream,)) + + rows = self.cursor_to_dict(txn) + + ret = self._get_events_txn( + txn, + # apply the filter on the room id list + [ + r["event_id"] for r in rows + if r["room_id"] in room_ids_for_as + ], + get_prev_content=True + ) + + self._set_before_and_after(ret, rows) + + if rows: + key = "s%d" % max([r["stream_ordering"] for r in rows]) + + else: + # Assume we didn't get anything because there was nothing to + # get. + key = to_key + + return ret, key + + results = yield self.runInteraction("get_appservice_room_stream", f) + defer.returnValue(results) @log_function def get_room_events_stream(self, user_id, from_key, to_key, room_id,