diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py index 00a6afc963..3ee6a091c5 100644 --- a/synapse/handlers/pagination.py +++ b/synapse/handlers/pagination.py @@ -156,7 +156,7 @@ class PaginationHandler(object): stream_ordering = yield self.store.find_first_stream_ordering_after_ts(ts) - r = yield self.store.get_room_event_after_stream_ordering( + r = yield self.store.get_room_event_before_stream_ordering( room_id, stream_ordering, ) if not r: diff --git a/synapse/storage/data_stores/main/stream.py b/synapse/storage/data_stores/main/stream.py index 140da8dad6..223ce7fedb 100644 --- a/synapse/storage/data_stores/main/stream.py +++ b/synapse/storage/data_stores/main/stream.py @@ -536,20 +536,55 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): Deferred[(int, int, str)]: (stream ordering, topological ordering, event_id) """ + return self.db.runInteraction( + "get_room_event_after_stream_ordering", + self.get_room_event_around_stream_ordering_txn, + room_id, stream_ordering, "f", + ) - def _f(txn): - sql = ( - "SELECT stream_ordering, topological_ordering, event_id" - " FROM events" - " WHERE room_id = ? AND stream_ordering >= ?" - " AND NOT outlier" - " ORDER BY stream_ordering" - " LIMIT 1" - ) - txn.execute(sql, (room_id, stream_ordering)) - return txn.fetchone() + def get_room_event_before_stream_ordering(self, room_id, stream_ordering): + """Gets details of the first event in a room at or before a stream ordering - return self.db.runInteraction("get_room_event_after_stream_ordering", _f) + Args: + room_id (str): + stream_ordering (int): + + Returns: + Deferred[(int, int, str)]: + (stream ordering, topological ordering, event_id) + """ + return self.db.runInteraction( + "get_room_event_before_stream_ordering", + self.get_room_event_around_stream_ordering_txn, + room_id, stream_ordering, "f", + ) + + def get_room_event_around_stream_ordering_txn( + self, txn, room_id, stream_ordering, dir="f" + ): + """Gets details of the first event in a room at or either after or before a + stream ordering, depending on the provided direction. + + Args: + room_id (str): + stream_ordering (int): + dir (str): Direction in which we're looking towards in the room's history, + either "f" (forward) or "b" (backward). + + Returns: + Deferred[(int, int, str)]: + (stream ordering, topological ordering, event_id) + """ + sql = ( + "SELECT stream_ordering, topological_ordering, event_id" + " FROM events" + " WHERE room_id = ? AND stream_ordering %s ?" + " AND NOT outlier" + " ORDER BY stream_ordering" + " LIMIT 1" + ) % ("<=" if dir == "b" else ">=",) + txn.execute(sql, (room_id, stream_ordering)) + return txn.fetchone() @defer.inlineCallbacks def get_room_events_max_id(self, room_id=None):