mirror of
https://github.com/element-hq/synapse
synced 2024-09-19 22:15:11 +00:00
Trace some results
This commit is contained in:
parent
2a467fd26b
commit
597c3f276e
6 changed files with 55 additions and 17 deletions
|
@ -61,7 +61,7 @@ from synapse.federation.federation_base import (
|
||||||
)
|
)
|
||||||
from synapse.federation.transport.client import SendJoinResponse
|
from synapse.federation.transport.client import SendJoinResponse
|
||||||
from synapse.http.types import QueryParams
|
from synapse.http.types import QueryParams
|
||||||
from synapse.logging.tracing import tag_args, trace
|
from synapse.logging.tracing import SynapseTags, set_attribute, tag_args, trace
|
||||||
from synapse.types import JsonDict, UserID, get_domain_from_id
|
from synapse.types import JsonDict, UserID, get_domain_from_id
|
||||||
from synapse.util.async_helpers import concurrently_execute
|
from synapse.util.async_helpers import concurrently_execute
|
||||||
from synapse.util.caches.expiringcache import ExpiringCache
|
from synapse.util.caches.expiringcache import ExpiringCache
|
||||||
|
@ -451,6 +451,8 @@ class FederationClient(FederationBase):
|
||||||
|
|
||||||
return event_copy
|
return event_copy
|
||||||
|
|
||||||
|
@trace
|
||||||
|
@tag_args
|
||||||
async def get_room_state_ids(
|
async def get_room_state_ids(
|
||||||
self, destination: str, room_id: str, event_id: str
|
self, destination: str, room_id: str, event_id: str
|
||||||
) -> Tuple[List[str], List[str]]:
|
) -> Tuple[List[str], List[str]]:
|
||||||
|
@ -470,6 +472,15 @@ class FederationClient(FederationBase):
|
||||||
state_event_ids = result["pdu_ids"]
|
state_event_ids = result["pdu_ids"]
|
||||||
auth_event_ids = result.get("auth_chain_ids", [])
|
auth_event_ids = result.get("auth_chain_ids", [])
|
||||||
|
|
||||||
|
set_attribute(
|
||||||
|
SynapseTags.RESULT_PREFIX + f"state_event_ids ({len(state_event_ids)})",
|
||||||
|
str(state_event_ids),
|
||||||
|
)
|
||||||
|
set_attribute(
|
||||||
|
SynapseTags.RESULT_PREFIX + f"auth_event_ids ({len(auth_event_ids)})",
|
||||||
|
str(auth_event_ids),
|
||||||
|
)
|
||||||
|
|
||||||
if not isinstance(state_event_ids, list) or not isinstance(
|
if not isinstance(state_event_ids, list) or not isinstance(
|
||||||
auth_event_ids, list
|
auth_event_ids, list
|
||||||
):
|
):
|
||||||
|
|
|
@ -59,7 +59,7 @@ from synapse.events.validator import EventValidator
|
||||||
from synapse.federation.federation_client import InvalidResponseError
|
from synapse.federation.federation_client import InvalidResponseError
|
||||||
from synapse.http.servlet import assert_params_in_dict
|
from synapse.http.servlet import assert_params_in_dict
|
||||||
from synapse.logging.context import nested_logging_context
|
from synapse.logging.context import nested_logging_context
|
||||||
from synapse.logging.tracing import set_attribute, trace
|
from synapse.logging.tracing import SynapseTags, set_attribute, trace
|
||||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||||
from synapse.module_api import NOT_SPAM
|
from synapse.module_api import NOT_SPAM
|
||||||
from synapse.replication.http.federation import (
|
from synapse.replication.http.federation import (
|
||||||
|
@ -323,7 +323,9 @@ class FederationHandler:
|
||||||
if len(extremities_to_request) >= 5:
|
if len(extremities_to_request) >= 5:
|
||||||
break
|
break
|
||||||
|
|
||||||
set_attribute("backfill_point" + str(i), str(bp))
|
set_attribute(
|
||||||
|
SynapseTags.RESULT_PREFIX + "backfill_point" + str(i), str(bp)
|
||||||
|
)
|
||||||
|
|
||||||
# For regular backwards extremities, we don't have the extremity events
|
# For regular backwards extremities, we don't have the extremity events
|
||||||
# themselves, so we need to actually check the events that reference them -
|
# themselves, so we need to actually check the events that reference them -
|
||||||
|
|
|
@ -1025,10 +1025,10 @@ class FederationEventHandler:
|
||||||
logger.debug("Fetching %i events from cache/store", len(desired_events))
|
logger.debug("Fetching %i events from cache/store", len(desired_events))
|
||||||
have_events = await self._store.have_seen_events(room_id, desired_events)
|
have_events = await self._store.have_seen_events(room_id, desired_events)
|
||||||
|
|
||||||
missing_desired_events = desired_events - have_events
|
missing_desired_event_ids = desired_events - have_events
|
||||||
logger.debug(
|
logger.debug(
|
||||||
"We are missing %i events (got %i)",
|
"We are missing %i events (got %i)",
|
||||||
len(missing_desired_events),
|
len(missing_desired_event_ids),
|
||||||
len(have_events),
|
len(have_events),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -1040,13 +1040,24 @@ class FederationEventHandler:
|
||||||
# already have a bunch of the state events. It would be nice if the
|
# already have a bunch of the state events. It would be nice if the
|
||||||
# federation api gave us a way of finding out which we actually need.
|
# federation api gave us a way of finding out which we actually need.
|
||||||
|
|
||||||
missing_auth_events = set(auth_event_ids) - have_events
|
missing_auth_event_ids = set(auth_event_ids) - have_events
|
||||||
missing_auth_events.difference_update(
|
missing_auth_event_ids.difference_update(
|
||||||
await self._store.have_seen_events(room_id, missing_auth_events)
|
await self._store.have_seen_events(room_id, missing_auth_event_ids)
|
||||||
)
|
)
|
||||||
logger.debug("We are also missing %i auth events", len(missing_auth_events))
|
logger.debug("We are also missing %i auth events", len(missing_auth_event_ids))
|
||||||
|
|
||||||
missing_events = missing_desired_events | missing_auth_events
|
missing_event_ids = missing_desired_event_ids | missing_auth_event_ids
|
||||||
|
|
||||||
|
set_attribute(
|
||||||
|
SynapseTags.RESULT_PREFIX
|
||||||
|
+ f"missing_auth_event_ids ({len(missing_auth_event_ids)})",
|
||||||
|
str(missing_auth_event_ids),
|
||||||
|
)
|
||||||
|
set_attribute(
|
||||||
|
SynapseTags.RESULT_PREFIX
|
||||||
|
+ f"missing_desired_event_ids ({len(missing_desired_event_ids)})",
|
||||||
|
str(missing_desired_event_ids),
|
||||||
|
)
|
||||||
|
|
||||||
# Making an individual request for each of 1000s of events has a lot of
|
# Making an individual request for each of 1000s of events has a lot of
|
||||||
# overhead. On the other hand, we don't really want to fetch all of the events
|
# overhead. On the other hand, we don't really want to fetch all of the events
|
||||||
|
@ -1057,13 +1068,13 @@ class FederationEventHandler:
|
||||||
#
|
#
|
||||||
# TODO: might it be better to have an API which lets us do an aggregate event
|
# TODO: might it be better to have an API which lets us do an aggregate event
|
||||||
# request
|
# request
|
||||||
if (len(missing_events) * 10) >= len(auth_event_ids) + len(state_event_ids):
|
if (len(missing_event_ids) * 10) >= len(auth_event_ids) + len(state_event_ids):
|
||||||
logger.debug("Requesting complete state from remote")
|
logger.debug("Requesting complete state from remote")
|
||||||
await self._get_state_and_persist(destination, room_id, event_id)
|
await self._get_state_and_persist(destination, room_id, event_id)
|
||||||
else:
|
else:
|
||||||
logger.debug("Fetching %i events from remote", len(missing_events))
|
logger.debug("Fetching %i events from remote", len(missing_event_ids))
|
||||||
await self._get_events_and_persist(
|
await self._get_events_and_persist(
|
||||||
destination=destination, room_id=room_id, event_ids=missing_events
|
destination=destination, room_id=room_id, event_ids=missing_event_ids
|
||||||
)
|
)
|
||||||
|
|
||||||
# We now need to fill out the state map, which involves fetching the
|
# We now need to fill out the state map, which involves fetching the
|
||||||
|
@ -1121,6 +1132,11 @@ class FederationEventHandler:
|
||||||
failed_to_fetch,
|
failed_to_fetch,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
set_attribute(
|
||||||
|
SynapseTags.RESULT_PREFIX + f"failed_to_fetch ({len(failed_to_fetch)})",
|
||||||
|
str(failed_to_fetch),
|
||||||
|
)
|
||||||
|
|
||||||
if remote_event.is_state() and remote_event.rejected_reason is None:
|
if remote_event.is_state() and remote_event.rejected_reason is None:
|
||||||
state_map[
|
state_map[
|
||||||
(remote_event.type, remote_event.state_key)
|
(remote_event.type, remote_event.state_key)
|
||||||
|
@ -1662,7 +1678,9 @@ class FederationEventHandler:
|
||||||
origin, event
|
origin, event
|
||||||
)
|
)
|
||||||
set_attribute(
|
set_attribute(
|
||||||
"claimed_auth_events", str([ev.event_id for ev in claimed_auth_events])
|
SynapseTags.RESULT_PREFIX
|
||||||
|
+ f"claimed_auth_events ({len(claimed_auth_events)})",
|
||||||
|
str([ev.event_id for ev in claimed_auth_events]),
|
||||||
)
|
)
|
||||||
|
|
||||||
# ... and check that the event passes auth at those auth events.
|
# ... and check that the event passes auth at those auth events.
|
||||||
|
@ -2110,7 +2128,7 @@ class FederationEventHandler:
|
||||||
if not backfilled: # Never notify for backfilled events
|
if not backfilled: # Never notify for backfilled events
|
||||||
with start_active_span("notify_persisted_events"):
|
with start_active_span("notify_persisted_events"):
|
||||||
set_attribute(
|
set_attribute(
|
||||||
SynapseTags.FUNC_ARG_PREFIX + f"event_ids ({len(events)})",
|
SynapseTags.RESULT_PREFIX + f"event_ids ({len(events)})",
|
||||||
str([ev.event_id for ev in events]),
|
str([ev.event_id for ev in events]),
|
||||||
)
|
)
|
||||||
for event in events:
|
for event in events:
|
||||||
|
|
|
@ -166,7 +166,6 @@ from functools import wraps
|
||||||
from typing import (
|
from typing import (
|
||||||
TYPE_CHECKING,
|
TYPE_CHECKING,
|
||||||
Any,
|
Any,
|
||||||
Awaitable,
|
|
||||||
Callable,
|
Callable,
|
||||||
ContextManager,
|
ContextManager,
|
||||||
Dict,
|
Dict,
|
||||||
|
@ -291,6 +290,9 @@ class SynapseTags:
|
||||||
# Tag keyword args
|
# Tag keyword args
|
||||||
FUNC_KWARGS = "kwargs"
|
FUNC_KWARGS = "kwargs"
|
||||||
|
|
||||||
|
# Some intermediate result that's interesting to the function
|
||||||
|
RESULT_PREFIX = "RESULT."
|
||||||
|
|
||||||
|
|
||||||
class SynapseBaggage:
|
class SynapseBaggage:
|
||||||
FORCE_TRACING = "synapse-force-tracing"
|
FORCE_TRACING = "synapse-force-tracing"
|
||||||
|
|
|
@ -29,7 +29,7 @@ from typing import (
|
||||||
|
|
||||||
from synapse.api.constants import EventTypes
|
from synapse.api.constants import EventTypes
|
||||||
from synapse.events import EventBase
|
from synapse.events import EventBase
|
||||||
from synapse.logging.tracing import trace
|
from synapse.logging.tracing import tag_args, trace
|
||||||
from synapse.storage.state import StateFilter
|
from synapse.storage.state import StateFilter
|
||||||
from synapse.storage.util.partial_state_events_tracker import (
|
from synapse.storage.util.partial_state_events_tracker import (
|
||||||
PartialCurrentStateTracker,
|
PartialCurrentStateTracker,
|
||||||
|
@ -228,6 +228,7 @@ class StateStorageController:
|
||||||
return {event: event_to_state[event] for event in event_ids}
|
return {event: event_to_state[event] for event in event_ids}
|
||||||
|
|
||||||
@trace
|
@trace
|
||||||
|
@tag_args
|
||||||
async def get_state_ids_for_events(
|
async def get_state_ids_for_events(
|
||||||
self,
|
self,
|
||||||
event_ids: Collection[str],
|
event_ids: Collection[str],
|
||||||
|
@ -332,6 +333,7 @@ class StateStorageController:
|
||||||
)
|
)
|
||||||
|
|
||||||
@trace
|
@trace
|
||||||
|
@tag_args
|
||||||
async def get_state_group_for_events(
|
async def get_state_group_for_events(
|
||||||
self,
|
self,
|
||||||
event_ids: Collection[str],
|
event_ids: Collection[str],
|
||||||
|
|
|
@ -20,6 +20,7 @@ from twisted.internet import defer
|
||||||
from twisted.internet.defer import Deferred
|
from twisted.internet.defer import Deferred
|
||||||
|
|
||||||
from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable
|
from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable
|
||||||
|
from synapse.logging.tracing import trace
|
||||||
from synapse.storage.databases.main.events_worker import EventsWorkerStore
|
from synapse.storage.databases.main.events_worker import EventsWorkerStore
|
||||||
from synapse.storage.databases.main.room import RoomWorkerStore
|
from synapse.storage.databases.main.room import RoomWorkerStore
|
||||||
from synapse.util import unwrapFirstError
|
from synapse.util import unwrapFirstError
|
||||||
|
@ -58,6 +59,7 @@ class PartialStateEventsTracker:
|
||||||
for o in observers:
|
for o in observers:
|
||||||
o.callback(None)
|
o.callback(None)
|
||||||
|
|
||||||
|
@trace
|
||||||
async def await_full_state(self, event_ids: Collection[str]) -> None:
|
async def await_full_state(self, event_ids: Collection[str]) -> None:
|
||||||
"""Wait for all the given events to have full state.
|
"""Wait for all the given events to have full state.
|
||||||
|
|
||||||
|
@ -151,6 +153,7 @@ class PartialCurrentStateTracker:
|
||||||
for o in observers:
|
for o in observers:
|
||||||
o.callback(None)
|
o.callback(None)
|
||||||
|
|
||||||
|
@trace
|
||||||
async def await_full_state(self, room_id: str) -> None:
|
async def await_full_state(self, room_id: str) -> None:
|
||||||
# We add the deferred immediately so that the DB call to check for
|
# We add the deferred immediately so that the DB call to check for
|
||||||
# partial state doesn't race when we unpartial the room.
|
# partial state doesn't race when we unpartial the room.
|
||||||
|
|
Loading…
Reference in a new issue