diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index 3fac256881..195057e841 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -75,6 +75,7 @@ from synapse.state import StateResolutionStore from synapse.storage.databases.main.events import PartialStateConflictError from synapse.storage.databases.main.events_worker import EventRedactBehaviour from synapse.storage.state import StateFilter +from synapse.storage.util.id_generators import AbstractStreamIdGenerator from synapse.types import ( PersistedEventPosition, RoomStreamToken, @@ -644,9 +645,71 @@ class FederationEventHandler: f"room {ev.room_id}, when we were backfilling in {room_id}" ) + # We expect the events from the `/backfill` response to start from + # `?v` and include events that preceded it (so the list will be + # newest -> oldest, reverse-chronological). It's described in the + # spec this way so we can rely on people doing it the right way for + # the historical messages to show up correctly. + reverse_chronological_events = events + # `[::-1]` is just syntax to reverse the list and give us a copy + chronological_events = reverse_chronological_events[::-1] + + # We want to calculate the `stream_ordering` from newest -> oldest + # (reverse-chronological) (so MSC2716 historical events end up + # sorting in the correct order) and persist oldest -> newest + # (chronological) to get the least missing `prev_event` fetch + # thrashing. + # ------------------------------------------------------------------ + + # Since we have been configured to write, we ought to have id generators, + # rather than id trackers. + assert ( + self._instance_name in self._config.worker.writers.events + ), "Can only write stream IDs on master" + assert isinstance(self._store._backfill_id_gen, AbstractStreamIdGenerator) + stream_ordering_manager = self._store._backfill_id_gen.get_next_mult( + len(reverse_chronological_events) + ) + async with stream_ordering_manager as stream_orderings: + # Calculate the `stream_ordering` from newest -> oldest + # (reverse-chronological) (so historical events end up sorting + # in the correct order). + # + # Backfilled events start with `stream_ordering=-1` and + # decrement. For events, that we backfill at the same `depth` + # (like chains of historical messages) in order for them to have + # the best chance of ending up in the correct order, assign + # `stream_ordering` to the assumed reverse-chronological list of + # events to backfill (where the newest events get + # stream_ordering assigned first) + # + # depth : stream_ordering : event + # ----- : --------------- : ----------------------- + # 1 : 1 : Event before 1 + # 2 : 2 : Event before 2 + # 3 : -4 : Historical message 1 + # 3 : -4 : Historical message 2 + # 3 : -3 : Historical message 3 + # 3 : -2 : Historical message 4 + # 3 : -1 : Historical message 5 + # 3 : 3 : Event after 1 + # 4 : 4 : Event after 2 + # + for event, stream in zip( + reverse_chronological_events, stream_orderings + ): + event.internal_metadata.stream_ordering = stream + await self._process_pulled_events( dest, - events, + # Persist events from oldest -> newest (chronological) to get + # the least missing `prev_event` fetch thrashing. + # `_process_pulled_events` does some sorting of its own by + # `depth` but if we let it sort the reverse-chronological list + # of events, it naively orders events with the same depth in the + # opposite order we want. If we pass it an already sorted by + # depth list, then everything lines up. + chronological_events, backfilled=True, ) diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index bb489b8189..27d75dba72 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -209,7 +209,11 @@ class PersistEventsStore: async with stream_ordering_manager as stream_orderings: for (event, _), stream in zip(events_and_contexts, stream_orderings): - event.internal_metadata.stream_ordering = stream + # If someone has already decided the stream_ordering for the + # event before, then just use that. This is done during backfill + # to help ordering of MSC2716 historical messages. + if event.internal_metadata.stream_ordering is None: + event.internal_metadata.stream_ordering = stream await self.db_pool.runInteraction( "persist_events",