Merge remote-tracking branch 'origin/develop' into dbkr/media_erasure

This commit is contained in:
David Baker 2018-06-28 15:16:04 +01:00
commit 94a14b4182
132 changed files with 1221 additions and 406 deletions

View file

@ -4,7 +4,12 @@ language: python
# tell travis to cache ~/.cache/pip
cache: pip
before_script:
- git remote set-branches --add origin develop
- git fetch origin develop
matrix:
fast_finish: true
include:
- python: 2.7
env: TOX_ENV=packaging
@ -14,10 +19,13 @@ matrix:
- python: 2.7
env: TOX_ENV=py27
- python: 3.6
env: TOX_ENV=py36
- python: 3.6
env: TOX_ENV=check-newsfragment
install:
- pip install tox

View file

@ -1,3 +1,12 @@
Changes in synapse v0.31.2 (2018-06-14)
=======================================
SECURITY UPDATE: Prevent unauthorised users from setting state events in a room
when there is no ``m.room.power_levels`` event in force in the room. (PR #3397)
Discussion around the Matrix Spec change proposal for this change can be
followed at https://github.com/matrix-org/matrix-doc/issues/1304.
Changes in synapse v0.31.1 (2018-06-08)
=======================================

View file

@ -48,6 +48,26 @@ Please ensure your changes match the cosmetic style of the existing project,
and **never** mix cosmetic and functional changes in the same commit, as it
makes it horribly hard to review otherwise.
Changelog
~~~~~~~~~
All changes, even minor ones, need a corresponding changelog
entry. These are managed by Towncrier
(https://github.com/hawkowl/towncrier).
To create a changelog entry, make a new file in the ``changelog.d``
file named in the format of ``issuenumberOrPR.type``. The type can be
one of ``feature``, ``bugfix``, ``removal`` (also used for
deprecations), or ``misc`` (for internal-only changes). The content of
the file is your changelog entry, which can contain RestructuredText
formatting. A note of contributors is welcomed in changelogs for
non-misc changes (the content of misc changes is not displayed).
For example, a fix for a bug reported in #1234 would have its
changelog entry in ``changelog.d/1234.bugfix``, and contain content
like "The security levels of Florbs are now validated when
recieved over federation. Contributed by Jane Matrix".
Attribution
~~~~~~~~~~~

View file

@ -29,5 +29,8 @@ exclude Dockerfile
exclude .dockerignore
recursive-exclude jenkins *.sh
include pyproject.toml
recursive-include changelog.d *
prune .github
prune demo/etc

1
changelog.d/3324.removal Normal file
View file

@ -0,0 +1 @@
Remove was_forgotten_at

1
changelog.d/3327.bugfix Normal file
View file

@ -0,0 +1 @@
Strip access_token from outgoing requests

0
changelog.d/3332.misc Normal file
View file

1
changelog.d/3334.feature Normal file
View file

@ -0,0 +1 @@
Cache factor override system for specific caches

1
changelog.d/3340.doc Normal file
View file

@ -0,0 +1 @@
``doc/postgres.rst``: fix display of the last command block. Thanks to @ArchangeGabriel!

0
changelog.d/3341.misc Normal file
View file

1
changelog.d/3344.feature Normal file
View file

@ -0,0 +1 @@
Add metrics to track appservice transactions

0
changelog.d/3347.misc Normal file
View file

0
changelog.d/3348.misc Normal file
View file

1
changelog.d/3349.bugfix Normal file
View file

@ -0,0 +1 @@
Redact AS tokens in logs

1
changelog.d/3355.bugfix Normal file
View file

@ -0,0 +1 @@
Fix federation backfill from SQLite servers

0
changelog.d/3356.misc Normal file
View file

1
changelog.d/3363.bugfix Normal file
View file

@ -0,0 +1 @@
Fix event-purge-by-ts admin API

1
changelog.d/3371.bugfix Normal file
View file

@ -0,0 +1 @@
Fix event filtering in get_missing_events handler

1
changelog.d/3372.feature Normal file
View file

@ -0,0 +1 @@
Try to log more helpful info when a sig verification fails

0
changelog.d/3446.misc Normal file
View file

0
changelog.d/3447.misc Normal file
View file

1
changelog.d/3462.feature Normal file
View file

@ -0,0 +1 @@
Synapse now uses the best performing JSON encoder/decoder according to your runtime (simplejson on CPython, stdlib json on PyPy).

View file

@ -44,13 +44,26 @@ Deactivate Account
This API deactivates an account. It removes active access tokens, resets the
password, and deletes third-party IDs (to prevent the user requesting a
password reset).
password reset). It can also mark the user as GDPR-erased (stopping their data
from distributed further, and deleting it entirely if there are no other
references to it).
The api is::
POST /_matrix/client/r0/admin/deactivate/<user_id>
including an ``access_token`` of a server admin, and an empty request body.
with a body of:
.. code:: json
{
"erase": true
}
including an ``access_token`` of a server admin.
The erase parameter is optional and defaults to 'false'.
An empty body may be passed for backwards compatibility.
Reset password

5
pyproject.toml Normal file
View file

@ -0,0 +1,5 @@
[tool.towncrier]
package = "synapse"
filename = "CHANGES.rst"
directory = "changelog.d"
issue_format = "`#{issue} <https://github.com/matrix-org/synapse/issues/{issue}>`_"

View file

@ -18,14 +18,22 @@
from __future__ import print_function
import argparse
from urlparse import urlparse, urlunparse
import nacl.signing
import json
import base64
import requests
import sys
from requests.adapters import HTTPAdapter
import srvlookup
import yaml
# uncomment the following to enable debug logging of http requests
#from httplib import HTTPConnection
#HTTPConnection.debuglevel = 1
def encode_base64(input_bytes):
"""Encode bytes as a base64 string without any padding."""
@ -113,17 +121,6 @@ def read_signing_keys(stream):
return keys
def lookup(destination, path):
if ":" in destination:
return "https://%s%s" % (destination, path)
else:
try:
srv = srvlookup.lookup("matrix", "tcp", destination)[0]
return "https://%s:%d%s" % (srv.host, srv.port, path)
except:
return "https://%s:%d%s" % (destination, 8448, path)
def request_json(method, origin_name, origin_key, destination, path, content):
if method is None:
if content is None:
@ -152,13 +149,19 @@ def request_json(method, origin_name, origin_key, destination, path, content):
authorization_headers.append(bytes(header))
print ("Authorization: %s" % header, file=sys.stderr)
dest = lookup(destination, path)
dest = "matrix://%s%s" % (destination, path)
print ("Requesting %s" % dest, file=sys.stderr)
result = requests.request(
s = requests.Session()
s.mount("matrix://", MatrixConnectionAdapter())
result = s.request(
method=method,
url=dest,
headers={"Authorization": authorization_headers[0]},
headers={
"Host": destination,
"Authorization": authorization_headers[0]
},
verify=False,
data=content,
)
@ -242,5 +245,39 @@ def read_args_from_config(args):
args.signing_key_path = config['signing_key_path']
class MatrixConnectionAdapter(HTTPAdapter):
@staticmethod
def lookup(s):
if s[-1] == ']':
# ipv6 literal (with no port)
return s, 8448
if ":" in s:
out = s.rsplit(":",1)
try:
port = int(out[1])
except ValueError:
raise ValueError("Invalid host:port '%s'" % s)
return out[0], port
try:
srv = srvlookup.lookup("matrix", "tcp", s)[0]
return srv.host, srv.port
except:
return s, 8448
def get_connection(self, url, proxies=None):
parsed = urlparse(url)
(host, port) = self.lookup(parsed.netloc)
netloc = "%s:%d" % (host, port)
print("Connecting to %s" % (netloc,), file=sys.stderr)
url = urlunparse((
"https", netloc, parsed.path, parsed.params, parsed.query,
parsed.fragment,
))
return super(MatrixConnectionAdapter, self).get_connection(url, proxies)
if __name__ == "__main__":
main()

View file

@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd
# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -16,4 +17,4 @@
""" This is a reference implementation of a Matrix home server.
"""
__version__ = "0.31.1"
__version__ = "0.31.2"

View file

@ -488,7 +488,7 @@ class Auth(object):
def _look_up_user_by_access_token(self, token):
ret = yield self.store.get_user_by_access_token(token)
if not ret:
logger.warn("Unrecognised access token - not in store: %s" % (token,))
logger.warn("Unrecognised access token - not in store.")
raise AuthError(
self.TOKEN_NOT_FOUND_HTTP_STATUS, "Unrecognised access token.",
errcode=Codes.UNKNOWN_TOKEN
@ -511,7 +511,7 @@ class Auth(object):
)
service = self.store.get_app_service_by_token(token)
if not service:
logger.warn("Unrecognised appservice access token: %s" % (token,))
logger.warn("Unrecognised appservice access token.")
raise AuthError(
self.TOKEN_NOT_FOUND_HTTP_STATUS,
"Unrecognised access token.",
@ -655,7 +655,7 @@ class Auth(object):
auth_events[(EventTypes.PowerLevels, "")] = power_level_event
send_level = event_auth.get_send_level(
EventTypes.Aliases, "", auth_events
EventTypes.Aliases, "", power_level_event,
)
user_level = event_auth.get_user_power_level(user_id, auth_events)

View file

@ -17,7 +17,8 @@
import logging
import simplejson as json
from canonicaljson import json
from six import iteritems
from six.moves import http_client

View file

@ -17,7 +17,8 @@ from synapse.storage.presence import UserPresenceState
from synapse.types import UserID, RoomID
from twisted.internet import defer
import simplejson as json
from canonicaljson import json
import jsonschema
from jsonschema import FormatChecker

View file

@ -23,6 +23,7 @@ from synapse.config._base import ConfigError
from synapse.config.homeserver import HomeServerConfig
from synapse.config.logger import setup_logging
from synapse.http.site import SynapseSite
from synapse.metrics import RegistryProxy
from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
from synapse.replication.slave.storage.directory import DirectoryStore
@ -62,7 +63,7 @@ class AppserviceServer(HomeServer):
for res in listener_config["resources"]:
for name in res["names"]:
if name == "metrics":
resources[METRICS_PREFIX] = MetricsResource(self)
resources[METRICS_PREFIX] = MetricsResource(RegistryProxy)
root_resource = create_resource_tree(resources, NoResource())
@ -97,7 +98,7 @@ class AppserviceServer(HomeServer):
elif listener["type"] == "metrics":
if not self.get_config().enable_metrics:
logger.warn(("Metrics listener configured, but "
"collect_metrics is not enabled!"))
"enable_metrics is not True!"))
else:
_base.listen_metrics(listener["bind_addresses"],
listener["port"])

View file

@ -122,7 +122,7 @@ class ClientReaderServer(HomeServer):
elif listener["type"] == "metrics":
if not self.get_config().enable_metrics:
logger.warn(("Metrics listener configured, but "
"collect_metrics is not enabled!"))
"enable_metrics is not True!"))
else:
_base.listen_metrics(listener["bind_addresses"],
listener["port"])

