diff --git a/changelog.d/8607.feature b/changelog.d/8607.feature new file mode 100644 index 0000000000..fef1eccb92 --- /dev/null +++ b/changelog.d/8607.feature @@ -0,0 +1 @@ +Support generating structured logs via the standard logging configuration. diff --git a/changelog.d/8607.misc b/changelog.d/8607.misc deleted file mode 100644 index 9e56551a34..0000000000 --- a/changelog.d/8607.misc +++ /dev/null @@ -1 +0,0 @@ -Re-organize the structured logging code to separate the TCP transport handling from the JSON formatting. diff --git a/changelog.d/8685.feature b/changelog.d/8685.feature new file mode 100644 index 0000000000..fef1eccb92 --- /dev/null +++ b/changelog.d/8685.feature @@ -0,0 +1 @@ +Support generating structured logs via the standard logging configuration. diff --git a/synapse/logging/_remote.py b/synapse/logging/_remote.py index ba45424f02..fb937b3f28 100644 --- a/synapse/logging/_remote.py +++ b/synapse/logging/_remote.py @@ -26,7 +26,7 @@ from typing_extensions import Deque from zope.interface import implementer from twisted.application.internet import ClientService -from twisted.internet.defer import Deferred +from twisted.internet.defer import CancelledError, Deferred from twisted.internet.endpoints import ( HostnameEndpoint, TCP4ClientEndpoint, @@ -34,6 +34,7 @@ from twisted.internet.endpoints import ( ) from twisted.internet.interfaces import IPushProducer, ITransport from twisted.internet.protocol import Factory, Protocol +from twisted.python.failure import Failure logger = logging.getLogger(__name__) @@ -131,9 +132,11 @@ class RemoteHandler(logging.Handler): factory = Factory.forProtocol(Protocol) self._service = ClientService(endpoint, factory, clock=_reactor) self._service.startService() + self._stopping = False self._connect() def close(self): + self._stopping = True self._service.stopService() def _connect(self) -> None: @@ -146,17 +149,21 @@ class RemoteHandler(logging.Handler): self._connection_waiter = self._service.whenConnected(failAfterFailures=1) - @self._connection_waiter.addErrback - def fail(r): - r.printTraceback(file=sys.__stderr__) + def fail(failure: Failure) -> None: + # If the Deferred was cancelled (e.g. during shutdown) do not try to + # reconnect (this will cause an infinite loop of errors). + if failure.check(CancelledError) and self._stopping: + return + + # For a different error, print the traceback and re-connect. + failure.printTraceback(file=sys.__stderr__) self._connection_waiter = None self._connect() - @self._connection_waiter.addCallback - def writer(r): + def writer(result: Protocol) -> None: # We have a connection. If we already have a producer, and its # transport is the same, just trigger a resumeProducing. - if self._producer and r.transport is self._producer.transport: + if self._producer and result.transport is self._producer.transport: self._producer.resumeProducing() self._connection_waiter = None return @@ -167,12 +174,14 @@ class RemoteHandler(logging.Handler): # Make a new producer and start it. self._producer = LogProducer( - buffer=self._buffer, transport=r.transport, format=self.format, + buffer=self._buffer, transport=result.transport, format=self.format, ) - r.transport.registerProducer(self._producer, True) + result.transport.registerProducer(self._producer, True) self._producer.resumeProducing() self._connection_waiter = None + self._connection_waiter.addCallbacks(writer, fail) + def _handle_pressure(self) -> None: """ Handle backpressure by shedding records. diff --git a/tests/logging/test_remote_handler.py b/tests/logging/test_remote_handler.py index 58ee1f2f3c..4bc27a1d7d 100644 --- a/tests/logging/test_remote_handler.py +++ b/tests/logging/test_remote_handler.py @@ -151,3 +151,19 @@ class RemoteHandlerTestCase(LoggerCleanupMixin, TestCase): + ["warn %s" % (i,) for i in range(15, 20)], logs, ) + + def test_cancel_connection(self): + """ + Gracefully handle the connection being cancelled. + """ + handler = RemoteHandler( + "127.0.0.1", 9000, maximum_buffer=10, _reactor=self.reactor + ) + logger = self.get_logger(handler) + + # Send a message. + logger.info("Hello there, %s!", "wally") + + # Do not accept the connection and shutdown. This causes the pending + # connection to be cancelled (and should not raise any exceptions). + handler.close()