Move to start_active_span

This commit is contained in:
Eric Eastwood 2022-07-29 21:59:30 -05:00
parent 7c135b93bd
commit d29a4af916
8 changed files with 227 additions and 296 deletions

View file

@ -22,10 +22,11 @@ from synapse.events import EventBase
from synapse.federation.persistence import TransactionActions
from synapse.federation.units import Edu, Transaction
from synapse.logging.tracing import (
Link,
StatusCode,
extract_text_map,
set_status,
start_active_span_follows_from,
start_active_span,
whitelisted_homeserver,
)
from synapse.types import JsonDict
@ -94,7 +95,10 @@ class TransactionManager:
if keep_destination:
edu.strip_context()
with start_active_span_follows_from("send_transaction", span_contexts):
with start_active_span(
"send_transaction",
links=[Link(context) for context in span_contexts],
):
logger.debug("TX [%s] _attempt_new_transaction", destination)
txn_id = str(self._next_txn_id)

View file

@ -15,7 +15,6 @@
import functools
import logging
import re
import time
from http import HTTPStatus
from typing import TYPE_CHECKING, Any, Awaitable, Callable, Dict, Optional, Tuple, cast
@ -26,13 +25,12 @@ from synapse.http.servlet import parse_json_object_from_request
from synapse.http.site import SynapseRequest
from synapse.logging.context import run_in_background
from synapse.logging.tracing import (
get_active_span,
Link,
create_non_recording_span,
get_active_span,
set_attribute,
span_context_from_request,
start_active_span,
start_active_span_follows_from,
whitelisted_homeserver,
)
from synapse.types import JsonDict

View file

