mirror of
https://github.com/element-hq/synapse
synced 2024-09-20 00:35:14 +00:00
Revert crazy custom sampler and span process to try force tracing for users
This commit is contained in:
parent
6bb7cb7166
commit
dbd9005cd1
2 changed files with 16 additions and 149 deletions
|
@ -138,15 +138,7 @@ class Auth:
|
||||||
AuthError if access is denied for the user in the access token
|
AuthError if access is denied for the user in the access token
|
||||||
"""
|
"""
|
||||||
parent_span = get_active_span()
|
parent_span = get_active_span()
|
||||||
with start_active_span(
|
with start_active_span("get_user_by_req"):
|
||||||
"get_user_by_req",
|
|
||||||
attributes={
|
|
||||||
# We still haven't determined whether to force tracing yet so we
|
|
||||||
# need to make sure the sampler set our span as recording so we
|
|
||||||
# don't lose anything.
|
|
||||||
SynapseTags.FORCE_RECORD_MAYBE_SAMPLE: True,
|
|
||||||
},
|
|
||||||
):
|
|
||||||
requester = await self._wrapped_get_user_by_req(
|
requester = await self._wrapped_get_user_by_req(
|
||||||
request, allow_guest, allow_expired
|
request, allow_guest, allow_expired
|
||||||
)
|
)
|
||||||
|
|
|
@ -265,11 +265,6 @@ logger = logging.getLogger(__name__)
|
||||||
class SynapseTags:
|
class SynapseTags:
|
||||||
"""FIXME: Rename to `SynapseAttributes` so it matches OpenTelemetry `SpanAttributes`"""
|
"""FIXME: Rename to `SynapseAttributes` so it matches OpenTelemetry `SpanAttributes`"""
|
||||||
|
|
||||||
# Force the sampler to always record
|
|
||||||
FORCE_RECORD_MAYBE_SAMPLE = "synapse.force_record_maybe_sample"
|
|
||||||
# Force the span to be exported
|
|
||||||
FORCE_TRACING = "synapse.force_tracing"
|
|
||||||
|
|
||||||
# The message ID of any to_device message processed
|
# The message ID of any to_device message processed
|
||||||
TO_DEVICE_MESSAGE_ID = "to_device.message_id"
|
TO_DEVICE_MESSAGE_ID = "to_device.message_id"
|
||||||
|
|
||||||
|
@ -387,111 +382,6 @@ def ensure_active_span(
|
||||||
# Setup
|
# Setup
|
||||||
|
|
||||||
|
|
||||||
if opentelemetry:
|
|
||||||
|
|
||||||
class AlwaysSampleSpan(opentelemetry.sdk.trace.ReadableSpan):
|
|
||||||
def __init__(self, span: "opentelemetry.sdk.trace.ReadableSpan"):
|
|
||||||
self._span = span
|
|
||||||
|
|
||||||
span_context = span.get_span_context()
|
|
||||||
self._always_sample_span_context = opentelemetry.trace.span.SpanContext(
|
|
||||||
trace_id=span_context.trace_id,
|
|
||||||
span_id=span_context.span_id,
|
|
||||||
is_remote=span_context.is_remote,
|
|
||||||
# Override and always trace
|
|
||||||
trace_flags=opentelemetry.trace.TraceFlags(
|
|
||||||
opentelemetry.trace.TraceFlags.SAMPLED
|
|
||||||
),
|
|
||||||
trace_state=span_context.trace_state,
|
|
||||||
)
|
|
||||||
|
|
||||||
@property
|
|
||||||
def context(self):
|
|
||||||
return self._always_sample_span_context
|
|
||||||
|
|
||||||
def get_span_context(self):
|
|
||||||
return self._always_sample_span_context
|
|
||||||
|
|
||||||
def __getattr__(self, attr):
|
|
||||||
if attr in ("context", "get_span_context"):
|
|
||||||
return self._always_sample_span_context
|
|
||||||
return getattr(self._span, attr)
|
|
||||||
|
|
||||||
else:
|
|
||||||
AlwaysSampleSpan = None
|
|
||||||
|
|
||||||
|
|
||||||
class ForcibleTracingBatchSpanProcessor(
|
|
||||||
opentelemetry.sdk.trace.export.BatchSpanProcessor
|
|
||||||
):
|
|
||||||
def on_end(self, span: "opentelemetry.sdk.trace.ReadableSpan") -> None:
|
|
||||||
should_force_export = span.attributes and span.attributes.get(
|
|
||||||
SynapseTags.FORCE_TRACING, False
|
|
||||||
)
|
|
||||||
if should_force_export and AlwaysSampleSpan is not None:
|
|
||||||
# Returns a proxied span that has recording and sampling enabled so
|
|
||||||
# that it can be exported.
|
|
||||||
proxied_span = AlwaysSampleSpan(span)
|
|
||||||
proxied_span_context = proxied_span.context
|
|
||||||
super().on_end(proxied_span)
|
|
||||||
else:
|
|
||||||
# Otherwise handle as normal
|
|
||||||
super().on_end(span)
|
|
||||||
|
|
||||||
|
|
||||||
class ForcibleRecordSampler(opentelemetry.sdk.trace.sampling.ParentBasedTraceIdRatio):
|
|
||||||
def should_sample(
|
|
||||||
self,
|
|
||||||
parent_context: Optional["opentelemetry.context.context.Context"],
|
|
||||||
trace_id: int,
|
|
||||||
name: str,
|
|
||||||
kind: "opentelemetry.trace.SpanKind" = None,
|
|
||||||
attributes: opentelemetry.util.types.Attributes = None,
|
|
||||||
links: Sequence["opentelemetry.trace.Link"] = None,
|
|
||||||
trace_state: "opentelemetry.trace.span.TraceState" = None,
|
|
||||||
) -> "opentelemetry.sdk.trace.sampling.SamplingResult":
|
|
||||||
default_sampling_result = super().should_sample(
|
|
||||||
parent_context=parent_context,
|
|
||||||
trace_id=trace_id,
|
|
||||||
name=name,
|
|
||||||
kind=kind,
|
|
||||||
attributes=attributes,
|
|
||||||
links=links,
|
|
||||||
trace_state=trace_state,
|
|
||||||
)
|
|
||||||
# If the default behavior already says we should sample, let's use that
|
|
||||||
# because that's the our ideal scenario!
|
|
||||||
if default_sampling_result.decision.is_sampled():
|
|
||||||
return default_sampling_result
|
|
||||||
|
|
||||||
# Otherwise check if we should atleast record
|
|
||||||
should_record = attributes and attributes.get(
|
|
||||||
SynapseTags.FORCE_RECORD_MAYBE_SAMPLE, False
|
|
||||||
)
|
|
||||||
if should_record:
|
|
||||||
return opentelemetry.sdk.trace.sampling.SamplingResult(
|
|
||||||
# Just record so the span doesn't lose any data in case we
|
|
||||||
# decide to force tracing later
|
|
||||||
decision=opentelemetry.sdk.trace.sampling.Decision.RECORD_ONLY,
|
|
||||||
attributes=attributes,
|
|
||||||
trace_state=self._get_parent_trace_state(parent_context),
|
|
||||||
)
|
|
||||||
|
|
||||||
# And fallback to the default response
|
|
||||||
return default_sampling_result
|
|
||||||
|
|
||||||
def _get_parent_trace_state(
|
|
||||||
self,
|
|
||||||
parent_context,
|
|
||||||
) -> Optional["opentelemetry.trace.span.TraceState"]:
|
|
||||||
parent_span_context = opentelemetry.trace.get_current_span(
|
|
||||||
parent_context
|
|
||||||
).get_span_context()
|
|
||||||
if parent_span_context is None or not parent_span_context.is_valid:
|
|
||||||
return None
|
|
||||||
return parent_span_context.trace_state
|
|
||||||
|
|
||||||
|
|
||||||
def init_tracer(hs: "HomeServer") -> None:
|
def init_tracer(hs: "HomeServer") -> None:
|
||||||
"""Set the whitelists and initialise the OpenTelemetry tracer"""
|
"""Set the whitelists and initialise the OpenTelemetry tracer"""
|
||||||
global opentelemetry
|
global opentelemetry
|
||||||
|
@ -515,10 +405,9 @@ def init_tracer(hs: "HomeServer") -> None:
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
# sampler = opentelemetry.sdk.trace.sampling.ParentBasedTraceIdRatio(
|
sampler = opentelemetry.sdk.trace.sampling.ParentBasedTraceIdRatio(
|
||||||
# hs.config.tracing.sample_rate
|
hs.config.tracing.sample_rate
|
||||||
# )
|
)
|
||||||
sampler = ForcibleRecordSampler(hs.config.tracing.sample_rate)
|
|
||||||
|
|
||||||
tracer_provider = opentelemetry.sdk.trace.TracerProvider(
|
tracer_provider = opentelemetry.sdk.trace.TracerProvider(
|
||||||
resource=resource, sampler=sampler
|
resource=resource, sampler=sampler
|
||||||
|
@ -532,10 +421,9 @@ def init_tracer(hs: "HomeServer") -> None:
|
||||||
jaeger_exporter = opentelemetry.exporter.jaeger.thrift.JaegerExporter(
|
jaeger_exporter = opentelemetry.exporter.jaeger.thrift.JaegerExporter(
|
||||||
**hs.config.tracing.jaeger_exporter_config
|
**hs.config.tracing.jaeger_exporter_config
|
||||||
)
|
)
|
||||||
# jaeger_processor = opentelemetry.sdk.trace.export.BatchSpanProcessor(
|
jaeger_processor = opentelemetry.sdk.trace.export.BatchSpanProcessor(
|
||||||
# jaeger_exporter
|
jaeger_exporter
|
||||||
# )
|
)
|
||||||
jaeger_processor = ForcibleTracingBatchSpanProcessor(jaeger_exporter)
|
|
||||||
tracer_provider.add_span_processor(jaeger_processor)
|
tracer_provider.add_span_processor(jaeger_processor)
|
||||||
|
|
||||||
# Sets the global default tracer provider
|
# Sets the global default tracer provider
|
||||||
|
@ -768,19 +656,8 @@ def force_tracing(span: Optional["opentelemetry.trace.span.Span"] = None) -> Non
|
||||||
Args:
|
Args:
|
||||||
span: span to force tracing for. By default, the active span.
|
span: span to force tracing for. By default, the active span.
|
||||||
"""
|
"""
|
||||||
|
# TODO
|
||||||
if span is None:
|
pass
|
||||||
span = get_active_span()
|
|
||||||
|
|
||||||
if span:
|
|
||||||
# Used by the span exporter to determine whether to force export
|
|
||||||
# regardless of what IsRecording/Sampled on the SpanContext says
|
|
||||||
span.set_attribute(SynapseTags.FORCE_TRACING, True)
|
|
||||||
|
|
||||||
# ctx = get_context_from_span(span)
|
|
||||||
# opentelemetry.baggage.set_baggage(
|
|
||||||
# SynapseBaggage.FORCE_TRACING, "1", context=ctx
|
|
||||||
# )
|
|
||||||
|
|
||||||
|
|
||||||
def is_context_forced_tracing(
|
def is_context_forced_tracing(
|
||||||
|
@ -1032,11 +909,6 @@ def trace_servlet(
|
||||||
return
|
return
|
||||||
|
|
||||||
attrs = {
|
attrs = {
|
||||||
# This is the root span and aren't able to determine whether to force
|
|
||||||
# tracing yet so we need to make sure the sampler set our span as
|
|
||||||
# recording so we don't lose anything.
|
|
||||||
SynapseTags.FORCE_RECORD_MAYBE_SAMPLE: True,
|
|
||||||
# Other attributes
|
|
||||||
SynapseTags.REQUEST_ID: request.get_request_id(),
|
SynapseTags.REQUEST_ID: request.get_request_id(),
|
||||||
SpanAttributes.HTTP_METHOD: request.get_method(),
|
SpanAttributes.HTTP_METHOD: request.get_method(),
|
||||||
SpanAttributes.HTTP_URL: request.get_redacted_uri(),
|
SpanAttributes.HTTP_URL: request.get_redacted_uri(),
|
||||||
|
@ -1046,15 +918,18 @@ def trace_servlet(
|
||||||
request_name = request.request_metrics.name
|
request_name = request.request_metrics.name
|
||||||
tracing_context = context_from_request(request) if extract_context else None
|
tracing_context = context_from_request(request) if extract_context else None
|
||||||
|
|
||||||
# we configure the scope not to finish the span immediately on exit, and instead
|
# This is will end up being the root span for all of servlet traces and we
|
||||||
# pass the span into the SynapseRequest, which will finish it once we've finished
|
# aren't able to determine whether to force tracing yet. We can determine
|
||||||
# sending the response to the client.
|
# whether to force trace later in `synapse/api/auth.py`.
|
||||||
|
|
||||||
with start_active_span(
|
with start_active_span(
|
||||||
request_name,
|
request_name,
|
||||||
kind=SpanKind.SERVER,
|
kind=SpanKind.SERVER,
|
||||||
context=tracing_context,
|
context=tracing_context,
|
||||||
attributes=attrs,
|
attributes=attrs,
|
||||||
|
# we configure the span not to finish immediately on exiting the scope,
|
||||||
|
# and instead pass the span into the SynapseRequest (via
|
||||||
|
# `request.set_tracing_span(span)`), which will finish it once we've
|
||||||
|
# finished sending the response to the client.
|
||||||
end_on_exit=False,
|
end_on_exit=False,
|
||||||
) as span:
|
) as span:
|
||||||
request.set_tracing_span(span)
|
request.set_tracing_span(span)
|
||||||
|
|
Loading…
Reference in a new issue