Fix historical messages backfilling in random order on remote homeservers (MSC2716) (#11114)

Fix https://github.com/matrix-org/synapse/issues/11091
Fix https://github.com/matrix-org/synapse/issues/10764 (side-stepping the issue because we no longer have to deal with `fake_prev_event_id`)

 1. Made the `/backfill` response return messages in `(depth, stream_ordering)` order (previously only sorted by `depth`)
    - Technically, it shouldn't really matter how `/backfill` returns things but I'm just trying to make the `stream_ordering` a little more consistent from the origin to the remote homeservers in order to get the order of messages from `/messages` consistent ([sorted by `(topological_ordering, stream_ordering)`](https://github.com/matrix-org/synapse/blob/develop/docs/development/room-dag-concepts.md#depth-and-stream-ordering)).
    - Even now that we return backfilled messages in order, it still doesn't guarantee the same `stream_ordering` (and more importantly the [`/messages` order](https://github.com/matrix-org/synapse/blob/develop/docs/development/room-dag-concepts.md#depth-and-stream-ordering)) on the other server. For example, if a room has a bunch of history imported and someone visits a permalink to a historical message back in time, their homeserver will skip over the historical messages in between and insert the permalink as the next message in the `stream_order` and totally throw off the sort.
       - This will be even more the case when we add the [MSC3030 jump to date API endpoint](https://github.com/matrix-org/matrix-doc/pull/3030) so the static archives can navigate and jump to a certain date.
       - We're solving this in the future by switching to [online topological ordering](https://github.com/matrix-org/gomatrixserverlib/issues/187) and [chunking](https://github.com/matrix-org/synapse/issues/3785) which by its nature will apply retroactively to fix any inconsistencies introduced by people permalinking
 2. As we're navigating `prev_events` to return in `/backfill`, we order by `depth` first (newest -> oldest) and now also tie-break based on the `stream_ordering` (newest -> oldest). This is technically important because MSC2716 inserts a bunch of historical messages at the same `depth` so it's best to be prescriptive about which ones we should process first. In reality, I think the code already looped over the historical messages as expected because the database is already in order.
 3. Making the historical state chain and historical event chain float on their own by having no `prev_events` instead of a fake `prev_event` which caused backfill to get clogged with an unresolvable event. Fixes https://github.com/matrix-org/synapse/issues/11091 and https://github.com/matrix-org/synapse/issues/10764
 4. We no longer find connected insertion events by finding a potential `prev_event` connection to the current event we're iterating over. We now solely rely on marker events which when processed, add the insertion event as an extremity and the federating homeserver can ask about it when time calls.
    - Related discussion, https://github.com/matrix-org/synapse/pull/11114#discussion_r741514793


Before | After
--- | ---
![](https://user-images.githubusercontent.com/558581/139218681-b465c862-5c49-4702-a59e-466733b0cf45.png) | ![](https://user-images.githubusercontent.com/558581/146453159-a1609e0a-8324-439d-ae44-e4bce43ac6d1.png)



#### Why aren't we sorting topologically when receiving backfill events?

> The main reason we're going to opt to not sort topologically when receiving backfill events is because it's probably best to do whatever is easiest to make it just work. People will probably have opinions once they look at [MSC2716](https://github.com/matrix-org/matrix-doc/pull/2716) which could change whatever implementation anyway.
> 
> As mentioned, ideally we would do this but code necessary to make the fake edges but it gets confusing and gives an impression of “just whyyyy” (feels icky). This problem also dissolves with online topological ordering.
>
> -- https://github.com/matrix-org/synapse/pull/11114#discussion_r741517138

See https://github.com/matrix-org/synapse/pull/11114#discussion_r739610091 for the technical difficulties
This commit is contained in:
Eric Eastwood 2022-02-07 15:54:13 -06:00 committed by GitHub
parent cf06783d54
commit fef2e792be
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 348 additions and 155 deletions

1
changelog.d/11114.bugfix Normal file
View file

@ -0,0 +1 @@
Fix [MSC2716](https://github.com/matrix-org/matrix-doc/pull/2716) historical messages backfilling in random order on remote homeservers.

View file

@ -166,9 +166,14 @@ class FederationHandler:
oldest_events_with_depth = (
await self.store.get_oldest_event_ids_with_depth_in_room(room_id)
)
insertion_events_to_be_backfilled = (
await self.store.get_insertion_event_backwards_extremities_in_room(room_id)
)
insertion_events_to_be_backfilled: Dict[str, int] = {}
if self.hs.config.experimental.msc2716_enabled:
insertion_events_to_be_backfilled = (
await self.store.get_insertion_event_backward_extremities_in_room(
room_id
)
)
logger.debug(
"_maybe_backfill_inner: extremities oldest_events_with_depth=%s insertion_events_to_be_backfilled=%s",
oldest_events_with_depth,
@ -271,11 +276,12 @@ class FederationHandler:
]
logger.debug(
"room_id: %s, backfill: current_depth: %s, limit: %s, max_depth: %s, extrems: %s filtered_sorted_extremeties_tuple: %s",
"room_id: %s, backfill: current_depth: %s, limit: %s, max_depth: %s, extrems (%d): %s filtered_sorted_extremeties_tuple: %s",
room_id,
current_depth,
limit,
max_depth,
len(sorted_extremeties_tuple),
sorted_extremeties_tuple,
filtered_sorted_extremeties_tuple,
)
@ -1047,6 +1053,19 @@ class FederationHandler:
limit = min(limit, 100)
events = await self.store.get_backfill_events(room_id, pdu_list, limit)
logger.debug(
"on_backfill_request: backfill events=%s",
[
"event_id=%s,depth=%d,body=%s,prevs=%s\n"
% (
event.event_id,
event.depth,
event.content.get("body", event.type),
event.prev_event_ids(),
)
for event in events
],
)
events = await filter_events_for_server(self.storage, origin, events)

View file

@ -508,7 +508,11 @@ class FederationEventHandler:
f"room {ev.room_id}, when we were backfilling in {room_id}"
)
await self._process_pulled_events(dest, events, backfilled=True)
await self._process_pulled_events(
dest,
events,
backfilled=True,
)
async def _get_missing_events_for_pdu(
self, origin: str, pdu: EventBase, prevs: Set[str], min_depth: int
@ -626,11 +630,24 @@ class FederationEventHandler:
backfilled: True if this is part of a historical batch of events (inhibits
notification to clients, and validation of device keys.)
"""
logger.debug(
"processing pulled backfilled=%s events=%s",
backfilled,
[
"event_id=%s,depth=%d,body=%s,prevs=%s\n"
% (
event.event_id,
event.depth,
event.content.get("body", event.type),
event.prev_event_ids(),
)
for event in events
],
)
# We want to sort these by depth so we process them and
# tell clients about them in order.
sorted_events = sorted(events, key=lambda x: x.depth)
for ev in sorted_events:
with nested_logging_context(ev.event_id):
await self._process_pulled_event(origin, ev, backfilled=backfilled)
@ -992,6 +1009,8 @@ class FederationEventHandler:
await self._run_push_actions_and_persist_event(event, context, backfilled)
await self._handle_marker_event(origin, event)
if backfilled or context.rejected:
return
@ -1071,8 +1090,6 @@ class FederationEventHandler:
event.sender,
)
await self._handle_marker_event(origin, event)
async def _resync_device(self, sender: str) -> None:
"""We have detected that the device list for the given user may be out
of sync, so we try and resync them.
@ -1323,7 +1340,14 @@ class FederationEventHandler:
return event, context
events_to_persist = (x for x in (prep(event) for event in fetched_events) if x)
await self.persist_events_and_notify(room_id, tuple(events_to_persist))
await self.persist_events_and_notify(
room_id,
tuple(events_to_persist),
# Mark these events backfilled as they're historic events that will
# eventually be backfilled. For example, missing events we fetch
# during backfill should be marked as backfilled as well.
backfilled=True,
)
async def _check_event_auth(
self,

View file

@ -490,12 +490,12 @@ class EventCreationHandler:
requester: Requester,
event_dict: dict,
txn_id: Optional[str] = None,
allow_no_prev_events: bool = False,
prev_event_ids: Optional[List[str]] = None,
auth_event_ids: Optional[List[str]] = None,
require_consent: bool = True,
outlier: bool = False,
historical: bool = False,
allow_no_prev_events: bool = False,
depth: Optional[int] = None,
) -> Tuple[EventBase, EventContext]:
"""
@ -510,6 +510,10 @@ class EventCreationHandler:
requester
event_dict: An entire event
txn_id
allow_no_prev_events: Whether to allow this event to be created an empty
list of prev_events. Normally this is prohibited just because most
events should have a prev_event and we should only use this in special
cases like MSC2716.
prev_event_ids:
the forward extremities to use as the prev_events for the
new event.
@ -604,10 +608,10 @@ class EventCreationHandler:
event, context = await self.create_new_client_event(
builder=builder,
requester=requester,
allow_no_prev_events=allow_no_prev_events,
prev_event_ids=prev_event_ids,
auth_event_ids=auth_event_ids,
depth=depth,
allow_no_prev_events=allow_no_prev_events,
)
# In an ideal world we wouldn't need the second part of this condition. However,
@ -764,6 +768,7 @@ class EventCreationHandler:
self,
requester: Requester,
event_dict: dict,
allow_no_prev_events: bool = False,
prev_event_ids: Optional[List[str]] = None,
auth_event_ids: Optional[List[str]] = None,
ratelimit: bool = True,
@ -781,6 +786,10 @@ class EventCreationHandler:
Args:
requester: The requester sending the event.
event_dict: An entire event.
allow_no_prev_events: Whether to allow this event to be created an empty
list of prev_events. Normally this is prohibited just because most
events should have a prev_event and we should only use this in special
cases like MSC2716.
prev_event_ids:
The event IDs to use as the prev events.
Should normally be left as None to automatically request them
@ -880,16 +889,20 @@ class EventCreationHandler:
self,
builder: EventBuilder,
requester: Optional[Requester] = None,
allow_no_prev_events: bool = False,
prev_event_ids: Optional[List[str]] = None,
auth_event_ids: Optional[List[str]] = None,
depth: Optional[int] = None,
allow_no_prev_events: bool = False,
) -> Tuple[EventBase, EventContext]:
"""Create a new event for a local client
Args:
builder:
requester:
allow_no_prev_events: Whether to allow this event to be created an empty
list of prev_events. Normally this is prohibited just because most
events should have a prev_event and we should only use this in special
cases like MSC2716.
prev_event_ids:
the forward extremities to use as the prev_events for the
new event.
@ -908,7 +921,6 @@ class EventCreationHandler:
Returns:
Tuple of created event, context
"""
# Strip down the auth_event_ids to only what we need to auth the event.
# For example, we don't need extra m.room.member that don't match event.sender
full_state_ids_at_event = None

View file

@ -13,10 +13,6 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__)
def generate_fake_event_id() -> str:
return "$fake_" + random_string(43)
class RoomBatchHandler:
def __init__(self, hs: "HomeServer"):
self.hs = hs
@ -182,11 +178,12 @@ class RoomBatchHandler:
state_event_ids_at_start = []
auth_event_ids = initial_auth_event_ids.copy()
# Make the state events float off on their own so we don't have a
# bunch of `@mxid joined the room` noise between each batch
prev_event_id_for_state_chain = generate_fake_event_id()
# Make the state events float off on their own by specifying no
# prev_events for the first one in the chain so we don't have a bunch of
# `@mxid joined the room` noise between each batch.
prev_event_ids_for_state_chain: List[str] = []
for state_event in state_events_at_start:
for index, state_event in enumerate(state_events_at_start):
assert_params_in_dict(
state_event, ["type", "origin_server_ts", "content", "sender"]
)
@ -222,7 +219,10 @@ class RoomBatchHandler:
content=event_dict["content"],
outlier=True,
historical=True,
prev_event_ids=[prev_event_id_for_state_chain],
# Only the first event in the chain should be floating.
# The rest should hang off each other in a chain.
allow_no_prev_events=index == 0,
prev_event_ids=prev_event_ids_for_state_chain,
# Make sure to use a copy of this list because we modify it
# later in the loop here. Otherwise it will be the same
# reference and also update in the event when we append later.
@ -242,7 +242,10 @@ class RoomBatchHandler:
event_dict,
outlier=True,
historical=True,
prev_event_ids=[prev_event_id_for_state_chain],
# Only the first event in the chain should be floating.
# The rest should hang off each other in a chain.
allow_no_prev_events=index == 0,
prev_event_ids=prev_event_ids_for_state_chain,
# Make sure to use a copy of this list because we modify it
# later in the loop here. Otherwise it will be the same
# reference and also update in the event when we append later.
@ -253,7 +256,7 @@ class RoomBatchHandler:
state_event_ids_at_start.append(event_id)
auth_event_ids.append(event_id)
# Connect all the state in a floating chain
prev_event_id_for_state_chain = event_id
prev_event_ids_for_state_chain = [event_id]
return state_event_ids_at_start
@ -261,7 +264,6 @@ class RoomBatchHandler:
self,
events_to_create: List[JsonDict],
room_id: str,
initial_prev_event_ids: List[str],
inherited_depth: int,
auth_event_ids: List[str],
app_service_requester: Requester,
@ -277,9 +279,6 @@ class RoomBatchHandler:
events_to_create: List of historical events to create in JSON
dictionary format.
room_id: Room where you want the events persisted in.
initial_prev_event_ids: These will be the prev_events for the first
event created. Each event created afterwards will point to the
previous event created.
inherited_depth: The depth to create the events at (you will
probably by calling inherit_depth_from_prev_ids(...)).
auth_event_ids: Define which events allow you to create the given
@ -291,11 +290,14 @@ class RoomBatchHandler:
"""
assert app_service_requester.app_service
prev_event_ids = initial_prev_event_ids.copy()
# Make the historical event chain float off on its own by specifying no
# prev_events for the first event in the chain which causes the HS to
# ask for the state at the start of the batch later.
prev_event_ids: List[str] = []
event_ids = []
events_to_persist = []
for ev in events_to_create:
for index, ev in enumerate(events_to_create):
assert_params_in_dict(ev, ["type", "origin_server_ts", "content", "sender"])
assert self.hs.is_mine_id(ev["sender"]), "User must be our own: %s" % (
@ -319,6 +321,9 @@ class RoomBatchHandler:
ev["sender"], app_service_requester.app_service
),
event_dict,
# Only the first event in the chain should be floating.
# The rest should hang off each other in a chain.
allow_no_prev_events=index == 0,
prev_event_ids=event_dict.get("prev_events"),
auth_event_ids=auth_event_ids,
historical=True,
@ -370,7 +375,6 @@ class RoomBatchHandler:
events_to_create: List[JsonDict],
room_id: str,
batch_id_to_connect_to: str,
initial_prev_event_ids: List[str],
inherited_depth: int,
auth_event_ids: List[str],
app_service_requester: Requester,
@ -385,9 +389,6 @@ class RoomBatchHandler:
room_id: Room where you want the events created in.
batch_id_to_connect_to: The batch_id from the insertion event you
want this batch to connect to.
initial_prev_event_ids: These will be the prev_events for the first
event created. Each event created afterwards will point to the
previous event created.
inherited_depth: The depth to create the events at (you will
probably by calling inherit_depth_from_prev_ids(...)).
auth_event_ids: Define which events allow you to create the given
@ -436,7 +437,6 @@ class RoomBatchHandler:
event_ids = await self.persist_historical_events(
events_to_create=events_to_create,
room_id=room_id,
initial_prev_event_ids=initial_prev_event_ids,
inherited_depth=inherited_depth,
auth_event_ids=auth_event_ids,
app_service_requester=app_service_requester,

View file

@ -268,7 +268,8 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
target: UserID,
room_id: str,
membership: str,
prev_event_ids: List[str],
allow_no_prev_events: bool = False,
prev_event_ids: Optional[List[str]] = None,
auth_event_ids: Optional[List[str]] = None,
txn_id: Optional[str] = None,
ratelimit: bool = True,
@ -286,8 +287,12 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
target:
room_id:
membership:
prev_event_ids: The event IDs to use as the prev events
allow_no_prev_events: Whether to allow this event to be created an empty
list of prev_events. Normally this is prohibited just because most
events should have a prev_event and we should only use this in special
cases like MSC2716.
prev_event_ids: The event IDs to use as the prev events
auth_event_ids:
The event ids to use as the auth_events for the new event.
Should normally be left as None, which will cause them to be calculated
@ -344,6 +349,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
"membership": membership,
},
txn_id=txn_id,
allow_no_prev_events=allow_no_prev_events,
prev_event_ids=prev_event_ids,
auth_event_ids=auth_event_ids,
require_consent=require_consent,
@ -446,6 +452,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
require_consent: bool = True,
outlier: bool = False,
historical: bool = False,
allow_no_prev_events: bool = False,
prev_event_ids: Optional[List[str]] = None,
auth_event_ids: Optional[List[str]] = None,
) -> Tuple[str, int]:
@ -470,6 +477,10 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
historical: Indicates whether the message is being inserted
back in time around some existing events. This is used to skip
a few checks and mark the event as backfilled.
allow_no_prev_events: Whether to allow this event to be created an empty
list of prev_events. Normally this is prohibited just because most
events should have a prev_event and we should only use this in special
cases like MSC2716.
prev_event_ids: The event IDs to use as the prev events
auth_event_ids:
The event ids to use as the auth_events for the new event.
@ -504,6 +515,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
require_consent=require_consent,
outlier=outlier,
historical=historical,
allow_no_prev_events=allow_no_prev_events,
prev_event_ids=prev_event_ids,
auth_event_ids=auth_event_ids,
)
@ -525,6 +537,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
require_consent: bool = True,
outlier: bool = False,
historical: bool = False,
allow_no_prev_events: bool = False,
prev_event_ids: Optional[List[str]] = None,
auth_event_ids: Optional[List[str]] = None,
) -> Tuple[str, int]:
@ -551,6 +564,10 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
historical: Indicates whether the message is being inserted
back in time around some existing events. This is used to skip
a few checks and mark the event as backfilled.
allow_no_prev_events: Whether to allow this event to be created an empty
list of prev_events. Normally this is prohibited just because most
events should have a prev_event and we should only use this in special
cases like MSC2716.
prev_event_ids: The event IDs to use as the prev events
auth_event_ids:
The event ids to use as the auth_events for the new event.
@ -680,6 +697,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
membership=effective_membership_state,
txn_id=txn_id,
ratelimit=ratelimit,
allow_no_prev_events=allow_no_prev_events,
prev_event_ids=prev_event_ids,
auth_event_ids=auth_event_ids,
content=content,

View file

@ -131,6 +131,14 @@ class RoomBatchSendEventRestServlet(RestServlet):
prev_event_ids_from_query
)
if not auth_event_ids:
raise SynapseError(
HTTPStatus.BAD_REQUEST,
"No auth events found for given prev_event query parameter. The prev_event=%s probably does not exist."
% prev_event_ids_from_query,
errcode=Codes.INVALID_PARAM,
)
state_event_ids_at_start = []
# Create and persist all of the state events that float off on their own
# before the batch. These will most likely be all of the invite/member
@ -197,21 +205,12 @@ class RoomBatchSendEventRestServlet(RestServlet):
EventContentFields.MSC2716_NEXT_BATCH_ID
]
# Also connect the historical event chain to the end of the floating
# state chain, which causes the HS to ask for the state at the start of
# the batch later. If there is no state chain to connect to, just make
# the insertion event float itself.
prev_event_ids = []
if len(state_event_ids_at_start):
prev_event_ids = [state_event_ids_at_start[-1]]
# Create and persist all of the historical events as well as insertion
# and batch meta events to make the batch navigable in the DAG.
event_ids, next_batch_id = await self.room_batch_handler.handle_batch_of_events(
events_to_create=events_to_create,
room_id=room_id,
batch_id_to_connect_to=batch_id_to_connect_to,
initial_prev_event_ids=prev_event_ids,
inherited_depth=inherited_depth,
auth_event_ids=auth_event_ids,
app_service_requester=requester,

View file

@ -16,9 +16,10 @@ import logging
from queue import Empty, PriorityQueue
from typing import TYPE_CHECKING, Collection, Dict, Iterable, List, Optional, Set, Tuple
import attr
from prometheus_client import Counter, Gauge
from synapse.api.constants import MAX_DEPTH
from synapse.api.constants import MAX_DEPTH, EventTypes
from synapse.api.errors import StoreError
from synapse.api.room_versions import EventFormatVersions, RoomVersion
from synapse.events import EventBase, make_event_from_dict
@ -60,6 +61,15 @@ pdus_pruned_from_federation_queue = Counter(
logger = logging.getLogger(__name__)
# All the info we need while iterating the DAG while backfilling
@attr.s(frozen=True, slots=True, auto_attribs=True)
class BackfillQueueNavigationItem:
depth: int
stream_ordering: int
event_id: str
type: str
class _NoChainCoverIndex(Exception):
def __init__(self, room_id: str):
super().__init__("Unexpectedly no chain cover for events in %s" % (room_id,))
@ -74,6 +84,8 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
):
super().__init__(database, db_conn, hs)
self.hs = hs
if hs.config.worker.run_background_tasks:
hs.get_clock().looping_call(
self._delete_old_forward_extrem_cache, 60 * 60 * 1000
@ -737,7 +749,7 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
room_id,
)
async def get_insertion_event_backwards_extremities_in_room(
async def get_insertion_event_backward_extremities_in_room(
self, room_id
) -> Dict[str, int]:
"""Get the insertion events we know about that we haven't backfilled yet.
@ -754,7 +766,7 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
Map from event_id to depth
"""
def get_insertion_event_backwards_extremities_in_room_txn(txn, room_id):
def get_insertion_event_backward_extremities_in_room_txn(txn, room_id):
sql = """
SELECT b.event_id, MAX(e.depth) FROM insertion_events as i
/* We only want insertion events that are also marked as backwards extremities */
@ -770,8 +782,8 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
return dict(txn)
return await self.db_pool.runInteraction(
"get_insertion_event_backwards_extremities_in_room",
get_insertion_event_backwards_extremities_in_room_txn,
"get_insertion_event_backward_extremities_in_room",
get_insertion_event_backward_extremities_in_room_txn,
room_id,
)
@ -997,58 +1009,25 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
"get_forward_extremeties_for_room", get_forward_extremeties_for_room_txn
)
async def get_backfill_events(self, room_id: str, event_list: list, limit: int):
"""Get a list of Events for a given topic that occurred before (and
including) the events in event_list. Return a list of max size `limit`
def _get_connected_batch_event_backfill_results_txn(
self, txn: LoggingTransaction, insertion_event_id: str, limit: int
) -> List[BackfillQueueNavigationItem]:
"""
Find any batch connections of a given insertion event.
A batch event points at a insertion event via:
batch_event.content[MSC2716_BATCH_ID] -> insertion_event.content[MSC2716_NEXT_BATCH_ID]
Args:
room_id
event_list
limit
txn: The database transaction to use
insertion_event_id: The event ID to navigate from. We will find
batch events that point back at this insertion event.
limit: Max number of event ID's to query for and return
Returns:
List of batch events that the backfill queue can process
"""
event_ids = await self.db_pool.runInteraction(
"get_backfill_events",
self._get_backfill_events,
room_id,
event_list,
limit,
)
events = await self.get_events_as_list(event_ids)
return sorted(events, key=lambda e: -e.depth)
def _get_backfill_events(self, txn, room_id, event_list, limit):
logger.debug("_get_backfill_events: %s, %r, %s", room_id, event_list, limit)
event_results = set()
# We want to make sure that we do a breadth-first, "depth" ordered
# search.
# Look for the prev_event_id connected to the given event_id
query = """
SELECT depth, prev_event_id FROM event_edges
/* Get the depth of the prev_event_id from the events table */
INNER JOIN events
ON prev_event_id = events.event_id
/* Find an event which matches the given event_id */
WHERE event_edges.event_id = ?
AND event_edges.is_state = ?
LIMIT ?
"""
# Look for the "insertion" events connected to the given event_id
connected_insertion_event_query = """
SELECT e.depth, i.event_id FROM insertion_event_edges AS i
/* Get the depth of the insertion event from the events table */
INNER JOIN events AS e USING (event_id)
/* Find an insertion event which points via prev_events to the given event_id */
WHERE i.insertion_prev_event_id = ?
LIMIT ?
"""
# Find any batch connections of a given insertion event
batch_connection_query = """
SELECT e.depth, c.event_id FROM insertion_events AS i
SELECT e.depth, e.stream_ordering, c.event_id, e.type FROM insertion_events AS i
/* Find the batch that connects to the given insertion event */
INNER JOIN batch_events AS c
ON i.next_batch_id = c.batch_id
@ -1059,81 +1038,213 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
LIMIT ?
"""
# Find any batch connections for the given insertion event
txn.execute(
batch_connection_query,
(insertion_event_id, limit),
)
return [
BackfillQueueNavigationItem(
depth=row[0],
stream_ordering=row[1],
event_id=row[2],
type=row[3],
)
for row in txn
]
def _get_connected_prev_event_backfill_results_txn(
self, txn: LoggingTransaction, event_id: str, limit: int
) -> List[BackfillQueueNavigationItem]:
"""
Find any events connected by prev_event the specified event_id.
Args:
txn: The database transaction to use
event_id: The event ID to navigate from
limit: Max number of event ID's to query for and return
Returns:
List of prev events that the backfill queue can process
"""
# Look for the prev_event_id connected to the given event_id
connected_prev_event_query = """
SELECT depth, stream_ordering, prev_event_id, events.type FROM event_edges
/* Get the depth and stream_ordering of the prev_event_id from the events table */
INNER JOIN events
ON prev_event_id = events.event_id
/* Look for an edge which matches the given event_id */
WHERE event_edges.event_id = ?
AND event_edges.is_state = ?
/* Because we can have many events at the same depth,
* we want to also tie-break and sort on stream_ordering */
ORDER BY depth DESC, stream_ordering DESC
LIMIT ?
"""
txn.execute(
connected_prev_event_query,
(event_id, False, limit),
)
return [
BackfillQueueNavigationItem(
depth=row[0],
stream_ordering=row[1],
event_id=row[2],
type=row[3],
)
for row in txn
]
async def get_backfill_events(
self, room_id: str, seed_event_id_list: list, limit: int
):
"""Get a list of Events for a given topic that occurred before (and
including) the events in seed_event_id_list. Return a list of max size `limit`
Args:
room_id
seed_event_id_list
limit
"""
event_ids = await self.db_pool.runInteraction(
"get_backfill_events",
self._get_backfill_events,
room_id,
seed_event_id_list,
limit,
)
events = await self.get_events_as_list(event_ids)
return sorted(
events, key=lambda e: (-e.depth, -e.internal_metadata.stream_ordering)
)
def _get_backfill_events(self, txn, room_id, seed_event_id_list, limit):
"""
We want to make sure that we do a breadth-first, "depth" ordered search.
We also handle navigating historical branches of history connected by
insertion and batch events.
"""
logger.debug(
"_get_backfill_events(room_id=%s): seeding backfill with seed_event_id_list=%s limit=%s",
room_id,
seed_event_id_list,
limit,
)
event_id_results = set()
# In a PriorityQueue, the lowest valued entries are retrieved first.
# We're using depth as the priority in the queue.
# Depth is lowest at the oldest-in-time message and highest and
# newest-in-time message. We add events to the queue with a negative depth so that
# we process the newest-in-time messages first going backwards in time.
# We're using depth as the priority in the queue and tie-break based on
# stream_ordering. Depth is lowest at the oldest-in-time message and
# highest and newest-in-time message. We add events to the queue with a
# negative depth so that we process the newest-in-time messages first
# going backwards in time. stream_ordering follows the same pattern.
queue = PriorityQueue()
for event_id in event_list:
depth = self.db_pool.simple_select_one_onecol_txn(
for seed_event_id in seed_event_id_list:
event_lookup_result = self.db_pool.simple_select_one_txn(
txn,
table="events",
keyvalues={"event_id": event_id, "room_id": room_id},
retcol="depth",
keyvalues={"event_id": seed_event_id, "room_id": room_id},
retcols=(
"type",
"depth",
"stream_ordering",
),
allow_none=True,
)
if depth:
queue.put((-depth, event_id))
if event_lookup_result is not None:
logger.debug(
"_get_backfill_events(room_id=%s): seed_event_id=%s depth=%s stream_ordering=%s type=%s",
room_id,
seed_event_id,
event_lookup_result["depth"],
event_lookup_result["stream_ordering"],
event_lookup_result["type"],
)
while not queue.empty() and len(event_results) < limit:
if event_lookup_result["depth"]:
queue.put(
(
-event_lookup_result["depth"],
-event_lookup_result["stream_ordering"],
seed_event_id,
event_lookup_result["type"],
)
)
while not queue.empty() and len(event_id_results) < limit:
try:
_, event_id = queue.get_nowait()
_, _, event_id, event_type = queue.get_nowait()
except Empty:
break
if event_id in event_results:
if event_id in event_id_results:
continue
event_results.add(event_id)
event_id_results.add(event_id)
# Try and find any potential historical batches of message history.
#
# First we look for an insertion event connected to the current
# event (by prev_event). If we find any, we need to go and try to
# find any batch events connected to the insertion event (by
# batch_id). If we find any, we'll add them to the queue and
# navigate up the DAG like normal in the next iteration of the loop.
txn.execute(
connected_insertion_event_query, (event_id, limit - len(event_results))
)
connected_insertion_event_id_results = txn.fetchall()
logger.debug(
"_get_backfill_events: connected_insertion_event_query %s",
connected_insertion_event_id_results,
)
for row in connected_insertion_event_id_results:
connected_insertion_event_depth = row[0]
connected_insertion_event = row[1]
queue.put((-connected_insertion_event_depth, connected_insertion_event))
if self.hs.config.experimental.msc2716_enabled:
# We need to go and try to find any batch events connected
# to a given insertion event (by batch_id). If we find any, we'll
# add them to the queue and navigate up the DAG like normal in the
# next iteration of the loop.
if event_type == EventTypes.MSC2716_INSERTION:
# Find any batch connections for the given insertion event
connected_batch_event_backfill_results = (
self._get_connected_batch_event_backfill_results_txn(
txn, event_id, limit - len(event_id_results)
)
)
logger.debug(
"_get_backfill_events(room_id=%s): connected_batch_event_backfill_results=%s",
room_id,
connected_batch_event_backfill_results,
)
for (
connected_batch_event_backfill_item
) in connected_batch_event_backfill_results:
if (
connected_batch_event_backfill_item.event_id
not in event_id_results
):
queue.put(
(
-connected_batch_event_backfill_item.depth,
-connected_batch_event_backfill_item.stream_ordering,
connected_batch_event_backfill_item.event_id,
connected_batch_event_backfill_item.type,
)
)
# Find any batch connections for the given insertion event
txn.execute(
batch_connection_query,
(connected_insertion_event, limit - len(event_results)),
# Now we just look up the DAG by prev_events as normal
connected_prev_event_backfill_results = (
self._get_connected_prev_event_backfill_results_txn(
txn, event_id, limit - len(event_id_results)
)
batch_start_event_id_results = txn.fetchall()
logger.debug(
"_get_backfill_events: batch_start_event_id_results %s",
batch_start_event_id_results,
)
for row in batch_start_event_id_results:
if row[1] not in event_results:
queue.put((-row[0], row[1]))
txn.execute(query, (event_id, False, limit - len(event_results)))
prev_event_id_results = txn.fetchall()
logger.debug(
"_get_backfill_events: prev_event_ids %s", prev_event_id_results
)
logger.debug(
"_get_backfill_events(room_id=%s): connected_prev_event_backfill_results=%s",
room_id,
connected_prev_event_backfill_results,
)
for (
connected_prev_event_backfill_item
) in connected_prev_event_backfill_results:
if connected_prev_event_backfill_item.event_id not in event_id_results:
queue.put(
(
-connected_prev_event_backfill_item.depth,
-connected_prev_event_backfill_item.stream_ordering,
connected_prev_event_backfill_item.event_id,
connected_prev_event_backfill_item.type,
)
)
for row in prev_event_id_results:
if row[1] not in event_results:
queue.put((-row[0], row[1]))
return event_results
return event_id_results
async def get_missing_events(self, room_id, earliest_events, latest_events, limit):
ids = await self.db_pool.runInteraction(

View file

@ -2215,9 +2215,14 @@ class PersistEventsStore:
" SELECT 1 FROM event_backward_extremities"
" WHERE event_id = ? AND room_id = ?"
" )"
# 1. Don't add an event as a extremity again if we already persisted it
# as a non-outlier.
# 2. Don't add an outlier as an extremity if it has no prev_events
" AND NOT EXISTS ("
" SELECT 1 FROM events WHERE event_id = ? AND room_id = ? "
" AND outlier = ?"
" SELECT 1 FROM events"
" LEFT JOIN event_edges edge"
" ON edge.event_id = events.event_id"
" WHERE events.event_id = ? AND events.room_id = ? AND (events.outlier = ? OR edge.event_id IS NULL)"
" )"
)
@ -2243,6 +2248,10 @@ class PersistEventsStore:
(ev.event_id, ev.room_id)
for ev in events
if not ev.internal_metadata.is_outlier()
# If we encountered an event with no prev_events, then we might
# as well remove it now because it won't ever have anything else
# to backfill from.
or len(ev.prev_event_ids()) == 0
],
)