View file

@ -138,7 +138,7 @@ class EventCreatorServer(HomeServer):
elif listener["type"] == "metrics":
if not self.get_config().enable_metrics:
logger.warn(("Metrics listener configured, but "
"collect_metrics is not enabled!"))
"enable_metrics is not True!"))
else:
_base.listen_metrics(listener["bind_addresses"],
listener["port"])

View file

@ -111,7 +111,7 @@ class FederationReaderServer(HomeServer):
elif listener["type"] == "metrics":
if not self.get_config().enable_metrics:
logger.warn(("Metrics listener configured, but "
"collect_metrics is not enabled!"))
"enable_metrics is not True!"))
else:
_base.listen_metrics(listener["bind_addresses"],
listener["port"])

View file

@ -125,7 +125,7 @@ class FederationSenderServer(HomeServer):
elif listener["type"] == "metrics":
if not self.get_config().enable_metrics:
logger.warn(("Metrics listener configured, but "
"collect_metrics is not enabled!"))
"enable_metrics is not True!"))
else:
_base.listen_metrics(listener["bind_addresses"],
listener["port"])

View file

@ -176,7 +176,7 @@ class FrontendProxyServer(HomeServer):
elif listener["type"] == "metrics":
if not self.get_config().enable_metrics:
logger.warn(("Metrics listener configured, but "
"collect_metrics is not enabled!"))
"enable_metrics is not True!"))
else:
_base.listen_metrics(listener["bind_addresses"],
listener["port"])

View file

@ -266,7 +266,7 @@ class SynapseHomeServer(HomeServer):
elif listener["type"] == "metrics":
if not self.get_config().enable_metrics:
logger.warn(("Metrics listener configured, but "
"collect_metrics is not enabled!"))
"enable_metrics is not True!"))
else:
_base.listen_metrics(listener["bind_addresses"],
listener["port"])
@ -318,11 +318,6 @@ def setup(config_options):
# check any extra requirements we have now we have a config
check_requirements(config)
version_string = "Synapse/" + get_version_string(synapse)
logger.info("Server hostname: %s", config.server_name)
logger.info("Server version: %s", version_string)
events.USE_FROZEN_DICTS = config.use_frozen_dicts
tls_server_context_factory = context_factory.ServerContextFactory(config)
@ -335,7 +330,7 @@ def setup(config_options):
db_config=config.database_config,
tls_server_context_factory=tls_server_context_factory,
config=config,
version_string=version_string,
version_string="Synapse/" + get_version_string(synapse),
database_engine=database_engine,
)

View file

@ -118,7 +118,7 @@ class MediaRepositoryServer(HomeServer):
elif listener["type"] == "metrics":
if not self.get_config().enable_metrics:
logger.warn(("Metrics listener configured, but "
"collect_metrics is not enabled!"))
"enable_metrics is not True!"))
else:
_base.listen_metrics(listener["bind_addresses"],
listener["port"])

View file

@ -128,7 +128,7 @@ class PusherServer(HomeServer):
elif listener["type"] == "metrics":
if not self.get_config().enable_metrics:
logger.warn(("Metrics listener configured, but "
"collect_metrics is not enabled!"))
"enable_metrics is not True!"))
else:
_base.listen_metrics(listener["bind_addresses"],
listener["port"])

View file

@ -305,7 +305,7 @@ class SynchrotronServer(HomeServer):
elif listener["type"] == "metrics":
if not self.get_config().enable_metrics:
logger.warn(("Metrics listener configured, but "
"collect_metrics is not enabled!"))
"enable_metrics is not True!"))
else:
_base.listen_metrics(listener["bind_addresses"],
listener["port"])

View file

@ -150,7 +150,7 @@ class UserDirectoryServer(HomeServer):
elif listener["type"] == "metrics":
if not self.get_config().enable_metrics:
logger.warn(("Metrics listener configured, but "
"collect_metrics is not enabled!"))
"enable_metrics is not True!"))
else:
_base.listen_metrics(listener["bind_addresses"],
listener["port"])

View file

