diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py index a671298967..11af5fdea8 100644 --- a/synapse/storage/databases/main/stream.py +++ b/synapse/storage/databases/main/stream.py @@ -920,6 +920,13 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): # minimum stream ordering. We then filter the results against the # token and return the first row that matches. + # We use `union all` because we don't need any of the deduplication logic + # (`union` is really a union + distinct). `UNION ALL`` does preserve the + # ordering of the operand queries but there is no actual gurantee that it + # has this behavior in all scenarios so we need the extra `ORDER BY` at the + # bottom. + # + # We're using the subquery syntax for SQLite compatibility. sql = """ SELECT * FROM ( SELECT instance_name, stream_ordering, topological_ordering, event_id @@ -943,6 +950,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): ORDER BY stream_ordering DESC LIMIT 1 ) AS b + ORDER BY stream_ordering DESC """ txn.execute( sql, diff --git a/tests/storage/test_stream.py b/tests/storage/test_stream.py index de01107ee1..7488eae0ee 100644 --- a/tests/storage/test_stream.py +++ b/tests/storage/test_stream.py @@ -449,7 +449,7 @@ class GetLastEventInRoomBeforeStreamOrderingTestCase(HomeserverTestCase): # Assemble a token that encompasses event1 -> event4 on worker1 end_token = RoomStreamToken( - stream=event_pos1.stream, + stream=event_pos2.stream, instance_map=immutabledict({"worker1": event_pos4.stream}), )