Optimize backfill receiving to have less missing prev_event thrashing

Pulled from scratch changes in,
https://github.com/matrix-org/synapse/pull/13864
This commit is contained in:
Eric Eastwood 2022-09-29 23:45:35 -05:00
parent 6f0c3e669d
commit 68ae0fd5c5
2 changed files with 69 additions and 2 deletions

View file

@ -75,6 +75,7 @@ from synapse.state import StateResolutionStore
from synapse.storage.databases.main.events import PartialStateConflictError
from synapse.storage.databases.main.events_worker import EventRedactBehaviour
from synapse.storage.state import StateFilter
from synapse.storage.util.id_generators import AbstractStreamIdGenerator
from synapse.types import (
PersistedEventPosition,
RoomStreamToken,
@ -644,9 +645,71 @@ class FederationEventHandler:
f"room {ev.room_id}, when we were backfilling in {room_id}"
)
# We expect the events from the `/backfill` response to start from
# `?v` and include events that preceded it (so the list will be
# newest -> oldest, reverse-chronological). It's described in the
# spec this way so we can rely on people doing it the right way for
# the historical messages to show up correctly.
reverse_chronological_events = events
# `[::-1]` is just syntax to reverse the list and give us a copy
chronological_events = reverse_chronological_events[::-1]
# We want to calculate the `stream_ordering` from newest -> oldest
# (reverse-chronological) (so MSC2716 historical events end up
# sorting in the correct order) and persist oldest -> newest
# (chronological) to get the least missing `prev_event` fetch
# thrashing.
# ------------------------------------------------------------------
# Since we have been configured to write, we ought to have id generators,
# rather than id trackers.
assert (
self._instance_name in self._config.worker.writers.events
), "Can only write stream IDs on master"
assert isinstance(self._store._backfill_id_gen, AbstractStreamIdGenerator)
stream_ordering_manager = self._store._backfill_id_gen.get_next_mult(
len(reverse_chronological_events)
)
async with stream_ordering_manager as stream_orderings:
# Calculate the `stream_ordering` from newest -> oldest
# (reverse-chronological) (so historical events end up sorting
# in the correct order).
#
# Backfilled events start with `stream_ordering=-1` and
# decrement. For events, that we backfill at the same `depth`
# (like chains of historical messages) in order for them to have
# the best chance of ending up in the correct order, assign
# `stream_ordering` to the assumed reverse-chronological list of
# events to backfill (where the newest events get
# stream_ordering assigned first)
#
# depth : stream_ordering : event
# ----- : --------------- : -----------------------
# 1 : 1 : Event before 1
# 2 : 2 : Event before 2
# 3 : -4 : Historical message 1
# 3 : -4 : Historical message 2
# 3 : -3 : Historical message 3
# 3 : -2 : Historical message 4
# 3 : -1 : Historical message 5
# 3 : 3 : Event after 1
# 4 : 4 : Event after 2
#
for event, stream in zip(
reverse_chronological_events, stream_orderings
):
event.internal_metadata.stream_ordering = stream
await self._process_pulled_events(
dest,
events,
# Persist events from oldest -> newest (chronological) to get
# the least missing `prev_event` fetch thrashing.
# `_process_pulled_events` does some sorting of its own by
# `depth` but if we let it sort the reverse-chronological list
# of events, it naively orders events with the same depth in the
# opposite order we want. If we pass it an already sorted by
# depth list, then everything lines up.
chronological_events,
backfilled=True,
)

View file

@ -209,7 +209,11 @@ class PersistEventsStore:
async with stream_ordering_manager as stream_orderings:
for (event, _), stream in zip(events_and_contexts, stream_orderings):
event.internal_metadata.stream_ordering = stream
# If someone has already decided the stream_ordering for the
# event before, then just use that. This is done during backfill
# to help ordering of MSC2716 historical messages.
if event.internal_metadata.stream_ordering is None:
event.internal_metadata.stream_ordering = stream
await self.db_pool.runInteraction(
"persist_events",