@ -12,17 +12,20 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ._base import Config
from synapse.util.logcontext import LoggingContextFilter
from twisted.logger import globalLogBeginner, STDLibLogObserver
import logging
import logging.config
import yaml
from string import Template
import os
import signal
from string import Template
import sys
from twisted.logger import STDLibLogObserver, globalLogBeginner
import yaml
import synapse
from synapse.util.logcontext import LoggingContextFilter
from synapse.util.versionstring import get_version_string
from ._base import Config
DEFAULT_LOG_CONFIG = Template("""
version: 1
@ -202,6 +205,15 @@ def setup_logging(config, use_worker_options=False):
if getattr(signal, "SIGHUP"):
signal.signal(signal.SIGHUP, sighup)
# make sure that the first thing we log is a thing we can grep backwards
# for
logging.warn("***** STARTING SERVER *****")
logging.warn(
"Server %s version %s",
sys.argv[0], get_version_string(synapse),
)
logging.info("Server hostname: %s", config.server_name)
# It's critical to point twisted's internal logging somewhere, otherwise it
# stacks up and leaks kup to 64K object;
# see: https://twistedmatrix.com/trac/ticket/8164

View file

@ -18,7 +18,7 @@ from twisted.web.http import HTTPClient
from twisted.internet.protocol import Factory
from twisted.internet import defer, reactor
from synapse.http.endpoint import matrix_federation_endpoint
import simplejson as json
from canonicaljson import json
import logging

View file

@ -34,9 +34,11 @@ def check(event, auth_events, do_sig_check=True, do_size_check=True):
event: the event being checked.
auth_events (dict: event-key -> event): the existing room state.
Raises:
AuthError if the checks fail
Returns:
True if the auth checks pass.
if the auth checks pass.
"""
if do_size_check:
_check_size_limits(event)
@ -71,7 +73,7 @@ def check(event, auth_events, do_sig_check=True, do_size_check=True):
# Oh, we don't know what the state of the room was, so we
# are trusting that this is allowed (at least for now)
logger.warn("Trusting event: %s", event.event_id)
return True
return
if event.type == EventTypes.Create:
room_id_domain = get_domain_from_id(event.room_id)
@ -81,7 +83,8 @@ def check(event, auth_events, do_sig_check=True, do_size_check=True):
"Creation event's room_id domain does not match sender's"
)
# FIXME
return True
logger.debug("Allowing! %s", event)
return
creation_event = auth_events.get((EventTypes.Create, ""), None)
@ -118,7 +121,8 @@ def check(event, auth_events, do_sig_check=True, do_size_check=True):
403,
"Alias event's state_key does not match sender's domain"
)
return True
logger.debug("Allowing! %s", event)
return
if logger.isEnabledFor(logging.DEBUG):
logger.debug(
@ -127,14 +131,9 @@ def check(event, auth_events, do_sig_check=True, do_size_check=True):
)
if event.type == EventTypes.Member:
allowed = _is_membership_change_allowed(
event, auth_events
)
if allowed:
logger.debug("Allowing! %s", event)
else:
logger.debug("Denying! %s", event)
return allowed
_is_membership_change_allowed(event, auth_events)
logger.debug("Allowing! %s", event)
return
_check_event_sender_in_room(event, auth_events)
@ -153,7 +152,8 @@ def check(event, auth_events, do_sig_check=True, do_size_check=True):
)
)
else:
return True
logger.debug("Allowing! %s", event)
return
_can_send_event(event, auth_events)
@ -200,7 +200,7 @@ def _is_membership_change_allowed(event, auth_events):
create = auth_events.get(key)
if create and event.prev_events[0][0] == create.event_id:
if create.content["creator"] == event.state_key:
return True
return
target_user_id = event.state_key
@ -265,13 +265,13 @@ def _is_membership_change_allowed(event, auth_events):
raise AuthError(
403, "%s is banned from the room" % (target_user_id,)
)
return True
return
if Membership.JOIN != membership:
if (caller_invited
and Membership.LEAVE == membership
and target_user_id == event.user_id):
return True
return
if not caller_in_room: # caller isn't joined
raise AuthError(
@ -334,8 +334,6 @@ def _is_membership_change_allowed(event, auth_events):
else:
raise AuthError(500, "Unknown membership %s" % membership)
return True
def _check_event_sender_in_room(event, auth_events):
key = (EventTypes.Member, event.user_id, )
@ -355,35 +353,46 @@ def _check_joined_room(member, user_id, room_id):
))
def get_send_level(etype, state_key, auth_events):
key = (EventTypes.PowerLevels, "", )
send_level_event = auth_events.get(key)
send_level = None
if send_level_event:
send_level = send_level_event.content.get("events", {}).get(
etype
)
if send_level is None:
if state_key is not None:
send_level = send_level_event.content.get(
"state_default", 50
)
else:
send_level = send_level_event.content.get(
"events_default", 0
)
def get_send_level(etype, state_key, power_levels_event):
"""Get the power level required to send an event of a given type
if send_level:
send_level = int(send_level)
The federation spec [1] refers to this as "Required Power Level".
https://matrix.org/docs/spec/server_server/unstable.html#definitions
Args:
etype (str): type of event
state_key (str|None): state_key of state event, or None if it is not
a state event.
power_levels_event (synapse.events.EventBase|None): power levels event
in force at this point in the room
Returns:
int: power level required to send this event.
"""
if power_levels_event:
power_levels_content = power_levels_event.content
else:
send_level = 0
power_levels_content = {}
return send_level
# see if we have a custom level for this event type
send_level = power_levels_content.get("events", {}).get(etype)
# otherwise, fall back to the state_default/events_default.
if send_level is None:
if state_key is not None:
send_level = power_levels_content.get("state_default", 50)
else:
send_level = power_levels_content.get("events_default", 0)
return int(send_level)
def _can_send_event(event, auth_events):
power_levels_event = _get_power_level_event(auth_events)
send_level = get_send_level(
event.type, event.get("state_key", None), auth_events
event.type, event.get("state_key"), power_levels_event,
)
user_level = get_user_power_level(event.user_id, auth_events)
@ -524,13 +533,22 @@ def _check_power_levels(event, auth_events):
def _get_power_level_event(auth_events):
key = (EventTypes.PowerLevels, "", )
return auth_events.get(key)
return auth_events.get((EventTypes.PowerLevels, ""))
def get_user_power_level(user_id, auth_events):
power_level_event = _get_power_level_event(auth_events)
"""Get a user's power level
Args:
user_id (str): user's id to look up in power_levels
auth_events (dict[(str, str), synapse.events.EventBase]):
state in force at this point in the room (or rather, a subset of
it including at least the create event and power levels event.
Returns:
int: the user's power level in this room.
"""
power_level_event = _get_power_level_event(auth_events)
if power_level_event:
level = power_level_event.content.get("users", {}).get(user_id)
if not level:
@ -541,6 +559,11 @@ def get_user_power_level(user_id, auth_events):
else:
return int(level)
else:
# if there is no power levels event, the creator gets 100 and everyone
# else gets 0.
# some things which call this don't pass the create event: hack around
# that.
key = (EventTypes.Create, "", )
create_event = auth_events.get(key)
if (create_event is not None and

View file

@ -15,7 +15,7 @@
# limitations under the License.
import logging
import simplejson as json
from canonicaljson import json
from twisted.internet import defer
from synapse.api.errors import AuthError, FederationError, SynapseError, NotFoundError

View file

@ -21,7 +21,6 @@ from .units import Transaction, Edu
from synapse.api.errors import HttpResponseException, FederationDeniedError
from synapse.util import logcontext, PreserveLoggingContext
from synapse.util.async import run_on_reactor
from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter
from synapse.util.metrics import measure_func
from synapse.handlers.presence import format_user_presence_state, get_interested_remotes
@ -42,8 +41,11 @@ import logging
logger = logging.getLogger(__name__)
sent_pdus_destination_dist = Counter(
"synapse_federation_transaction_queue_sent_pdu_destinations", ""
sent_pdus_destination_dist_count = Counter(
"synapse_federation_client_sent_pdu_destinations:count", ""
)
sent_pdus_destination_dist_total = Counter(
"synapse_federation_client_sent_pdu_destinations:total", ""
)
@ -280,7 +282,8 @@ class TransactionQueue(object):
if not destinations:
return
sent_pdus_destination_dist.inc(len(destinations))
sent_pdus_destination_dist_total.inc(len(destinations))
sent_pdus_destination_dist_count.inc()
for destination in destinations:
self.pending_pdus_by_dest.setdefault(destination, []).append(
@ -451,9 +454,6 @@ class TransactionQueue(object):
# hence why we throw the result away.
yield get_retry_limiter(destination, self.clock, self.store)
# XXX: what's this for?
yield run_on_reactor()
pending_pdus = []
while True:
device_message_edus, device_stream_id, dev_list_id = (

View file

@ -13,8 +13,11 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from twisted.internet import defer, threads
from canonicaljson import json
from ._base import BaseHandler
from synapse.api.constants import LoginType
from synapse.api.errors import (
@ -23,7 +26,6 @@ from synapse.api.errors import (
)
from synapse.module_api import ModuleApi
from synapse.types import UserID
from synapse.util.async import run_on_reactor
from synapse.util.caches.expiringcache import ExpiringCache
from synapse.util.logcontext import make_deferred_yieldable
@ -32,7 +34,7 @@ from twisted.web.client import PartialDownloadError
import logging
import bcrypt
import pymacaroons
import simplejson
import attr
import synapse.util.stringutils as stringutils
@ -402,7 +404,7 @@ class AuthHandler(BaseHandler):
except PartialDownloadError as pde:
# Twisted is silly
data = pde.response
resp_body = simplejson.loads(data)
resp_body = json.loads(data)
if 'success' in resp_body:
# Note that we do NOT check the hostname here: we explicitly
@ -423,15 +425,11 @@ class AuthHandler(BaseHandler):
def _check_msisdn(self, authdict, _):
return self._check_threepid('msisdn', authdict)
@defer.inlineCallbacks
def _check_dummy_auth(self, authdict, _):
yield run_on_reactor()
defer.returnValue(True)
return defer.succeed(True)
@defer.inlineCallbacks
def _check_threepid(self, medium, authdict):
yield run_on_reactor()
if 'threepid_creds' not in authdict:
raise LoginError(400, "Missing threepid_creds", Codes.MISSING_PARAM)
@ -858,7 +856,11 @@ class AuthHandler(BaseHandler):
return bcrypt.hashpw(password.encode('utf8') + self.hs.config.password_pepper,
bcrypt.gensalt(self.bcrypt_rounds))
return make_deferred_yieldable(threads.deferToThread(_do_hash))
return make_deferred_yieldable(
threads.deferToThreadPool(
self.hs.get_reactor(), self.hs.get_reactor().getThreadPool(), _do_hash
),
)
def validate_hash(self, password, stored_hash):
"""Validates that self.hash(password) == stored_hash.
@ -878,16 +880,21 @@ class AuthHandler(BaseHandler):
)
if stored_hash:
return make_deferred_yieldable(threads.deferToThread(_do_validate_hash))
return make_deferred_yieldable(
threads.deferToThreadPool(
self.hs.get_reactor(),
self.hs.get_reactor().getThreadPool(),
_do_validate_hash,
),
)
else:
return defer.succeed(False)
class MacaroonGeneartor(object):
def __init__(self, hs):
self.clock = hs.get_clock()
self.server_name = hs.config.server_name
self.macaroon_secret_key = hs.config.macaroon_secret_key
@attr.s
class MacaroonGenerator(object):
hs = attr.ib()
def generate_access_token(self, user_id, extra_caveats=None):
extra_caveats = extra_caveats or []
@ -905,7 +912,7 @@ class MacaroonGeneartor(object):
def generate_short_term_login_token(self, user_id, duration_in_ms=(2 * 60 * 1000)):
macaroon = self._generate_base_macaroon(user_id)
macaroon.add_first_party_caveat("type = login")
now = self.clock.time_msec()
now = self.hs.get_clock().time_msec()
expiry = now + duration_in_ms
macaroon.add_first_party_caveat("time < %d" % (expiry,))
return macaroon.serialize()
@ -917,9 +924,9 @@ class MacaroonGeneartor(object):
def _generate_base_macaroon(self, user_id):
macaroon = pymacaroons.Macaroon(
location=self.server_name,
location=self.hs.config.server_name,
identifier="key",
key=self.macaroon_secret_key)
key=self.hs.config.macaroon_secret_key)
macaroon.add_first_party_caveat("gen = 1")
macaroon.add_first_party_caveat("user_id = %s" % (user_id,))
return macaroon

View file

@ -12,7 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from twisted.internet import defer, reactor
from twisted.internet import defer
from ._base import BaseHandler
from synapse.types import UserID, create_requester
@ -39,7 +39,7 @@ class DeactivateAccountHandler(BaseHandler):
# Start the user parter loop so it can resume parting users from rooms where
# it left off (if it has work left to do).
reactor.callWhenRunning(self._start_user_parting)
hs.get_reactor().callWhenRunning(self._start_user_parting)
@defer.inlineCallbacks
def deactivate_account(self, user_id, erase_data):
@ -47,6 +47,7 @@ class DeactivateAccountHandler(BaseHandler):
Args:
user_id (str): ID of user to be deactivated
erase_data (bool): whether to GDPR-erase the user's data
Returns:
Deferred

View file

@ -14,10 +14,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import simplejson as json
import logging
from canonicaljson import encode_canonical_json
from canonicaljson import encode_canonical_json, json
from twisted.internet import defer
from six import iteritems
@ -80,7 +79,7 @@ class E2eKeysHandler(object):
else:
remote_queries[user_id] = device_ids
# Firt get local devices.
# First get local devices.
failures = {}
results = {}
if local_query:
@ -357,7 +356,7 @@ def _exception_to_failure(e):
# include ConnectionRefused and other errors
#
# Note that some Exceptions (notably twisted's ResponseFailed etc) don't
# give a string for e.message, which simplejson then fails to serialize.
# give a string for e.message, which json then fails to serialize.
return {
"status": 503, "message": str(e.message),
}

View file

@ -39,7 +39,7 @@ from synapse.events.validator import EventValidator
from synapse.util import unwrapFirstError, logcontext
from synapse.util.metrics import measure_func
from synapse.util.logutils import log_function
from synapse.util.async import run_on_reactor, Linearizer
from synapse.util.async import Linearizer
from synapse.util.frozenutils import unfreeze
from synapse.crypto.event_signing import (
compute_event_signature, add_hashes_and_signatures,
@ -460,6 +460,47 @@ class FederationHandler(BaseHandler):
@measure_func("_filter_events_for_server")
@defer.inlineCallbacks
def _filter_events_for_server(self, server_name, room_id, events):
"""Filter the given events for the given server, redacting those the
server can't see.
Assumes the server is currently in the room.
Returns
list[FrozenEvent]
"""
# First lets check to see if all the events have a history visibility
# of "shared" or "world_readable". If thats the case then we don't
# need to check membership (as we know the server is in the room).
event_to_state_ids = yield self.store.get_state_ids_for_events(
frozenset(e.event_id for e in events),
types=(
(EventTypes.RoomHistoryVisibility, ""),
)
)
visibility_ids = set()
for sids in event_to_state_ids.itervalues():
hist = sids.get((EventTypes.RoomHistoryVisibility, ""))
if hist:
visibility_ids.add(hist)
# If we failed to find any history visibility events then the default
# is "shared" visiblity.
if not visibility_ids:
defer.returnValue(events)
event_map = yield self.store.get_events(visibility_ids)
all_open = all(
e.content.get("history_visibility") in (None, "shared", "world_readable")
for e in event_map.itervalues()
)
if all_open:
defer.returnValue(events)
# Ok, so we're dealing with events that have non-trivial visibility
# rules, so we need to also get the memberships of the room.
event_to_state_ids = yield self.store.get_state_ids_for_events(
frozenset(e.event_id for e in events),
types=(
@ -1396,8 +1437,6 @@ class FederationHandler(BaseHandler):
def get_state_for_pdu(self, room_id, event_id):
"""Returns the state at the event. i.e. not including said event.
"""
yield run_on_reactor()
state_groups = yield self.store.get_state_groups(
room_id, [event_id]
)
@ -1440,8 +1479,6 @@ class FederationHandler(BaseHandler):
def get_state_ids_for_pdu(self, room_id, event_id):
"""Returns the state at the event. i.e. not including said event.
"""
yield run_on_reactor()
state_groups = yield self.store.get_state_groups_ids(
room_id, [event_id]
)

View file

@ -19,7 +19,7 @@
import logging
import simplejson as json
from canonicaljson import json
from twisted.internet import defer
@ -27,7 +27,6 @@ from synapse.api.errors import (
MatrixCodeMessageException, CodeMessageException
)
from ._base import BaseHandler
from synapse.util.async import run_on_reactor
from synapse.api.errors import SynapseError, Codes
logger = logging.getLogger(__name__)
@ -62,8 +61,6 @@ class IdentityHandler(BaseHandler):
@defer.inlineCallbacks
def threepid_from_creds(self, creds):
yield run_on_reactor()
if 'id_server' in creds:
id_server = creds['id_server']
elif 'idServer' in creds:
@ -106,7 +103,6 @@ class IdentityHandler(BaseHandler):
@defer.inlineCallbacks
def bind_threepid(self, creds, mxid):
yield run_on_reactor()
logger.debug("binding threepid %r to %s", creds, mxid)
data = None
@ -188,8 +184,6 @@ class IdentityHandler(BaseHandler):
@defer.inlineCallbacks
def requestEmailToken(self, id_server, email, client_secret, send_attempt, **kwargs):
yield run_on_reactor()
if not self._should_trust_id_server(id_server):
raise SynapseError(
400, "Untrusted ID server '%s'" % id_server,
@ -224,8 +218,6 @@ class IdentityHandler(BaseHandler):
self, id_server, country, phone_number,
client_secret, send_attempt, **kwargs
):
yield run_on_reactor()
if not self._should_trust_id_server(id_server):
raise SynapseError(
400, "Untrusted ID server '%s'" % id_server,

View file

@ -14,13 +14,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import simplejson
import sys
from canonicaljson import encode_canonical_json
from canonicaljson import encode_canonical_json, json
import six
from six import string_types, itervalues, iteritems
from twisted.internet import defer, reactor
from twisted.internet import defer
from twisted.internet.defer import succeed
from twisted.python.failure import Failure
@ -36,7 +35,7 @@ from synapse.events.validator import EventValidator
from synapse.types import (
UserID, RoomAlias, RoomStreamToken,
)
from synapse.util.async import run_on_reactor, ReadWriteLock, Limiter
from synapse.util.async import ReadWriteLock, Limiter
from synapse.util.logcontext import run_in_background
from synapse.util.metrics import measure_func
from synapse.util.frozenutils import frozendict_json_encoder
@ -157,7 +156,7 @@ class MessageHandler(BaseHandler):
# remove the purge from the list 24 hours after it completes
def clear_purge():
del self._purges_by_id[purge_id]
reactor.callLater(24 * 3600, clear_purge)
self.hs.get_reactor().callLater(24 * 3600, clear_purge)
def get_purge_status(self, purge_id):
"""Get the current status of an active purge
@ -491,7 +490,7 @@ class EventCreationHandler(object):
target, e
)
is_exempt = yield self._is_exempt_from_privacy_policy(builder)
is_exempt = yield self._is_exempt_from_privacy_policy(builder, requester)
if not is_exempt:
yield self.assert_accepted_privacy_policy(requester)
@ -509,12 +508,13 @@ class EventCreationHandler(object):
defer.returnValue((event, context))
def _is_exempt_from_privacy_policy(self, builder):
def _is_exempt_from_privacy_policy(self, builder, requester):
""""Determine if an event to be sent is exempt from having to consent
to the privacy policy
Args:
builder (synapse.events.builder.EventBuilder): event being created
requester (Requster): user requesting this event
Returns:
Deferred[bool]: true if the event can be sent without the user
@ -525,6 +525,9 @@ class EventCreationHandler(object):
membership = builder.content.get("membership", None)
if membership == Membership.JOIN:
return self._is_server_notices_room(builder.room_id)
elif membership == Membership.LEAVE:
# the user is always allowed to leave (but not kick people)
return builder.state_key == requester.user.to_string()
return succeed(False)
@defer.inlineCallbacks
@ -793,7 +796,7 @@ class EventCreationHandler(object):
# Ensure that we can round trip before trying to persist in db
try:
dump = frozendict_json_encoder.encode(event.content)
simplejson.loads(dump)
json.loads(dump)
except Exception:
logger.exception("Failed to encode content: %r", event.content)
raise
@ -806,6 +809,7 @@ class EventCreationHandler(object):
# If we're a worker we need to hit out to the master.
if self.config.worker_app:
yield send_event_to_master(
self.hs.get_clock(),
self.http_client,
host=self.config.worker_replication_host,
port=self.config.worker_replication_http_port,
@ -959,9 +963,7 @@ class EventCreationHandler(object):
event_stream_id, max_stream_id
)
@defer.inlineCallbacks
def _notify():
yield run_on_reactor()
try:
self.notifier.on_new_room_event(
event, event_stream_id, max_stream_id,

View file

@ -22,7 +22,7 @@ The methods that define policy are:
- should_notify
"""
from twisted.internet import defer, reactor
from twisted.internet import defer
from contextlib import contextmanager
from six import itervalues, iteritems
@ -179,7 +179,7 @@ class PresenceHandler(object):
# have not yet been persisted
self.unpersisted_users_changes = set()
reactor.addSystemEventTrigger("before", "shutdown", self._on_shutdown)
hs.get_reactor().addSystemEventTrigger("before", "shutdown", self._on_shutdown)
self.serial_to_user = {}
self._next_serial = 1

View file

@ -24,7 +24,7 @@ from synapse.api.errors import (
from synapse.http.client import CaptchaServerHttpClient
from synapse import types
from synapse.types import UserID, create_requester, RoomID, RoomAlias
from synapse.util.async import run_on_reactor, Linearizer
from synapse.util.async import Linearizer
from synapse.util.threepids import check_3pid_allowed
from ._base import BaseHandler
@ -139,7 +139,6 @@ class RegistrationHandler(BaseHandler):
Raises:
RegistrationError if there was a problem registering.
"""
yield run_on_reactor()
password_hash = None
if password:
password_hash = yield self.auth_handler().hash(password)
@ -431,8 +430,6 @@ class RegistrationHandler(BaseHandler):
Raises:
RegistrationError if there was a problem registering.
"""
yield run_on_reactor()
if localpart is None:
raise SynapseError(400, "Request must include user id")

View file

@ -23,7 +23,7 @@ from synapse.types import UserID, RoomAlias, RoomID, RoomStreamToken
from synapse.api.constants import (
EventTypes, JoinRules, RoomCreationPreset
)
from synapse.api.errors import AuthError, StoreError, SynapseError
from synapse.api.errors import AuthError, Codes, StoreError, SynapseError
from synapse.util import stringutils
from synapse.visibility import filter_events_for_client
@ -115,7 +115,11 @@ class RoomCreationHandler(BaseHandler):
)
if mapping:
raise SynapseError(400, "Room alias already taken")
raise SynapseError(
400,
"Room alias already taken",
Codes.ROOM_IN_USE
)
else:
room_alias = None

View file

@ -64,6 +64,13 @@ class SearchHandler(BaseHandler):
except Exception:
raise SynapseError(400, "Invalid batch")
logger.info(
"Search batch properties: %r, %r, %r",
batch_group, batch_group_key, batch_token,
)
logger.info("Search content: %s", content)
try:
room_cat = content["search_categories"]["room_events"]
@ -271,6 +278,8 @@ class SearchHandler(BaseHandler):
# We should never get here due to the guard earlier.
raise NotImplementedError()
logger.info("Found %d events to return", len(allowed_events))
# If client has asked for "context" for each event (i.e. some surrounding
# events and state), fetch that
if event_context is not None:
@ -282,6 +291,11 @@ class SearchHandler(BaseHandler):
event.room_id, event.event_id, before_limit, after_limit
)
logger.info(
"Context for search returned %d and %d events",
len(res["events_before"]), len(res["events_after"]),
)
res["events_before"] = yield filter_events_for_client(
self.store, user.to_string(), res["events_before"]
)

View file

@ -145,7 +145,7 @@ class SyncResult(collections.namedtuple("SyncResult", [
"invited", # InvitedSyncResult for each invited room.
"archived", # ArchivedSyncResult for each archived room.
"to_device", # List of direct messages for the device.
"device_lists", # List of user_ids whose devices have chanegd
"device_lists", # List of user_ids whose devices have changed
"device_one_time_keys_count", # Dict of algorithm to count for one time keys
# for this device
"groups",

View file

@ -19,7 +19,6 @@ from twisted.internet import defer
from synapse.api.constants import EventTypes, JoinRules, Membership
from synapse.storage.roommember import ProfileInfo
from synapse.util.metrics import Measure
from synapse.util.async import sleep
from synapse.types import get_localpart_from_id
from six import iteritems
@ -174,7 +173,7 @@ class UserDirectoryHandler(object):
logger.info("Handling room %d/%d", num_processed_rooms + 1, len(room_ids))
yield self._handle_initial_room(room_id)
num_processed_rooms += 1
yield sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.)
yield self.clock.sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.)
logger.info("Processed all rooms.")
@ -188,7 +187,7 @@ class UserDirectoryHandler(object):
logger.info("Handling user %d/%d", num_processed_users + 1, len(user_ids))
yield self._handle_local_user(user_id)
num_processed_users += 1
yield sleep(self.INITIAL_USER_SLEEP_MS / 1000.)
yield self.clock.sleep(self.INITIAL_USER_SLEEP_MS / 1000.)
logger.info("Processed all users")
@ -236,7 +235,7 @@ class UserDirectoryHandler(object):
count = 0
for user_id in user_ids:
if count % self.INITIAL_ROOM_SLEEP_COUNT == 0:
yield sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.)
yield self.clock.sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.)
if not self.is_mine_id(user_id):
count += 1
@ -251,7 +250,7 @@ class UserDirectoryHandler(object):
continue
if count % self.INITIAL_ROOM_SLEEP_COUNT == 0:
yield sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.)
yield self.clock.sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.)
count += 1
user_set = (user_id, other_user_id)

