diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 56304999dc..f18fb63c5e 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -40,7 +40,7 @@ from synapse.util.caches.descriptors import cached from synapse.api.constants import EventTypes from synapse.types import RoomStreamToken from synapse.util.logcontext import preserve_fn -from synapse.storage.engines import PostgresEngine +from synapse.storage.engines import PostgresEngine, Sqlite3Engine import logging @@ -614,19 +614,55 @@ class StreamStore(SQLBaseStore): results["stream_ordering"], ) - query_before = ( - "SELECT topological_ordering, stream_ordering, event_id FROM events" - " WHERE room_id = ? AND %s" - " ORDER BY topological_ordering DESC, stream_ordering DESC LIMIT ?" - ) % (upper_bound(token, self.database_engine, inclusive=False),) + if isinstance(self.database_engine, Sqlite3Engine): + # SQLite3 doesn't optimise ``(x < a) OR (x = a AND y < b)`` + # So we give pass it to SQLite3 as the UNION ALL of the two queries. - query_after = ( - "SELECT topological_ordering, stream_ordering, event_id FROM events" - " WHERE room_id = ? AND %s" - " ORDER BY topological_ordering ASC, stream_ordering ASC LIMIT ?" - ) % (lower_bound(token, self.database_engine, inclusive=False),) + query_before = ( + "SELECT topological_ordering, stream_ordering, event_id FROM events" + " WHERE room_id = ? AND topological_ordering < ?" + " UNION ALL" + " SELECT topological_ordering, stream_ordering, event_id FROM events" + " WHERE room_id = ? AND topological_ordering = ? AND stream_ordering < ?" + " ORDER BY topological_ordering DESC, stream_ordering DESC LIMIT ?" + ) + before_args = ( + room_id, token.topological, + room_id, token.topological, token.stream, + before_limit, + ) - txn.execute(query_before, (room_id, before_limit)) + query_after = ( + "SELECT topological_ordering, stream_ordering, event_id FROM events" + " WHERE room_id = ? AND topological_ordering > ?" + " UNION ALL" + " SELECT topological_ordering, stream_ordering, event_id FROM events" + " WHERE room_id = ? AND topological_ordering = ? AND stream_ordering > ?" + " ORDER BY topological_ordering ASC, stream_ordering ASC LIMIT ?" + ) + after_args = ( + room_id, token.topological, + room_id, token.topological, token.stream, + after_limit, + ) + else: + query_before = ( + "SELECT topological_ordering, stream_ordering, event_id FROM events" + " WHERE room_id = ? AND %s" + " ORDER BY topological_ordering DESC, stream_ordering DESC LIMIT ?" + ) % (upper_bound(token, self.database_engine, inclusive=False),) + + before_args = (room_id, before_limit), + + query_after = ( + "SELECT topological_ordering, stream_ordering, event_id FROM events" + " WHERE room_id = ? AND %s" + " ORDER BY topological_ordering ASC, stream_ordering ASC LIMIT ?" + ) % (lower_bound(token, self.database_engine, inclusive=False),) + + after_args = (room_id, after_limit) + + txn.execute(query_before, before_args) rows = self.cursor_to_dict(txn) events_before = [r["event_id"] for r in rows] @@ -642,7 +678,7 @@ class StreamStore(SQLBaseStore): token.stream - 1, )) - txn.execute(query_after, (room_id, after_limit)) + txn.execute(query_after, after_args) rows = self.cursor_to_dict(txn) events_after = [r["event_id"] for r in rows]