mirror of
https://github.com/element-hq/synapse
synced 2024-09-19 05:55:14 +00:00
WIP
This commit is contained in:
parent
3c3c0dd419
commit
e75de1d7a6
2 changed files with 39 additions and 7 deletions
|
@ -72,7 +72,12 @@ from synapse.api.errors import (
|
||||||
UnrecognizedRequestError,
|
UnrecognizedRequestError,
|
||||||
)
|
)
|
||||||
from synapse.config.homeserver import HomeServerConfig
|
from synapse.config.homeserver import HomeServerConfig
|
||||||
from synapse.logging.context import defer_to_thread, preserve_fn, run_in_background
|
from synapse.logging.context import (
|
||||||
|
defer_to_thread,
|
||||||
|
measure_coroutine,
|
||||||
|
preserve_fn,
|
||||||
|
run_in_background,
|
||||||
|
)
|
||||||
from synapse.logging.opentracing import active_span, start_active_span, trace_servlet
|
from synapse.logging.opentracing import active_span, start_active_span, trace_servlet
|
||||||
from synapse.util import json_encoder
|
from synapse.util import json_encoder
|
||||||
from synapse.util.caches import intern_dict
|
from synapse.util.caches import intern_dict
|
||||||
|
@ -329,7 +334,9 @@ class _AsyncResource(resource.Resource, metaclass=abc.ABCMeta):
|
||||||
request.request_metrics.name = self.__class__.__name__
|
request.request_metrics.name = self.__class__.__name__
|
||||||
|
|
||||||
with trace_servlet(request, self._extract_context):
|
with trace_servlet(request, self._extract_context):
|
||||||
callback_return = await self._async_render(request)
|
callback_return = await measure_coroutine(
|
||||||
|
request.request_metrics.name, self._async_render(request)
|
||||||
|
)
|
||||||
|
|
||||||
if callback_return is not None:
|
if callback_return is not None:
|
||||||
code, response = callback_return
|
code, response = callback_return
|
||||||
|
|
|
@ -50,6 +50,7 @@ from typing import (
|
||||||
)
|
)
|
||||||
|
|
||||||
import attr
|
import attr
|
||||||
|
from prometheus_client import Counter
|
||||||
from typing_extensions import Literal, ParamSpec
|
from typing_extensions import Literal, ParamSpec
|
||||||
|
|
||||||
from twisted.internet import defer, threads
|
from twisted.internet import defer, threads
|
||||||
|
@ -61,6 +62,10 @@ if TYPE_CHECKING:
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
context_ru_utime = Counter(
|
||||||
|
"synapse_logging_context_ru_utime", "utime in log context", ("name",)
|
||||||
|
)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
import resource
|
import resource
|
||||||
|
|
||||||
|
@ -982,13 +987,20 @@ _T = TypeVar("_T")
|
||||||
|
|
||||||
@attr.s(frozen=True, slots=True, auto_attribs=True)
|
@attr.s(frozen=True, slots=True, auto_attribs=True)
|
||||||
class _ResourceTracker(Generator[defer.Deferred[Any], Any, _T]):
|
class _ResourceTracker(Generator[defer.Deferred[Any], Any, _T]):
|
||||||
|
name: str
|
||||||
gen: Generator[defer.Deferred[Any], Any, _T]
|
gen: Generator[defer.Deferred[Any], Any, _T]
|
||||||
|
|
||||||
def send(self, val: Any) -> defer.Deferred[_T]:
|
def send(self, val: Any) -> defer.Deferred[_T]:
|
||||||
|
rusage_start = get_thread_resource_usage()
|
||||||
|
|
||||||
try:
|
try:
|
||||||
return self.gen.send(val)
|
return self.gen.send(val)
|
||||||
finally:
|
finally:
|
||||||
pass
|
rusage_end = get_thread_resource_usage()
|
||||||
|
if rusage_start and rusage_end:
|
||||||
|
context_ru_utime.labels(self.name).inc(
|
||||||
|
max(0, rusage_end.ru_utime - rusage_start.ru_utime)
|
||||||
|
)
|
||||||
|
|
||||||
@overload
|
@overload
|
||||||
def throw(
|
def throw(
|
||||||
|
@ -1006,20 +1018,27 @@ class _ResourceTracker(Generator[defer.Deferred[Any], Any, _T]):
|
||||||
|
|
||||||
def throw(self, a: Any, b: Any = None, c: Any = None) -> defer.Deferred[Any]:
|
def throw(self, a: Any, b: Any = None, c: Any = None) -> defer.Deferred[Any]:
|
||||||
try:
|
try:
|
||||||
return self.throw(a, b, c)
|
return self.gen.throw(a, b, c)
|
||||||
finally:
|
finally:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
@attr.s(frozen=True, slots=True, auto_attribs=True)
|
@attr.s(frozen=True, slots=True, auto_attribs=True)
|
||||||
class _ResourceTracker2(Coroutine[defer.Deferred[Any], Any, _T]):
|
class _ResourceTracker2(Coroutine[defer.Deferred[Any], Any, _T]):
|
||||||
|
name: str
|
||||||
gen: Coroutine[defer.Deferred[Any], Any, _T]
|
gen: Coroutine[defer.Deferred[Any], Any, _T]
|
||||||
|
|
||||||
def send(self, val: Any) -> defer.Deferred[_T]:
|
def send(self, val: Any) -> defer.Deferred[_T]:
|
||||||
|
rusage_start = get_thread_resource_usage()
|
||||||
|
|
||||||
try:
|
try:
|
||||||
return self.gen.send(val)
|
return self.gen.send(val)
|
||||||
finally:
|
finally:
|
||||||
pass
|
rusage_end = get_thread_resource_usage()
|
||||||
|
if rusage_start and rusage_end:
|
||||||
|
context_ru_utime.labels(self.name).inc(
|
||||||
|
max(0, rusage_end.ru_utime - rusage_start.ru_utime)
|
||||||
|
)
|
||||||
|
|
||||||
@overload
|
@overload
|
||||||
def throw(
|
def throw(
|
||||||
|
@ -1037,12 +1056,18 @@ class _ResourceTracker2(Coroutine[defer.Deferred[Any], Any, _T]):
|
||||||
|
|
||||||
def throw(self, a: Any, b: Any = None, c: Any = None) -> defer.Deferred[Any]:
|
def throw(self, a: Any, b: Any = None, c: Any = None) -> defer.Deferred[Any]:
|
||||||
try:
|
try:
|
||||||
return self.throw(a, b, c)
|
return self.gen.throw(a, b, c)
|
||||||
finally:
|
finally:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
def __await__(self) -> Generator[defer.Deferred[Any], Any, _T]:
|
def __await__(self) -> Generator[defer.Deferred[Any], Any, _T]:
|
||||||
return _ResourceTracker(self.gen.__await__())
|
return _ResourceTracker(self.name, self.gen.__await__())
|
||||||
|
|
||||||
def close(self) -> None:
|
def close(self) -> None:
|
||||||
return self.gen.close()
|
return self.gen.close()
|
||||||
|
|
||||||
|
|
||||||
|
async def measure_coroutine(
|
||||||
|
name: str, co: Coroutine[defer.Deferred[Any], Any, _T]
|
||||||
|
) -> _T:
|
||||||
|
return await _ResourceTracker2(name, co)
|
||||||
|
|
Loading…
Reference in a new issue