Fix up leak. Add warnings.

This commit is contained in:
Erik Johnston 2015-05-08 19:53:34 +01:00
parent da1aa07db5
commit 2236ef6c92
8 changed files with 69 additions and 48 deletions

View file

@ -20,6 +20,8 @@ from synapse.crypto.event_signing import add_hashes_and_signatures
from synapse.api.constants import Membership, EventTypes from synapse.api.constants import Membership, EventTypes
from synapse.types import UserID from synapse.types import UserID
from synapse.util.logcontext import PreserveLoggingContext
import logging import logging
@ -137,10 +139,11 @@ class BaseHandler(object):
"Failed to get destination from event %s", s.event_id "Failed to get destination from event %s", s.event_id
) )
# Don't block waiting on waking up all the listeners. with PreserveLoggingContext():
notify_d = self.notifier.on_new_room_event( # Don't block waiting on waking up all the listeners.
event, extra_users=extra_users notify_d = self.notifier.on_new_room_event(
) event, extra_users=extra_users
)
def log_failure(f): def log_failure(f):
logger.warn( logger.warn(

View file

@ -21,6 +21,7 @@ from synapse.api.errors import (
AuthError, FederationError, StoreError, AuthError, FederationError, StoreError,
) )
from synapse.api.constants import EventTypes, Membership, RejectedReason from synapse.api.constants import EventTypes, Membership, RejectedReason
from synapse.util.logcontext import PreserveLoggingContext
from synapse.util.logutils import log_function from synapse.util.logutils import log_function
from synapse.util.async import run_on_reactor from synapse.util.async import run_on_reactor
from synapse.util.frozenutils import unfreeze from synapse.util.frozenutils import unfreeze
@ -197,9 +198,10 @@ class FederationHandler(BaseHandler):
target_user = UserID.from_string(target_user_id) target_user = UserID.from_string(target_user_id)
extra_users.append(target_user) extra_users.append(target_user)
d = self.notifier.on_new_room_event( with PreserveLoggingContext():
event, extra_users=extra_users d = self.notifier.on_new_room_event(
) event, extra_users=extra_users
)
def log_failure(f): def log_failure(f):
logger.warn( logger.warn(
@ -431,9 +433,10 @@ class FederationHandler(BaseHandler):
auth_events=auth_events, auth_events=auth_events,
) )
d = self.notifier.on_new_room_event( with PreserveLoggingContext():
new_event, extra_users=[joinee] d = self.notifier.on_new_room_event(
) new_event, extra_users=[joinee]
)
def log_failure(f): def log_failure(f):
logger.warn( logger.warn(
@ -512,9 +515,10 @@ class FederationHandler(BaseHandler):
target_user = UserID.from_string(target_user_id) target_user = UserID.from_string(target_user_id)
extra_users.append(target_user) extra_users.append(target_user)
d = self.notifier.on_new_room_event( with PreserveLoggingContext():
event, extra_users=extra_users d = self.notifier.on_new_room_event(
) event, extra_users=extra_users
)
def log_failure(f): def log_failure(f):
logger.warn( logger.warn(
@ -594,9 +598,10 @@ class FederationHandler(BaseHandler):
) )
target_user = UserID.from_string(event.state_key) target_user = UserID.from_string(event.state_key)
d = self.notifier.on_new_room_event( with PreserveLoggingContext():
event, extra_users=[target_user], d = self.notifier.on_new_room_event(
) event, extra_users=[target_user],
)
def log_failure(f): def log_failure(f):
logger.warn( logger.warn(

View file

@ -18,6 +18,7 @@ from twisted.internet import defer
from synapse.api.errors import SynapseError, AuthError from synapse.api.errors import SynapseError, AuthError
from synapse.api.constants import PresenceState from synapse.api.constants import PresenceState
from synapse.util.logcontext import PreserveLoggingContext
from synapse.util.logutils import log_function from synapse.util.logutils import log_function
from synapse.types import UserID from synapse.types import UserID
import synapse.metrics import synapse.metrics
@ -808,10 +809,11 @@ class PresenceHandler(BaseHandler):
def push_update_to_clients(self, observed_user, users_to_push=[], def push_update_to_clients(self, observed_user, users_to_push=[],
room_ids=[], statuscache=None): room_ids=[], statuscache=None):
self.notifier.on_new_user_event( with PreserveLoggingContext():
users_to_push, self.notifier.on_new_user_event(
room_ids, users_to_push,
) room_ids,
)
class PresenceEventSource(object): class PresenceEventSource(object):

View file

@ -18,6 +18,7 @@ from twisted.internet import defer
from ._base import BaseHandler from ._base import BaseHandler
from synapse.api.errors import SynapseError, AuthError from synapse.api.errors import SynapseError, AuthError
from synapse.util.logcontext import PreserveLoggingContext
from synapse.types import UserID from synapse.types import UserID
import logging import logging
@ -216,7 +217,8 @@ class TypingNotificationHandler(BaseHandler):
self._latest_room_serial += 1 self._latest_room_serial += 1
self._room_serials[room_id] = self._latest_room_serial self._room_serials[room_id] = self._latest_room_serial
self.notifier.on_new_user_event(rooms=[room_id]) with PreserveLoggingContext():
self.notifier.on_new_user_event(rooms=[room_id])
class TypingNotificationEventSource(object): class TypingNotificationEventSource(object):

View file

@ -17,7 +17,7 @@
from synapse.api.errors import ( from synapse.api.errors import (
cs_exception, SynapseError, CodeMessageException, UnrecognizedRequestError cs_exception, SynapseError, CodeMessageException, UnrecognizedRequestError
) )
from synapse.util.logcontext import LoggingContext from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
import synapse.metrics import synapse.metrics
from syutil.jsonutil import ( from syutil.jsonutil import (
@ -85,7 +85,9 @@ def request_handler(request_handler):
"Received request: %s %s", "Received request: %s %s",
request.method, request.path request.method, request.path
) )
yield request_handler(self, request) d = request_handler(self, request)
with PreserveLoggingContext():
yield d
code = request.code code = request.code
except CodeMessageException as e: except CodeMessageException as e:
code = e.code code = e.code

View file

@ -54,7 +54,8 @@ class Clock(object):
LoggingContext.thread_local.current_context = current_context LoggingContext.thread_local.current_context = current_context
callback() callback()
return reactor.callLater(delay, wrapped_callback) with PreserveLoggingContext():
return reactor.callLater(delay, wrapped_callback)
def cancel_call_later(self, timer): def cancel_call_later(self, timer):
timer.cancel() timer.cancel()

View file

@ -13,8 +13,6 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
from synapse.util.logcontext import PreserveLoggingContext
from twisted.internet import defer from twisted.internet import defer
import logging import logging
@ -93,7 +91,6 @@ class Signal(object):
Each observer callable may return a Deferred.""" Each observer callable may return a Deferred."""
self.observers.append(observer) self.observers.append(observer)
@defer.inlineCallbacks
def fire(self, *args, **kwargs): def fire(self, *args, **kwargs):
"""Invokes every callable in the observer list, passing in the args and """Invokes every callable in the observer list, passing in the args and
kwargs. Exceptions thrown by observers are logged but ignored. It is kwargs. Exceptions thrown by observers are logged but ignored. It is
@ -101,24 +98,24 @@ class Signal(object):
Returns a Deferred that will complete when all the observers have Returns a Deferred that will complete when all the observers have
completed.""" completed."""
with PreserveLoggingContext():
deferreds = []
for observer in self.observers:
d = defer.maybeDeferred(observer, *args, **kwargs)
def eb(failure): def eb(failure):
logger.warning( logger.warning(
"%s signal observer %s failed: %r", "%s signal observer %s failed: %r",
self.name, observer, failure, self.name, observer, failure,
exc_info=( exc_info=(
failure.type, failure.type,
failure.value, failure.value,
failure.getTracebackObject())) failure.getTracebackObject()))
if not self.suppress_failures: if not self.suppress_failures:
failure.raiseException() failure.raiseException()
deferreds.append(d.addErrback(eb))
results = [] deferreds = [
for deferred in deferreds: defer.maybeDeferred(observer, *args, **kwargs)
result = yield deferred for observer in self.observers
results.append(result) ]
defer.returnValue(results)
d = defer.gatherResults(deferreds, consumeErrors=True)
d.addErrback(eb)
return d

View file

@ -132,6 +132,13 @@ class PreserveLoggingContext(object):
"""Restores the current logging context""" """Restores the current logging context"""
LoggingContext.thread_local.current_context = self.current_context LoggingContext.thread_local.current_context = self.current_context
if self.current_context is not LoggingContext.sentinel:
if self.current_context.parent_context is None:
logger.warn(
"Restoring dead context: %s",
self.current_context,
)
def preserve_context_over_fn(fn, *args, **kwargs): def preserve_context_over_fn(fn, *args, **kwargs):
"""Takes a function and invokes it with the given arguments, but removes """Takes a function and invokes it with the given arguments, but removes
@ -169,6 +176,8 @@ def preserve_context_over_deferred(deferred):
res = d.errback(failure) res = d.errback(failure)
return res return res
deferred.addCallbacks(cb, eb) if deferred.called:
return deferred
deferred.addCallbacks(cb, eb)
return d return d