From ce45cc1d44dcc9b469b23b4daab8426c5dd2c31f Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Mon, 10 Jun 2024 17:18:48 -0500 Subject: [PATCH] Sliding sync sort stub --- synapse/handlers/sliding_sync.py | 18 +++++++++++++++++- synapse/storage/databases/main/stream.py | 20 +++++++++++--------- synapse/types/rest/client/__init__.py | 15 --------------- 3 files changed, 28 insertions(+), 25 deletions(-) diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index 1c37f83a2b..7379a007cd 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -169,7 +169,7 @@ class SlidingSyncHandler: # `["m.room.member", "$LAZY"]` filtered_room_ids = room_id_set # TODO: Apply sorts - sorted_room_ids = sorted(filtered_room_ids) + sorted_room_ids = await self.sort_rooms(filtered_room_ids, to_token) ops: List[SlidingSyncResult.SlidingWindowList.Operation] = [] if list_config.ranges: @@ -439,3 +439,19 @@ class SlidingSyncHandler: sync_room_id_set.add(room_id) return sync_room_id_set + + async def sort_rooms( + self, + room_id_set: AbstractSet[str], + to_token: StreamToken, + ) -> List[str]: + """ + Sort by recency of the last event in the room (stream_ordering). In order to get + a stable sort, we tie-break by room ID. + + Args: + room_id_set: Set of room IDs to sort + to_token: We sort based on the events in the room at this token + """ + # TODO: `get_last_event_in_room_before_stream_ordering()` + pass diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py index 7ab6003f61..188bba0f1f 100644 --- a/synapse/storage/databases/main/stream.py +++ b/synapse/storage/databases/main/stream.py @@ -914,11 +914,13 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): def get_last_event_in_room_before_stream_ordering_txn( txn: LoggingTransaction, ) -> Optional[str]: - # We need to handle the fact that the stream tokens can be vector - # clocks. We do this by getting all rows between the minimum and - # maximum stream ordering in the token, plus one row less than the - # minimum stream ordering. We then filter the results against the - # token and return the first row that matches. + # We need to handle the fact that the stream tokens can be vector clocks. We + # do this by getting all rows between the minimum and maximum stream + # ordering in the token, plus one row less than the minimum stream ordering + # (TODO: Why?). We then filter the results against the token and return the + # first row that matches. + min_stream = end_token.stream + max_stream = end_token.get_max_stream_pos() sql = """ SELECT * FROM ( @@ -926,7 +928,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): FROM events LEFT JOIN rejections USING (event_id) WHERE room_id = ? - AND ? < stream_ordering AND stream_ordering <= ? + AND stream_ordering > ? AND stream_ordering <= ? AND NOT outlier AND rejections.event_id IS NULL ORDER BY stream_ordering DESC @@ -948,10 +950,10 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): sql, ( room_id, - end_token.stream, - end_token.get_max_stream_pos(), + min_stream, + max_stream, room_id, - end_token.stream, + min_stream, ), ) diff --git a/synapse/types/rest/client/__init__.py b/synapse/types/rest/client/__init__.py index ef261518a0..4bae162161 100644 --- a/synapse/types/rest/client/__init__.py +++ b/synapse/types/rest/client/__init__.py @@ -175,20 +175,6 @@ class SlidingSyncBody(RequestBodyModel): ranges: Sliding window ranges. If this field is missing, no sliding window is used and all rooms are returned in this list. Integers are *inclusive*. - sort: How the list should be sorted on the server. The first value is - applied first, then tiebreaks are performed with each subsequent sort - listed. - - FIXME: Furthermore, it's not currently defined how servers should behave - if they encounter a filter or sort operation they do not recognise. If - the server rejects the request with an HTTP 400 then that will break - backwards compatibility with new clients vs old servers. However, the - client would be otherwise unaware that only some of the sort/filter - operations have taken effect. We may need to include a "warnings" - section to indicate which sort/filter operations are unrecognised, - allowing for some form of graceful degradation of service. - -- https://github.com/matrix-org/matrix-spec-proposals/blob/kegan/sync-v3/proposals/3575-sync.md#filter-and-sort-extensions - slow_get_all_rooms: Just get all rooms (for clients that don't want to deal with sliding windows). When true, the `ranges` and `sort` fields are ignored. required_state: Required state for each room returned. An array of event @@ -253,7 +239,6 @@ class SlidingSyncBody(RequestBodyModel): ranges: Optional[List[Tuple[int, int]]] = None else: ranges: Optional[List[Tuple[conint(ge=0, strict=True), conint(ge=0, strict=True)]]] = None # type: ignore[valid-type] - sort: Optional[List[StrictStr]] = None slow_get_all_rooms: Optional[StrictBool] = False include_heroes: Optional[StrictBool] = False filters: Optional[Filters] = None