View file

@ -42,7 +42,7 @@ from twisted.web._newclient import ResponseDone
from six import StringIO
from prometheus_client import Counter
import simplejson as json
from canonicaljson import json
import logging
import urllib
@ -98,8 +98,8 @@ class SimpleHttpClient(object):
method, uri, *args, **kwargs
)
add_timeout_to_deferred(
request_deferred,
60, cancelled_to_request_timed_out_error,
request_deferred, 60, self.hs.get_reactor(),
cancelled_to_request_timed_out_error,
)
response = yield make_deferred_yieldable(request_deferred)
@ -115,7 +115,7 @@ class SimpleHttpClient(object):
"Error sending request to %s %s: %s %s",
method, redact_uri(uri), type(e).__name__, e.message
)
raise e
raise
@defer.inlineCallbacks
def post_urlencoded_get_json(self, uri, args={}, headers=None):

View file

@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from twisted.internet.endpoints import HostnameEndpoint, wrapClientTLS
from twisted.internet import defer, reactor
from twisted.internet import defer
from twisted.internet.error import ConnectError
from twisted.names import client, dns
from twisted.names.error import DNSNameError, DomainError
@ -74,21 +74,22 @@ def matrix_federation_endpoint(reactor, destination, ssl_context_factory=None,
reactor, "matrix", domain, protocol="tcp",
default_port=default_port, endpoint=transport_endpoint,
endpoint_kw_args=endpoint_kw_args
))
), reactor)
else:
return _WrappingEndpointFac(transport_endpoint(
reactor, domain, port, **endpoint_kw_args
))
), reactor)
class _WrappingEndpointFac(object):
def __init__(self, endpoint_fac):
def __init__(self, endpoint_fac, reactor):
self.endpoint_fac = endpoint_fac
self.reactor = reactor
@defer.inlineCallbacks
def connect(self, protocolFactory):
conn = yield self.endpoint_fac.connect(protocolFactory)
conn = _WrappedConnection(conn)
conn = _WrappedConnection(conn, self.reactor)
defer.returnValue(conn)
@ -98,9 +99,10 @@ class _WrappedConnection(object):
"""
__slots__ = ["conn", "last_request"]
def __init__(self, conn):
def __init__(self, conn, reactor):
object.__setattr__(self, "conn", conn)
object.__setattr__(self, "last_request", time.time())
self._reactor = reactor
def __getattr__(self, name):
return getattr(self.conn, name)
@ -131,14 +133,14 @@ class _WrappedConnection(object):
# Time this connection out if we haven't send a request in the last
# N minutes
# TODO: Cancel the previous callLater?
reactor.callLater(3 * 60, self._time_things_out_maybe)
self._reactor.callLater(3 * 60, self._time_things_out_maybe)
d = self.conn.request(request)
def update_request_time(res):
self.last_request = time.time()
# TODO: Cancel the previous callLater?
reactor.callLater(3 * 60, self._time_things_out_maybe)
self._reactor.callLater(3 * 60, self._time_things_out_maybe)
return res
d.addCallback(update_request_time)

View file

@ -22,12 +22,12 @@ from twisted.web._newclient import ResponseDone
from synapse.http import cancelled_to_request_timed_out_error
from synapse.http.endpoint import matrix_federation_endpoint
import synapse.metrics
from synapse.util.async import sleep, add_timeout_to_deferred
from synapse.util.async import add_timeout_to_deferred
from synapse.util import logcontext
from synapse.util.logcontext import make_deferred_yieldable
import synapse.util.retryutils
from canonicaljson import encode_canonical_json
from canonicaljson import encode_canonical_json, json
from synapse.api.errors import (
SynapseError, Codes, HttpResponseException, FederationDeniedError,
@ -36,7 +36,6 @@ from synapse.api.errors import (
from signedjson.sign import sign_json
import cgi
import simplejson as json
import logging
import random
import sys
@ -193,6 +192,7 @@ class MatrixFederationHttpClient(object):
add_timeout_to_deferred(
request_deferred,
timeout / 1000. if timeout else 60,
self.hs.get_reactor(),
cancelled_to_request_timed_out_error,
)
response = yield make_deferred_yieldable(
@ -234,7 +234,7 @@ class MatrixFederationHttpClient(object):
delay = min(delay, 2)
delay *= random.uniform(0.8, 1.4)
yield sleep(delay)
yield self.clock.sleep(delay)
retries_left -= 1
else:
raise

View file

@ -117,13 +117,17 @@ def _get_in_flight_counts():
Returns:
dict[tuple[str, str], int]
"""
for rm in _in_flight_requests:
# Cast to a list to prevent it changing while the Prometheus
# thread is collecting metrics
reqs = list(_in_flight_requests)
for rm in reqs:
rm.update_metrics()
# Map from (method, name) -> int, the number of in flight requests of that
# type
counts = {}
for rm in _in_flight_requests:
for rm in reqs:
key = (rm.method, rm.name,)
counts[key] = counts.get(key, 0) + 1
@ -131,7 +135,7 @@ def _get_in_flight_counts():
LaterGauge(
"synapse_http_request_metrics_in_flight_requests_count",
"synapse_http_server_in_flight_requests_count",
"",
["method", "servlet"],
_get_in_flight_counts,

View file

@ -29,7 +29,7 @@ import synapse.metrics
import synapse.events
from canonicaljson import (
encode_canonical_json, encode_pretty_printed_json
encode_canonical_json, encode_pretty_printed_json, json
)
from twisted.internet import defer
@ -41,7 +41,6 @@ from twisted.web.util import redirectTo
import collections
import logging
import urllib
import simplejson
logger = logging.getLogger(__name__)
@ -410,7 +409,7 @@ def respond_with_json(request, code, json_object, send_cors=False,
if canonical_json or synapse.events.USE_FROZEN_DICTS:
json_bytes = encode_canonical_json(json_object)
else:
json_bytes = simplejson.dumps(json_object)
json_bytes = json.dumps(json_object)
return respond_with_json_bytes(
request, code, json_bytes,

View file

@ -18,7 +18,9 @@
from synapse.api.errors import SynapseError, Codes
import logging
import simplejson
from canonicaljson import json
logger = logging.getLogger(__name__)
@ -171,7 +173,7 @@ def parse_json_value_from_request(request, allow_empty_body=False):
return None
try:
content = simplejson.loads(content_bytes)
content = json.loads(content_bytes)
except Exception as e:
logger.warn("Unable to parse JSON: %s", e)
raise SynapseError(400, "Content not JSON.", errcode=Codes.NOT_JSON)

View file

@ -99,16 +99,18 @@ class SynapseRequest(Request):
db_txn_count = context.db_txn_count
db_txn_duration_sec = context.db_txn_duration_sec
db_sched_duration_sec = context.db_sched_duration_sec
evt_db_fetch_count = context.evt_db_fetch_count
except Exception:
ru_utime, ru_stime = (0, 0)
db_txn_count, db_txn_duration_sec = (0, 0)
evt_db_fetch_count = 0
end_time = time.time()
self.site.access_logger.info(
"%s - %s - {%s}"
" Processed request: %.3fsec (%.3fsec, %.3fsec) (%.3fsec/%.3fsec/%d)"
" %sB %s \"%s %s %s\" \"%s\"",
" %sB %s \"%s %s %s\" \"%s\" [%d dbevts]",
self.getClientIP(),
self.site.site_tag,
self.authenticated_entity,
@ -124,6 +126,7 @@ class SynapseRequest(Request):
self.get_redacted_uri(),
self.clientproto,
self.get_user_agent(),
evt_db_fetch_count,
)
try:

View file

@ -62,7 +62,7 @@ class LaterGauge(object):
calls = self.caller()
except Exception:
logger.exception(
"Exception running callback for LaterGuage(%s)",
"Exception running callback for LaterGauge(%s)",
self.name,
)
yield g
@ -140,14 +140,15 @@ gc_time = Histogram(
class GCCounts(object):
def collect(self):
cm = GaugeMetricFamily("python_gc_counts", "GC cycle counts", labels=["gen"])
cm = GaugeMetricFamily("python_gc_counts", "GC object counts", labels=["gen"])
for n, m in enumerate(gc.get_count()):
cm.add_metric([str(n)], m)
yield cm
REGISTRY.register(GCCounts())
if not running_on_pypy:
REGISTRY.register(GCCounts())
#
# Twisted reactor metrics
@ -190,6 +191,22 @@ event_processing_last_ts = Gauge("synapse_event_processing_last_ts", "", ["name"
# finished being processed.
event_processing_lag = Gauge("synapse_event_processing_lag", "", ["name"])
last_ticked = time.time()
class ReactorLastSeenMetric(object):
def collect(self):
cm = GaugeMetricFamily(
"python_twisted_reactor_last_seen",
"Seconds since the Twisted reactor was last seen",
)
cm.add_metric([], time.time() - last_ticked)
yield cm
REGISTRY.register(ReactorLastSeenMetric())
def runUntilCurrentTimer(func):
@ -222,6 +239,11 @@ def runUntilCurrentTimer(func):
tick_time.observe(end - start)
pending_calls_metric.observe(num_pending)
# Update the time we last ticked, for the metric to test whether
# Synapse's reactor has frozen
global last_ticked
last_ticked = end
if running_on_pypy:
return ret

View file

@ -161,6 +161,7 @@ class Notifier(object):
self.user_to_user_stream = {}
self.room_to_user_streams = {}
self.hs = hs
self.event_sources = hs.get_event_sources()
self.store = hs.get_datastore()
self.pending_new_room_events = []
@ -340,6 +341,7 @@ class Notifier(object):
add_timeout_to_deferred(
listener.deferred,
(end_time - now) / 1000.,
self.hs.get_reactor(),
)
with PreserveLoggingContext():
yield listener.deferred
@ -561,6 +563,7 @@ class Notifier(object):
add_timeout_to_deferred(
listener.deferred.addTimeout,
(end_time - now) / 1000.,
self.hs.get_reactor(),
)
try:
with PreserveLoggingContext():

View file

@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from twisted.internet import defer, reactor
from twisted.internet import defer
from twisted.internet.error import AlreadyCalled, AlreadyCancelled
import logging
@ -199,7 +199,7 @@ class EmailPusher(object):
self.timed_call = None
if soonest_due_at is not None:
self.timed_call = reactor.callLater(
self.timed_call = self.hs.get_reactor().callLater(
self.seconds_until(soonest_due_at), self.on_timer
)

View file

@ -15,7 +15,7 @@
# limitations under the License.
import logging
from twisted.internet import defer, reactor
from twisted.internet import defer
from twisted.internet.error import AlreadyCalled, AlreadyCancelled
from . import push_rule_evaluator
@ -220,7 +220,9 @@ class HttpPusher(object):
)
else:
logger.info("Push failed: delaying for %ds", self.backoff_delay)
self.timed_call = reactor.callLater(self.backoff_delay, self.on_timer)
self.timed_call = self.hs.get_reactor().callLater(
self.backoff_delay, self.on_timer
)
self.backoff_delay = min(self.backoff_delay * 2, self.MAX_BACKOFF_SEC)
break

View file

@ -19,7 +19,6 @@ import logging
from twisted.internet import defer
from synapse.push.pusher import PusherFactory
from synapse.util.async import run_on_reactor
from synapse.util.logcontext import make_deferred_yieldable, run_in_background
logger = logging.getLogger(__name__)
@ -125,7 +124,6 @@ class PusherPool:
@defer.inlineCallbacks
def on_new_notifications(self, min_stream_id, max_stream_id):
yield run_on_reactor()
try:
users_affected = yield self.store.get_push_action_users_in_range(
min_stream_id, max_stream_id
@ -151,7 +149,6 @@ class PusherPool:
@defer.inlineCallbacks
def on_new_receipts(self, min_stream_id, max_stream_id, affected_room_ids):
yield run_on_reactor()
try:
# Need to subtract 1 from the minimum because the lower bound here
# is not inclusive

View file

@ -21,7 +21,6 @@ from synapse.api.errors import (
from synapse.events import FrozenEvent
from synapse.events.snapshot import EventContext
from synapse.http.servlet import RestServlet, parse_json_object_from_request
from synapse.util.async import sleep
from synapse.util.caches.response_cache import ResponseCache
from synapse.util.metrics import Measure
from synapse.types import Requester, UserID
@ -33,11 +32,12 @@ logger = logging.getLogger(__name__)
@defer.inlineCallbacks
def send_event_to_master(client, host, port, requester, event, context,
def send_event_to_master(clock, client, host, port, requester, event, context,
ratelimit, extra_users):
"""Send event to be handled on the master
Args:
clock (synapse.util.Clock)
client (SimpleHttpClient)
host (str): host of master
port (int): port on master listening for HTTP replication
@ -77,7 +77,7 @@ def send_event_to_master(client, host, port, requester, event, context,
# If we timed out we probably don't need to worry about backing
# off too much, but lets just wait a little anyway.
yield sleep(1)
yield clock.sleep(1)
except MatrixCodeMessageException as e:
# We convert to SynapseError as we know that it was a SynapseError
# on the master process that we should send to the client. (And

View file

@ -23,6 +23,7 @@ from synapse.storage.roommember import RoomMemberWorkerStore
from synapse.storage.state import StateGroupWorkerStore
from synapse.storage.stream import StreamWorkerStore
from synapse.storage.signatures import SignatureWorkerStore
from synapse.storage.user_erasure_store import UserErasureWorkerStore
from ._base import BaseSlavedStore
from ._slaved_id_tracker import SlavedIdTracker
@ -45,6 +46,7 @@ class SlavedEventStore(EventFederationWorkerStore,
EventsWorkerStore,
StateGroupWorkerStore,
SignatureWorkerStore,
UserErasureWorkerStore,
BaseSlavedStore):
def __init__(self, db_conn, hs):

View file

@ -15,7 +15,7 @@
"""A replication client for use by synapse workers.
"""
from twisted.internet import reactor, defer
from twisted.internet import defer
from twisted.internet.protocol import ReconnectingClientFactory
from .commands import (
@ -44,7 +44,7 @@ class ReplicationClientFactory(ReconnectingClientFactory):
self.server_name = hs.config.server_name
self._clock = hs.get_clock() # As self.clock is defined in super class
reactor.addSystemEventTrigger("before", "shutdown", self.stopTrying)
hs.get_reactor().addSystemEventTrigger("before", "shutdown", self.stopTrying)
def startedConnecting(self, connector):
logger.info("Connecting to replication: %r", connector.getDestination())
@ -95,7 +95,7 @@ class ReplicationClientHandler(object):
factory = ReplicationClientFactory(hs, client_name, self)
host = hs.config.worker_replication_host
port = hs.config.worker_replication_port
reactor.connectTCP(host, port, factory)
hs.get_reactor().connectTCP(host, port, factory)
def on_rdata(self, stream_name, token, rows):
"""Called when we get new replication data. By default this just pokes

View file

@ -19,13 +19,17 @@ allowed to be sent by which side.
"""
import logging
import simplejson
import platform
if platform.python_implementation() == "PyPy":
import json
_json_encoder = json.JSONEncoder()
else:
import simplejson as json
_json_encoder = json.JSONEncoder(namedtuple_as_object=False)
logger = logging.getLogger(__name__)
_json_encoder = simplejson.JSONEncoder(namedtuple_as_object=False)
class Command(object):
"""The base command class.
@ -102,7 +106,7 @@ class RdataCommand(Command):
return cls(
stream_name,
None if token == "batch" else int(token),
simplejson.loads(row_json)
json.loads(row_json)
)
def to_line(self):
@ -300,7 +304,7 @@ class InvalidateCacheCommand(Command):
def from_line(cls, line):
cache_func, keys_json = line.split(" ", 1)
return cls(cache_func, simplejson.loads(keys_json))
return cls(cache_func, json.loads(keys_json))
def to_line(self):
return " ".join((
@ -329,7 +333,7 @@ class UserIpCommand(Command):
def from_line(cls, line):
user_id, jsn = line.split(" ", 1)
access_token, ip, user_agent, device_id, last_seen = simplejson.loads(jsn)
access_token, ip, user_agent, device_id, last_seen = json.loads(jsn)
return cls(
user_id, access_token, ip, user_agent, device_id, last_seen

View file

@ -564,11 +564,13 @@ class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol):
# The following simply registers metrics for the replication connections
pending_commands = LaterGauge(
"pending_commands", "", ["name", "conn_id"],
"synapse_replication_tcp_protocol_pending_commands",
"",
["name", "conn_id"],
lambda: {
(p.name, p.conn_id): len(p.pending_commands)
for p in connected_connections
})
(p.name, p.conn_id): len(p.pending_commands) for p in connected_connections
},
)
def transport_buffer_size(protocol):
@ -579,11 +581,13 @@ def transport_buffer_size(protocol):
transport_send_buffer = LaterGauge(
"synapse_replication_tcp_transport_send_buffer", "", ["name", "conn_id"],
"synapse_replication_tcp_protocol_transport_send_buffer",
"",
["name", "conn_id"],
lambda: {
(p.name, p.conn_id): transport_buffer_size(p)
for p in connected_connections
})
(p.name, p.conn_id): transport_buffer_size(p) for p in connected_connections
},
)
def transport_kernel_read_buffer_size(protocol, read=True):
@ -602,37 +606,50 @@ def transport_kernel_read_buffer_size(protocol, read=True):
tcp_transport_kernel_send_buffer = LaterGauge(
"synapse_replication_tcp_transport_kernel_send_buffer", "", ["name", "conn_id"],
"synapse_replication_tcp_protocol_transport_kernel_send_buffer",
"",
["name", "conn_id"],
lambda: {
(p.name, p.conn_id): transport_kernel_read_buffer_size(p, False)
for p in connected_connections
})
},
)
tcp_transport_kernel_read_buffer = LaterGauge(
"synapse_replication_tcp_transport_kernel_read_buffer", "", ["name", "conn_id"],
"synapse_replication_tcp_protocol_transport_kernel_read_buffer",
"",
["name", "conn_id"],
lambda: {
(p.name, p.conn_id): transport_kernel_read_buffer_size(p, True)
for p in connected_connections
})
},
)
tcp_inbound_commands = LaterGauge(
"synapse_replication_tcp_inbound_commands", "", ["command", "name", "conn_id"],
"synapse_replication_tcp_protocol_inbound_commands",
"",
["command", "name", "conn_id"],
lambda: {
(k[0], p.name, p.conn_id): count
for p in connected_connections
for k, count in iteritems(p.inbound_commands_counter)
})
},
)
tcp_outbound_commands = LaterGauge(
"synapse_replication_tcp_outbound_commands", "", ["command", "name", "conn_id"],
"synapse_replication_tcp_protocol_outbound_commands",
"",
["command", "name", "conn_id"],
lambda: {
(k[0], p.name, p.conn_id): count
for p in connected_connections
for k, count in iteritems(p.outbound_commands_counter)
})
},
)
# number of updates received for each RDATA stream
inbound_rdata_count = Counter("synapse_replication_tcp_inbound_rdata_count", "",
["stream_name"])
inbound_rdata_count = Counter(
"synapse_replication_tcp_protocol_inbound_rdata_count", "", ["stream_name"]
)

