mirror of
https://github.com/element-hq/synapse
synced 2024-08-19 15:00:25 +00:00
Fix when the state_filter prevented us from returning any rows before
This commit is contained in:
parent
3d80449d6b
commit
ab576b6b6b
1 changed files with 53 additions and 11 deletions
|
@ -131,6 +131,21 @@ class StateGroupBackgroundUpdateStore(SQLBaseStore):
|
|||
"""
|
||||
|
||||
overall_select_query_args: List[Union[int, str]] = []
|
||||
# Make sure we always have a row that tells us if we linked up to another
|
||||
# state group that we already processed (`state_group_reached`) regardless
|
||||
# of whether we find any state according to the state_filter.
|
||||
#
|
||||
# We use a `UNION ALL` to make sure it is always the first row returned.
|
||||
# `UNION` will merge and sort in with the rows from the next query
|
||||
# otherwise.
|
||||
overall_select_clause = """
|
||||
(
|
||||
SELECT NULL, NULL, NULL, state_group_reached
|
||||
FROM sgs
|
||||
ORDER BY state_group ASC
|
||||
LIMIT 1
|
||||
) UNION ALL (%s)
|
||||
"""
|
||||
|
||||
# This is an optimization to create a select clause per-condition. This
|
||||
# makes the query planner a lot smarter on what rows should pull out in the
|
||||
|
@ -178,7 +193,7 @@ class StateGroupBackgroundUpdateStore(SQLBaseStore):
|
|||
"""
|
||||
)
|
||||
|
||||
overall_select_clause = " UNION ".join(select_clause_list)
|
||||
main_select_clause = " UNION ".join(select_clause_list)
|
||||
else:
|
||||
where_clause, where_args = state_filter.make_sql_filter_clause()
|
||||
# Unless the filter clause is empty, we're going to append it after an
|
||||
|
@ -188,7 +203,7 @@ class StateGroupBackgroundUpdateStore(SQLBaseStore):
|
|||
|
||||
overall_select_query_args.extend(where_args)
|
||||
|
||||
overall_select_clause = f"""
|
||||
main_select_clause = f"""
|
||||
SELECT DISTINCT ON (type, state_key)
|
||||
type, state_key, event_id, state_group
|
||||
FROM state_groups_state
|
||||
|
@ -209,6 +224,7 @@ class StateGroupBackgroundUpdateStore(SQLBaseStore):
|
|||
|
||||
state_groups_we_have_already_fetched_string = ", ".join(
|
||||
[
|
||||
# TODO: Is this string manipulation safe?
|
||||
f"{state_group}::bigint"
|
||||
# We default to `[-1]` just to fill in the query with something
|
||||
# that will have no effct
|
||||
|
@ -217,6 +233,16 @@ class StateGroupBackgroundUpdateStore(SQLBaseStore):
|
|||
)
|
||||
|
||||
txn.execute(
|
||||
sql
|
||||
% (
|
||||
state_groups_we_have_already_fetched_string,
|
||||
overall_select_clause % (main_select_clause,),
|
||||
),
|
||||
args,
|
||||
)
|
||||
|
||||
logger.info(
|
||||
"sql=%s, args=%s",
|
||||
sql
|
||||
% (
|
||||
state_groups_we_have_already_fetched_string,
|
||||
|
@ -225,24 +251,39 @@ class StateGroupBackgroundUpdateStore(SQLBaseStore):
|
|||
args,
|
||||
)
|
||||
|
||||
min_state_group: Optional[int] = None
|
||||
# The first row is always our special `state_group_reached` row which
|
||||
# tells us if we linked up to any other existing state_group that we
|
||||
# already fetched and if so, which one we linked up to (see the `UNION
|
||||
# ALL` above)
|
||||
first_row = txn.fetchone()
|
||||
if first_row:
|
||||
_, _, _, state_group_reached = first_row
|
||||
|
||||
partial_state_map_for_state_group: MutableStateMap[str] = {}
|
||||
for row in txn:
|
||||
typ, state_key, event_id, state_group = row
|
||||
typ, state_key, event_id, _state_group = row
|
||||
logger.info(
|
||||
"row from db -> group=%s type=%s state_key=%s event_id=%s",
|
||||
group,
|
||||
typ,
|
||||
state_key,
|
||||
event_id,
|
||||
)
|
||||
key = (intern_string(typ), intern_string(state_key))
|
||||
partial_state_map_for_state_group[key] = event_id
|
||||
|
||||
if min_state_group is None or state_group < min_state_group:
|
||||
min_state_group = state_group
|
||||
logger.info(
|
||||
"group=%s state_group_reached=%s, partial_state_map_for_state_group=%s",
|
||||
group,
|
||||
state_group_reached,
|
||||
partial_state_map_for_state_group,
|
||||
)
|
||||
|
||||
# If we see a state group edge link to a previous state_group that we
|
||||
# already fetched from the database, link up the base state to the
|
||||
# partial state we retrieved from the database to build on top of.
|
||||
if (
|
||||
min_state_group is not None
|
||||
and results.get(min_state_group) is not None
|
||||
):
|
||||
resultant_state_map = dict(results[min_state_group])
|
||||
if state_group_reached in results:
|
||||
resultant_state_map = dict(results[state_group_reached])
|
||||
resultant_state_map.update(partial_state_map_for_state_group)
|
||||
|
||||
results[group] = resultant_state_map
|
||||
|
@ -310,6 +351,7 @@ class StateGroupBackgroundUpdateStore(SQLBaseStore):
|
|||
allow_none=True,
|
||||
)
|
||||
|
||||
logger.info("_get_state_groups_from_groups_txn results=%s", results)
|
||||
# The results shouldn't be considered mutable.
|
||||
return results
|
||||
|
||||
|
|
Loading…
Reference in a new issue