mirror of
https://github.com/element-hq/synapse
synced 2024-09-19 22:15:11 +00:00
Passing tests and context manager doesn't seem to be needed
This commit is contained in:
parent
041acdf985
commit
d84815663e
3 changed files with 52 additions and 234 deletions
|
@ -1,171 +0,0 @@
|
|||
# Copyright 2019 The Matrix.org Foundation C.I.C.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.import logging
|
||||
|
||||
import logging
|
||||
from types import TracebackType
|
||||
from typing import Optional, Type
|
||||
|
||||
from opentracing import Scope, ScopeManager, Span
|
||||
|
||||
import twisted
|
||||
|
||||
from synapse.logging.context import (
|
||||
LoggingContext,
|
||||
current_context,
|
||||
nested_logging_context,
|
||||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class LogContextScopeManager(ScopeManager):
|
||||
"""
|
||||
The LogContextScopeManager tracks the active scope in opentracing
|
||||
by using the log contexts which are native to synapse. This is so
|
||||
that the basic opentracing api can be used across twisted defereds.
|
||||
|
||||
It would be nice just to use opentracing's ContextVarsScopeManager,
|
||||
but currently that doesn't work due to https://twistedmatrix.com/trac/ticket/10301.
|
||||
"""
|
||||
|
||||
def __init__(self) -> None:
|
||||
pass
|
||||
|
||||
@property
|
||||
def active(self) -> Optional[Scope]:
|
||||
"""
|
||||
Returns the currently active Scope which can be used to access the
|
||||
currently active Scope.span.
|
||||
If there is a non-null Scope, its wrapped Span
|
||||
becomes an implicit parent of any newly-created Span at
|
||||
Tracer.start_active_span() time.
|
||||
|
||||
Return:
|
||||
The Scope that is active, or None if not available.
|
||||
"""
|
||||
ctx = current_context()
|
||||
return ctx.scope
|
||||
|
||||
def activate(self, span: Span, end_on_exit: bool) -> Scope:
|
||||
"""
|
||||
Makes a Span active.
|
||||
Args
|
||||
span: the span that should become active.
|
||||
end_on_exit: whether Span should be automatically finished when
|
||||
Scope.close() is called.
|
||||
|
||||
Returns:
|
||||
Scope to control the end of the active period for
|
||||
*span*. It is a programming error to neglect to call
|
||||
Scope.close() on the returned instance.
|
||||
"""
|
||||
|
||||
ctx = current_context()
|
||||
|
||||
if not ctx:
|
||||
logger.error("Tried to activate scope outside of loggingcontext")
|
||||
return Scope(None, span) # type: ignore[arg-type]
|
||||
|
||||
if ctx.scope is not None:
|
||||
# start a new logging context as a child of the existing one.
|
||||
# Doing so -- rather than updating the existing logcontext -- means that
|
||||
# creating several concurrent spans under the same logcontext works
|
||||
# correctly.
|
||||
ctx = nested_logging_context("")
|
||||
enter_logcontext = True
|
||||
else:
|
||||
# if there is no span currently associated with the current logcontext, we
|
||||
# just store the scope in it.
|
||||
#
|
||||
# This feels a bit dubious, but it does hack around a problem where a
|
||||
# span outlasts its parent logcontext (which would otherwise lead to
|
||||
# "Re-starting finished log context" errors).
|
||||
enter_logcontext = False
|
||||
|
||||
scope = _LogContextScope(self, span, ctx, enter_logcontext, end_on_exit)
|
||||
ctx.scope = scope
|
||||
if enter_logcontext:
|
||||
ctx.__enter__()
|
||||
|
||||
return scope
|
||||
|
||||
|
||||
class _LogContextScope(Scope):
|
||||
"""
|
||||
A custom opentracing scope, associated with a LogContext
|
||||
|
||||
* filters out _DefGen_Return exceptions which arise from calling
|
||||
`defer.returnValue` in Twisted code
|
||||
|
||||
* When the scope is closed, the logcontext's active scope is reset to None.
|
||||
and - if enter_logcontext was set - the logcontext is finished too.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
manager: LogContextScopeManager,
|
||||
span: Span,
|
||||
logcontext: LoggingContext,
|
||||
enter_logcontext: bool,
|
||||
end_on_exit: bool,
|
||||
):
|
||||
"""
|
||||
Args:
|
||||
manager:
|
||||
the manager that is responsible for this scope.
|
||||
span:
|
||||
the opentracing span which this scope represents the local
|
||||
lifetime for.
|
||||
logcontext:
|
||||
the log context to which this scope is attached.
|
||||
enter_logcontext:
|
||||
if True the log context will be exited when the scope is finished
|
||||
end_on_exit:
|
||||
if True finish the span when the scope is closed
|
||||
"""
|
||||
super().__init__(manager, span)
|
||||
self.logcontext = logcontext
|
||||
self._end_on_exit = end_on_exit
|
||||
self._enter_logcontext = enter_logcontext
|
||||
|
||||
def __exit__(
|
||||
self,
|
||||
exc_type: Optional[Type[BaseException]],
|
||||
value: Optional[BaseException],
|
||||
traceback: Optional[TracebackType],
|
||||
) -> None:
|
||||
if exc_type == twisted.internet.defer._DefGen_Return:
|
||||
# filter out defer.returnValue() calls
|
||||
exc_type = value = traceback = None
|
||||
super().__exit__(exc_type, value, traceback)
|
||||
|
||||
def __str__(self) -> str:
|
||||
return f"Scope<{self.span}>"
|
||||
|
||||
def close(self) -> None:
|
||||
active_scope = self.manager.active
|
||||
if active_scope is not self:
|
||||
logger.error(
|
||||
"Closing scope %s which is not the currently-active one %s",
|
||||
self,
|
||||
active_scope,
|
||||
)
|
||||
|
||||
if self._end_on_exit:
|
||||
self.span.finish()
|
||||
|
||||
self.logcontext.scope = None
|
||||
|
||||
if self._enter_logcontext:
|
||||
self.logcontext.__exit__(None, None, None)
|
|
@ -173,7 +173,6 @@ from typing import (
|
|||
TYPE_CHECKING,
|
||||
Any,
|
||||
Callable,
|
||||
Collection,
|
||||
Dict,
|
||||
Generator,
|
||||
Iterable,
|
||||
|
@ -182,14 +181,12 @@ from typing import (
|
|||
Optional,
|
||||
Pattern,
|
||||
Sequence,
|
||||
Type,
|
||||
TypeVar,
|
||||
Union,
|
||||
cast,
|
||||
overload,
|
||||
)
|
||||
|
||||
import attr
|
||||
from typing_extensions import ParamSpec
|
||||
|
||||
from twisted.internet import defer
|
||||
|
@ -197,7 +194,6 @@ from twisted.web.http import Request
|
|||
from twisted.web.http_headers import Headers
|
||||
|
||||
from synapse.config import ConfigError
|
||||
from synapse.util import json_decoder, json_encoder
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from synapse.http.site import SynapseRequest
|
||||
|
@ -205,8 +201,10 @@ if TYPE_CHECKING:
|
|||
|
||||
# Helper class
|
||||
|
||||
# This will always returns the fixed value given for any accessed property
|
||||
|
||||
class _DummyLookup(object):
|
||||
"""This will always returns the fixed value given for any accessed property"""
|
||||
|
||||
def __init__(self, value):
|
||||
self.value = value
|
||||
|
||||
|
@ -215,6 +213,8 @@ class _DummyLookup(object):
|
|||
|
||||
|
||||
class DummyLink(ABC):
|
||||
"""Dummy placeholder for `opentelemetry.trace.Link`"""
|
||||
|
||||
def __init__(self):
|
||||
self.not_implemented_message = (
|
||||
"opentelemetry wasn't imported so this is just a dummy link placeholder"
|
||||
|
@ -225,7 +225,7 @@ class DummyLink(ABC):
|
|||
raise NotImplementedError(self.not_implemented_message)
|
||||
|
||||
@property
|
||||
def context(self):
|
||||
def attributes(self):
|
||||
raise NotImplementedError(self.not_implemented_message)
|
||||
|
||||
|
||||
|
@ -256,8 +256,10 @@ except ImportError:
|
|||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# FIXME: Rename to `SynapseAttributes` so it matches OpenTelemetry `SpanAttributes`
|
||||
|
||||
class SynapseTags:
|
||||
"""FIXME: Rename to `SynapseAttributes` so it matches OpenTelemetry `SpanAttributes`"""
|
||||
|
||||
# The message ID of any to_device message processed
|
||||
TO_DEVICE_MESSAGE_ID = "to_device.message_id"
|
||||
|
||||
|
|
|
@ -12,21 +12,14 @@
|
|||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import logging
|
||||
from typing import cast
|
||||
|
||||
from twisted.internet import defer
|
||||
from twisted.test.proto_helpers import MemoryReactorClock
|
||||
|
||||
from synapse.logging.context import (
|
||||
LoggingContext,
|
||||
make_deferred_yieldable,
|
||||
run_in_background,
|
||||
)
|
||||
from synapse.logging.context import make_deferred_yieldable, run_in_background
|
||||
from synapse.logging.tracing import start_active_span
|
||||
from synapse.util import Clock
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
from tests.unittest import TestCase
|
||||
|
||||
try:
|
||||
import opentelemetry
|
||||
|
@ -38,8 +31,6 @@ try:
|
|||
except ImportError:
|
||||
opentelemetry = None # type: ignore[assignment]
|
||||
|
||||
from tests.unittest import TestCase
|
||||
|
||||
|
||||
class LogContextScopeManagerTestCase(TestCase):
|
||||
"""
|
||||
|
@ -120,7 +111,7 @@ class LogContextScopeManagerTestCase(TestCase):
|
|||
self.assertIsNotNone(child_span1.end_time)
|
||||
self.assertIsNotNone(child_span2.end_time)
|
||||
|
||||
# Active span is unset outside of the with scopes
|
||||
# Active span is unset now that we're outside of the `with` scopes
|
||||
self.assertEqual(
|
||||
opentelemetry.trace.get_current_span(), opentelemetry.trace.INVALID_SPAN
|
||||
)
|
||||
|
@ -131,57 +122,53 @@ class LogContextScopeManagerTestCase(TestCase):
|
|||
["child_span2", "child_span1", "root_span"],
|
||||
)
|
||||
|
||||
# def test_overlapping_spans(self) -> None:
|
||||
# """Overlapping spans which are not neatly nested should work"""
|
||||
# reactor = MemoryReactorClock()
|
||||
# clock = Clock(reactor)
|
||||
def test_overlapping_spans(self) -> None:
|
||||
"""Overlapping spans which are not neatly nested should work"""
|
||||
reactor = MemoryReactorClock()
|
||||
clock = Clock(reactor)
|
||||
|
||||
# scopes = []
|
||||
async def task(i: int):
|
||||
with start_active_span(
|
||||
f"task{i}",
|
||||
tracer=self._tracer,
|
||||
) as span1:
|
||||
self.assertEqual(opentelemetry.trace.get_current_span(), span1)
|
||||
await clock.sleep(4)
|
||||
self.assertEqual(opentelemetry.trace.get_current_span(), span1)
|
||||
|
||||
# async def task(i: int):
|
||||
# scope = start_active_span(
|
||||
# f"task{i}",
|
||||
# tracer=self._tracer,
|
||||
# )
|
||||
# scopes.append(scope)
|
||||
async def root():
|
||||
with start_active_span("root_span", tracer=self._tracer) as root_span:
|
||||
self.assertEqual(opentelemetry.trace.get_current_span(), root_span)
|
||||
|
||||
# self.assertEqual(self._tracer.active_span, scope.span)
|
||||
# await clock.sleep(4)
|
||||
# self.assertEqual(self._tracer.active_span, scope.span)
|
||||
# scope.close()
|
||||
d1 = run_in_background(task, 1)
|
||||
await clock.sleep(2)
|
||||
d2 = run_in_background(task, 2)
|
||||
|
||||
# async def root():
|
||||
# with start_active_span("root span", tracer=self._tracer) as root_scope:
|
||||
# self.assertEqual(self._tracer.active_span, root_scope.span)
|
||||
# scopes.append(root_scope)
|
||||
# because we did run_in_background, the active span should still be the
|
||||
# root.
|
||||
self.assertEqual(opentelemetry.trace.get_current_span(), root_span)
|
||||
|
||||
# d1 = run_in_background(task, 1)
|
||||
# await clock.sleep(2)
|
||||
# d2 = run_in_background(task, 2)
|
||||
await make_deferred_yieldable(
|
||||
defer.gatherResults([d1, d2], consumeErrors=True)
|
||||
)
|
||||
|
||||
# # because we did run_in_background, the active span should still be the
|
||||
# # root.
|
||||
# self.assertEqual(self._tracer.active_span, root_scope.span)
|
||||
self.assertEqual(opentelemetry.trace.get_current_span(), root_span)
|
||||
|
||||
# await make_deferred_yieldable(
|
||||
# defer.gatherResults([d1, d2], consumeErrors=True)
|
||||
# )
|
||||
# start the test off
|
||||
d1 = defer.ensureDeferred(root())
|
||||
|
||||
# self.assertEqual(self._tracer.active_span, root_scope.span)
|
||||
# let the tasks complete
|
||||
reactor.pump((2,) * 8)
|
||||
|
||||
# with LoggingContext("root context"):
|
||||
# # start the test off
|
||||
# d1 = defer.ensureDeferred(root())
|
||||
self.successResultOf(d1)
|
||||
# Active span is unset now that we're outside of the `with` scopes
|
||||
self.assertEqual(
|
||||
opentelemetry.trace.get_current_span(), opentelemetry.trace.INVALID_SPAN
|
||||
)
|
||||
|
||||
# # let the tasks complete
|
||||
# reactor.pump((2,) * 8)
|
||||
|
||||
# self.successResultOf(d1)
|
||||
# self.assertIsNone(self._tracer.active_span)
|
||||
|
||||
# # the spans should be reported in order of their finishing: task 1, task 2,
|
||||
# # root.
|
||||
# self.assertEqual(
|
||||
# self._reporter.get_spans(),
|
||||
# [scopes[1].span, scopes[2].span, scopes[0].span],
|
||||
# )
|
||||
# the spans should be reported in order of their finishing: task 1, task 2,
|
||||
# root.
|
||||
self.assertListEqual(
|
||||
[span.name for span in self._exporter.get_finished_spans()],
|
||||
["task1", "task2", "root_span"],
|
||||
)
|
||||
|
|
Loading…
Reference in a new issue