View file

@ -15,7 +15,7 @@
"""The server side of the replication stream.
"""
from twisted.internet import defer, reactor
from twisted.internet import defer
from twisted.internet.protocol import Factory
from .streams import STREAMS_MAP, FederationStream
@ -109,7 +109,7 @@ class ReplicationStreamer(object):
self.is_looping = False
self.pending_updates = False
reactor.addSystemEventTrigger("before", "shutdown", self.on_shutdown)
hs.get_reactor().addSystemEventTrigger("before", "shutdown", self.on_shutdown)
def on_shutdown(self):
# close all connections on shutdown

View file

@ -16,6 +16,8 @@
from twisted.internet import defer
from six.moves import http_client
from synapse.api.constants import Membership
from synapse.api.errors import AuthError, SynapseError, Codes, NotFoundError
from synapse.types import UserID, create_requester
@ -247,6 +249,15 @@ class DeactivateAccountRestServlet(ClientV1RestServlet):
@defer.inlineCallbacks
def on_POST(self, request, target_user_id):
body = parse_json_object_from_request(request, allow_empty_body=True)
erase = body.get("erase", False)
if not isinstance(erase, bool):
raise SynapseError(
http_client.BAD_REQUEST,
"Param 'erase' must be a boolean, if given",
Codes.BAD_JSON,
)
UserID.from_string(target_user_id)
requester = yield self.auth.get_user_by_req(request)
is_admin = yield self.auth.is_server_admin(requester.user)
@ -255,7 +266,7 @@ class DeactivateAccountRestServlet(ClientV1RestServlet):
raise AuthError(403, "You are not a server admin")
yield self._deactivate_account_handler.deactivate_account(
target_user_id, False,
target_user_id, erase,
)
defer.returnValue((200, {}))

