Remove run_on_reactor (#3395)

This commit is contained in:
Amber Brown 2018-06-14 18:27:37 +10:00 committed by GitHub
parent 3681437c35
commit a61738b316
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
13 changed files with 6 additions and 63 deletions

View file

@ -21,7 +21,6 @@ from .units import Transaction, Edu
from synapse.api.errors import HttpResponseException, FederationDeniedError from synapse.api.errors import HttpResponseException, FederationDeniedError
from synapse.util import logcontext, PreserveLoggingContext from synapse.util import logcontext, PreserveLoggingContext
from synapse.util.async import run_on_reactor
from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter
from synapse.util.metrics import measure_func from synapse.util.metrics import measure_func
from synapse.handlers.presence import format_user_presence_state, get_interested_remotes from synapse.handlers.presence import format_user_presence_state, get_interested_remotes
@ -451,9 +450,6 @@ class TransactionQueue(object):
# hence why we throw the result away. # hence why we throw the result away.
yield get_retry_limiter(destination, self.clock, self.store) yield get_retry_limiter(destination, self.clock, self.store)
# XXX: what's this for?
yield run_on_reactor()
pending_pdus = [] pending_pdus = []
while True: while True:
device_message_edus, device_stream_id, dev_list_id = ( device_message_edus, device_stream_id, dev_list_id = (

View file

@ -13,6 +13,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
from twisted.internet import defer, threads from twisted.internet import defer, threads
from ._base import BaseHandler from ._base import BaseHandler
@ -23,7 +24,6 @@ from synapse.api.errors import (
) )
from synapse.module_api import ModuleApi from synapse.module_api import ModuleApi
from synapse.types import UserID from synapse.types import UserID
from synapse.util.async import run_on_reactor
from synapse.util.caches.expiringcache import ExpiringCache from synapse.util.caches.expiringcache import ExpiringCache
from synapse.util.logcontext import make_deferred_yieldable from synapse.util.logcontext import make_deferred_yieldable
@ -423,15 +423,11 @@ class AuthHandler(BaseHandler):
def _check_msisdn(self, authdict, _): def _check_msisdn(self, authdict, _):
return self._check_threepid('msisdn', authdict) return self._check_threepid('msisdn', authdict)
@defer.inlineCallbacks
def _check_dummy_auth(self, authdict, _): def _check_dummy_auth(self, authdict, _):
yield run_on_reactor() return defer.succeed(True)
defer.returnValue(True)
@defer.inlineCallbacks @defer.inlineCallbacks
def _check_threepid(self, medium, authdict): def _check_threepid(self, medium, authdict):
yield run_on_reactor()
if 'threepid_creds' not in authdict: if 'threepid_creds' not in authdict:
raise LoginError(400, "Missing threepid_creds", Codes.MISSING_PARAM) raise LoginError(400, "Missing threepid_creds", Codes.MISSING_PARAM)

View file

@ -39,7 +39,7 @@ from synapse.events.validator import EventValidator
from synapse.util import unwrapFirstError, logcontext from synapse.util import unwrapFirstError, logcontext
from synapse.util.metrics import measure_func from synapse.util.metrics import measure_func
from synapse.util.logutils import log_function from synapse.util.logutils import log_function
from synapse.util.async import run_on_reactor, Linearizer from synapse.util.async import Linearizer
from synapse.util.frozenutils import unfreeze from synapse.util.frozenutils import unfreeze
from synapse.crypto.event_signing import ( from synapse.crypto.event_signing import (
compute_event_signature, add_hashes_and_signatures, compute_event_signature, add_hashes_and_signatures,
@ -1381,8 +1381,6 @@ class FederationHandler(BaseHandler):
def get_state_for_pdu(self, room_id, event_id): def get_state_for_pdu(self, room_id, event_id):
"""Returns the state at the event. i.e. not including said event. """Returns the state at the event. i.e. not including said event.
""" """
yield run_on_reactor()
state_groups = yield self.store.get_state_groups( state_groups = yield self.store.get_state_groups(
room_id, [event_id] room_id, [event_id]
) )
@ -1425,8 +1423,6 @@ class FederationHandler(BaseHandler):
def get_state_ids_for_pdu(self, room_id, event_id): def get_state_ids_for_pdu(self, room_id, event_id):
"""Returns the state at the event. i.e. not including said event. """Returns the state at the event. i.e. not including said event.
""" """
yield run_on_reactor()
state_groups = yield self.store.get_state_groups_ids( state_groups = yield self.store.get_state_groups_ids(
room_id, [event_id] room_id, [event_id]
) )

View file

@ -27,7 +27,6 @@ from synapse.api.errors import (
MatrixCodeMessageException, CodeMessageException MatrixCodeMessageException, CodeMessageException
) )
from ._base import BaseHandler from ._base import BaseHandler
from synapse.util.async import run_on_reactor
from synapse.api.errors import SynapseError, Codes from synapse.api.errors import SynapseError, Codes
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -62,8 +61,6 @@ class IdentityHandler(BaseHandler):
@defer.inlineCallbacks @defer.inlineCallbacks
def threepid_from_creds(self, creds): def threepid_from_creds(self, creds):
yield run_on_reactor()
if 'id_server' in creds: if 'id_server' in creds:
id_server = creds['id_server'] id_server = creds['id_server']
elif 'idServer' in creds: elif 'idServer' in creds:
@ -106,7 +103,6 @@ class IdentityHandler(BaseHandler):
@defer.inlineCallbacks @defer.inlineCallbacks
def bind_threepid(self, creds, mxid): def bind_threepid(self, creds, mxid):
yield run_on_reactor()
logger.debug("binding threepid %r to %s", creds, mxid) logger.debug("binding threepid %r to %s", creds, mxid)
data = None data = None
@ -188,8 +184,6 @@ class IdentityHandler(BaseHandler):
@defer.inlineCallbacks @defer.inlineCallbacks
def requestEmailToken(self, id_server, email, client_secret, send_attempt, **kwargs): def requestEmailToken(self, id_server, email, client_secret, send_attempt, **kwargs):
yield run_on_reactor()
if not self._should_trust_id_server(id_server): if not self._should_trust_id_server(id_server):
raise SynapseError( raise SynapseError(
400, "Untrusted ID server '%s'" % id_server, 400, "Untrusted ID server '%s'" % id_server,
@ -224,8 +218,6 @@ class IdentityHandler(BaseHandler):
self, id_server, country, phone_number, self, id_server, country, phone_number,
client_secret, send_attempt, **kwargs client_secret, send_attempt, **kwargs
): ):
yield run_on_reactor()
if not self._should_trust_id_server(id_server): if not self._should_trust_id_server(id_server):
raise SynapseError( raise SynapseError(
400, "Untrusted ID server '%s'" % id_server, 400, "Untrusted ID server '%s'" % id_server,

View file

@ -36,7 +36,7 @@ from synapse.events.validator import EventValidator
from synapse.types import ( from synapse.types import (
UserID, RoomAlias, RoomStreamToken, UserID, RoomAlias, RoomStreamToken,
) )
from synapse.util.async import run_on_reactor, ReadWriteLock, Limiter from synapse.util.async import ReadWriteLock, Limiter
from synapse.util.logcontext import run_in_background from synapse.util.logcontext import run_in_background
from synapse.util.metrics import measure_func from synapse.util.metrics import measure_func
from synapse.util.frozenutils import frozendict_json_encoder from synapse.util.frozenutils import frozendict_json_encoder
@ -959,9 +959,7 @@ class EventCreationHandler(object):
event_stream_id, max_stream_id event_stream_id, max_stream_id
) )
@defer.inlineCallbacks
def _notify(): def _notify():
yield run_on_reactor()
try: try:
self.notifier.on_new_room_event( self.notifier.on_new_room_event(
event, event_stream_id, max_stream_id, event, event_stream_id, max_stream_id,

View file

@ -24,7 +24,7 @@ from synapse.api.errors import (
from synapse.http.client import CaptchaServerHttpClient from synapse.http.client import CaptchaServerHttpClient
from synapse import types from synapse import types
from synapse.types import UserID, create_requester, RoomID, RoomAlias from synapse.types import UserID, create_requester, RoomID, RoomAlias
from synapse.util.async import run_on_reactor, Linearizer from synapse.util.async import Linearizer
from synapse.util.threepids import check_3pid_allowed from synapse.util.threepids import check_3pid_allowed
from ._base import BaseHandler from ._base import BaseHandler
@ -139,7 +139,6 @@ class RegistrationHandler(BaseHandler):
Raises: Raises:
RegistrationError if there was a problem registering. RegistrationError if there was a problem registering.
""" """
yield run_on_reactor()
password_hash = None password_hash = None
if password: if password:
password_hash = yield self.auth_handler().hash(password) password_hash = yield self.auth_handler().hash(password)
@ -431,8 +430,6 @@ class RegistrationHandler(BaseHandler):
Raises: Raises:
RegistrationError if there was a problem registering. RegistrationError if there was a problem registering.
""" """
yield run_on_reactor()
if localpart is None: if localpart is None:
raise SynapseError(400, "Request must include user id") raise SynapseError(400, "Request must include user id")

View file

@ -19,7 +19,6 @@ import logging
from twisted.internet import defer from twisted.internet import defer
from synapse.push.pusher import PusherFactory from synapse.push.pusher import PusherFactory
from synapse.util.async import run_on_reactor
from synapse.util.logcontext import make_deferred_yieldable, run_in_background from synapse.util.logcontext import make_deferred_yieldable, run_in_background
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -125,7 +124,6 @@ class PusherPool:
@defer.inlineCallbacks @defer.inlineCallbacks
def on_new_notifications(self, min_stream_id, max_stream_id): def on_new_notifications(self, min_stream_id, max_stream_id):
yield run_on_reactor()
try: try:
users_affected = yield self.store.get_push_action_users_in_range( users_affected = yield self.store.get_push_action_users_in_range(
min_stream_id, max_stream_id min_stream_id, max_stream_id
@ -151,7 +149,6 @@ class PusherPool:
@defer.inlineCallbacks @defer.inlineCallbacks
def on_new_receipts(self, min_stream_id, max_stream_id, affected_room_ids): def on_new_receipts(self, min_stream_id, max_stream_id, affected_room_ids):
yield run_on_reactor()
try: try:
# Need to subtract 1 from the minimum because the lower bound here # Need to subtract 1 from the minimum because the lower bound here
# is not inclusive # is not inclusive

View file

@ -24,8 +24,6 @@ import synapse.util.stringutils as stringutils
from synapse.http.servlet import parse_json_object_from_request from synapse.http.servlet import parse_json_object_from_request
from synapse.types import create_requester from synapse.types import create_requester
from synapse.util.async import run_on_reactor
from hashlib import sha1 from hashlib import sha1
import hmac import hmac
import logging import logging
@ -272,7 +270,6 @@ class RegisterRestServlet(ClientV1RestServlet):
@defer.inlineCallbacks @defer.inlineCallbacks
def _do_password(self, request, register_json, session): def _do_password(self, request, register_json, session):
yield run_on_reactor()
if (self.hs.config.enable_registration_captcha and if (self.hs.config.enable_registration_captcha and
not session[LoginType.RECAPTCHA]): not session[LoginType.RECAPTCHA]):
# captcha should've been done by this stage! # captcha should've been done by this stage!
@ -333,8 +330,6 @@ class RegisterRestServlet(ClientV1RestServlet):
@defer.inlineCallbacks @defer.inlineCallbacks
def _do_shared_secret(self, request, register_json, session): def _do_shared_secret(self, request, register_json, session):
yield run_on_reactor()
if not isinstance(register_json.get("mac", None), string_types): if not isinstance(register_json.get("mac", None), string_types):
raise SynapseError(400, "Expected mac.") raise SynapseError(400, "Expected mac.")
if not isinstance(register_json.get("user", None), string_types): if not isinstance(register_json.get("user", None), string_types):
@ -423,8 +418,6 @@ class CreateUserRestServlet(ClientV1RestServlet):
@defer.inlineCallbacks @defer.inlineCallbacks
def _do_create(self, requester, user_json): def _do_create(self, requester, user_json):
yield run_on_reactor()
if "localpart" not in user_json: if "localpart" not in user_json:
raise SynapseError(400, "Expected 'localpart' key.") raise SynapseError(400, "Expected 'localpart' key.")

View file

@ -24,7 +24,6 @@ from synapse.http.servlet import (
RestServlet, assert_params_in_request, RestServlet, assert_params_in_request,
parse_json_object_from_request, parse_json_object_from_request,
) )
from synapse.util.async import run_on_reactor
from synapse.util.msisdn import phone_number_to_msisdn from synapse.util.msisdn import phone_number_to_msisdn
from synapse.util.threepids import check_3pid_allowed from synapse.util.threepids import check_3pid_allowed
from ._base import client_v2_patterns, interactive_auth_handler from ._base import client_v2_patterns, interactive_auth_handler
@ -300,8 +299,6 @@ class ThreepidRestServlet(RestServlet):
@defer.inlineCallbacks @defer.inlineCallbacks
def on_GET(self, request): def on_GET(self, request):
yield run_on_reactor()
requester = yield self.auth.get_user_by_req(request) requester = yield self.auth.get_user_by_req(request)
threepids = yield self.datastore.user_get_threepids( threepids = yield self.datastore.user_get_threepids(
@ -312,8 +309,6 @@ class ThreepidRestServlet(RestServlet):
@defer.inlineCallbacks @defer.inlineCallbacks
def on_POST(self, request): def on_POST(self, request):
yield run_on_reactor()
body = parse_json_object_from_request(request) body = parse_json_object_from_request(request)
threePidCreds = body.get('threePidCreds') threePidCreds = body.get('threePidCreds')
@ -365,8 +360,6 @@ class ThreepidDeleteRestServlet(RestServlet):
@defer.inlineCallbacks @defer.inlineCallbacks
def on_POST(self, request): def on_POST(self, request):
yield run_on_reactor()
body = parse_json_object_from_request(request) body = parse_json_object_from_request(request)
required = ['medium', 'address'] required = ['medium', 'address']

View file

@ -32,7 +32,6 @@ from ._base import client_v2_patterns, interactive_auth_handler
import logging import logging
import hmac import hmac
from hashlib import sha1 from hashlib import sha1
from synapse.util.async import run_on_reactor
from synapse.util.ratelimitutils import FederationRateLimiter from synapse.util.ratelimitutils import FederationRateLimiter
from six import string_types from six import string_types
@ -191,8 +190,6 @@ class RegisterRestServlet(RestServlet):
@interactive_auth_handler @interactive_auth_handler
@defer.inlineCallbacks @defer.inlineCallbacks
def on_POST(self, request): def on_POST(self, request):
yield run_on_reactor()
body = parse_json_object_from_request(request) body = parse_json_object_from_request(request)
kind = "user" kind = "user"

View file

@ -13,7 +13,6 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
from twisted.internet import defer, reactor from twisted.internet import defer, reactor
from twisted.internet.defer import CancelledError from twisted.internet.defer import CancelledError
from twisted.python import failure from twisted.python import failure
@ -41,13 +40,6 @@ def sleep(seconds):
defer.returnValue(res) defer.returnValue(res)
def run_on_reactor():
""" This will cause the rest of the function to be invoked upon the next
iteration of the main loop
"""
return sleep(0)
class ObservableDeferred(object): class ObservableDeferred(object):
"""Wraps a deferred object so that we can add observer deferreds. These """Wraps a deferred object so that we can add observer deferreds. These
observer deferreds do not affect the callback chain of the original observer deferreds do not affect the callback chain of the original
@ -227,7 +219,7 @@ class Linearizer(object):
# the context manager, but it needs to happen while we hold the # the context manager, but it needs to happen while we hold the
# lock, and the context manager's exit code must be synchronous, # lock, and the context manager's exit code must be synchronous,
# so actually this is the only sensible place. # so actually this is the only sensible place.
yield run_on_reactor() yield sleep(0)
else: else:
logger.info("Acquired uncontended linearizer lock %r for key %r", logger.info("Acquired uncontended linearizer lock %r for key %r",

View file

@ -19,7 +19,6 @@ from twisted.internet import defer
from mock import Mock, patch from mock import Mock, patch
from synapse.util.distributor import Distributor from synapse.util.distributor import Distributor
from synapse.util.async import run_on_reactor
class DistributorTestCase(unittest.TestCase): class DistributorTestCase(unittest.TestCase):
@ -95,7 +94,6 @@ class DistributorTestCase(unittest.TestCase):
@defer.inlineCallbacks @defer.inlineCallbacks
def observer(): def observer():
yield run_on_reactor()
raise MyException("Oopsie") raise MyException("Oopsie")
self.dist.observe("whail", observer) self.dist.observe("whail", observer)

View file

@ -18,7 +18,6 @@ import logging
import mock import mock
from synapse.api.errors import SynapseError from synapse.api.errors import SynapseError
from synapse.util import async
from synapse.util import logcontext from synapse.util import logcontext
from twisted.internet import defer from twisted.internet import defer
from synapse.util.caches import descriptors from synapse.util.caches import descriptors
@ -195,7 +194,6 @@ class DescriptorTestCase(unittest.TestCase):
def fn(self, arg1): def fn(self, arg1):
@defer.inlineCallbacks @defer.inlineCallbacks
def inner_fn(): def inner_fn():
yield async.run_on_reactor()
raise SynapseError(400, "blah") raise SynapseError(400, "blah")
return inner_fn() return inner_fn()