Progress towards OTEL

This commit is contained in:
Eric Eastwood 2022-07-27 00:55:43 -05:00
parent 2fe6911957
commit 6984cefa79
17 changed files with 198 additions and 362 deletions

16
poetry.lock generated
View file

@ -636,17 +636,6 @@ category = "main"
optional = true
python-versions = ">=3.6"
[[package]]
name = "opentracing"
version = "2.4.0"
description = "OpenTracing API for Python. See documentation at http://opentracing.io"
category = "main"
optional = true
python-versions = "*"
[package.extras]
tests = ["doubles", "flake8", "flake8-quotes", "mock", "pytest", "pytest-cov", "pytest-mock", "sphinx", "sphinx-rtd-theme", "six (>=1.10.0,<2.0)", "gevent", "tornado"]
[[package]]
name = "packaging"
version = "21.3"
@ -1535,7 +1524,7 @@ url_preview = ["lxml"]
[metadata]
lock-version = "1.1"
python-versions = "^3.7.1"
content-hash = "14602f17c83b68a9dde71aee3b37d9c902153fed752a9a62d1d84b5ca8a6cd14"
content-hash = "c41657e7ab748ab5cf7cc149e78cde43e35588d2d1d47a94f161085c0c5d4ba5"
[metadata.files]
attrs = [
@ -2116,9 +2105,6 @@ opentelemetry-semantic-conventions = [
{file = "opentelemetry-semantic-conventions-0.30b1.tar.gz", hash = "sha256:2fac7c7202602566b87b2ee3c90fbc272be6094725479f8102f083bf425cc253"},
{file = "opentelemetry_semantic_conventions-0.30b1-py3-none-any.whl", hash = "sha256:5213268cd0a7a8fb94c054e4c1bac8c17586f732eca91769463320f3dcd910bb"},
]
opentracing = [
{file = "opentracing-2.4.0.tar.gz", hash = "sha256:a173117e6ef580d55874734d1fa7ecb6f3655160b8b8974a2a1e98e5ec9c840d"},
]
packaging = [
{file = "packaging-21.3-py3-none-any.whl", hash = "sha256:ef103e05f519cdc783ae24ea4e2e0f508a9c99b2d4969652eed6a2e1ea5bd522"},
{file = "packaging-21.3.tar.gz", hash = "sha256:dd47c42927d89ab911e606518907cc2d3a1f38bbd026385970643f9c5b8ecfeb"},

View file

@ -182,7 +182,6 @@ parameterized = { version = ">=0.7.4", optional = true }
idna = { version = ">=2.5", optional = true }
opentelemetry-api = {version = "^1.11.1", optional = true}
opentelemetry-sdk = {version = "^1.11.1", optional = true}
opentracing = {version = "^2.4.0", optional = true}
[tool.poetry.extras]
# NB: Packages that should be part of `pip install matrix-synapse[all]` need to be specified

View file

@ -23,9 +23,9 @@ from synapse.federation.persistence import TransactionActions
from synapse.federation.units import Edu, Transaction
from synapse.logging.tracing import (
extract_text_map,
set_attribute,
set_status,
start_active_span_follows_from,
tags,
StatusCode,
whitelisted_homeserver,
)
from synapse.types import JsonDict
@ -166,7 +166,7 @@ class TransactionManager:
except HttpResponseException as e:
code = e.code
set_attribute(tags.ERROR, True)
set_status(StatusCode.ERROR)
logger.info("TX [%s] {%s} got %d response", destination, txn_id, code)
raise

View file

@ -1568,7 +1568,7 @@ class SyncHandler:
sync_result_builder,
room_entry,
ephemeral=ephemeral_by_room.get(room_entry.room_id, []),
tags=tags_by_room.get(room_entry.room_id),
attributes=tags_by_room.get(room_entry.room_id),
account_data=account_data_by_room.get(room_entry.room_id, {}),
always_include=sync_result_builder.full_state,
)

View file

@ -75,7 +75,13 @@ from synapse.http import QuieterFileBodyProducer, RequestTimedOutError, redact_u
from synapse.http.proxyagent import ProxyAgent
from synapse.http.types import QueryParams
from synapse.logging.context import make_deferred_yieldable
from synapse.logging.tracing import set_attribute, start_active_span, tags
from synapse.logging.tracing import (
set_status,
start_active_span,
SpanKind,
SpanAttributes,
StatusCode,
)
from synapse.types import ISynapseReactor
from synapse.util import json_decoder
from synapse.util.async_helpers import timeout_deferred
@ -402,12 +408,12 @@ class SimpleHttpClient:
with start_active_span(
"outgoing-client-request",
tags={
tags.SPAN_KIND: tags.SPAN_KIND_RPC_CLIENT,
tags.HTTP_METHOD: method,
tags.HTTP_URL: uri,
kind=SpanKind.CLIENT,
attributes={
SpanAttributes.HTTP_METHOD: method,
SpanAttributes.HTTP_URL: uri,
},
finish_on_close=True,
end_on_exit=True,
):
try:
body_producer = None
@ -459,8 +465,7 @@ class SimpleHttpClient:
type(e).__name__,
e.args[0],
)
set_attribute(tags.ERROR, True)
set_attribute("error_reason", e.args[0])
set_status(StatusCode.ERROR, e.args[0])
raise
async def post_urlencoded_get_json(

View file

@ -72,9 +72,14 @@ from synapse.http.client import (
)
from synapse.http.federation.matrix_federation_agent import MatrixFederationAgent
from synapse.http.types import QueryParams
from synapse.logging import opentelemetry
from synapse.logging import tracing
from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.logging.tracing import set_attribute, start_active_span, tags
from synapse.logging.tracing import (
set_attribute,
start_active_span,
SpanKind,
SpanAttributes,
)
from synapse.types import JsonDict
from synapse.util import json_decoder
from synapse.util.async_helpers import AwakenableSleeper, timeout_deferred
@ -517,18 +522,18 @@ class MatrixFederationHttpClient:
scope = start_active_span(
"outgoing-federation-request",
tags={
tags.SPAN_KIND: tags.SPAN_KIND_RPC_CLIENT,
tags.PEER_ADDRESS: request.destination,
tags.HTTP_METHOD: request.method,
tags.HTTP_URL: request.path,
kind=SpanKind.CLIENT,
attributes={
SpanAttributes.PEER_ADDRESS: request.destination,
SpanAttributes.HTTP_METHOD: request.method,
SpanAttributes.HTTP_URL: request.path,
},
finish_on_close=True,
end_on_exit=True,
)
# Inject the span into the headers
headers_dict: Dict[bytes, List[bytes]] = {}
opentelemetry.inject_header_dict(headers_dict, request.destination)
tracing.inject_header_dict(headers_dict, request.destination)
headers_dict[b"User-Agent"] = [self.version_string_bytes]
@ -614,7 +619,7 @@ class MatrixFederationHttpClient:
request.method, response.code
).inc()
set_attribute(tags.HTTP_STATUS_CODE, response.code)
set_attribute(SpanAttributes.HTTP_STATUS_CODE, response.code)
response_phrase = response.phrase.decode("ascii", errors="replace")
if 200 <= response.code < 300:

View file

@ -66,7 +66,7 @@ from synapse.util.caches import intern_dict
from synapse.util.iterutils import chunk_seq
if TYPE_CHECKING:
import opentracing
import opentelemetry
from synapse.server import HomeServer
@ -868,15 +868,15 @@ async def _async_write_json_to_request_in_thread(
expensive.
"""
def encode(opentracing_span: "Optional[opentracing.Span]") -> bytes:
def encode(tracing_span: Optional["opentelemetry.trace.span.Span"]) -> bytes:
# it might take a while for the threadpool to schedule us, so we write
# opentracing logs once we actually get scheduled, so that we can see how
# much that contributed.
if opentracing_span:
opentracing_span.log_kv({"event": "scheduled"})
if tracing_span:
tracing_span.add_event("scheduled", attributes={"event": "scheduled"})
res = json_encoder(json_object)
if opentracing_span:
opentracing_span.log_kv({"event": "encoded"})
if tracing_span:
tracing_span.add_event("scheduled", attributes={"event": "encoded"})
return res
with start_active_span("encode_json_response"):

View file

@ -37,7 +37,7 @@ from synapse.logging.context import (
from synapse.types import Requester
if TYPE_CHECKING:
import opentracing
import opentelemetry
logger = logging.getLogger(__name__)
@ -87,7 +87,7 @@ class SynapseRequest(Request):
# An opentracing span for this request. Will be closed when the request is
# completely processed.
self._opentracing_span: "Optional[opentracing.Span]" = None
self._opentracing_span: Optional["opentelemetry.trace.span.Span"] = None
# we can't yet create the logcontext, as we don't know the method.
self.logcontext: Optional[LoggingContext] = None
@ -164,9 +164,7 @@ class SynapseRequest(Request):
# If there's no authenticated entity, it was the requester.
self.logcontext.request.authenticated_entity = authenticated_entity or requester
def set_opentracing_span(
self, span: opentelemetry.shim.opentracing_shim.SpanShim
) -> None:
def set_opentracing_span(self, span: "opentelemetry.trace.span.Span") -> None:
"""attach an opentracing span to this request
Doing so will cause the span to be closed when we finish processing the request

View file

@ -57,12 +57,12 @@ class LogContextScopeManager(ScopeManager):
ctx = current_context()
return ctx.scope
def activate(self, span: Span, finish_on_close: bool) -> Scope:
def activate(self, span: Span, end_on_exit: bool) -> Scope:
"""
Makes a Span active.
Args
span: the span that should become active.
finish_on_close: whether Span should be automatically finished when
end_on_exit: whether Span should be automatically finished when
Scope.close() is called.
Returns:
@ -93,7 +93,7 @@ class LogContextScopeManager(ScopeManager):
# "Re-starting finished log context" errors).
enter_logcontext = False
scope = _LogContextScope(self, span, ctx, enter_logcontext, finish_on_close)
scope = _LogContextScope(self, span, ctx, enter_logcontext, end_on_exit)
ctx.scope = scope
if enter_logcontext:
ctx.__enter__()
@ -118,7 +118,7 @@ class _LogContextScope(Scope):
span: Span,
logcontext: LoggingContext,
enter_logcontext: bool,
finish_on_close: bool,
end_on_exit: bool,
):
"""
Args:
@ -131,12 +131,12 @@ class _LogContextScope(Scope):
the log context to which this scope is attached.
enter_logcontext:
if True the log context will be exited when the scope is finished
finish_on_close:
end_on_exit:
if True finish the span when the scope is closed
"""
super().__init__(manager, span)
self.logcontext = logcontext
self._finish_on_close = finish_on_close
self._end_on_exit = end_on_exit
self._enter_logcontext = enter_logcontext
def __exit__(
@ -162,7 +162,7 @@ class _LogContextScope(Scope):
active_scope,
)
if self._finish_on_close:
if self._end_on_exit:
self.span.finish()
self.logcontext.scope = None

View file

@ -176,6 +176,7 @@ from typing import (
Dict,
Generator,
Iterable,
Iterator,
List,
Optional,
Pattern,
@ -203,51 +204,30 @@ if TYPE_CHECKING:
# Helper class
# Always returns the value given for any accessed property
class _DummyLookup(object):
def __init__(self, value):
self.value = value
class _DummyTagNames:
"""wrapper of opentracings tags. We need to have them if we
want to reference them without opentracing around. Clearly they
should never actually show up in a trace. `set_attributes` overwrites
these with the correct ones."""
INVALID_TAG = "invalid-tag"
COMPONENT = INVALID_TAG
DATABASE_INSTANCE = INVALID_TAG
DATABASE_STATEMENT = INVALID_TAG
DATABASE_TYPE = INVALID_TAG
DATABASE_USER = INVALID_TAG
ERROR = INVALID_TAG
HTTP_METHOD = INVALID_TAG
HTTP_STATUS_CODE = INVALID_TAG
HTTP_URL = INVALID_TAG
MESSAGE_BUS_DESTINATION = INVALID_TAG
PEER_ADDRESS = INVALID_TAG
PEER_HOSTNAME = INVALID_TAG
PEER_HOST_IPV4 = INVALID_TAG
PEER_HOST_IPV6 = INVALID_TAG
PEER_PORT = INVALID_TAG
PEER_SERVICE = INVALID_TAG
SAMPLING_PRIORITY = INVALID_TAG
SERVICE = INVALID_TAG
SPAN_KIND = INVALID_TAG
SPAN_KIND_CONSUMER = INVALID_TAG
SPAN_KIND_PRODUCER = INVALID_TAG
SPAN_KIND_RPC_CLIENT = INVALID_TAG
SPAN_KIND_RPC_SERVER = INVALID_TAG
def __getattribute__(self, name):
return self.value
# These dependencies are optional so they can fail to import
# and we
try:
import opentelemetry
import opentracing
import opentelemetry.trace
import opentelemetry.semconv.trace
SpanKind = opentelemetry.trace.SpanKind
SpanAttributes = opentelemetry.semconv.trace.SpanAttributes
StatusCode = opentelemetry.trace.StatusCode
# TODO: tags?
except ImportError:
opentelemetry = None # type: ignore[assignment]
opentracing = None # type: ignore[assignment]
tags = _DummyTagNames # type: ignore[assignment]
SpanKind = _DummyLookup(0)
SpanAttributes = _DummyLookup("fake-attribute")
StatusCode = _DummyLookup(0)
logger = logging.getLogger(__name__)
@ -431,49 +411,33 @@ def whitelisted_homeserver(destination: str) -> bool:
# Start spans and scopes
# Could use kwargs but I want these to be explicit
def start_active_span(
operation_name: str,
child_of: Optional[
Union[
opentelemetry.shim.opentracing_shim.SpanShim,
opentelemetry.shim.opentracing_shim.SpanContextShim,
]
] = None,
references: Optional[List["opentracing.Reference"]] = None,
tags: Optional[Dict[str, str]] = None,
start_time: Optional[float] = None,
ignore_active_span: bool = False,
finish_on_close: bool = True,
*,
tracer: Optional[opentelemetry.shim.opentracing_shim.TracerShim] = None,
) -> opentelemetry.shim.opentracing_shim.ScopeShim:
"""Starts an active opentracing span.
Records the start time for the span, and sets it as the "active span" in the
scope manager.
Args:
See opentracing.tracer
Returns:
scope (Scope) or contextlib.nullcontext
"""
def start_active_span(
name: str,
*,
context: Optional["opentelemetry.context.context.Context"] = None,
kind: Optional["opentelemetry.trace.SpanKind"] = None,
attributes: "opentelemetry.util.types.Attributes" = None,
links: Optional[Sequence["opentelemetry.trace.Link"]] = None,
start_time: Optional[int] = None,
record_exception: bool = True,
set_status_on_exception: bool = True,
end_on_exit: bool = True,
) -> Iterator["opentelemetry.trace.span.Span"]:
if opentelemetry is None:
return contextlib.nullcontext() # type: ignore[unreachable]
if tracer is None:
# use the global tracer by default
otel_tracer = opentelemetry.trace.get_tracer(__name__)
tracerShim = opentelemetry.shim.opentracing_shim.create_tracer(otel_tracer)
tracer = tracerShim
return tracer.start_active_span(
operation_name,
child_of=child_of,
references=references,
tags=tags,
tracer = opentelemetry.trace.get_tracer(__name__)
return tracer.start_as_current_span(
name=name,
context=context,
kind=kind,
attributes=attributes,
links=links,
start_time=start_time,
ignore_active_span=ignore_active_span,
finish_on_close=finish_on_close,
record_exception=record_exception,
set_status_on_exception=set_status_on_exception,
end_on_exit=end_on_exit,
)
@ -482,15 +446,15 @@ def start_active_span_follows_from(
contexts: Collection,
child_of: Optional[
Union[
opentelemetry.shim.opentracing_shim.SpanShim,
opentelemetry.shim.opentracing_shim.SpanContextShim,
"opentelemetry.shim.opentracing_shim.SpanShim",
"opentelemetry.shim.opentracing_shim.SpanContextShim",
]
] = None,
start_time: Optional[float] = None,
*,
inherit_force_tracing: bool = False,
tracer: Optional[opentelemetry.shim.opentracing_shim.TracerShim] = None,
) -> opentelemetry.shim.opentracing_shim.ScopeShim:
tracer: Optional["opentelemetry.shim.opentracing_shim.TracerShim"] = None,
) -> Iterator["opentelemetry.trace.span.Span"]:
"""Starts an active opentracing span, with additional references to previous spans
Args:
operation_name: name of the operation represented by the new span
@ -504,35 +468,14 @@ def start_active_span_follows_from(
forced, the new span will also have tracing forced.
tracer: override the opentracing tracer. By default the global tracer is used.
"""
if opentelemetry is None:
return contextlib.nullcontext() # type: ignore[unreachable]
references = [opentracing.follows_from(context) for context in contexts]
scope = start_active_span(
operation_name,
child_of=child_of,
references=references,
start_time=start_time,
tracer=tracer,
)
if inherit_force_tracing and any(
is_context_forced_tracing(ctx) for ctx in contexts
):
force_tracing(scope.span)
return scope
# TODO
pass
def start_active_span_from_edu(
edu_content: Dict[str, Any],
operation_name: str,
references: Optional[List["opentracing.Reference"]] = None,
tags: Optional[Dict[str, str]] = None,
start_time: Optional[float] = None,
ignore_active_span: bool = False,
finish_on_close: bool = True,
) -> opentelemetry.shim.opentracing_shim.ScopeShim:
) -> Iterator["opentelemetry.trace.span.Span"]:
"""
Extracts a span context from an edu and uses it to start a new active span
@ -542,50 +485,13 @@ def start_active_span_from_edu(
For the other args see opentracing.tracer
"""
references = references or []
if opentelemetry is None:
return contextlib.nullcontext() # type: ignore[unreachable]
carrier = json_decoder.decode(edu_content.get("context", "{}")).get(
"opentracing", {}
)
otel_tracer = opentelemetry.trace.get_tracer(__name__)
tracerShim = opentelemetry.shim.opentracing_shim.create_tracer(otel_tracer)
context = tracerShim.extract(opentracing.Format.TEXT_MAP, carrier)
_references = [
opentracing.Reference(
type=opentracing.ReferenceType.CHILD_OF,
referenced_context=span_context_from_string(x),
)
for x in carrier.get("references", [])
]
# For some reason jaeger decided not to support the visualization of multiple parent
# spans or explicitly show references. I include the span context as a tag here as
# an aid to people debugging but it's really not an ideal solution.
references += _references
scope = tracerShim.start_active_span(
operation_name,
child_of=context,
references=references,
tags=tags,
start_time=start_time,
ignore_active_span=ignore_active_span,
finish_on_close=finish_on_close,
)
scope.span.set_attribute("references", carrier.get("references", []))
return scope
# TODO
pass
# OpenTelemetry setters for attributes, logs, etc
@only_if_tracing
def active_span() -> Optional[opentelemetry.trace.span.Span]:
def active_span() -> Optional["opentelemetry.trace.span.Span"]:
"""Get the currently active span, if any"""
return opentelemetry.trace.get_current_span()
@ -593,24 +499,32 @@ def active_span() -> Optional[opentelemetry.trace.span.Span]:
@ensure_active_span("set a tag")
def set_attribute(key: str, value: Union[str, bool, int, float]) -> None:
"""Sets a tag on the active span"""
active_span = active_span()
assert active_span is not None
active_span.set_attribute(key, value)
span = active_span()
assert span is not None
span.set_attribute(key, value)
@ensure_active_span("set the status")
def set_status(key: str, status: "opentelemetry.trace.StatusCode") -> None:
"""Sets a tag on the active span"""
span = active_span()
assert span is not None
span.set_status(status)
@ensure_active_span("log")
def log_kv(key_values: Dict[str, Any], timestamp: Optional[float] = None) -> None:
"""Log to the active span"""
active_span = active_span()
assert active_span is not None
event_name = opentelemetry.ext.opentracing_shim.util.event_name_from_kv(key_values)
active_span.add_event(event_name, timestamp, key_values)
span = active_span()
assert span is not None
event_name = key_values.get("event", "log")
span.add_event(event_name, attributes=key_values, timestamp=timestamp)
@only_if_tracing
def force_tracing(
span: Union[
opentelemetry.shim.opentracing_shim.SpanShim, _Sentinel
"opentelemetry.shim.opentracing_shim.SpanShim", _Sentinel
] = _Sentinel.sentinel
) -> None:
"""Force sampling for the active/given span and its children.
@ -618,28 +532,16 @@ def force_tracing(
Args:
span: span to force tracing for. By default, the active span.
"""
if isinstance(span, _Sentinel):
span_to_trace = opentelemetry.trace.get_current_span()
else:
span_to_trace = span
if span_to_trace is None:
logger.error("No active span in force_tracing")
return
span_to_trace.set_attribute(opentracing.tags.SAMPLING_PRIORITY, 1)
# also set a bit of baggage, so that we have a way of figuring out if
# it is enabled later
span_to_trace.set_baggage_item(SynapseBaggage.FORCE_TRACING, "1")
# TODO
pass
def is_context_forced_tracing(
span_context: Optional[opentelemetry.shim.opentracing_shim.SpanContextShim],
span_context: Optional["opentelemetry.shim.opentracing_shim.SpanContextShim"],
) -> bool:
"""Check if sampling has been force for the given span context."""
if span_context is None:
return False
return span_context.baggage.get(SynapseBaggage.FORCE_TRACING) is not None
# TODO
return False
# Injection and extraction
@ -669,42 +571,19 @@ def inject_header_dict(
here:
https://github.com/jaegertracing/jaeger-client-python/blob/master/jaeger_client/constants.py
"""
if check_destination:
if destination is None:
raise ValueError(
"destination must be given unless check_destination is False"
)
if not whitelisted_homeserver(destination):
return
otel_tracer = opentelemetry.trace.get_tracer(__name__)
tracerShim = opentelemetry.shim.opentracing_shim.create_tracer(otel_tracer)
span = tracerShim.active_span
carrier: Dict[str, str] = {}
assert span is not None
tracerShim.inject(span.context, opentracing.Format.HTTP_HEADERS, carrier)
for key, value in carrier.items():
headers[key.encode()] = [value.encode()]
# TODO
pass
def inject_response_headers(response_headers: Headers) -> None:
"""Inject the current trace id into the HTTP response headers"""
if not opentelemetry:
return
span = opentelemetry.trace.get_current_span()
if not span:
current_span = opentelemetry.trace.get_current_span()
if not current_span:
return
# This is a bit implementation-specific.
#
# Jaeger's Spans have a trace_id property; other implementations (including the
# dummy opentracing.span.Span which we use if init_tracer is not called) do not
# expose it
trace_id = getattr(span, "trace_id", None)
trace_id = current_span.get_span_context().trace_id
if trace_id is not None:
response_headers.addRawHeader("Synapse-Trace-Id", f"{trace_id:x}")
@ -724,66 +603,27 @@ def get_active_span_text_map(destination: Optional[str] = None) -> Dict[str, str
Returns:
dict: the active span's context if opentracing is enabled, otherwise empty.
"""
if destination and not whitelisted_homeserver(destination):
return {}
# TODO
carrier: Dict[str, str] = {}
otel_tracer = opentelemetry.trace.get_tracer(__name__)
tracerShim = opentelemetry.shim.opentracing_shim.create_tracer(otel_tracer)
assert tracerShim.active_span is not None
tracerShim.inject(tracerShim.context, opentracing.Format.TEXT_MAP, carrier)
return carrier
@ensure_active_span("get the span context as a string.", ret={})
def active_span_context_as_string() -> str:
"""
Returns:
The active span context encoded as a string.
"""
carrier: Dict[str, str] = {}
if opentelemetry:
otel_tracer = opentelemetry.trace.get_tracer(__name__)
tracerShim = opentelemetry.shim.opentracing_shim.create_tracer(otel_tracer)
assert tracerShim.active_span is not None
tracerShim.inject(
tracerShim.active_span.context, opentracing.Format.TEXT_MAP, carrier
)
return json_encoder.encode(carrier)
def span_context_from_request(request: Request) -> "Optional[opentracing.SpanContext]":
def span_context_from_request(
request: Request,
) -> Optional["opentelemetry.trace.span.SpanContext"]:
"""Extract an opentracing context from the headers on an HTTP request
This is useful when we have received an HTTP request from another part of our
system, and want to link our spans to those of the remote system.
"""
if not opentracing:
return None
header_dict = {
k.decode(): v[0].decode() for k, v in request.requestHeaders.getAllRawHeaders()
}
return opentracing.tracer.extract(opentracing.Format.HTTP_HEADERS, header_dict)
@only_if_tracing
def span_context_from_string(
carrier: str,
) -> Optional[opentelemetry.shim.opentracing_shim.SpanContextShim]:
"""
Returns:
The active span context decoded from a string.
"""
payload: Dict[str, str] = json_decoder.decode(carrier)
return opentracing.tracer.extract(opentracing.Format.TEXT_MAP, payload)
# TODO
return None
@only_if_tracing
def extract_text_map(
carrier: Dict[str, str]
) -> Optional[opentelemetry.shim.opentracing_shim.SpanContextShim]:
) -> Optional["opentelemetry.shim.opentracing_shim.SpanContextShim"]:
"""
Wrapper method for opentracing's tracer.extract for TEXT_MAP.
Args:
@ -792,7 +632,8 @@ def extract_text_map(
Returns:
The active span context extracted from carrier.
"""
return opentracing.tracer.extract(opentracing.Format.TEXT_MAP, carrier)
# TODO
return None
# Tracing decorators
@ -807,7 +648,7 @@ def trace_with_opname(opname: str) -> Callable[[Callable[P, R]], Callable[P, R]]
"""
def decorator(func: Callable[P, R]) -> Callable[P, R]:
if opentracing is None:
if opentelemetry is None:
return func # type: ignore[unreachable]
if inspect.iscoroutinefunction(func):
@ -878,7 +719,7 @@ def tag_args(func: Callable[P, R]) -> Callable[P, R]:
Tags all of the args to the active span.
"""
if not opentracing:
if not opentelemetry:
return func
@wraps(func)
@ -907,16 +748,15 @@ def trace_servlet(
context from the request the servlet is handling.
"""
if opentracing is None:
if opentelemetry is None:
yield # type: ignore[unreachable]
return
request_tags = {
request_attrs = {
SynapseTags.REQUEST_ID: request.get_request_id(),
tags.SPAN_KIND: tags.SPAN_KIND_RPC_SERVER,
tags.HTTP_METHOD: request.get_method(),
tags.HTTP_URL: request.get_redacted_uri(),
tags.PEER_HOST_IPV6: request.getClientAddress().host,
SpanAttributes.HTTP_METHOD: request.get_method(),
SpanAttributes.HTTP_URL: request.get_redacted_uri(),
SpanAttributes.PEER_HOST_IPV6: request.getClientAddress().host,
}
request_name = request.request_metrics.name
@ -925,23 +765,27 @@ def trace_servlet(
# we configure the scope not to finish the span immediately on exit, and instead
# pass the span into the SynapseRequest, which will finish it once we've finished
# sending the response to the client.
scope = start_active_span(request_name, child_of=context, finish_on_close=False)
request.set_opentracing_span(scope.span)
span = start_active_span(
request_name,
kind=opentelemetry.trace.SpanKind.SERVER,
child_of=context,
end_on_exit=False,
)
request.set_opentracing_span(span)
with scope:
inject_response_headers(request.responseHeaders)
try:
yield
finally:
# We set the operation name again in case its changed (which happens
# with JsonResource).
scope.span.update_name(request.request_metrics.name)
inject_response_headers(request.responseHeaders)
try:
yield
finally:
# We set the operation name again in case its changed (which happens
# with JsonResource).
span.update_name(request.request_metrics.name)
# set the tags *after* the servlet completes, in case it decided to
# prioritise the span (tags will get dropped on unprioritised spans)
request_tags[
SynapseTags.REQUEST_TAG
] = request.request_metrics.start_context.tag
# set the tags *after* the servlet completes, in case it decided to
# prioritise the span (tags will get dropped on unprioritised spans)
request_attrs[
SynapseTags.REQUEST_TAG
] = request.request_metrics.start_context.tag
for k, v in request_tags.items():
scope.span.set_attribute(k, v)
for k, v in request_attrs.items():
span.set_attribute(k, v)

View file

@ -232,7 +232,8 @@ def run_as_background_process(
try:
if bg_start_span:
ctx = start_active_span(
f"bgproc.{desc}", tags={SynapseTags.REQUEST_ID: str(context)}
f"bgproc.{desc}",
attributes={SynapseTags.REQUEST_ID: str(context)},
)
else:
ctx = nullcontext() # type: ignore[assignment]

View file

@ -23,7 +23,7 @@ from twisted.internet.interfaces import IDelayedCall
from synapse.api.constants import EventTypes
from synapse.events import EventBase
from synapse.logging import opentelemetry
from synapse.logging import tracing
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.push import Pusher, PusherConfig, PusherConfigException
from synapse.storage.databases.main.event_push_actions import HttpPushAction
@ -198,9 +198,9 @@ class HttpPusher(Pusher):
)
for push_action in unprocessed:
with opentelemetry.start_active_span(
with tracing.start_active_span(
"http-push",
tags={
attributes={
"authenticated_entity": self.user_id,
"event_id": push_action.event_id,
"app_id": self.app_id,

View file

@ -28,7 +28,7 @@ from synapse.api.errors import HttpResponseException, SynapseError
from synapse.http import RequestTimedOutError
from synapse.http.server import HttpServer, is_method_cancellable
from synapse.http.site import SynapseRequest
from synapse.logging import opentelemetry
from synapse.logging import tracing
from synapse.logging.tracing import trace_with_opname
from synapse.types import JsonDict
from synapse.util.caches.response_cache import ResponseCache
@ -248,7 +248,7 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta):
# Add an authorization header, if configured.
if replication_secret:
headers[b"Authorization"] = [b"Bearer " + replication_secret]
opentelemetry.inject_header_dict(headers, check_destination=False)
tracing.inject_header_dict(headers, check_destination=False)
try:
# Keep track of attempts made so we can bail if we don't manage to

View file

@ -17,7 +17,7 @@ from typing import TYPE_CHECKING, Any, Optional
from prometheus_client import Counter, Histogram
from synapse.logging import opentelemetry
from synapse.logging import tracing
from synapse.logging.context import make_deferred_yieldable
from synapse.util import json_decoder, json_encoder
@ -94,9 +94,9 @@ class ExternalCache:
logger.debug("Caching %s %s: %r", cache_name, key, encoded_value)
with opentelemetry.start_active_span(
with tracing.start_active_span(
"ExternalCache.set",
tags={opentelemetry.SynapseTags.CACHE_NAME: cache_name},
attributes={tracing.SynapseTags.CACHE_NAME: cache_name},
):
with response_timer.labels("set").time():
return await make_deferred_yieldable(
@ -113,9 +113,9 @@ class ExternalCache:
if self._redis_connection is None:
return None
with opentelemetry.start_active_span(
with tracing.start_active_span(
"ExternalCache.get",
tags={opentelemetry.SynapseTags.CACHE_NAME: cache_name},
attributes={tracing.SynapseTags.CACHE_NAME: cache_name},
):
with response_timer.labels("get").time():
result = await make_deferred_yieldable(

View file

@ -45,7 +45,7 @@ from twisted.internet import defer
from synapse.api.constants import EventTypes, Membership
from synapse.events import EventBase
from synapse.events.snapshot import EventContext
from synapse.logging import opentelemetry
from synapse.logging import tracing
from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage.controllers.state import StateStorageController
@ -223,7 +223,7 @@ class _EventPeristenceQueue(Generic[_PersistResult]):
queue.append(end_item)
# also add our active opentracing span to the item so that we get a link back
span = opentelemetry.active_span()
span = tracing.active_span()
if span:
end_item.parent_opentracing_span_contexts.append(span.context)
@ -234,7 +234,7 @@ class _EventPeristenceQueue(Generic[_PersistResult]):
res = await make_deferred_yieldable(end_item.deferred.observe())
# add another opentracing span which links to the persist trace.
with opentelemetry.start_active_span_follows_from(
with tracing.start_active_span_follows_from(
f"{task.name}_complete", (end_item.opentracing_span_context,)
):
pass
@ -266,7 +266,7 @@ class _EventPeristenceQueue(Generic[_PersistResult]):
queue = self._get_drainining_queue(room_id)
for item in queue:
try:
with opentelemetry.start_active_span_follows_from(
with tracing.start_active_span_follows_from(
item.task.name,
item.parent_opentracing_span_contexts,
inherit_force_tracing=True,
@ -355,7 +355,7 @@ class EventsPersistenceStorageController:
f"Found an unexpected task type in event persistence queue: {task}"
)
@opentelemetry.trace
@tracing.trace
async def persist_events(
self,
events_and_contexts: Iterable[Tuple[EventBase, EventContext]],
@ -418,7 +418,7 @@ class EventsPersistenceStorageController:
self.main_store.get_room_max_token(),
)
@opentelemetry.trace
@tracing.trace
async def persist_event(
self, event: EventBase, context: EventContext, backfilled: bool = False
) -> Tuple[EventBase, PersistedEventPosition, RoomStreamToken]:

View file

@ -47,7 +47,7 @@ from twisted.internet.interfaces import IReactorCore
from synapse.api.errors import StoreError
from synapse.config.database import DatabaseConnectionConfig
from synapse.logging import opentelemetry
from synapse.logging import tracing
from synapse.logging.context import (
LoggingContext,
current_context,
@ -422,11 +422,11 @@ class LoggingTransaction:
start = time.time()
try:
with opentelemetry.start_active_span(
with tracing.start_active_span(
"db.query",
tags={
opentelemetry.tags.DATABASE_TYPE: "sql",
opentelemetry.tags.DATABASE_STATEMENT: one_line_sql,
attributes={
tracing.SpanAttributes.DB_SYSTEM: "sql",
tracing.SpanAttributes.DB_STATEMENT: one_line_sql,
},
):
return func(sql, *args, **kwargs)
@ -701,15 +701,15 @@ class DatabasePool:
exception_callbacks=exception_callbacks,
)
try:
with opentelemetry.start_active_span(
with tracing.start_active_span(
"db.txn",
tags={
opentelemetry.SynapseTags.DB_TXN_DESC: desc,
opentelemetry.SynapseTags.DB_TXN_ID: name,
attributes={
tracing.SynapseTags.DB_TXN_DESC: desc,
tracing.SynapseTags.DB_TXN_ID: name,
},
):
r = func(cursor, *args, **kwargs)
opentelemetry.log_kv({"message": "commit"})
tracing.log_kv({"message": "commit"})
conn.commit()
return r
except self.engine.module.OperationalError as e:
@ -725,7 +725,7 @@ class DatabasePool:
if i < N:
i += 1
try:
with opentelemetry.start_active_span("db.rollback"):
with tracing.start_active_span("db.rollback"):
conn.rollback()
except self.engine.module.Error as e1:
transaction_logger.warning("[TXN EROLL] {%s} %s", name, e1)
@ -739,7 +739,7 @@ class DatabasePool:
if i < N:
i += 1
try:
with opentelemetry.start_active_span("db.rollback"):
with tracing.start_active_span("db.rollback"):
conn.rollback()
except self.engine.module.Error as e1:
transaction_logger.warning(
@ -845,7 +845,7 @@ class DatabasePool:
logger.warning("Starting db txn '%s' from sentinel context", desc)
try:
with opentelemetry.start_active_span(f"db.{desc}"):
with tracing.start_active_span(f"db.{desc}"):
result = await self.runWithConnection(
self.new_transaction,
desc,
@ -928,7 +928,7 @@ class DatabasePool:
with LoggingContext(
str(curr_context), parent_context=parent_context
) as context:
with opentelemetry.start_active_span(
with tracing.start_active_span(
operation_name="db.connection",
):
sched_duration_sec = monotonic_time() - start_time
@ -944,15 +944,13 @@ class DatabasePool:
"Reconnecting database connection over transaction limit"
)
conn.reconnect()
opentelemetry.log_kv(
{"message": "reconnected due to txn limit"}
)
tracing.log_kv({"message": "reconnected due to txn limit"})
self._txn_counters[tid] = 1
if self.engine.is_connection_closed(conn):
logger.debug("Reconnecting closed database connection")
conn.reconnect()
opentelemetry.log_kv({"message": "reconnected"})
tracing.log_kv({"message": "reconnected"})
if self._txn_limit > 0:
self._txn_counters[tid] = 1

View file

@ -41,7 +41,7 @@ from synapse.util.caches import register_cache
logger = logging.getLogger(__name__)
if TYPE_CHECKING:
import opentracing
import opentelemetry
# the type of the key in the cache
KV = TypeVar("KV")
@ -82,7 +82,7 @@ class ResponseCacheEntry:
easier to cache Failure results.
"""
opentracing_span_context: "Optional[opentracing.SpanContext]"
opentracing_span_context: Optional["opentelemetry.trace.span.SpanContext"]
"""The opentracing span which generated/is generating the result"""
@ -141,7 +141,7 @@ class ResponseCache(Generic[KV]):
self,
context: ResponseCacheContext[KV],
deferred: "defer.Deferred[RV]",
opentracing_span_context: "Optional[opentracing.SpanContext]",
opentracing_span_context: Optional["opentelemetry.trace.span.SpanContext"],
) -> ResponseCacheEntry:
"""Set the entry for the given key to the given deferred.
@ -234,7 +234,7 @@ class ResponseCache(Generic[KV]):
if cache_context:
kwargs["cache_context"] = context
span_context: Optional[opentracing.SpanContext] = None
span_context: Optional["opentelemetry.trace.span.SpanContext"] = None
async def cb() -> RV:
# NB it is important that we do not `await` before setting span_context!