View file

@ -23,7 +23,8 @@ from synapse.util.msisdn import phone_number_to_msisdn
from .base import ClientV1RestServlet, client_path_patterns
import simplejson as json
from canonicaljson import json
import urllib
from six.moves.urllib import parse as urlparse

View file

@ -24,8 +24,6 @@ import synapse.util.stringutils as stringutils
from synapse.http.servlet import parse_json_object_from_request
from synapse.types import create_requester
from synapse.util.async import run_on_reactor
from hashlib import sha1
import hmac
import logging
@ -272,7 +270,6 @@ class RegisterRestServlet(ClientV1RestServlet):
@defer.inlineCallbacks
def _do_password(self, request, register_json, session):
yield run_on_reactor()
if (self.hs.config.enable_registration_captcha and
not session[LoginType.RECAPTCHA]):
# captcha should've been done by this stage!
@ -333,8 +330,6 @@ class RegisterRestServlet(ClientV1RestServlet):
@defer.inlineCallbacks
def _do_shared_secret(self, request, register_json, session):
yield run_on_reactor()
if not isinstance(register_json.get("mac", None), string_types):
raise SynapseError(400, "Expected mac.")
if not isinstance(register_json.get("user", None), string_types):
@ -423,8 +418,6 @@ class CreateUserRestServlet(ClientV1RestServlet):
@defer.inlineCallbacks
def _do_create(self, requester, user_json):
yield run_on_reactor()
if "localpart" not in user_json:
raise SynapseError(400, "Expected 'localpart' key.")

