From 4fb65a10916481e0600d506d4c7e9bcfbffb7092 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 15 Sep 2016 11:27:04 +0100 Subject: [PATCH] Base public room list off of public_rooms stream --- synapse/handlers/room_list.py | 34 ++++++++++++++++++----- synapse/storage/room.py | 52 +++++++++++++++++++++++++++++++++++ 2 files changed, 79 insertions(+), 7 deletions(-) diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py index 6a62f3c27e..28bc35f8a3 100644 --- a/synapse/handlers/room_list.py +++ b/synapse/handlers/room_list.py @@ -55,16 +55,26 @@ class RoomListHandler(BaseHandler): else: since_token = None - room_ids = yield self.store.get_public_room_ids() - rooms_to_order_value = {} rooms_to_num_joined = {} rooms_to_latest_event_ids = {} + newly_visible = [] + newly_unpublished = [] if since_token: - current_stream_token = since_token.stream_ordering + stream_token = since_token.stream_ordering + current_public_id = yield self.store.get_current_public_room_stream_id() + public_room_stream_id = since_token.public_room_stream_id + newly_visible, newly_unpublished = yield self.store.get_public_room_changes( + public_room_stream_id, current_public_id + ) else: - current_stream_token = yield self.store.get_room_max_stream_ordering() + stream_token = yield self.store.get_room_max_stream_ordering() + public_room_stream_id = yield self.store.get_current_public_room_stream_id() + + room_ids = yield self.store.get_public_room_ids_at_stream_id( + public_room_stream_id + ) # We want to return rooms in a particular order: the number of joined # users. We then arbitrarily use the room_id as a tie breaker. @@ -74,7 +84,7 @@ class RoomListHandler(BaseHandler): latest_event_ids = rooms_to_latest_event_ids.get(room_id, None) if not latest_event_ids: latest_event_ids = yield self.store.get_forward_extremeties_for_room( - room_id, current_stream_token + room_id, stream_token ) rooms_to_latest_event_ids[room_id] = latest_event_ids @@ -125,6 +135,9 @@ class RoomListHandler(BaseHandler): if num_joined_users == 0: return + if room_id in newly_unpublished: + return + result = { "room_id": room_id, "num_joined_members": num_joined_users, @@ -207,10 +220,14 @@ class RoomListHandler(BaseHandler): "chunk": chunk, } + if since_token: + results["new_rooms"] = bool(newly_visible) + if not since_token or since_token.direction_is_forward: if new_limit: results["next_batch"] = RoomListNextBatch( - stream_ordering=current_stream_token, + stream_ordering=stream_token, + public_room_stream_id=public_room_stream_id, current_limit=new_limit, direction_is_forward=True, ).to_token() @@ -222,7 +239,8 @@ class RoomListHandler(BaseHandler): else: if new_limit: results["prev_batch"] = RoomListNextBatch( - stream_ordering=current_stream_token, + stream_ordering=stream_token, + public_room_stream_id=public_room_stream_id, current_limit=new_limit, direction_is_forward=False, ).to_token() @@ -245,12 +263,14 @@ class RoomListHandler(BaseHandler): class RoomListNextBatch(namedtuple("RoomListNextBatch", ( "stream_ordering", # stream_ordering of the first public room list + "public_room_stream_id", # public room stream id for first public room list "current_limit", # The number of previous rooms returned "direction_is_forward", # Bool if this is a next_batch, false if prev_batch ))): KEY_DICT = { "stream_ordering": "s", + "public_room_stream_id": "p", "current_limit": "n", "direction_is_forward": "d", } diff --git a/synapse/storage/room.py b/synapse/storage/room.py index ef0d79891e..8aa4545939 100644 --- a/synapse/storage/room.py +++ b/synapse/storage/room.py @@ -255,3 +255,55 @@ class RoomStore(SQLBaseStore): }, desc="add_event_report" ) + + def get_current_public_room_stream_id(self): + return self._public_room_id_gen.get_current_token() + + def get_public_room_ids_at_stream_id(self, stream_id): + return self.runInteraction( + "get_public_room_ids_at_stream_id", + self.get_public_room_ids_at_stream_id_txn, stream_id + ) + + def get_public_room_ids_at_stream_id_txn(self, txn, stream_id): + return { + rm + for rm, vis in self.get_published_at_stream_id_txn(txn, stream_id).items() + if vis + } + + def get_published_at_stream_id_txn(self, txn, stream_id): + sql = (""" + SELECT room_id, visibility FROM public_room_list_stream + INNER JOIN ( + SELECT room_id, max(stream_id) AS stream_id + FROM public_room_list_stream + WHERE stream_id <= ? + GROUP BY room_id + ) grouped USING (room_id, stream_id) + """) + + txn.execute(sql, (stream_id,)) + return dict(txn.fetchall()) + + def get_public_room_changes(self, prev_stream_id, new_stream_id): + def get_public_room_changes_txn(txn): + then_rooms = self.get_public_room_ids_at_stream_id_txn(txn, prev_stream_id) + + now_rooms_dict = self.get_published_at_stream_id_txn(txn, new_stream_id) + + now_rooms_visible = set( + rm for rm, vis in now_rooms_dict.items() if vis + ) + now_rooms_not_visible = set( + rm for rm, vis in now_rooms_dict.items() if not vis + ) + + newly_visible = now_rooms_visible - then_rooms + newly_unpublished = now_rooms_not_visible & then_rooms + + return newly_visible, newly_unpublished + + return self.runInteraction( + "get_public_room_changes", get_public_room_changes_txn + )