From 431b31e0f2529cf58be09d561b59926a508eee0f Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 11 Jun 2024 20:29:40 -0500 Subject: [PATCH] Add actual guranteed order for UNION We use `union all` because we don't need any of the deduplication logic (`union` is really a union + distinct). `UNION ALL`` does preserve the ordering of the operand queries but there is no actual gurantee that it has this behavior in all scenarios so we need the extra `ORDER BY` at the bottom. See https://dba.stackexchange.com/questions/316818/are-results-from-union-all-clauses-always-appended-in-order/316835#316835 --- synapse/storage/databases/main/stream.py | 8 ++++++++ tests/storage/test_stream.py | 2 +- 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py index a671298967..11af5fdea8 100644 --- a/synapse/storage/databases/main/stream.py +++ b/synapse/storage/databases/main/stream.py @@ -920,6 +920,13 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): # minimum stream ordering. We then filter the results against the # token and return the first row that matches. + # We use `union all` because we don't need any of the deduplication logic + # (`union` is really a union + distinct). `UNION ALL`` does preserve the + # ordering of the operand queries but there is no actual gurantee that it + # has this behavior in all scenarios so we need the extra `ORDER BY` at the + # bottom. + # + # We're using the subquery syntax for SQLite compatibility. sql = """ SELECT * FROM ( SELECT instance_name, stream_ordering, topological_ordering, event_id @@ -943,6 +950,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): ORDER BY stream_ordering DESC LIMIT 1 ) AS b + ORDER BY stream_ordering DESC """ txn.execute( sql, diff --git a/tests/storage/test_stream.py b/tests/storage/test_stream.py index de01107ee1..7488eae0ee 100644 --- a/tests/storage/test_stream.py +++ b/tests/storage/test_stream.py @@ -449,7 +449,7 @@ class GetLastEventInRoomBeforeStreamOrderingTestCase(HomeserverTestCase): # Assemble a token that encompasses event1 -> event4 on worker1 end_token = RoomStreamToken( - stream=event_pos1.stream, + stream=event_pos2.stream, instance_map=immutabledict({"worker1": event_pos4.stream}), )