View file

@ -31,7 +31,7 @@ from synapse.http.servlet import (
from six.moves.urllib import parse as urlparse
import logging
import simplejson as json
from canonicaljson import json
logger = logging.getLogger(__name__)

View file

@ -26,7 +26,6 @@ from synapse.http.servlet import (
RestServlet, assert_params_in_request,
parse_json_object_from_request,
)
from synapse.util.async import run_on_reactor
from synapse.util.msisdn import phone_number_to_msisdn
from synapse.util.threepids import check_3pid_allowed
from ._base import client_v2_patterns, interactive_auth_handler
@ -309,8 +308,6 @@ class ThreepidRestServlet(RestServlet):
@defer.inlineCallbacks
def on_GET(self, request):
yield run_on_reactor()
requester = yield self.auth.get_user_by_req(request)
threepids = yield self.datastore.user_get_threepids(
@ -321,8 +318,6 @@ class ThreepidRestServlet(RestServlet):
@defer.inlineCallbacks
def on_POST(self, request):
yield run_on_reactor()
body = parse_json_object_from_request(request)
threePidCreds = body.get('threePidCreds')
@ -374,8 +369,6 @@ class ThreepidDeleteRestServlet(RestServlet):
@defer.inlineCallbacks
def on_POST(self, request):
yield run_on_reactor()
body = parse_json_object_from_request(request)
required = ['medium', 'address']

View file

@ -32,7 +32,6 @@ from ._base import client_v2_patterns, interactive_auth_handler
import logging
import hmac
from hashlib import sha1
from synapse.util.async import run_on_reactor
from synapse.util.ratelimitutils import FederationRateLimiter
from six import string_types
@ -191,8 +190,6 @@ class RegisterRestServlet(RestServlet):
@interactive_auth_handler
@defer.inlineCallbacks
def on_POST(self, request):
yield run_on_reactor()
body = parse_json_object_from_request(request)
kind = "user"

View file

@ -33,7 +33,7 @@ from ._base import set_timeline_upper_limit
import itertools
import logging
import simplejson as json
from canonicaljson import json
logger = logging.getLogger(__name__)

View file

@ -22,8 +22,9 @@ from synapse.api.errors import (
from twisted.protocols.basic import FileSender
from twisted.web import server, resource
from canonicaljson import json
import base64
import simplejson as json
import logging
import os
import re

View file

@ -58,6 +58,7 @@ UPDATE_RECENTLY_ACCESSED_TS = 60 * 1000
class MediaRepository(object):
def __init__(self, hs):
self.hs = hs
self.auth = hs.get_auth()
self.client = MatrixFederationHttpClient(hs)
self.clock = hs.get_clock()
@ -94,7 +95,7 @@ class MediaRepository(object):
storage_providers.append(provider)
self.media_storage = MediaStorage(
self.primary_base_path, self.filepaths, storage_providers,
self.hs, self.primary_base_path, self.filepaths, storage_providers,
)
self.clock.looping_call(

View file

@ -37,13 +37,15 @@ class MediaStorage(object):
"""Responsible for storing/fetching files from local sources.
Args:
hs (synapse.server.Homeserver)
local_media_directory (str): Base path where we store media on disk
filepaths (MediaFilePaths)
storage_providers ([StorageProvider]): List of StorageProvider that are
used to fetch and store files.
"""
def __init__(self, local_media_directory, filepaths, storage_providers):
def __init__(self, hs, local_media_directory, filepaths, storage_providers):
self.hs = hs
self.local_media_directory = local_media_directory
self.filepaths = filepaths
self.storage_providers = storage_providers
@ -175,7 +177,8 @@ class MediaStorage(object):
res = yield provider.fetch(path, file_info)
if res:
with res:
consumer = BackgroundFileConsumer(open(local_path, "w"))
consumer = BackgroundFileConsumer(
open(local_path, "w"), self.hs.get_reactor())
yield res.write_to_consumer(consumer)
yield consumer.wait()
defer.returnValue(local_path)

View file

@ -23,7 +23,8 @@ import re
import shutil
import sys
import traceback
import simplejson as json
from canonicaljson import json
from six.moves import urllib_parse as urlparse
from six import string_types

View file

@ -40,7 +40,7 @@ from synapse.federation.transport.client import TransportLayerClient
from synapse.federation.transaction_queue import TransactionQueue
from synapse.handlers import Handlers
from synapse.handlers.appservice import ApplicationServicesHandler
from synapse.handlers.auth import AuthHandler, MacaroonGeneartor
from synapse.handlers.auth import AuthHandler, MacaroonGenerator
from synapse.handlers.deactivate_account import DeactivateAccountHandler
from synapse.handlers.devicemessage import DeviceMessageHandler
from synapse.handlers.device import DeviceHandler
@ -165,15 +165,19 @@ class HomeServer(object):
'server_notices_sender',
]
def __init__(self, hostname, **kwargs):
def __init__(self, hostname, reactor=None, **kwargs):
"""
Args:
hostname : The hostname for the server.
"""
if not reactor:
from twisted.internet import reactor
self._reactor = reactor
self.hostname = hostname
self._building = {}
self.clock = Clock()
self.clock = Clock(reactor)
self.distributor = Distributor()
self.ratelimiter = Ratelimiter()
@ -186,6 +190,12 @@ class HomeServer(object):
self.datastore = DataStore(self.get_db_conn(), self)
logger.info("Finished setting up.")
def get_reactor(self):
"""
Fetch the Twisted reactor in use by this HomeServer.
"""
return self._reactor
def get_ip_from_request(self, request):
# X-Forwarded-For is handled by our custom request type.
return request.getClientIP()
@ -261,7 +271,7 @@ class HomeServer(object):
return AuthHandler(self)
def build_macaroon_generator(self):
return MacaroonGeneartor(self)
return MacaroonGenerator(self)
def build_device_handler(self):
return DeviceHandler(self)
@ -328,6 +338,7 @@ class HomeServer(object):
return adbapi.ConnectionPool(
name,
cp_reactor=self.get_reactor(),
**self.db_config.get("args", {})
)

View file

@ -22,8 +22,9 @@ from synapse.storage.util.id_generators import StreamIdGenerator
from synapse.util.caches.stream_change_cache import StreamChangeCache
from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
from canonicaljson import json
import abc
import simplejson as json
import logging
logger = logging.getLogger(__name__)

View file

@ -15,8 +15,8 @@
# limitations under the License.
import logging
import re
import simplejson as json
from twisted.internet import defer
from canonicaljson import json
from synapse.appservice import AppServiceTransaction
from synapse.config.appservice import load_appservices

View file

@ -12,14 +12,14 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import synapse.util.async
from ._base import SQLBaseStore
from . import engines
from twisted.internet import defer
import simplejson as json
from canonicaljson import json
import logging
logger = logging.getLogger(__name__)
@ -92,7 +92,7 @@ class BackgroundUpdateStore(SQLBaseStore):
logger.info("Starting background schema updates")
while True:
yield synapse.util.async.sleep(
yield self.hs.get_clock().sleep(
self.BACKGROUND_UPDATE_INTERVAL_MS / 1000.)
try:

View file

@ -15,7 +15,7 @@
import logging
from twisted.internet import defer, reactor
from twisted.internet import defer
from ._base import Cache
from . import background_updates
@ -70,7 +70,9 @@ class ClientIpStore(background_updates.BackgroundUpdateStore):
self._client_ip_looper = self._clock.looping_call(
self._update_client_ips_batch, 5 * 1000
)
reactor.addSystemEventTrigger("before", "shutdown", self._update_client_ips_batch)
self.hs.get_reactor().addSystemEventTrigger(
"before", "shutdown", self._update_client_ips_batch
)
def insert_client_ip(self, user_id, access_token, ip, user_agent, device_id,
now=None):

View file

@ -14,7 +14,8 @@
# limitations under the License.
import logging
import simplejson
from canonicaljson import json
from twisted.internet import defer
@ -85,7 +86,7 @@ class DeviceInboxStore(BackgroundUpdateStore):
)
rows = []
for destination, edu in remote_messages_by_destination.items():
edu_json = simplejson.dumps(edu)
edu_json = json.dumps(edu)
rows.append((destination, stream_id, now_ms, edu_json))
txn.executemany(sql, rows)
@ -177,7 +178,7 @@ class DeviceInboxStore(BackgroundUpdateStore):
" WHERE user_id = ?"
)
txn.execute(sql, (user_id,))
message_json = simplejson.dumps(messages_by_device["*"])
message_json = json.dumps(messages_by_device["*"])
for row in txn:
# Add the message for all devices for this user on this
# server.
@ -199,7 +200,7 @@ class DeviceInboxStore(BackgroundUpdateStore):
# Only insert into the local inbox if the device exists on
# this server
device = row[0]
message_json = simplejson.dumps(messages_by_device[device])
message_json = json.dumps(messages_by_device[device])
messages_json_for_user[device] = message_json
if messages_json_for_user:
@ -253,7 +254,7 @@ class DeviceInboxStore(BackgroundUpdateStore):
messages = []
for row in txn:
stream_pos = row[0]
messages.append(simplejson.loads(row[1]))
messages.append(json.loads(row[1]))
if len(messages) < limit:
stream_pos = current_stream_id
return (messages, stream_pos)
@ -389,7 +390,7 @@ class DeviceInboxStore(BackgroundUpdateStore):
messages = []
for row in txn:
stream_pos = row[0]
messages.append(simplejson.loads(row[1]))
messages.append(json.loads(row[1]))
if len(messages) < limit:
stream_pos = current_stream_id
return (messages, stream_pos)

View file

@ -13,7 +13,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import simplejson as json
from twisted.internet import defer
@ -21,6 +20,8 @@ from synapse.api.errors import StoreError
from ._base import SQLBaseStore, Cache
from synapse.util.caches.descriptors import cached, cachedList, cachedInlineCallbacks
from canonicaljson import json
from six import itervalues, iteritems
logger = logging.getLogger(__name__)

View file

@ -16,8 +16,7 @@ from twisted.internet import defer
from synapse.util.caches.descriptors import cached
from canonicaljson import encode_canonical_json
import simplejson as json
from canonicaljson import encode_canonical_json, json
from ._base import SQLBaseStore

View file

@ -16,11 +16,11 @@
from synapse.storage._base import SQLBaseStore, LoggingTransaction
from twisted.internet import defer
from synapse.util.async import sleep
from synapse.util.caches.descriptors import cachedInlineCallbacks
import logging
import simplejson as json
from canonicaljson import json
from six import iteritems
@ -84,6 +84,8 @@ class EventPushActionsWorkerStore(SQLBaseStore):
self.find_stream_orderings_looping_call = self._clock.looping_call(
self._find_stream_orderings_for_times, 10 * 60 * 1000
)
self._rotate_delay = 3
self._rotate_count = 10000
@cachedInlineCallbacks(num_args=3, tree=True, max_entries=5000)
def get_unread_event_push_actions_by_room_for_user(
@ -800,7 +802,7 @@ class EventPushActionsStore(EventPushActionsWorkerStore):
)
if caught_up:
break
yield sleep(5)
yield self.hs.get_clock().sleep(self._rotate_delay)
finally:
self._doing_notif_rotation = False
@ -821,8 +823,8 @@ class EventPushActionsStore(EventPushActionsWorkerStore):
txn.execute("""
SELECT stream_ordering FROM event_push_actions
WHERE stream_ordering > ?
ORDER BY stream_ordering ASC LIMIT 1 OFFSET 50000
""", (old_rotate_stream_ordering,))
ORDER BY stream_ordering ASC LIMIT 1 OFFSET ?
""", (old_rotate_stream_ordering, self._rotate_count))
stream_row = txn.fetchone()
if stream_row:
offset_stream_ordering, = stream_row

View file

@ -19,7 +19,8 @@ from functools import wraps
import itertools
import logging
import simplejson as json
from canonicaljson import json
from twisted.internet import defer
from synapse.storage.events_worker import EventsWorkerStore
@ -1044,7 +1045,6 @@ class EventsStore(EventsWorkerStore):
"event_edge_hashes",
"event_edges",
"event_forward_extremities",
"event_push_actions",
"event_reference_hashes",
"event_search",
"event_signatures",
@ -1064,6 +1064,14 @@ class EventsStore(EventsWorkerStore):
[(ev.event_id,) for ev, _ in events_and_contexts]
)
for table in (
"event_push_actions",
):
txn.executemany(
"DELETE FROM %s WHERE room_id = ? AND event_id = ?" % (table,),
[(ev.event_id,) for ev, _ in events_and_contexts]
)
def _store_event_txn(self, txn, events_and_contexts):
"""Insert new events into the event and event_json tables

View file

@ -14,13 +14,14 @@
# limitations under the License.
from ._base import SQLBaseStore
from twisted.internet import defer, reactor
from twisted.internet import defer
from synapse.events import FrozenEvent
from synapse.events.utils import prune_event
from synapse.util.logcontext import (
PreserveLoggingContext, make_deferred_yieldable, run_in_background,
LoggingContext,
)
from synapse.util.metrics import Measure
from synapse.api.errors import SynapseError
@ -28,7 +29,8 @@ from synapse.api.errors import SynapseError
from collections import namedtuple
import logging
import simplejson as json
from canonicaljson import json
# these are only included to make the type annotations work
from synapse.events import EventBase # noqa: F401
@ -145,6 +147,9 @@ class EventsWorkerStore(SQLBaseStore):
missing_events_ids = [e for e in event_ids if e not in event_entry_map]
if missing_events_ids:
log_ctx = LoggingContext.current_context()
log_ctx.record_event_fetch(len(missing_events_ids))
missing_events = yield self._enqueue_events(
missing_events_ids,
check_redacted=check_redacted,
@ -265,7 +270,7 @@ class EventsWorkerStore(SQLBaseStore):
except Exception:
logger.exception("Failed to callback")
with PreserveLoggingContext():
reactor.callFromThread(fire, event_list, row_dict)
self.hs.get_reactor().callFromThread(fire, event_list, row_dict)
except Exception as e:
logger.exception("do_fetch")
@ -278,7 +283,7 @@ class EventsWorkerStore(SQLBaseStore):
if event_list:
with PreserveLoggingContext():
reactor.callFromThread(fire, event_list)
self.hs.get_reactor().callFromThread(fire, event_list)
@defer.inlineCallbacks
def _enqueue_events(self, events, check_redacted=True, allow_rejected=False):

View file

@ -19,8 +19,7 @@ from ._base import SQLBaseStore
from synapse.api.errors import SynapseError, Codes
from synapse.util.caches.descriptors import cachedInlineCallbacks
from canonicaljson import encode_canonical_json
import simplejson as json
from canonicaljson import encode_canonical_json, json
class FilteringStore(SQLBaseStore):

View file

@ -20,7 +20,7 @@ from synapse.api.errors import SynapseError
from ._base import SQLBaseStore
import simplejson as json
from canonicaljson import json
# The category ID for the "default" category. We don't store as null in the

View file

@ -25,9 +25,10 @@ from synapse.push.baserules import list_with_base_rules
from synapse.api.constants import EventTypes
from twisted.internet import defer
from canonicaljson import json
import abc
import logging
import simplejson as json
logger = logging.getLogger(__name__)

Some files were not shown because too many files have changed in this diff Show more