@ -334,24 +334,6 @@ class LoggingContext:
def __str__(self) -> str:
return self.name
@classmethod
def current_context(cls) -> LoggingContextOrSentinel:
"""Get the current logging context from thread local storage
This exists for backwards compatibility. ``current_context()`` should be
called directly.
Returns:
LoggingContext: the current logging context
"""
warnings.warn(
"synapse.logging.context.LoggingContext.current_context() is deprecated "
"in favor of synapse.logging.context.current_context().",
DeprecationWarning,
stacklevel=2,
)
return current_context()
@classmethod
def set_current_context(
cls, context: LoggingContextOrSentinel

View file

@ -162,12 +162,12 @@ Gotchas
than one caller? Will all of those calling functions have be in a context
with an active span?
"""
from abc import ABC
import contextlib
import enum
import inspect
import logging
import re
from abc import ABC
from functools import wraps
from typing import (
TYPE_CHECKING,
@ -235,12 +235,12 @@ try:
import opentelemetry
import opentelemetry.exporter.jaeger.thrift
import opentelemetry.propagate
import opentelemetry.trace.propagation
import opentelemetry.sdk.resources
import opentelemetry.sdk.trace
import opentelemetry.sdk.trace.export
import opentelemetry.semconv.trace
import opentelemetry.trace
import opentelemetry.trace.propagation
SpanKind = opentelemetry.trace.SpanKind
SpanAttributes = opentelemetry.semconv.trace.SpanAttributes
@ -494,52 +494,6 @@ def start_active_span(
)
# TODO: I don't think we even need this function over the normal `start_active_span`.
# The only difference is the `inherit_force_tracing` stuff.
def start_active_span_follows_from(
operation_name: str,
contexts: Collection,
child_of: Optional[
Union[
"opentelemetry.shim.opentracing_shim.SpanShim",
"opentelemetry.shim.opentracing_shim.SpanContextShim",
]
] = None,
start_time: Optional[float] = None,
*,
inherit_force_tracing: bool = False,
tracer: Optional["opentelemetry.shim.opentracing_shim.TracerShim"] = None,
) -> Iterator["opentelemetry.trace.span.Span"]:
"""Starts an active opentracing span, with additional references to previous spans
Args:
operation_name: name of the operation represented by the new span
contexts: the previous spans to inherit from
child_of: optionally override the parent span. If unset, the currently active
span will be the parent. (If there is no currently active span, the first
span in `contexts` will be the parent.)
start_time: optional override for the start time of the created span. Seconds
since the epoch.
inherit_force_tracing: if set, and any of the previous contexts have had tracing
forced, the new span will also have tracing forced.
tracer: override the opentracing tracer. By default the global tracer is used.
"""
if opentelemetry is None:
return contextlib.nullcontext() # type: ignore[unreachable]
links = [opentelemetry.trace.Link(context) for context in contexts]
span = start_active_span(
name=operation_name,
links=links,
)
if inherit_force_tracing and any(
is_context_forced_tracing(ctx) for ctx in contexts
):
force_tracing(span)
return span
def start_active_span_from_edu(
edu_content: Dict[str, Any],
operation_name: str,

View file

@ -45,8 +45,8 @@ from twisted.internet import defer
from synapse.api.constants import EventTypes, Membership
from synapse.events import EventBase
from synapse.events.snapshot import EventContext
from synapse.logging import tracing
from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable
from synapse.logging.tracing import Link, get_active_span, start_active_span, trace
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage.controllers.state import StateStorageController
from synapse.storage.databases import Databases
@ -118,7 +118,7 @@ times_pruned_extremities = Counter(
class _PersistEventsTask:
"""A batch of events to persist."""
name: ClassVar[str] = "persist_event_batch" # used for opentracing
name: ClassVar[str] = "persist_event_batch" # used for tracing
events_and_contexts: List[Tuple[EventBase, EventContext]]
backfilled: bool
@ -139,7 +139,7 @@ class _PersistEventsTask:
class _UpdateCurrentStateTask:
"""A room whose current state needs recalculating."""
name: ClassVar[str] = "update_current_state" # used for opentracing
name: ClassVar[str] = "update_current_state" # used for tracing
def try_merge(self, task: "_EventPersistQueueTask") -> bool:
"""Deduplicates consecutive recalculations of current state."""
@ -154,11 +154,11 @@ class _EventPersistQueueItem:
task: _EventPersistQueueTask
deferred: ObservableDeferred
parent_opentracing_span_contexts: List = attr.ib(factory=list)
"""A list of opentracing spans waiting for this batch"""
parent_tracing_span_contexts: List = attr.ib(factory=list)
"""A list of tracing spans waiting for this batch"""
opentracing_span_context: Any = None
"""The opentracing span under which the persistence actually happened"""
tracing_span_context: Any = None
"""The tracing span under which the persistence actually happened"""
_PersistResult = TypeVar("_PersistResult")
@ -223,9 +223,9 @@ class _EventPeristenceQueue(Generic[_PersistResult]):
queue.append(end_item)
# also add our active opentracing span to the item so that we get a link back
span = tracing.get_active_span()
span = get_active_span()
if span:
end_item.parent_opentracing_span_contexts.append(span.context)
end_item.parent_tracing_span_contexts.append(span.get_span_context())
# start a processor for the queue, if there isn't one already
self._handle_queue(room_id)
@ -234,8 +234,9 @@ class _EventPeristenceQueue(Generic[_PersistResult]):
res = await make_deferred_yieldable(end_item.deferred.observe())
# add another opentracing span which links to the persist trace.
with tracing.start_active_span_follows_from(
f"{task.name}_complete", (end_item.opentracing_span_context,)
with start_active_span(
f"{task.name}_complete",
links=[Link(end_item.tracing_span_context)],
):
pass
@ -266,13 +267,17 @@ class _EventPeristenceQueue(Generic[_PersistResult]):
queue = self._get_drainining_queue(room_id)
for item in queue:
try:
with tracing.start_active_span_follows_from(
with start_active_span(
item.task.name,
item.parent_opentracing_span_contexts,
links=[
Link(span_context)
for span_context in item.parent_tracing_span_contexts
],
# TODO: inherit_force_tracing
inherit_force_tracing=True,
) as scope:
if scope:
item.opentracing_span_context = scope.span.context
) as span:
if span:
item.tracing_span_context = span.get_span_context()
ret = await self._per_item_callback(room_id, item.task)
except Exception:
@ -355,7 +360,7 @@ class EventsPersistenceStorageController:
f"Found an unexpected task type in event persistence queue: {task}"
)
@tracing.trace
@trace
async def persist_events(
self,
events_and_contexts: Iterable[Tuple[EventBase, EventContext]],
@ -418,7 +423,7 @@ class EventsPersistenceStorageController:
self.main_store.get_room_max_token(),
)
@tracing.trace
@trace
async def persist_event(
self, event: EventBase, context: EventContext, backfilled: bool = False
) -> Tuple[EventBase, PersistedEventPosition, RoomStreamToken]:

View file

@ -29,11 +29,7 @@ import attr
from twisted.internet import defer
from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.logging.tracing import (
get_active_span,
start_active_span,
start_active_span_follows_from,
)
from synapse.logging.tracing import Link, get_active_span, start_active_span
from synapse.util import Clock
from synapse.util.async_helpers import AbstractObservableDeferred, ObservableDeferred
from synapse.util.caches import register_cache
@ -82,8 +78,8 @@ class ResponseCacheEntry:
easier to cache Failure results.
"""
opentracing_span_context: Optional["opentelemetry.trace.span.SpanContext"]
"""The opentracing span which generated/is generating the result"""
tracing_span_context: Optional["opentelemetry.trace.span.SpanContext"]
"""The tracing span which generated/is generating the result"""
class ResponseCache(Generic[KV]):
@ -141,7 +137,7 @@ class ResponseCache(Generic[KV]):
self,
context: ResponseCacheContext[KV],
deferred: "defer.Deferred[RV]",
opentracing_span_context: Optional["opentelemetry.trace.span.SpanContext"],
tracing_span_context: Optional["opentelemetry.trace.span.SpanContext"],
) -> ResponseCacheEntry:
"""Set the entry for the given key to the given deferred.
@ -152,14 +148,14 @@ class ResponseCache(Generic[KV]):
Args:
context: Information about the cache miss
deferred: The deferred which resolves to the result.
opentracing_span_context: An opentracing span wrapping the calculation
tracing_span_context: An tracing span wrapping the calculation
Returns:
The cache entry object.
"""
result = ObservableDeferred(deferred, consumeErrors=True)
key = context.cache_key
entry = ResponseCacheEntry(result, opentracing_span_context)
entry = ResponseCacheEntry(result, tracing_span_context)
self._result_cache[key] = entry
def on_complete(r: RV) -> RV:
@ -242,7 +238,7 @@ class ResponseCache(Generic[KV]):
with start_active_span(f"ResponseCache[{self._name}].calculate"):
span = get_active_span()
if span:
span_context = span.context
span_context = span.get_span_context()
return await callback(*args, **kwargs)
d = run_in_background(cb)
@ -257,9 +253,9 @@ class ResponseCache(Generic[KV]):
"[%s]: using incomplete cached result for [%s]", self._name, key
)
span_context = entry.opentracing_span_context
with start_active_span_follows_from(
span_context = entry.tracing_span_context
with start_active_span(
f"ResponseCache[{self._name}].wait",
contexts=(span_context,) if span_context else (),
links=[Link(span_context)] if span_context else None,
):
return await make_deferred_yieldable(result)

View file

@ -1,193 +0,0 @@
# Copyright 2022 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import cast
from twisted.internet import defer
from twisted.test.proto_helpers import MemoryReactorClock
from synapse.logging.context import (
LoggingContext,
make_deferred_yieldable,
run_in_background,
)
from synapse.logging.tracing import start_active_span, start_active_span_follows_from
from synapse.util import Clock
try:
from synapse.logging.scopecontextmanager import LogContextScopeManager
except ImportError:
LogContextScopeManager = None # type: ignore
try:
import jaeger_client
except ImportError:
jaeger_client = None # type: ignore
from tests.unittest import TestCase
class LogContextScopeManagerTestCase(TestCase):
"""
Test logging contexts and active opentracing spans.
There's casts throughout this from generic opentracing objects (e.g.
opentracing.Span) to the ones specific to Jaeger since they have additional
properties that these tests depend on. This is safe since the only supported
opentracing backend is Jaeger.
"""
if LogContextScopeManager is None:
skip = "Requires opentracing" # type: ignore[unreachable]
if jaeger_client is None:
skip = "Requires jaeger_client" # type: ignore[unreachable]
def setUp(self) -> None:
# since this is a unit test, we don't really want to mess around with the
# global variables that power opentracing. We create our own tracer instance
# and test with it.
scope_manager = LogContextScopeManager()
config = jaeger_client.config.Config(
config={}, service_name="test", scope_manager=scope_manager
)
self._reporter = jaeger_client.reporter.InMemoryReporter()
self._tracer = config.create_tracer(
sampler=jaeger_client.ConstSampler(True),
reporter=self._reporter,
)
def test_start_active_span(self) -> None:
# the scope manager assumes a logging context of some sort.
with LoggingContext("root context"):
self.assertIsNone(self._tracer.active_span)
# start_active_span should start and activate a span.
scope = start_active_span("span", tracer=self._tracer)
span = cast(jaeger_client.Span, scope.span)
self.assertEqual(self._tracer.active_span, span)
self.assertIsNotNone(span.start_time)
# entering the context doesn't actually do a whole lot.
with scope as ctx:
self.assertIs(ctx, scope)
self.assertEqual(self._tracer.active_span, span)
# ... but leaving it unsets the active span, and finishes the span.
self.assertIsNone(self._tracer.active_span)
self.assertIsNotNone(span.end_time)
# the span should have been reported
self.assertEqual(self._reporter.get_spans(), [span])
def test_nested_spans(self) -> None:
"""Starting two spans off inside each other should work"""
with LoggingContext("root context"):
with start_active_span("root span", tracer=self._tracer) as root_scope:
self.assertEqual(self._tracer.active_span, root_scope.span)
root_context = cast(jaeger_client.SpanContext, root_scope.span.context)
scope1 = start_active_span(
"child1",
tracer=self._tracer,
)
self.assertEqual(
self._tracer.active_span, scope1.span, "child1 was not activated"
)
context1 = cast(jaeger_client.SpanContext, scope1.span.context)
self.assertEqual(context1.parent_id, root_context.span_id)
scope2 = start_active_span_follows_from(
"child2",
contexts=(scope1,),
tracer=self._tracer,
)
self.assertEqual(self._tracer.active_span, scope2.span)
context2 = cast(jaeger_client.SpanContext, scope2.span.context)
self.assertEqual(context2.parent_id, context1.span_id)
with scope1, scope2:
pass
# the root scope should be restored
self.assertEqual(self._tracer.active_span, root_scope.span)
span2 = cast(jaeger_client.Span, scope2.span)
span1 = cast(jaeger_client.Span, scope1.span)
self.assertIsNotNone(span2.end_time)
self.assertIsNotNone(span1.end_time)
self.assertIsNone(self._tracer.active_span)
# the spans should be reported in order of their finishing.
self.assertEqual(
self._reporter.get_spans(), [scope2.span, scope1.span, root_scope.span]
)
def test_overlapping_spans(self) -> None:
"""Overlapping spans which are not neatly nested should work"""
reactor = MemoryReactorClock()
clock = Clock(reactor)
scopes = []
async def task(i: int):
scope = start_active_span(
f"task{i}",
tracer=self._tracer,
)
scopes.append(scope)
self.assertEqual(self._tracer.active_span, scope.span)
await clock.sleep(4)
self.assertEqual(self._tracer.active_span, scope.span)
scope.close()
async def root():
with start_active_span("root span", tracer=self._tracer) as root_scope:
self.assertEqual(self._tracer.active_span, root_scope.span)
scopes.append(root_scope)
d1 = run_in_background(task, 1)
await clock.sleep(2)
d2 = run_in_background(task, 2)
# because we did run_in_background, the active span should still be the
# root.
self.assertEqual(self._tracer.active_span, root_scope.span)
await make_deferred_yieldable(
defer.gatherResults([d1, d2], consumeErrors=True)
)
self.assertEqual(self._tracer.active_span, root_scope.span)
with LoggingContext("root context"):
# start the test off
d1 = defer.ensureDeferred(root())
# let the tasks complete
reactor.pump((2,) * 8)
self.successResultOf(d1)
self.assertIsNone(self._tracer.active_span)
# the spans should be reported in order of their finishing: task 1, task 2,
# root.
self.assertEqual(
self._reporter.get_spans(),
[scopes[1].span, scopes[2].span, scopes[0].span],
)

View file

@ -0,0 +1,185 @@
# Copyright 2022 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from typing import cast
from twisted.internet import defer
from twisted.test.proto_helpers import MemoryReactorClock
from synapse.logging.context import (
LoggingContext,
make_deferred_yieldable,
run_in_background,
)
from synapse.logging.tracing import start_active_span
from synapse.util import Clock
logger = logging.getLogger(__name__)
try:
import opentelemetry
import opentelemetry.sdk.trace
import opentelemetry.sdk.trace.export
import opentelemetry.sdk.trace.export.in_memory_span_exporter
import opentelemetry.trace
import opentelemetry.trace.propagation
except ImportError:
opentelemetry = None # type: ignore[assignment]
from tests.unittest import TestCase
class LogContextScopeManagerTestCase(TestCase):
"""
Test logging contexts and active opentelemetry spans.
"""
if opentelemetry is None:
skip = "Requires opentelemetry" # type: ignore[unreachable]
def setUp(self) -> None:
# since this is a unit test, we don't really want to mess around with the
# global variables that power opentracing. We create our own tracer instance
# and test with it.
self._tracer_provider = opentelemetry.sdk.trace.TracerProvider()
self._exporter = (
opentelemetry.sdk.trace.export.in_memory_span_exporter.InMemorySpanExporter()
)
processor = opentelemetry.sdk.trace.export.SimpleSpanProcessor(self._exporter)
self._tracer_provider.add_span_processor(processor)
self._tracer = self._tracer_provider.get_tracer(__name__)
def test_start_active_span(self) -> None:
# This means no current span
self.assertEqual(
opentelemetry.trace.get_current_span(), opentelemetry.trace.INVALID_SPAN
)
# start_active_span should start and activate a span.
with start_active_span("new-span", tracer=self._tracer) as span:
self.assertEqual(opentelemetry.trace.get_current_span(), span)
self.assertIsNotNone(span.start_time)
# ... but leaving it unsets the active span, and finishes the span.
self.assertEqual(
opentelemetry.trace.get_current_span(), opentelemetry.trace.INVALID_SPAN
)
self.assertIsNotNone(span.end_time)
# the span should have been reported
self.assertListEqual(
[span.name for span in self._exporter.get_finished_spans()], ["new-span"]
)
def test_nested_spans(self) -> None:
"""Starting two spans off inside each other should work"""
with start_active_span("root_span", tracer=self._tracer) as root_span:
self.assertEqual(opentelemetry.trace.get_current_span(), root_span)
root_context = root_span.get_span_context()
with start_active_span(
"child_span1",
tracer=self._tracer,
) as child_span1:
self.assertEqual(
opentelemetry.trace.get_current_span(),
child_span1,
"child_span1 was not activated",
)
self.assertEqual(child_span1.parent.span_id, root_context.span_id)
ctx1 = opentelemetry.trace.propagation.set_span_in_context(child_span1)
with start_active_span(
"child_span2",
context=ctx1,
tracer=self._tracer,
) as child_span2:
self.assertEqual(opentelemetry.trace.get_current_span(), child_span2)
self.assertEqual(
child_span2.parent.span_id, child_span1.get_span_context().span_id
)
# the root scope should be restored
self.assertEqual(opentelemetry.trace.get_current_span(), root_span)
self.assertIsNotNone(child_span1.end_time)
self.assertIsNotNone(child_span2.end_time)
# Active span is unset outside of the with scopes
self.assertEqual(
opentelemetry.trace.get_current_span(), opentelemetry.trace.INVALID_SPAN
)
# the spans should be reported in order of their finishing.
self.assertEqual(
self._reporter.get_spans(), [scope2.span, scope1.span, root_scope.span]
)
# def test_overlapping_spans(self) -> None:
# """Overlapping spans which are not neatly nested should work"""
# reactor = MemoryReactorClock()
# clock = Clock(reactor)
# scopes = []
# async def task(i: int):
# scope = start_active_span(
# f"task{i}",
# tracer=self._tracer,
# )
# scopes.append(scope)
# self.assertEqual(self._tracer.active_span, scope.span)
# await clock.sleep(4)
# self.assertEqual(self._tracer.active_span, scope.span)
# scope.close()
# async def root():
# with start_active_span("root span", tracer=self._tracer) as root_scope:
# self.assertEqual(self._tracer.active_span, root_scope.span)
# scopes.append(root_scope)
# d1 = run_in_background(task, 1)
# await clock.sleep(2)
# d2 = run_in_background(task, 2)
# # because we did run_in_background, the active span should still be the
# # root.
# self.assertEqual(self._tracer.active_span, root_scope.span)
# await make_deferred_yieldable(
# defer.gatherResults([d1, d2], consumeErrors=True)
# )
# self.assertEqual(self._tracer.active_span, root_scope.span)
# with LoggingContext("root context"):
# # start the test off
# d1 = defer.ensureDeferred(root())
# # let the tasks complete
# reactor.pump((2,) * 8)
# self.successResultOf(d1)
# self.assertIsNone(self._tracer.active_span)
# # the spans should be reported in order of their finishing: task 1, task 2,
# # root.
# self.assertEqual(
# self._reporter.get_spans(),
# [scopes[1].span, scopes[2].span, scopes[0].span],
# )