Remove need for topological_ordering

This commit is contained in:
Eric Eastwood 2024-06-27 01:20:42 -05:00
parent 935b98c474
commit f163fcf08a

View file

@ -412,6 +412,43 @@ def _filter_results(
return True
def _filter_results_by_stream(
lower_token: Optional[RoomStreamToken],
upper_token: Optional[RoomStreamToken],
instance_name: str,
stream_ordering: int,
) -> bool:
"""
This function only works with "live" tokens with `stream_ordering` only. See
`_filter_results(...)` if you want to work with all tokens.
Returns True if the event persisted by the given instance at the given
stream_ordering falls between the two tokens (taking a None
token to mean unbounded).
Used to filter results from fetching events in the DB against the given
tokens. This is necessary to handle the case where the tokens include
position maps, which we handle by fetching more than necessary from the DB
and then filtering (rather than attempting to construct a complicated SQL
query).
"""
if lower_token:
assert lower_token.topological is None
# If these are live tokens we compare the stream ordering against the
# writers stream position.
if stream_ordering <= lower_token.get_stream_pos_for_instance(instance_name):
return False
if upper_token:
assert upper_token.topological is None
if upper_token.get_stream_pos_for_instance(instance_name) < stream_ordering:
return False
return True
def filter_to_clause(event_filter: Optional[Filter]) -> Tuple[str, List[str]]:
# NB: This may create SQL clauses that don't optimise well (and we don't
# have indices on all possible clauses). E.g. it may create
@ -764,6 +801,8 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
Fetch membership events (and the previous event that was replaced by that one)
for a given user.
Note: This function only works with "live" tokens with `stream_ordering` only.
We're looking for membership changes in the token range (> `from_key` and <=
`to_key`).
@ -837,7 +876,6 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
s.room_id,
COALESCE(e.instance_name, s.instance_name),
COALESCE(e.stream_ordering, s.stream_id),
e.topological_ordering,
m.membership,
e.sender
FROM current_state_delta_stream AS s
@ -859,7 +897,6 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
room_id,
instance_name,
stream_ordering,
topological_ordering,
membership,
sender,
) in txn:
@ -867,12 +904,10 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
assert instance_name is not None
assert stream_ordering is not None
if _filter_results(
if _filter_results_by_stream(
from_key,
to_key,
instance_name,
# TODO: This isn't always filled now
topological_ordering,
stream_ordering,
):
# When the server leaves a room, it will insert new rows with