diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py index 9ae1fe6c15..9e94cb08f6 100644 --- a/synapse/storage/databases/main/stream.py +++ b/synapse/storage/databases/main/stream.py @@ -412,6 +412,43 @@ def _filter_results( return True +def _filter_results_by_stream( + lower_token: Optional[RoomStreamToken], + upper_token: Optional[RoomStreamToken], + instance_name: str, + stream_ordering: int, +) -> bool: + """ + This function only works with "live" tokens with `stream_ordering` only. See + `_filter_results(...)` if you want to work with all tokens. + + Returns True if the event persisted by the given instance at the given + stream_ordering falls between the two tokens (taking a None + token to mean unbounded). + + Used to filter results from fetching events in the DB against the given + tokens. This is necessary to handle the case where the tokens include + position maps, which we handle by fetching more than necessary from the DB + and then filtering (rather than attempting to construct a complicated SQL + query). + """ + if lower_token: + assert lower_token.topological is None + + # If these are live tokens we compare the stream ordering against the + # writers stream position. + if stream_ordering <= lower_token.get_stream_pos_for_instance(instance_name): + return False + + if upper_token: + assert upper_token.topological is None + + if upper_token.get_stream_pos_for_instance(instance_name) < stream_ordering: + return False + + return True + + def filter_to_clause(event_filter: Optional[Filter]) -> Tuple[str, List[str]]: # NB: This may create SQL clauses that don't optimise well (and we don't # have indices on all possible clauses). E.g. it may create @@ -764,6 +801,8 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): Fetch membership events (and the previous event that was replaced by that one) for a given user. + Note: This function only works with "live" tokens with `stream_ordering` only. + We're looking for membership changes in the token range (> `from_key` and <= `to_key`). @@ -837,7 +876,6 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): s.room_id, COALESCE(e.instance_name, s.instance_name), COALESCE(e.stream_ordering, s.stream_id), - e.topological_ordering, m.membership, e.sender FROM current_state_delta_stream AS s @@ -859,7 +897,6 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): room_id, instance_name, stream_ordering, - topological_ordering, membership, sender, ) in txn: @@ -867,12 +904,10 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): assert instance_name is not None assert stream_ordering is not None - if _filter_results( + if _filter_results_by_stream( from_key, to_key, instance_name, - # TODO: This isn't always filled now - topological_ordering, stream_ordering, ): # When the server leaves a room, it will insert new rows with