Merge branch 'develop' of github.com:matrix-org/synapse into erikj/refactor_state_handler

This commit is contained in:
Erik Johnston 2018-08-20 14:49:43 +01:00
commit 4d664278af
208 changed files with 4269 additions and 3242 deletions

48
.circleci/config.yml Normal file
View file

@ -0,0 +1,48 @@
version: 2
jobs:
sytestpy2:
machine: true
steps:
- checkout
- run: docker pull matrixdotorg/sytest-synapsepy2
- run: docker run --rm -it -v $(pwd)\:/src -v $(pwd)/logs\:/logs matrixdotorg/sytest-synapsepy2
- store_artifacts:
path: ~/project/logs
destination: logs
sytestpy2postgres:
machine: true
steps:
- checkout
- run: docker pull matrixdotorg/sytest-synapsepy2
- run: docker run --rm -it -v $(pwd)\:/src -v $(pwd)/logs\:/logs -e POSTGRES=1 matrixdotorg/sytest-synapsepy2
- store_artifacts:
path: ~/project/logs
destination: logs
sytestpy3:
machine: true
steps:
- checkout
- run: docker pull matrixdotorg/sytest-synapsepy3
- run: docker run --rm -it -v $(pwd)\:/src -v $(pwd)/logs\:/logs hawkowl/sytestpy3
- store_artifacts:
path: ~/project/logs
destination: logs
sytestpy3postgres:
machine: true
steps:
- checkout
- run: docker pull matrixdotorg/sytest-synapsepy3
- run: docker run --rm -it -v $(pwd)\:/src -v $(pwd)/logs\:/logs -e POSTGRES=1 matrixdotorg/sytest-synapsepy3
- store_artifacts:
path: ~/project/logs
destination: logs
workflows:
version: 2
build:
jobs:
- sytestpy2
- sytestpy2postgres
# Currently broken while the Python 3 port is incomplete
# - sytestpy3
# - sytestpy3postgres

View file

@ -3,3 +3,6 @@ Dockerfile
.gitignore
demo/etc
tox.ini
synctl
.git/*
.tox/*

View file

@ -8,6 +8,9 @@ before_script:
- git remote set-branches --add origin develop
- git fetch origin develop
services:
- postgresql
matrix:
fast_finish: true
include:
@ -20,6 +23,9 @@ matrix:
- python: 2.7
env: TOX_ENV=py27
- python: 2.7
env: TOX_ENV=py27-postgres TRIAL_FLAGS="-j 4"
- python: 3.6
env: TOX_ENV=py36
@ -29,6 +35,10 @@ matrix:
- python: 3.6
env: TOX_ENV=check-newsfragment
allow_failures:
- python: 2.7
env: TOX_ENV=py27-postgres TRIAL_FLAGS="-j 4"
install:
- pip install tox

View file

@ -30,11 +30,11 @@ use github's pull request workflow to review the contribution, and either ask
you to make any refinements needed or merge it and make them ourselves. The
changes will then land on master when we next do a release.
We use `Jenkins <http://matrix.org/jenkins>`_ and
We use `Jenkins <http://matrix.org/jenkins>`_ and
`Travis <https://travis-ci.org/matrix-org/synapse>`_ for continuous
integration. All pull requests to synapse get automatically tested by Travis;
the Jenkins builds require an adminstrator to start them. If your change
breaks the build, this will be shown in github, so please keep an eye on the
integration. All pull requests to synapse get automatically tested by Travis;
the Jenkins builds require an adminstrator to start them. If your change
breaks the build, this will be shown in github, so please keep an eye on the
pull request for feedback.
Code style
@ -56,17 +56,17 @@ entry. These are managed by Towncrier
(https://github.com/hawkowl/towncrier).
To create a changelog entry, make a new file in the ``changelog.d``
file named in the format of ``issuenumberOrPR.type``. The type can be
file named in the format of ``PRnumber.type``. The type can be
one of ``feature``, ``bugfix``, ``removal`` (also used for
deprecations), or ``misc`` (for internal-only changes). The content of
the file is your changelog entry, which can contain RestructuredText
formatting. A note of contributors is welcomed in changelogs for
non-misc changes (the content of misc changes is not displayed).
For example, a fix for a bug reported in #1234 would have its
changelog entry in ``changelog.d/1234.bugfix``, and contain content
like "The security levels of Florbs are now validated when
recieved over federation. Contributed by Jane Matrix".
For example, a fix in PR #1234 would have its changelog entry in
``changelog.d/1234.bugfix``, and contain content like "The security levels of
Florbs are now validated when recieved over federation. Contributed by Jane
Matrix".
Attribution
~~~~~~~~~~~
@ -125,7 +125,7 @@ the contribution or otherwise have the right to contribute it to Matrix::
personal information I submit with it, including my sign-off) is
maintained indefinitely and may be redistributed consistent with
this project or the open source license(s) involved.
If you agree to this for your contribution, then all that's needed is to
include the line in your commit or pull request comment::

View file

@ -36,3 +36,4 @@ recursive-include changelog.d *
prune .github
prune demo/etc
prune docker
prune .circleci

1
changelog.d/1491.feature Normal file
View file

@ -0,0 +1 @@
Add support for the SNI extension to federation TLS connections

1
changelog.d/3184.feature Normal file
View file

@ -0,0 +1 @@
Add /_media/r0/config

1
changelog.d/3423.misc Normal file
View file

@ -0,0 +1 @@
The test suite now can run under PostgreSQL.

1
changelog.d/3568.feature Normal file
View file

@ -0,0 +1 @@
speed up /members API and add `at` and `membership` params as per MSC1227

1
changelog.d/3574.feature Normal file
View file

@ -0,0 +1 @@
implement `summary` block in /sync response as per MSC688

1
changelog.d/3589.feature Normal file
View file

@ -0,0 +1 @@
Add lazy-loading support to /messages as per MSC1227

1
changelog.d/3653.feature Normal file
View file

@ -0,0 +1 @@
Support more federation endpoints on workers

1
changelog.d/3660.misc Normal file
View file

@ -0,0 +1 @@
Sytests can now be run inside a Docker container.

1
changelog.d/3661.bugfix Normal file
View file

@ -0,0 +1 @@
Fix bug on deleting 3pid when using identity servers that don't support unbind API

1
changelog.d/3669.misc Normal file
View file

@ -0,0 +1 @@
Update docker base image from alpine 3.7 to 3.8.

1
changelog.d/3670.feature Normal file
View file

@ -0,0 +1 @@
Where server is disabled, block ability for locked out users to read new messages

1
changelog.d/3676.bugfix Normal file
View file

@ -0,0 +1 @@
Make the tests pass on Twisted < 18.7.0

1
changelog.d/3677.bugfix Normal file
View file

@ -0,0 +1 @@
Dont ship recaptcha_ajax.js, use it directly from Google

1
changelog.d/3678.misc Normal file
View file

@ -0,0 +1 @@
Rename synapse.util.async to synapse.util.async_helpers to mitigate async becoming a keyword on Python 3.7.

1
changelog.d/3679.misc Normal file
View file

@ -0,0 +1 @@
Synapse's tests are now formatted with the black autoformatter.

1
changelog.d/3681.bugfix Normal file
View file

@ -0,0 +1 @@
Fixes test_reap_monthly_active_users so it passes under postgres

1
changelog.d/3684.misc Normal file
View file

@ -0,0 +1 @@
Implemented a new testing base class to reduce test boilerplate.

1
changelog.d/3687.feature Normal file
View file

@ -0,0 +1 @@
set admin uri via config, to be used in error messages where the user should contact the administrator

1
changelog.d/3689.bugfix Normal file
View file

@ -0,0 +1 @@
Fix mau blocking calulation bug on login

1
changelog.d/3690.misc Normal file
View file

@ -0,0 +1 @@
Rename MAU prometheus metrics

1
changelog.d/3692.bugfix Normal file
View file

@ -0,0 +1 @@
Fix missing yield in synapse.storage.monthly_active_users.initialise_reserved_users

1
changelog.d/3694.feature Normal file
View file

@ -0,0 +1 @@
Synapse's presence functionality can now be disabled with the "use_presence" configuration option.

1
changelog.d/3700.bugfix Normal file
View file

@ -0,0 +1 @@
Improve HTTP request logging to include all requests

1
changelog.d/3701.bugfix Normal file
View file

@ -0,0 +1 @@
Avoid timing out requests while we are streaming back the response

1
changelog.d/3703.removal Normal file
View file

@ -0,0 +1 @@
The Shared-Secret registration method of the legacy v1/register REST endpoint has been removed. For a replacement, please see [the admin/register API documentation](https://github.com/matrix-org/synapse/blob/master/docs/admin_api/register_api.rst).

1
changelog.d/3705.bugfix Normal file
View file

@ -0,0 +1 @@
Support more federation endpoints on workers

1
changelog.d/3707.misc Normal file
View file

@ -0,0 +1 @@
add new error type ResourceLimit

1
changelog.d/3708.feature Normal file
View file

@ -0,0 +1 @@
For resource limit blocked users, prevent writing into rooms

1
changelog.d/3709.misc Normal file
View file

@ -0,0 +1 @@
Logcontexts for replication command handlers

1
changelog.d/3710.bugfix Normal file
View file

@ -0,0 +1 @@
Fix "Starting db txn 'get_all_updated_receipts' from sentinel context" warning

1
changelog.d/3712.misc Normal file
View file

@ -0,0 +1 @@
Update admin register API documentation to reference a real user ID.

1
changelog.d/3713.bugfix Normal file
View file

@ -0,0 +1 @@
Support more federation endpoints on workers

1
changelog.d/3719.bugfix Normal file
View file

@ -0,0 +1 @@
Fix bug where `state_cache` cache factor ignored environment variables

View file

@ -1,4 +1,4 @@
FROM docker.io/python:2-alpine3.7
FROM docker.io/python:2-alpine3.8
RUN apk add --no-cache --virtual .nacl_deps \
build-base \

View file

@ -33,7 +33,7 @@ As an example::
< {
"access_token": "token_here",
"user_id": "@pepper_roni@test",
"user_id": "@pepper_roni:localhost",
"home_server": "test",
"device_id": "device_id_here"
}

View file

@ -173,10 +173,23 @@ endpoints matching the following regular expressions::
^/_matrix/federation/v1/backfill/
^/_matrix/federation/v1/get_missing_events/
^/_matrix/federation/v1/publicRooms
^/_matrix/federation/v1/query/
^/_matrix/federation/v1/make_join/
^/_matrix/federation/v1/make_leave/
^/_matrix/federation/v1/send_join/
^/_matrix/federation/v1/send_leave/
^/_matrix/federation/v1/invite/
^/_matrix/federation/v1/query_auth/
^/_matrix/federation/v1/event_auth/
^/_matrix/federation/v1/exchange_third_party_invite/
^/_matrix/federation/v1/send/
The above endpoints should all be routed to the federation_reader worker by the
reverse-proxy configuration.
The `^/_matrix/federation/v1/send/` endpoint must only be handled by a single
instance.
``synapse.app.federation_sender``
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
@ -228,6 +241,14 @@ regular expressions::
^/_matrix/client/(api/v1|r0|unstable)/keys/upload
If ``use_presence`` is False in the homeserver config, it can also handle REST
endpoints matching the following regular expressions::
^/_matrix/client/(api/v1|r0|unstable)/presence/[^/]+/status
This "stub" presence handler will pass through ``GET`` request but make the
``PUT`` effectively a no-op.
It will proxy any requests it cannot handle to the main synapse instance. It
must therefore be configured with the location of the main instance, via
the ``worker_main_http_uri`` setting in the frontend_proxy worker configuration

View file

@ -25,7 +25,7 @@ from twisted.internet import defer
import synapse.types
from synapse import event_auth
from synapse.api.constants import EventTypes, JoinRules, Membership
from synapse.api.errors import AuthError, Codes
from synapse.api.errors import AuthError, Codes, ResourceLimitError
from synapse.types import UserID
from synapse.util.caches import CACHE_SIZE_FACTOR, register_cache
from synapse.util.caches.lrucache import LruCache
@ -775,17 +775,34 @@ class Auth(object):
)
@defer.inlineCallbacks
def check_auth_blocking(self):
def check_auth_blocking(self, user_id=None):
"""Checks if the user should be rejected for some external reason,
such as monthly active user limiting or global disable flag
Args:
user_id(str|None): If present, checks for presence against existing
MAU cohort
"""
if self.hs.config.hs_disabled:
raise AuthError(
403, self.hs.config.hs_disabled_message, errcode=Codes.HS_DISABLED
raise ResourceLimitError(
403, self.hs.config.hs_disabled_message,
errcode=Codes.RESOURCE_LIMIT_EXCEED,
admin_uri=self.hs.config.admin_uri,
limit_type=self.hs.config.hs_disabled_limit_type
)
if self.hs.config.limit_usage_by_mau is True:
# If the user is already part of the MAU cohort
if user_id:
timestamp = yield self.store.user_last_seen_monthly_active(user_id)
if timestamp:
return
# Else if there is no room in the MAU bucket, bail
current_mau = yield self.store.get_monthly_active_count()
if current_mau >= self.hs.config.max_mau_value:
raise AuthError(
403, "MAU Limit Exceeded", errcode=Codes.MAU_LIMIT_EXCEEDED
raise ResourceLimitError(
403, "Monthly Active User Limit Exceeded",
admin_uri=self.hs.config.admin_uri,
errcode=Codes.RESOURCE_LIMIT_EXCEED,
limit_type="monthly_active_user"
)

View file

@ -56,8 +56,7 @@ class Codes(object):
SERVER_NOT_TRUSTED = "M_SERVER_NOT_TRUSTED"
CONSENT_NOT_GIVEN = "M_CONSENT_NOT_GIVEN"
CANNOT_LEAVE_SERVER_NOTICE_ROOM = "M_CANNOT_LEAVE_SERVER_NOTICE_ROOM"
MAU_LIMIT_EXCEEDED = "M_MAU_LIMIT_EXCEEDED"
HS_DISABLED = "M_HS_DISABLED"
RESOURCE_LIMIT_EXCEED = "M_RESOURCE_LIMIT_EXCEED"
UNSUPPORTED_ROOM_VERSION = "M_UNSUPPORTED_ROOM_VERSION"
INCOMPATIBLE_ROOM_VERSION = "M_INCOMPATIBLE_ROOM_VERSION"
@ -232,6 +231,30 @@ class AuthError(SynapseError):
super(AuthError, self).__init__(*args, **kwargs)
class ResourceLimitError(SynapseError):
"""
Any error raised when there is a problem with resource usage.
For instance, the monthly active user limit for the server has been exceeded
"""
def __init__(
self, code, msg,
errcode=Codes.RESOURCE_LIMIT_EXCEED,
admin_uri=None,
limit_type=None,
):
self.admin_uri = admin_uri
self.limit_type = limit_type
super(ResourceLimitError, self).__init__(code, msg, errcode=errcode)
def error_dict(self):
return cs_error(
self.msg,
self.errcode,
admin_uri=self.admin_uri,
limit_type=self.limit_type
)
class EventSizeError(SynapseError):
"""An error raised when an event is too big."""

View file

@ -140,7 +140,7 @@ def listen_metrics(bind_addresses, port):
logger.info("Metrics now reporting on %s:%d", host, port)
def listen_tcp(bind_addresses, port, factory, backlog=50):
def listen_tcp(bind_addresses, port, factory, reactor=reactor, backlog=50):
"""
Create a TCP socket for a port and several addresses
"""
@ -156,7 +156,9 @@ def listen_tcp(bind_addresses, port, factory, backlog=50):
check_bind_error(e, address, bind_addresses)
def listen_ssl(bind_addresses, port, factory, context_factory, backlog=50):
def listen_ssl(
bind_addresses, port, factory, context_factory, reactor=reactor, backlog=50
):
"""
Create an SSL socket for a port and several addresses
"""

View file

@ -117,8 +117,9 @@ class ASReplicationHandler(ReplicationClientHandler):
super(ASReplicationHandler, self).__init__(hs.get_datastore())
self.appservice_handler = hs.get_application_service_handler()
@defer.inlineCallbacks
def on_rdata(self, stream_name, token, rows):
super(ASReplicationHandler, self).on_rdata(stream_name, token, rows)
yield super(ASReplicationHandler, self).on_rdata(stream_name, token, rows)
if stream_name == "events":
max_stream_id = self.store.get_room_max_stream_ordering()

View file

@ -39,7 +39,7 @@ from synapse.replication.slave.storage.events import SlavedEventStore
from synapse.replication.slave.storage.keys import SlavedKeyStore
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
from synapse.replication.slave.storage.room import RoomStore
from synapse.replication.slave.storage.transactions import TransactionStore
from synapse.replication.slave.storage.transactions import SlavedTransactionStore
from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.rest.client.v1.room import (
JoinedRoomMemberListRestServlet,
@ -66,7 +66,7 @@ class ClientReaderSlavedStore(
DirectoryStore,
SlavedApplicationServiceStore,
SlavedRegistrationStore,
TransactionStore,
SlavedTransactionStore,
SlavedClientIpStore,
BaseSlavedStore,
):
@ -168,11 +168,13 @@ def start(config_options):
database_engine = create_engine(config.database_config)
tls_server_context_factory = context_factory.ServerContextFactory(config)
tls_client_options_factory = context_factory.ClientTLSOptionsFactory(config)
ss = ClientReaderServer(
config.server_name,
db_config=config.database_config,
tls_server_context_factory=tls_server_context_factory,
tls_client_options_factory=tls_client_options_factory,
config=config,
version_string="Synapse/" + get_version_string(synapse),
database_engine=database_engine,

View file

@ -43,7 +43,7 @@ from synapse.replication.slave.storage.pushers import SlavedPusherStore
from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
from synapse.replication.slave.storage.room import RoomStore
from synapse.replication.slave.storage.transactions import TransactionStore
from synapse.replication.slave.storage.transactions import SlavedTransactionStore
from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.rest.client.v1.room import (
JoinRoomAliasServlet,
@ -63,7 +63,7 @@ logger = logging.getLogger("synapse.app.event_creator")
class EventCreatorSlavedStore(
DirectoryStore,
TransactionStore,
SlavedTransactionStore,
SlavedProfileStore,
SlavedAccountDataStore,
SlavedPusherStore,
@ -174,11 +174,13 @@ def start(config_options):
database_engine = create_engine(config.database_config)
tls_server_context_factory = context_factory.ServerContextFactory(config)
tls_client_options_factory = context_factory.ClientTLSOptionsFactory(config)
ss = EventCreatorServer(
config.server_name,
db_config=config.database_config,
tls_server_context_factory=tls_server_context_factory,
tls_client_options_factory=tls_client_options_factory,
config=config,
version_string="Synapse/" + get_version_string(synapse),
database_engine=database_engine,

View file

@ -32,11 +32,17 @@ from synapse.http.site import SynapseSite
from synapse.metrics import RegistryProxy
from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
from synapse.replication.slave.storage._base import BaseSlavedStore
from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
from synapse.replication.slave.storage.directory import DirectoryStore
from synapse.replication.slave.storage.events import SlavedEventStore
from synapse.replication.slave.storage.keys import SlavedKeyStore
from synapse.replication.slave.storage.profile import SlavedProfileStore
from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore
from synapse.replication.slave.storage.pushers import SlavedPusherStore
from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
from synapse.replication.slave.storage.room import RoomStore
from synapse.replication.slave.storage.transactions import TransactionStore
from synapse.replication.slave.storage.transactions import SlavedTransactionStore
from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.server import HomeServer
from synapse.storage.engines import create_engine
@ -49,11 +55,17 @@ logger = logging.getLogger("synapse.app.federation_reader")
class FederationReaderSlavedStore(
SlavedAccountDataStore,
SlavedProfileStore,
SlavedApplicationServiceStore,
SlavedPusherStore,
SlavedPushRuleStore,
SlavedReceiptsStore,
SlavedEventStore,
SlavedKeyStore,
RoomStore,
DirectoryStore,
TransactionStore,
SlavedTransactionStore,
BaseSlavedStore,
):
pass
@ -143,11 +155,13 @@ def start(config_options):
database_engine = create_engine(config.database_config)
tls_server_context_factory = context_factory.ServerContextFactory(config)
tls_client_options_factory = context_factory.ClientTLSOptionsFactory(config)
ss = FederationReaderServer(
config.server_name,
db_config=config.database_config,
tls_server_context_factory=tls_server_context_factory,
tls_client_options_factory=tls_client_options_factory,
config=config,
version_string="Synapse/" + get_version_string(synapse),
database_engine=database_engine,

View file

@ -36,11 +36,11 @@ from synapse.replication.slave.storage.events import SlavedEventStore
from synapse.replication.slave.storage.presence import SlavedPresenceStore
from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
from synapse.replication.slave.storage.transactions import TransactionStore
from synapse.replication.slave.storage.transactions import SlavedTransactionStore
from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.server import HomeServer
from synapse.storage.engines import create_engine
from synapse.util.async import Linearizer
from synapse.util.async_helpers import Linearizer
from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.logcontext import LoggingContext, run_in_background
from synapse.util.manhole import manhole
@ -50,7 +50,7 @@ logger = logging.getLogger("synapse.app.federation_sender")
class FederationSenderSlaveStore(
SlavedDeviceInboxStore, TransactionStore, SlavedReceiptsStore, SlavedEventStore,
SlavedDeviceInboxStore, SlavedTransactionStore, SlavedReceiptsStore, SlavedEventStore,
SlavedRegistrationStore, SlavedDeviceStore, SlavedPresenceStore,
):
def __init__(self, db_conn, hs):
@ -144,8 +144,9 @@ class FederationSenderReplicationHandler(ReplicationClientHandler):
super(FederationSenderReplicationHandler, self).__init__(hs.get_datastore())
self.send_handler = FederationSenderHandler(hs, self)
@defer.inlineCallbacks
def on_rdata(self, stream_name, token, rows):
super(FederationSenderReplicationHandler, self).on_rdata(
yield super(FederationSenderReplicationHandler, self).on_rdata(
stream_name, token, rows
)
self.send_handler.process_replication_rows(stream_name, token, rows)
@ -186,11 +187,13 @@ def start(config_options):
config.send_federation = True
tls_server_context_factory = context_factory.ServerContextFactory(config)
tls_client_options_factory = context_factory.ClientTLSOptionsFactory(config)
ps = FederationSenderServer(
config.server_name,
db_config=config.database_config,
tls_server_context_factory=tls_server_context_factory,
tls_client_options_factory=tls_client_options_factory,
config=config,
version_string="Synapse/" + get_version_string(synapse),
database_engine=database_engine,

View file

@ -38,6 +38,7 @@ from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
from synapse.replication.slave.storage.devices import SlavedDeviceStore
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.rest.client.v1.base import ClientV1RestServlet, client_path_patterns
from synapse.rest.client.v2_alpha._base import client_v2_patterns
from synapse.server import HomeServer
from synapse.storage.engines import create_engine
@ -49,6 +50,35 @@ from synapse.util.versionstring import get_version_string
logger = logging.getLogger("synapse.app.frontend_proxy")
class PresenceStatusStubServlet(ClientV1RestServlet):
PATTERNS = client_path_patterns("/presence/(?P<user_id>[^/]*)/status")
def __init__(self, hs):
super(PresenceStatusStubServlet, self).__init__(hs)
self.http_client = hs.get_simple_http_client()
self.auth = hs.get_auth()
self.main_uri = hs.config.worker_main_http_uri
@defer.inlineCallbacks
def on_GET(self, request, user_id):
# Pass through the auth headers, if any, in case the access token
# is there.
auth_headers = request.requestHeaders.getRawHeaders("Authorization", [])
headers = {
"Authorization": auth_headers,
}
result = yield self.http_client.get_json(
self.main_uri + request.uri,
headers=headers,
)
defer.returnValue((200, result))
@defer.inlineCallbacks
def on_PUT(self, request, user_id):
yield self.auth.get_user_by_req(request)
defer.returnValue((200, {}))
class KeyUploadServlet(RestServlet):
PATTERNS = client_v2_patterns("/keys/upload(/(?P<device_id>[^/]+))?$")
@ -135,6 +165,12 @@ class FrontendProxyServer(HomeServer):
elif name == "client":
resource = JsonResource(self, canonical_json=False)
KeyUploadServlet(self).register(resource)
# If presence is disabled, use the stub servlet that does
# not allow sending presence
if not self.config.use_presence:
PresenceStatusStubServlet(self).register(resource)
resources.update({
"/_matrix/client/r0": resource,
"/_matrix/client/unstable": resource,
@ -153,7 +189,8 @@ class FrontendProxyServer(HomeServer):
listener_config,
root_resource,
self.version_string,
)
),
reactor=self.get_reactor()
)
logger.info("Synapse client reader now listening on port %d", port)
@ -208,11 +245,13 @@ def start(config_options):
database_engine = create_engine(config.database_config)
tls_server_context_factory = context_factory.ServerContextFactory(config)
tls_client_options_factory = context_factory.ClientTLSOptionsFactory(config)
ss = FrontendProxyServer(
config.server_name,
db_config=config.database_config,
tls_server_context_factory=tls_server_context_factory,
tls_client_options_factory=tls_client_options_factory,
config=config,
version_string="Synapse/" + get_version_string(synapse),
database_engine=database_engine,

View file

@ -303,8 +303,8 @@ class SynapseHomeServer(HomeServer):
# Gauges to expose monthly active user control metrics
current_mau_gauge = Gauge("synapse_admin_current_mau", "Current MAU")
max_mau_value_gauge = Gauge("synapse_admin_max_mau_value", "MAU Limit")
current_mau_gauge = Gauge("synapse_admin_mau:current", "Current MAU")
max_mau_gauge = Gauge("synapse_admin_mau:max", "MAU Limit")
def setup(config_options):
@ -338,6 +338,7 @@ def setup(config_options):
events.USE_FROZEN_DICTS = config.use_frozen_dicts
tls_server_context_factory = context_factory.ServerContextFactory(config)
tls_client_options_factory = context_factory.ClientTLSOptionsFactory(config)
database_engine = create_engine(config.database_config)
config.database_config["args"]["cp_openfun"] = database_engine.on_new_connection
@ -346,6 +347,7 @@ def setup(config_options):
config.server_name,
db_config=config.database_config,
tls_server_context_factory=tls_server_context_factory,
tls_client_options_factory=tls_client_options_factory,
config=config,
version_string="Synapse/" + get_version_string(synapse),
database_engine=database_engine,
@ -523,6 +525,7 @@ def run(hs):
clock.looping_call(
hs.get_datastore().reap_monthly_active_users, 1000 * 60 * 60
)
hs.get_datastore().reap_monthly_active_users()
@defer.inlineCallbacks
def generate_monthly_active_users():
@ -530,7 +533,7 @@ def run(hs):
if hs.config.limit_usage_by_mau:
count = yield hs.get_datastore().get_monthly_active_count()
current_mau_gauge.set(float(count))
max_mau_value_gauge.set(float(hs.config.max_mau_value))
max_mau_gauge.set(float(hs.config.max_mau_value))
hs.get_datastore().initialise_reserved_users(
hs.config.mau_limits_reserved_threepids

View file

@ -34,7 +34,7 @@ from synapse.replication.slave.storage._base import BaseSlavedStore
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
from synapse.replication.slave.storage.transactions import TransactionStore
from synapse.replication.slave.storage.transactions import SlavedTransactionStore
from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.rest.media.v0.content_repository import ContentRepoResource
from synapse.server import HomeServer
@ -52,7 +52,7 @@ class MediaRepositorySlavedStore(
SlavedApplicationServiceStore,
SlavedRegistrationStore,
SlavedClientIpStore,
TransactionStore,
SlavedTransactionStore,
BaseSlavedStore,
MediaRepositoryStore,
):
@ -155,11 +155,13 @@ def start(config_options):
database_engine = create_engine(config.database_config)
tls_server_context_factory = context_factory.ServerContextFactory(config)
tls_client_options_factory = context_factory.ClientTLSOptionsFactory(config)
ss = MediaRepositoryServer(
config.server_name,
db_config=config.database_config,
tls_server_context_factory=tls_server_context_factory,
tls_client_options_factory=tls_client_options_factory,
config=config,
version_string="Synapse/" + get_version_string(synapse),
database_engine=database_engine,

View file

@ -148,8 +148,9 @@ class PusherReplicationHandler(ReplicationClientHandler):
self.pusher_pool = hs.get_pusherpool()
@defer.inlineCallbacks
def on_rdata(self, stream_name, token, rows):
super(PusherReplicationHandler, self).on_rdata(stream_name, token, rows)
yield super(PusherReplicationHandler, self).on_rdata(stream_name, token, rows)
run_in_background(self.poke_pushers, stream_name, token, rows)
@defer.inlineCallbacks
@ -162,11 +163,11 @@ class PusherReplicationHandler(ReplicationClientHandler):
else:
yield self.start_pusher(row.user_id, row.app_id, row.pushkey)
elif stream_name == "events":
yield self.pusher_pool.on_new_notifications(
self.pusher_pool.on_new_notifications(
token, token,
)
elif stream_name == "receipts":
yield self.pusher_pool.on_new_receipts(
self.pusher_pool.on_new_receipts(
token, token, set(row.room_id for row in rows)
)
except Exception:

View file

@ -114,7 +114,10 @@ class SynchrotronPresence(object):
logger.info("Presence process_id is %r", self.process_id)
def send_user_sync(self, user_id, is_syncing, last_sync_ms):
self.hs.get_tcp_replication().send_user_sync(user_id, is_syncing, last_sync_ms)
if self.hs.config.use_presence:
self.hs.get_tcp_replication().send_user_sync(
user_id, is_syncing, last_sync_ms
)
def mark_as_coming_online(self, user_id):
"""A user has started syncing. Send a UserSync to the master, unless they
@ -211,10 +214,13 @@ class SynchrotronPresence(object):
yield self.notify_from_replication(states, stream_id)
def get_currently_syncing_users(self):
return [
user_id for user_id, count in iteritems(self.user_to_num_current_syncs)
if count > 0
]
if self.hs.config.use_presence:
return [
user_id for user_id, count in iteritems(self.user_to_num_current_syncs)
if count > 0
]
else:
return set()
class SynchrotronTyping(object):
@ -332,8 +338,9 @@ class SyncReplicationHandler(ReplicationClientHandler):
self.presence_handler = hs.get_presence_handler()
self.notifier = hs.get_notifier()
@defer.inlineCallbacks
def on_rdata(self, stream_name, token, rows):
super(SyncReplicationHandler, self).on_rdata(stream_name, token, rows)
yield super(SyncReplicationHandler, self).on_rdata(stream_name, token, rows)
run_in_background(self.process_and_notify, stream_name, token, rows)
def get_streams_to_replicate(self):

View file

@ -169,8 +169,9 @@ class UserDirectoryReplicationHandler(ReplicationClientHandler):
super(UserDirectoryReplicationHandler, self).__init__(hs.get_datastore())
self.user_directory = hs.get_user_directory_handler()
@defer.inlineCallbacks
def on_rdata(self, stream_name, token, rows):
super(UserDirectoryReplicationHandler, self).on_rdata(
yield super(UserDirectoryReplicationHandler, self).on_rdata(
stream_name, token, rows
)
if stream_name == "current_state_deltas":
@ -214,11 +215,13 @@ def start(config_options):
config.update_user_directory = True
tls_server_context_factory = context_factory.ServerContextFactory(config)
tls_client_options_factory = context_factory.ClientTLSOptionsFactory(config)
ps = UserDirectoryServer(
config.server_name,
db_config=config.database_config,
tls_server_context_factory=tls_server_context_factory,
tls_client_options_factory=tls_client_options_factory,
config=config,
version_string="Synapse/" + get_version_string(synapse),
database_engine=database_engine,

View file

@ -193,9 +193,8 @@ def setup_logging(config, use_worker_options=False):
def sighup(signum, stack):
# it might be better to use a file watcher or something for this.
logging.info("Reloading log config from %s due to SIGHUP",
log_config)
load_log_config()
logging.info("Reloaded log config from %s due to SIGHUP", log_config)
load_log_config()

View file

@ -49,6 +49,9 @@ class ServerConfig(Config):
# "disable" federation
self.send_federation = config.get("send_federation", True)
# Whether to enable user presence.
self.use_presence = config.get("use_presence", True)
# Whether to update the user directory or not. This should be set to
# false only if we are updating the user directory in a worker
self.update_user_directory = config.get("update_user_directory", True)
@ -81,6 +84,11 @@ class ServerConfig(Config):
# Options to disable HS
self.hs_disabled = config.get("hs_disabled", False)
self.hs_disabled_message = config.get("hs_disabled_message", "")
self.hs_disabled_limit_type = config.get("hs_disabled_limit_type", "")
# Admin uri to direct users at should their instance become blocked
# due to resource constraints
self.admin_uri = config.get("admin_uri", None)
# FIXME: federation_domain_whitelist needs sytests
self.federation_domain_whitelist = None
@ -245,6 +253,9 @@ class ServerConfig(Config):
# hard limit.
soft_file_limit: 0
# Set to false to disable presence tracking on this homeserver.
use_presence: true
# The GC threshold parameters to pass to `gc.set_threshold`, if defined
# gc_thresholds: [700, 10, 10]
@ -336,6 +347,32 @@ class ServerConfig(Config):
# - port: 9000
# bind_addresses: ['::1', '127.0.0.1']
# type: manhole
# Homeserver blocking
#
# How to reach the server admin, used in ResourceLimitError
# admin_uri: 'mailto:admin@server.com'
#
# Global block config
#
# hs_disabled: False
# hs_disabled_message: 'Human readable reason for why the HS is blocked'
# hs_disabled_limit_type: 'error code(str), to help clients decode reason'
#
# Monthly Active User Blocking
#
# Enables monthly active user checking
# limit_usage_by_mau: False
# max_mau_value: 50
#
# Sometimes the server admin will want to ensure certain accounts are
# never blocked by mau checking. These accounts are specified here.
#
# mau_limit_reserved_threepids:
# - medium: 'email'
# address: 'reserved_user@example.com'
""" % locals()
def read_arguments(self, args):

View file

@ -11,19 +11,22 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from zope.interface import implementer
from OpenSSL import SSL, crypto
from twisted.internet import ssl
from twisted.internet._sslverify import _defaultCurveName
from twisted.internet.interfaces import IOpenSSLClientConnectionCreator
from twisted.internet.ssl import CertificateOptions, ContextFactory
from twisted.python.failure import Failure
logger = logging.getLogger(__name__)
class ServerContextFactory(ssl.ContextFactory):
class ServerContextFactory(ContextFactory):
"""Factory for PyOpenSSL SSL contexts that are used to handle incoming
connections and to make connections to remote servers."""
connections."""
def __init__(self, config):
self._context = SSL.Context(SSL.SSLv23_METHOD)
@ -48,3 +51,78 @@ class ServerContextFactory(ssl.ContextFactory):
def getContext(self):
return self._context
def _idnaBytes(text):
"""
Convert some text typed by a human into some ASCII bytes. This is a
copy of twisted.internet._idna._idnaBytes. For documentation, see the
twisted documentation.
"""
try:
import idna
except ImportError:
return text.encode("idna")
else:
return idna.encode(text)
def _tolerateErrors(wrapped):
"""
Wrap up an info_callback for pyOpenSSL so that if something goes wrong
the error is immediately logged and the connection is dropped if possible.
This is a copy of twisted.internet._sslverify._tolerateErrors. For
documentation, see the twisted documentation.
"""
def infoCallback(connection, where, ret):
try:
return wrapped(connection, where, ret)
except: # noqa: E722, taken from the twisted implementation
f = Failure()
logger.exception("Error during info_callback")
connection.get_app_data().failVerification(f)
return infoCallback
@implementer(IOpenSSLClientConnectionCreator)
class ClientTLSOptions(object):
"""
Client creator for TLS without certificate identity verification. This is a
copy of twisted.internet._sslverify.ClientTLSOptions with the identity
verification left out. For documentation, see the twisted documentation.
"""
def __init__(self, hostname, ctx):
self._ctx = ctx
self._hostname = hostname
self._hostnameBytes = _idnaBytes(hostname)
ctx.set_info_callback(
_tolerateErrors(self._identityVerifyingInfoCallback)
)
def clientConnectionForTLS(self, tlsProtocol):
context = self._ctx
connection = SSL.Connection(context, None)
connection.set_app_data(tlsProtocol)
return connection
def _identityVerifyingInfoCallback(self, connection, where, ret):
if where & SSL.SSL_CB_HANDSHAKE_START:
connection.set_tlsext_host_name(self._hostnameBytes)
class ClientTLSOptionsFactory(object):
"""Factory for Twisted ClientTLSOptions that are used to make connections
to remote servers for federation."""
def __init__(self, config):
# We don't use config options yet
pass
def get_options(self, host):
return ClientTLSOptions(
host.decode('utf-8'),
CertificateOptions(verify=False).getContext()
)

View file

@ -30,14 +30,14 @@ KEY_API_V1 = b"/_matrix/key/v1/"
@defer.inlineCallbacks
def fetch_server_key(server_name, ssl_context_factory, path=KEY_API_V1):
def fetch_server_key(server_name, tls_client_options_factory, path=KEY_API_V1):
"""Fetch the keys for a remote server."""
factory = SynapseKeyClientFactory()
factory.path = path
factory.host = server_name
endpoint = matrix_federation_endpoint(
reactor, server_name, ssl_context_factory, timeout=30
reactor, server_name, tls_client_options_factory, timeout=30
)
for i in range(5):

View file

@ -512,7 +512,7 @@ class Keyring(object):
continue
(response, tls_certificate) = yield fetch_server_key(
server_name, self.hs.tls_server_context_factory,
server_name, self.hs.tls_client_options_factory,
path=(b"/_matrix/key/v2/server/%s" % (
urllib.quote(requested_key_id),
)).encode("ascii"),
@ -655,7 +655,7 @@ class Keyring(object):
# Try to fetch the key from the remote server.
(response, tls_certificate) = yield fetch_server_key(
server_name, self.hs.tls_server_context_factory
server_name, self.hs.tls_client_options_factory
)
# Check the response.

View file

@ -39,8 +39,12 @@ from synapse.federation.federation_base import FederationBase, event_from_pdu_js
from synapse.federation.persistence import TransactionActions
from synapse.federation.units import Edu, Transaction
from synapse.http.endpoint import parse_server_name
from synapse.replication.http.federation import (
ReplicationFederationSendEduRestServlet,
ReplicationGetQueryRestServlet,
)
from synapse.types import get_domain_from_id
from synapse.util import async
from synapse.util.async_helpers import Linearizer, concurrently_execute
from synapse.util.caches.response_cache import ResponseCache
from synapse.util.logutils import log_function
@ -67,8 +71,8 @@ class FederationServer(FederationBase):
self.auth = hs.get_auth()
self.handler = hs.get_handlers().federation_handler
self._server_linearizer = async.Linearizer("fed_server")
self._transaction_linearizer = async.Linearizer("fed_txn_handler")
self._server_linearizer = Linearizer("fed_server")
self._transaction_linearizer = Linearizer("fed_txn_handler")
self.transaction_actions = TransactionActions(self.store)
@ -200,7 +204,7 @@ class FederationServer(FederationBase):
event_id, f.getTraceback().rstrip(),
)
yield async.concurrently_execute(
yield concurrently_execute(
process_pdus_for_room, pdus_by_room.keys(),
TRANSACTION_CONCURRENCY_LIMIT,
)
@ -760,6 +764,8 @@ class FederationHandlerRegistry(object):
if edu_type in self.edu_handlers:
raise KeyError("Already have an EDU handler for %s" % (edu_type,))
logger.info("Registering federation EDU handler for %r", edu_type)
self.edu_handlers[edu_type] = handler
def register_query_handler(self, query_type, handler):
@ -778,6 +784,8 @@ class FederationHandlerRegistry(object):
"Already have a Query handler for %s" % (query_type,)
)
logger.info("Registering federation query handler for %r", query_type)
self.query_handlers[query_type] = handler
@defer.inlineCallbacks
@ -800,3 +808,49 @@ class FederationHandlerRegistry(object):
raise NotFoundError("No handler for Query type '%s'" % (query_type,))
return handler(args)
class ReplicationFederationHandlerRegistry(FederationHandlerRegistry):
"""A FederationHandlerRegistry for worker processes.
When receiving EDU or queries it will check if an appropriate handler has
been registered on the worker, if there isn't one then it calls off to the
master process.
"""
def __init__(self, hs):
self.config = hs.config
self.http_client = hs.get_simple_http_client()
self.clock = hs.get_clock()
self._get_query_client = ReplicationGetQueryRestServlet.make_client(hs)
self._send_edu = ReplicationFederationSendEduRestServlet.make_client(hs)
super(ReplicationFederationHandlerRegistry, self).__init__()
def on_edu(self, edu_type, origin, content):
"""Overrides FederationHandlerRegistry
"""
handler = self.edu_handlers.get(edu_type)
if handler:
return super(ReplicationFederationHandlerRegistry, self).on_edu(
edu_type, origin, content,
)
return self._send_edu(
edu_type=edu_type,
origin=origin,
content=content,
)
def on_query(self, query_type, args):
"""Overrides FederationHandlerRegistry
"""
handler = self.query_handlers.get(query_type)
if handler:
return handler(args)
return self._get_query_client(
query_type=query_type,
args=args,
)

View file

@ -58,6 +58,7 @@ class TransactionQueue(object):
"""
def __init__(self, hs):
self.hs = hs
self.server_name = hs.hostname
self.store = hs.get_datastore()
@ -308,6 +309,9 @@ class TransactionQueue(object):
Args:
states (list(UserPresenceState))
"""
if not self.hs.config.use_presence:
# No-op if presence is disabled.
return
# First we queue up the new presence by user ID, so multiple presence
# updates in quick successtion are correctly handled

View file

@ -520,7 +520,7 @@ class AuthHandler(BaseHandler):
"""
logger.info("Logging in user %s on device %s", user_id, device_id)
access_token = yield self.issue_access_token(user_id, device_id)
yield self.auth.check_auth_blocking()
yield self.auth.check_auth_blocking(user_id)
# the device *should* have been registered before we got here; however,
# it's possible we raced against a DELETE operation. The thing we
@ -734,7 +734,6 @@ class AuthHandler(BaseHandler):
@defer.inlineCallbacks
def validate_short_term_login_token_and_get_user_id(self, login_token):
yield self.auth.check_auth_blocking()
auth_api = self.hs.get_auth()
user_id = None
try:
@ -743,6 +742,7 @@ class AuthHandler(BaseHandler):
auth_api.validate_macaroon(macaroon, "login", True, user_id)
except Exception:
raise AuthError(403, "Invalid token", errcode=Codes.FORBIDDEN)
yield self.auth.check_auth_blocking(user_id)
defer.returnValue(user_id)
@defer.inlineCallbacks
@ -828,12 +828,26 @@ class AuthHandler(BaseHandler):
@defer.inlineCallbacks
def delete_threepid(self, user_id, medium, address):
"""Attempts to unbind the 3pid on the identity servers and deletes it
from the local database.
Args:
user_id (str)
medium (str)
address (str)
Returns:
Deferred[bool]: Returns True if successfully unbound the 3pid on
the identity server, False if identity server doesn't support the
unbind API.
"""
# 'Canonicalise' email addresses as per above
if medium == 'email':
address = address.lower()
identity_handler = self.hs.get_handlers().identity_handler
yield identity_handler.unbind_threepid(
result = yield identity_handler.try_unbind_threepid(
user_id,
{
'medium': medium,
@ -841,10 +855,10 @@ class AuthHandler(BaseHandler):
},
)
ret = yield self.store.user_delete_threepid(
yield self.store.user_delete_threepid(
user_id, medium, address,
)
defer.returnValue(ret)
defer.returnValue(result)
def _save_session(self, session):
# TODO: Persistent storage

View file

@ -51,7 +51,8 @@ class DeactivateAccountHandler(BaseHandler):
erase_data (bool): whether to GDPR-erase the user's data
Returns:
Deferred
Deferred[bool]: True if identity server supports removing
threepids, otherwise False.
"""
# FIXME: Theoretically there is a race here wherein user resets
# password using threepid.
@ -60,16 +61,22 @@ class DeactivateAccountHandler(BaseHandler):
# leave the user still active so they can try again.
# Ideally we would prevent password resets and then do this in the
# background thread.
# This will be set to false if the identity server doesn't support
# unbinding
identity_server_supports_unbinding = True
threepids = yield self.store.user_get_threepids(user_id)
for threepid in threepids:
try:
yield self._identity_handler.unbind_threepid(
result = yield self._identity_handler.try_unbind_threepid(
user_id,
{
'medium': threepid['medium'],
'address': threepid['address'],
},
)
identity_server_supports_unbinding &= result
except Exception:
# Do we want this to be a fatal error or should we carry on?
logger.exception("Failed to remove threepid from ID server")
@ -103,6 +110,8 @@ class DeactivateAccountHandler(BaseHandler):
# parts users from rooms (if it isn't already running)
self._start_user_parting()
defer.returnValue(identity_server_supports_unbinding)
def _start_user_parting(self):
"""
Start the process that goes through the table of users

View file

@ -23,7 +23,7 @@ from synapse.api.constants import EventTypes
from synapse.api.errors import FederationDeniedError
from synapse.types import RoomStreamToken, get_domain_from_id
from synapse.util import stringutils
from synapse.util.async import Linearizer
from synapse.util.async_helpers import Linearizer
from synapse.util.caches.expiringcache import ExpiringCache
from synapse.util.metrics import measure_func
from synapse.util.retryutils import NotRetryingDestination

View file

@ -49,10 +49,15 @@ from synapse.crypto.event_signing import (
compute_event_signature,
)
from synapse.events.validator import EventValidator
from synapse.replication.http.federation import (
ReplicationCleanRoomRestServlet,
ReplicationFederationSendEventsRestServlet,
)
from synapse.replication.http.membership import ReplicationUserJoinedLeftRoomRestServlet
from synapse.state import resolve_events_with_factory
from synapse.types import UserID, get_domain_from_id
from synapse.util import logcontext, unwrapFirstError
from synapse.util.async import Linearizer
from synapse.util.async_helpers import Linearizer
from synapse.util.distributor import user_joined_room
from synapse.util.frozenutils import unfreeze
from synapse.util.logutils import log_function
@ -91,6 +96,18 @@ class FederationHandler(BaseHandler):
self.spam_checker = hs.get_spam_checker()
self.event_creation_handler = hs.get_event_creation_handler()
self._server_notices_mxid = hs.config.server_notices_mxid
self.config = hs.config
self.http_client = hs.get_simple_http_client()
self._send_events_to_master = (
ReplicationFederationSendEventsRestServlet.make_client(hs)
)
self._notify_user_membership_change = (
ReplicationUserJoinedLeftRoomRestServlet.make_client(hs)
)
self._clean_room_for_join_client = (
ReplicationCleanRoomRestServlet.make_client(hs)
)
# When joining a room we need to queue any events for that room up
self.room_queues = {}
@ -1159,7 +1176,7 @@ class FederationHandler(BaseHandler):
)
context = yield self.state_handler.compute_event_context(event)
yield self._persist_events([(event, context)])
yield self.persist_events_and_notify([(event, context)])
defer.returnValue(event)
@ -1190,7 +1207,7 @@ class FederationHandler(BaseHandler):
)
context = yield self.state_handler.compute_event_context(event)
yield self._persist_events([(event, context)])
yield self.persist_events_and_notify([(event, context)])
defer.returnValue(event)
@ -1433,7 +1450,7 @@ class FederationHandler(BaseHandler):
event, context
)
yield self._persist_events(
yield self.persist_events_and_notify(
[(event, context)],
backfilled=backfilled,
)
@ -1471,7 +1488,7 @@ class FederationHandler(BaseHandler):
], consumeErrors=True,
))
yield self._persist_events(
yield self.persist_events_and_notify(
[
(ev_info["event"], context)
for ev_info, context in zip(event_infos, contexts)
@ -1559,7 +1576,7 @@ class FederationHandler(BaseHandler):
raise
events_to_context[e.event_id].rejected = RejectedReason.AUTH_ERROR
yield self._persist_events(
yield self.persist_events_and_notify(
[
(e, events_to_context[e.event_id])
for e in itertools.chain(auth_events, state)
@ -1570,7 +1587,7 @@ class FederationHandler(BaseHandler):
event, old_state=state
)
yield self._persist_events(
yield self.persist_events_and_notify(
[(event, new_event_context)],
)
@ -2301,7 +2318,7 @@ class FederationHandler(BaseHandler):
for revocation.
"""
try:
response = yield self.hs.get_simple_http_client().get_json(
response = yield self.http_client.get_json(
url,
{"public_key": public_key}
)
@ -2314,7 +2331,7 @@ class FederationHandler(BaseHandler):
raise AuthError(403, "Third party certificate was invalid")
@defer.inlineCallbacks
def _persist_events(self, event_and_contexts, backfilled=False):
def persist_events_and_notify(self, event_and_contexts, backfilled=False):
"""Persists events and tells the notifier/pushers about them, if
necessary.
@ -2326,14 +2343,21 @@ class FederationHandler(BaseHandler):
Returns:
Deferred
"""
max_stream_id = yield self.store.persist_events(
event_and_contexts,
backfilled=backfilled,
)
if self.config.worker_app:
yield self._send_events_to_master(
store=self.store,
event_and_contexts=event_and_contexts,
backfilled=backfilled
)
else:
max_stream_id = yield self.store.persist_events(
event_and_contexts,
backfilled=backfilled,
)
if not backfilled: # Never notify for backfilled events
for event, _ in event_and_contexts:
self._notify_persisted_event(event, max_stream_id)
if not backfilled: # Never notify for backfilled events
for event, _ in event_and_contexts:
self._notify_persisted_event(event, max_stream_id)
def _notify_persisted_event(self, event, max_stream_id):
"""Checks to see if notifier/pushers should be notified about the
@ -2366,15 +2390,30 @@ class FederationHandler(BaseHandler):
extra_users=extra_users
)
logcontext.run_in_background(
self.pusher_pool.on_new_notifications,
self.pusher_pool.on_new_notifications(
event_stream_id, max_stream_id,
)
def _clean_room_for_join(self, room_id):
return self.store.clean_room_for_join(room_id)
"""Called to clean up any data in DB for a given room, ready for the
server to join the room.
Args:
room_id (str)
"""
if self.config.worker_app:
return self._clean_room_for_join_client(room_id)
else:
return self.store.clean_room_for_join(room_id)
def user_joined_room(self, user, room_id):
"""Called when a new user has joined the room
"""
return user_joined_room(self.distributor, user, room_id)
if self.config.worker_app:
return self._notify_user_membership_change(
room_id=room_id,
user_id=user.to_string(),
change="joined",
)
else:
return user_joined_room(self.distributor, user, room_id)

View file

@ -137,15 +137,19 @@ class IdentityHandler(BaseHandler):
defer.returnValue(data)
@defer.inlineCallbacks
def unbind_threepid(self, mxid, threepid):
"""
Removes a binding from an identity server
def try_unbind_threepid(self, mxid, threepid):
"""Removes a binding from an identity server
Args:
mxid (str): Matrix user ID of binding to be removed
threepid (dict): Dict with medium & address of binding to be removed
Raises:
SynapseError: If we failed to contact the identity server
Returns:
Deferred[bool]: True on success, otherwise False
Deferred[bool]: True on success, otherwise False if the identity
server doesn't support unbinding
"""
logger.debug("unbinding threepid %r from %s", threepid, mxid)
if not self.trusted_id_servers:
@ -175,11 +179,21 @@ class IdentityHandler(BaseHandler):
content=content,
destination_is=id_server,
)
yield self.http_client.post_json_get_json(
url,
content,
headers,
)
try:
yield self.http_client.post_json_get_json(
url,
content,
headers,
)
except HttpResponseException as e:
if e.code in (400, 404, 501,):
# The remote server probably doesn't support unbinding (yet)
logger.warn("Received %d response while unbinding threepid", e.code)
defer.returnValue(False)
else:
logger.error("Failed to unbind threepid on identity server: %s", e)
raise SynapseError(502, "Failed to contact identity server")
defer.returnValue(True)
@defer.inlineCallbacks

View file

@ -25,7 +25,7 @@ from synapse.handlers.presence import format_user_presence_state
from synapse.streams.config import PaginationConfig
from synapse.types import StreamToken, UserID
from synapse.util import unwrapFirstError
from synapse.util.async import concurrently_execute
from synapse.util.async_helpers import concurrently_execute
from synapse.util.caches.snapshot_cache import SnapshotCache
from synapse.util.logcontext import make_deferred_yieldable, run_in_background
from synapse.visibility import filter_events_for_client
@ -372,6 +372,10 @@ class InitialSyncHandler(BaseHandler):
@defer.inlineCallbacks
def get_presence():
# If presence is disabled, return an empty list
if not self.hs.config.use_presence:
defer.returnValue([])
states = yield presence_handler.get_states(
[m.user_id for m in room_members],
as_event=True,

View file

@ -25,17 +25,24 @@ from twisted.internet import defer
from twisted.internet.defer import succeed
from synapse.api.constants import MAX_DEPTH, EventTypes, Membership
from synapse.api.errors import AuthError, Codes, ConsentNotGivenError, SynapseError
from synapse.api.errors import (
AuthError,
Codes,
ConsentNotGivenError,
NotFoundError,
SynapseError,
)
from synapse.api.urls import ConsentURIBuilder
from synapse.crypto.event_signing import add_hashes_and_signatures
from synapse.events.utils import serialize_event
from synapse.events.validator import EventValidator
from synapse.replication.http.send_event import ReplicationSendEventRestServlet
from synapse.types import RoomAlias, UserID
from synapse.util.async import Linearizer
from synapse.util.async_helpers import Linearizer
from synapse.util.frozenutils import frozendict_json_encoder
from synapse.util.logcontext import run_in_background
from synapse.util.metrics import measure_func
from synapse.visibility import filter_events_for_client
from ._base import BaseHandler
@ -82,28 +89,85 @@ class MessageHandler(object):
defer.returnValue(data)
@defer.inlineCallbacks
def get_state_events(self, user_id, room_id, is_guest=False):
def get_state_events(
self, user_id, room_id, types=None, filtered_types=None,
at_token=None, is_guest=False,
):
"""Retrieve all state events for a given room. If the user is
joined to the room then return the current state. If the user has
left the room return the state events from when they left.
left the room return the state events from when they left. If an explicit
'at' parameter is passed, return the state events as of that event, if
visible.
Args:
user_id(str): The user requesting state events.
room_id(str): The room ID to get all state events from.
types(list[(str, str|None)]|None): List of (type, state_key) tuples
which are used to filter the state fetched. If `state_key` is None,
all events are returned of the given type.
May be None, which matches any key.
filtered_types(list[str]|None): Only apply filtering via `types` to this
list of event types. Other types of events are returned unfiltered.
If None, `types` filtering is applied to all events.
at_token(StreamToken|None): the stream token of the at which we are requesting
the stats. If the user is not allowed to view the state as of that
stream token, we raise a 403 SynapseError. If None, returns the current
state based on the current_state_events table.
is_guest(bool): whether this user is a guest
Returns:
A list of dicts representing state events. [{}, {}, {}]
"""
membership, membership_event_id = yield self.auth.check_in_room_or_world_readable(
room_id, user_id
)
Raises:
NotFoundError (404) if the at token does not yield an event
if membership == Membership.JOIN:
room_state = yield self.state.get_current_state(room_id)
elif membership == Membership.LEAVE:
room_state = yield self.store.get_state_for_events(
[membership_event_id], None
AuthError (403) if the user doesn't have permission to view
members of this room.
"""
if at_token:
# FIXME this claims to get the state at a stream position, but
# get_recent_events_for_room operates by topo ordering. This therefore
# does not reliably give you the state at the given stream position.
# (https://github.com/matrix-org/synapse/issues/3305)
last_events, _ = yield self.store.get_recent_events_for_room(
room_id, end_token=at_token.room_key, limit=1,
)
room_state = room_state[membership_event_id]
if not last_events:
raise NotFoundError("Can't find event for token %s" % (at_token, ))
visible_events = yield filter_events_for_client(
self.store, user_id, last_events,
)
event = last_events[0]
if visible_events:
room_state = yield self.store.get_state_for_events(
[event.event_id], types, filtered_types=filtered_types,
)
room_state = room_state[event.event_id]
else:
raise AuthError(
403,
"User %s not allowed to view events in room %s at token %s" % (
user_id, room_id, at_token,
)
)
else:
membership, membership_event_id = (
yield self.auth.check_in_room_or_world_readable(
room_id, user_id,
)
)
if membership == Membership.JOIN:
state_ids = yield self.store.get_filtered_current_state_ids(
room_id, types, filtered_types=filtered_types,
)
room_state = yield self.store.get_events(state_ids.values())
elif membership == Membership.LEAVE:
room_state = yield self.store.get_state_for_events(
[membership_event_id], types, filtered_types=filtered_types,
)
room_state = room_state[membership_event_id]
now = self.clock.time_msec()
defer.returnValue(
@ -212,10 +276,14 @@ class EventCreationHandler(object):
where *hashes* is a map from algorithm to hash.
If None, they will be requested from the database.
Raises:
ResourceLimitError if server is blocked to some resource being
exceeded
Returns:
Tuple of created event (FrozenEvent), Context
"""
yield self.auth.check_auth_blocking(requester.user.to_string())
builder = self.event_builder_factory.new(event_dict)
self.validator.validate_new(builder)
@ -710,11 +778,8 @@ class EventCreationHandler(object):
event, context=context
)
# this intentionally does not yield: we don't care about the result
# and don't need to wait for it.
run_in_background(
self.pusher_pool.on_new_notifications,
event_stream_id, max_stream_id
self.pusher_pool.on_new_notifications(
event_stream_id, max_stream_id,
)
def _notify():

View file

@ -18,11 +18,11 @@ import logging
from twisted.internet import defer
from twisted.python.failure import Failure
from synapse.api.constants import Membership
from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import SynapseError
from synapse.events.utils import serialize_event
from synapse.types import RoomStreamToken
from synapse.util.async import ReadWriteLock
from synapse.util.async_helpers import ReadWriteLock
from synapse.util.logcontext import run_in_background
from synapse.util.stringutils import random_string
from synapse.visibility import filter_events_for_client
@ -251,6 +251,33 @@ class PaginationHandler(object):
is_peeking=(member_event_id is None),
)
state = None
if event_filter and event_filter.lazy_load_members():
# TODO: remove redundant members
types = [
(EventTypes.Member, state_key)
for state_key in set(
event.sender # FIXME: we also care about invite targets etc.
for event in events
)
]
state_ids = yield self.store.get_state_ids_for_event(
events[0].event_id, types=types,
)
if state_ids:
state = yield self.store.get_events(list(state_ids.values()))
if state:
state = yield filter_events_for_client(
self.store,
user_id,
state.values(),
is_peeking=(member_event_id is None),
)
time_now = self.clock.time_msec()
chunk = {
@ -262,4 +289,10 @@ class PaginationHandler(object):
"end": next_token.to_string(),
}
if state:
chunk["state"] = [
serialize_event(e, time_now, as_client_event)
for e in state
]
defer.returnValue(chunk)

View file

@ -36,7 +36,7 @@ from synapse.api.errors import SynapseError
from synapse.metrics import LaterGauge
from synapse.storage.presence import UserPresenceState
from synapse.types import UserID, get_domain_from_id
from synapse.util.async import Linearizer
from synapse.util.async_helpers import Linearizer
from synapse.util.caches.descriptors import cachedInlineCallbacks
from synapse.util.logcontext import run_in_background
from synapse.util.logutils import log_function
@ -95,6 +95,7 @@ class PresenceHandler(object):
Args:
hs (synapse.server.HomeServer):
"""
self.hs = hs
self.is_mine = hs.is_mine
self.is_mine_id = hs.is_mine_id
self.clock = hs.get_clock()
@ -230,6 +231,10 @@ class PresenceHandler(object):
earlier than they should when synapse is restarted. This affect of this
is some spurious presence changes that will self-correct.
"""
# If the DB pool has already terminated, don't try updating
if not self.hs.get_db_pool().running:
return
logger.info(
"Performing _on_shutdown. Persisting %d unpersisted changes",
len(self.user_to_current_state)
@ -390,6 +395,10 @@ class PresenceHandler(object):
"""We've seen the user do something that indicates they're interacting
with the app.
"""
# If presence is disabled, no-op
if not self.hs.config.use_presence:
return
user_id = user.to_string()
bump_active_time_counter.inc()
@ -419,6 +428,11 @@ class PresenceHandler(object):
Useful for streams that are not associated with an actual
client that is being used by a user.
"""
# Override if it should affect the user's presence, if presence is
# disabled.
if not self.hs.config.use_presence:
affect_presence = False
if affect_presence:
curr_sync = self.user_to_num_current_syncs.get(user_id, 0)
self.user_to_num_current_syncs[user_id] = curr_sync + 1
@ -464,13 +478,16 @@ class PresenceHandler(object):
Returns:
set(str): A set of user_id strings.
"""
syncing_user_ids = {
user_id for user_id, count in self.user_to_num_current_syncs.items()
if count
}
for user_ids in self.external_process_to_current_syncs.values():
syncing_user_ids.update(user_ids)
return syncing_user_ids
if self.hs.config.use_presence:
syncing_user_ids = {
user_id for user_id, count in self.user_to_num_current_syncs.items()
if count
}
for user_ids in self.external_process_to_current_syncs.values():
syncing_user_ids.update(user_ids)
return syncing_user_ids
else:
return set()
@defer.inlineCallbacks
def update_external_syncs_row(self, process_id, user_id, is_syncing, sync_time_msec):

View file

@ -17,7 +17,7 @@ import logging
from twisted.internet import defer
from synapse.util.async import Linearizer
from synapse.util.async_helpers import Linearizer
from ._base import BaseHandler

View file

@ -18,7 +18,6 @@ from twisted.internet import defer
from synapse.types import get_domain_from_id
from synapse.util import logcontext
from synapse.util.logcontext import PreserveLoggingContext
from ._base import BaseHandler
@ -116,16 +115,15 @@ class ReceiptsHandler(BaseHandler):
affected_room_ids = list(set([r["room_id"] for r in receipts]))
with PreserveLoggingContext():
self.notifier.on_new_event(
"receipt_key", max_batch_id, rooms=affected_room_ids
)
# Note that the min here shouldn't be relied upon to be accurate.
self.hs.get_pusherpool().on_new_receipts(
min_batch_id, max_batch_id, affected_room_ids
)
self.notifier.on_new_event(
"receipt_key", max_batch_id, rooms=affected_room_ids
)
# Note that the min here shouldn't be relied upon to be accurate.
self.hs.get_pusherpool().on_new_receipts(
min_batch_id, max_batch_id, affected_room_ids,
)
defer.returnValue(True)
defer.returnValue(True)
@logcontext.preserve_fn # caller should not yield on this
@defer.inlineCallbacks

View file

@ -28,7 +28,7 @@ from synapse.api.errors import (
)
from synapse.http.client import CaptchaServerHttpClient
from synapse.types import RoomAlias, RoomID, UserID, create_requester
from synapse.util.async import Linearizer
from synapse.util.async_helpers import Linearizer
from synapse.util.threepids import check_3pid_allowed
from ._base import BaseHandler
@ -144,7 +144,8 @@ class RegistrationHandler(BaseHandler):
Raises:
RegistrationError if there was a problem registering.
"""
yield self._check_mau_limits()
yield self.auth.check_auth_blocking()
password_hash = None
if password:
password_hash = yield self.auth_handler().hash(password)
@ -289,7 +290,7 @@ class RegistrationHandler(BaseHandler):
400,
"User ID can only contain characters a-z, 0-9, or '=_-./'",
)
yield self._check_mau_limits()
yield self.auth.check_auth_blocking()
user = UserID(localpart, self.hs.hostname)
user_id = user.to_string()
@ -439,7 +440,7 @@ class RegistrationHandler(BaseHandler):
"""
if localpart is None:
raise SynapseError(400, "Request must include user id")
yield self._check_mau_limits()
yield self.auth.check_auth_blocking()
need_register = True
try:
@ -533,14 +534,3 @@ class RegistrationHandler(BaseHandler):
remote_room_hosts=remote_room_hosts,
action="join",
)
@defer.inlineCallbacks
def _check_mau_limits(self):
"""
Do not accept registrations if monthly active user limits exceeded
and limiting is enabled
"""
try:
yield self.auth.check_auth_blocking()
except AuthError as e:
raise RegistrationError(e.code, str(e), e.errcode)

View file

@ -98,9 +98,13 @@ class RoomCreationHandler(BaseHandler):
Raises:
SynapseError if the room ID couldn't be stored, or something went
horribly wrong.
ResourceLimitError if server is blocked to some resource being
exceeded
"""
user_id = requester.user.to_string()
self.auth.check_auth_blocking(user_id)
if not self.spam_checker.user_may_create_room(user_id):
raise SynapseError(403, "You are not permitted to create rooms")

View file

@ -26,7 +26,7 @@ from twisted.internet import defer
from synapse.api.constants import EventTypes, JoinRules
from synapse.types import ThirdPartyInstanceID
from synapse.util.async import concurrently_execute
from synapse.util.async_helpers import concurrently_execute
from synapse.util.caches.descriptors import cachedInlineCallbacks
from synapse.util.caches.response_cache import ResponseCache

View file

@ -30,7 +30,7 @@ import synapse.types
from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import AuthError, Codes, SynapseError
from synapse.types import RoomID, UserID
from synapse.util.async import Linearizer
from synapse.util.async_helpers import Linearizer
from synapse.util.distributor import user_joined_room, user_left_room
logger = logging.getLogger(__name__)

View file

@ -25,7 +25,7 @@ from twisted.internet import defer
from synapse.api.constants import EventTypes, Membership
from synapse.push.clientformat import format_push_rules_for_user
from synapse.types import RoomStreamToken
from synapse.util.async import concurrently_execute
from synapse.util.async_helpers import concurrently_execute
from synapse.util.caches.expiringcache import ExpiringCache
from synapse.util.caches.lrucache import LruCache
from synapse.util.caches.response_cache import ResponseCache
@ -75,6 +75,7 @@ class JoinedSyncResult(collections.namedtuple("JoinedSyncResult", [
"ephemeral",
"account_data",
"unread_notifications",
"summary",
])):
__slots__ = []
@ -184,6 +185,7 @@ class SyncResult(collections.namedtuple("SyncResult", [
class SyncHandler(object):
def __init__(self, hs):
self.hs_config = hs.config
self.store = hs.get_datastore()
self.notifier = hs.get_notifier()
self.presence_handler = hs.get_presence_handler()
@ -191,6 +193,7 @@ class SyncHandler(object):
self.clock = hs.get_clock()
self.response_cache = ResponseCache(hs, "sync")
self.state = hs.get_state_handler()
self.auth = hs.get_auth()
# ExpiringCache((User, Device)) -> LruCache(state_key => event_id)
self.lazy_loaded_members_cache = ExpiringCache(
@ -198,19 +201,27 @@ class SyncHandler(object):
max_len=0, expiry_ms=LAZY_LOADED_MEMBERS_CACHE_MAX_AGE,
)
@defer.inlineCallbacks
def wait_for_sync_for_user(self, sync_config, since_token=None, timeout=0,
full_state=False):
"""Get the sync for a client if we have new data for it now. Otherwise
wait for new data to arrive on the server. If the timeout expires, then
return an empty sync result.
Returns:
A Deferred SyncResult.
Deferred[SyncResult]
"""
return self.response_cache.wrap(
# If the user is not part of the mau group, then check that limits have
# not been exceeded (if not part of the group by this point, almost certain
# auth_blocking will occur)
user_id = sync_config.user.to_string()
yield self.auth.check_auth_blocking(user_id)
res = yield self.response_cache.wrap(
sync_config.request_key,
self._wait_for_sync_for_user,
sync_config, since_token, timeout, full_state,
)
defer.returnValue(res)
@defer.inlineCallbacks
def _wait_for_sync_for_user(self, sync_config, since_token, timeout,
@ -494,10 +505,142 @@ class SyncHandler(object):
state = {}
defer.returnValue(state)
@defer.inlineCallbacks
def compute_summary(self, room_id, sync_config, batch, state, now_token):
""" Works out a room summary block for this room, summarising the number
of joined members in the room, and providing the 'hero' members if the
room has no name so clients can consistently name rooms. Also adds
state events to 'state' if needed to describe the heroes.
Args:
room_id(str):
sync_config(synapse.handlers.sync.SyncConfig):
batch(synapse.handlers.sync.TimelineBatch): The timeline batch for
the room that will be sent to the user.
state(dict): dict of (type, state_key) -> Event as returned by
compute_state_delta
now_token(str): Token of the end of the current batch.
Returns:
A deferred dict describing the room summary
"""
# FIXME: this promulgates https://github.com/matrix-org/synapse/issues/3305
last_events, _ = yield self.store.get_recent_event_ids_for_room(
room_id, end_token=now_token.room_key, limit=1,
)
if not last_events:
defer.returnValue(None)
return
last_event = last_events[-1]
state_ids = yield self.store.get_state_ids_for_event(
last_event.event_id, [
(EventTypes.Member, None),
(EventTypes.Name, ''),
(EventTypes.CanonicalAlias, ''),
]
)
member_ids = {
state_key: event_id
for (t, state_key), event_id in state_ids.iteritems()
if t == EventTypes.Member
}
name_id = state_ids.get((EventTypes.Name, ''))
canonical_alias_id = state_ids.get((EventTypes.CanonicalAlias, ''))
summary = {}
# FIXME: it feels very heavy to load up every single membership event
# just to calculate the counts.
member_events = yield self.store.get_events(member_ids.values())
joined_user_ids = []
invited_user_ids = []
for ev in member_events.values():
if ev.content.get("membership") == Membership.JOIN:
joined_user_ids.append(ev.state_key)
elif ev.content.get("membership") == Membership.INVITE:
invited_user_ids.append(ev.state_key)
# TODO: only send these when they change.
summary["m.joined_member_count"] = len(joined_user_ids)
summary["m.invited_member_count"] = len(invited_user_ids)
if name_id or canonical_alias_id:
defer.returnValue(summary)
# FIXME: order by stream ordering, not alphabetic
me = sync_config.user.to_string()
if (joined_user_ids or invited_user_ids):
summary['m.heroes'] = sorted(
[
user_id
for user_id in (joined_user_ids + invited_user_ids)
if user_id != me
]
)[0:5]
else:
summary['m.heroes'] = sorted(
[user_id for user_id in member_ids.keys() if user_id != me]
)[0:5]
if not sync_config.filter_collection.lazy_load_members():
defer.returnValue(summary)
# ensure we send membership events for heroes if needed
cache_key = (sync_config.user.to_string(), sync_config.device_id)
cache = self.get_lazy_loaded_members_cache(cache_key)
# track which members the client should already know about via LL:
# Ones which are already in state...
existing_members = set(
user_id for (typ, user_id) in state.keys()
if typ == EventTypes.Member
)
# ...or ones which are in the timeline...
for ev in batch.events:
if ev.type == EventTypes.Member:
existing_members.add(ev.state_key)
# ...and then ensure any missing ones get included in state.
missing_hero_event_ids = [
member_ids[hero_id]
for hero_id in summary['m.heroes']
if (
cache.get(hero_id) != member_ids[hero_id] and
hero_id not in existing_members
)
]
missing_hero_state = yield self.store.get_events(missing_hero_event_ids)
missing_hero_state = missing_hero_state.values()
for s in missing_hero_state:
cache.set(s.state_key, s.event_id)
state[(EventTypes.Member, s.state_key)] = s
defer.returnValue(summary)
def get_lazy_loaded_members_cache(self, cache_key):
cache = self.lazy_loaded_members_cache.get(cache_key)
if cache is None:
logger.debug("creating LruCache for %r", cache_key)
cache = LruCache(LAZY_LOADED_MEMBERS_CACHE_MAX_SIZE)
self.lazy_loaded_members_cache[cache_key] = cache
else:
logger.debug("found LruCache for %r", cache_key)
return cache
@defer.inlineCallbacks
def compute_state_delta(self, room_id, batch, sync_config, since_token, now_token,
full_state):
""" Works out the differnce in state between the start of the timeline
""" Works out the difference in state between the start of the timeline
and the previous sync.
Args:
@ -511,7 +654,7 @@ class SyncHandler(object):
full_state(bool): Whether to force returning the full state.
Returns:
A deferred new event dictionary
A deferred dict of (type, state_key) -> Event
"""
# TODO(mjark) Check if the state events were received by the server
# after the previous sync, since we need to include those state
@ -609,13 +752,7 @@ class SyncHandler(object):
if lazy_load_members and not include_redundant_members:
cache_key = (sync_config.user.to_string(), sync_config.device_id)
cache = self.lazy_loaded_members_cache.get(cache_key)
if cache is None:
logger.debug("creating LruCache for %r", cache_key)
cache = LruCache(LAZY_LOADED_MEMBERS_CACHE_MAX_SIZE)
self.lazy_loaded_members_cache[cache_key] = cache
else:
logger.debug("found LruCache for %r", cache_key)
cache = self.get_lazy_loaded_members_cache(cache_key)
# if it's a new sync sequence, then assume the client has had
# amnesia and doesn't want any recent lazy-loaded members
@ -724,7 +861,7 @@ class SyncHandler(object):
since_token is None and
sync_config.filter_collection.blocks_all_presence()
)
if not block_all_presence_data:
if self.hs_config.use_presence and not block_all_presence_data:
yield self._generate_sync_entry_for_presence(
sync_result_builder, newly_joined_rooms, newly_joined_users
)
@ -1416,7 +1553,6 @@ class SyncHandler(object):
if events == [] and tags is None:
return
since_token = sync_result_builder.since_token
now_token = sync_result_builder.now_token
sync_config = sync_result_builder.sync_config
@ -1459,6 +1595,18 @@ class SyncHandler(object):
full_state=full_state
)
summary = {}
if (
sync_config.filter_collection.lazy_load_members() and
(
any(ev.type == EventTypes.Member for ev in batch.events) or
since_token is None
)
):
summary = yield self.compute_summary(
room_id, sync_config, batch, state, now_token
)
if room_builder.rtype == "joined":
unread_notifications = {}
room_sync = JoinedSyncResult(
@ -1468,6 +1616,7 @@ class SyncHandler(object):
ephemeral=ephemeral,
account_data=account_data_events,
unread_notifications=unread_notifications,
summary=summary,
)
if room_sync or always_include:

View file

@ -42,7 +42,7 @@ from twisted.web.http_headers import Headers
from synapse.api.errors import Codes, HttpResponseException, SynapseError
from synapse.http import cancelled_to_request_timed_out_error, redact_uri
from synapse.http.endpoint import SpiderEndpoint
from synapse.util.async import add_timeout_to_deferred
from synapse.util.async_helpers import add_timeout_to_deferred
from synapse.util.caches import CACHE_SIZE_FACTOR
from synapse.util.logcontext import make_deferred_yieldable

View file

@ -26,7 +26,6 @@ from twisted.names.error import DNSNameError, DomainError
logger = logging.getLogger(__name__)
SERVER_CACHE = {}
# our record of an individual server which can be tried to reach a destination.
@ -103,15 +102,16 @@ def parse_and_validate_server_name(server_name):
return host, port
def matrix_federation_endpoint(reactor, destination, ssl_context_factory=None,
def matrix_federation_endpoint(reactor, destination, tls_client_options_factory=None,
timeout=None):
"""Construct an endpoint for the given matrix destination.
Args:
reactor: Twisted reactor.
destination (bytes): The name of the server to connect to.
ssl_context_factory (twisted.internet.ssl.ContextFactory): Factory
which generates SSL contexts to use for TLS.
tls_client_options_factory
(synapse.crypto.context_factory.ClientTLSOptionsFactory):
Factory which generates TLS options for client connections.
timeout (int): connection timeout in seconds
"""
@ -122,13 +122,13 @@ def matrix_federation_endpoint(reactor, destination, ssl_context_factory=None,
if timeout is not None:
endpoint_kw_args.update(timeout=timeout)
if ssl_context_factory is None:
if tls_client_options_factory is None:
transport_endpoint = HostnameEndpoint
default_port = 8008
else:
def transport_endpoint(reactor, host, port, timeout):
return wrapClientTLS(
ssl_context_factory,
tls_client_options_factory.get_options(host),
HostnameEndpoint(reactor, host, port, timeout=timeout))
default_port = 8448

View file

@ -43,7 +43,7 @@ from synapse.api.errors import (
from synapse.http import cancelled_to_request_timed_out_error
from synapse.http.endpoint import matrix_federation_endpoint
from synapse.util import logcontext
from synapse.util.async import add_timeout_to_deferred
from synapse.util.async_helpers import add_timeout_to_deferred
from synapse.util.logcontext import make_deferred_yieldable
logger = logging.getLogger(__name__)
@ -61,14 +61,14 @@ MAX_SHORT_RETRIES = 3
class MatrixFederationEndpointFactory(object):
def __init__(self, hs):
self.tls_server_context_factory = hs.tls_server_context_factory
self.tls_client_options_factory = hs.tls_client_options_factory
def endpointForURI(self, uri):
destination = uri.netloc
return matrix_federation_endpoint(
reactor, destination, timeout=10,
ssl_context_factory=self.tls_server_context_factory
tls_client_options_factory=self.tls_client_options_factory
)

View file

@ -25,8 +25,9 @@ from canonicaljson import encode_canonical_json, encode_pretty_printed_json, jso
from twisted.internet import defer
from twisted.python import failure
from twisted.web import resource, server
from twisted.web import resource
from twisted.web.server import NOT_DONE_YET
from twisted.web.static import NoRangeStaticProducer
from twisted.web.util import redirectTo
import synapse.events
@ -37,10 +38,13 @@ from synapse.api.errors import (
SynapseError,
UnrecognizedRequestError,
)
from synapse.http.request_metrics import requests_counter
from synapse.util.caches import intern_dict
from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
from synapse.util.metrics import Measure
from synapse.util.logcontext import preserve_fn
if PY3:
from io import BytesIO
else:
from cStringIO import StringIO as BytesIO
logger = logging.getLogger(__name__)
@ -60,11 +64,10 @@ HTML_ERROR_TEMPLATE = """<!DOCTYPE html>
def wrap_json_request_handler(h):
"""Wraps a request handler method with exception handling.
Also adds logging as per wrap_request_handler_with_logging.
Also does the wrapping with request.processing as per wrap_async_request_handler.
The handler method must have a signature of "handle_foo(self, request)",
where "self" must have a "clock" attribute (and "request" must be a
SynapseRequest).
where "request" must be a SynapseRequest.
The handler must return a deferred. If the deferred succeeds we assume that
a response has been sent. If the deferred fails with a SynapseError we use
@ -108,24 +111,23 @@ def wrap_json_request_handler(h):
pretty_print=_request_user_agent_is_curl(request),
)
return wrap_request_handler_with_logging(wrapped_request_handler)
return wrap_async_request_handler(wrapped_request_handler)
def wrap_html_request_handler(h):
"""Wraps a request handler method with exception handling.
Also adds logging as per wrap_request_handler_with_logging.
Also does the wrapping with request.processing as per wrap_async_request_handler.
The handler method must have a signature of "handle_foo(self, request)",
where "self" must have a "clock" attribute (and "request" must be a
SynapseRequest).
where "request" must be a SynapseRequest.
"""
def wrapped_request_handler(self, request):
d = defer.maybeDeferred(h, self, request)
d.addErrback(_return_html_error, request)
return d
return wrap_request_handler_with_logging(wrapped_request_handler)
return wrap_async_request_handler(wrapped_request_handler)
def _return_html_error(f, request):
@ -170,46 +172,26 @@ def _return_html_error(f, request):
finish_request(request)
def wrap_request_handler_with_logging(h):
"""Wraps a request handler to provide logging and metrics
def wrap_async_request_handler(h):
"""Wraps an async request handler so that it calls request.processing.
This helps ensure that work done by the request handler after the request is completed
is correctly recorded against the request metrics/logs.
The handler method must have a signature of "handle_foo(self, request)",
where "self" must have a "clock" attribute (and "request" must be a
SynapseRequest).
where "request" must be a SynapseRequest.
As well as calling `request.processing` (which will log the response and
duration for this request), the wrapped request handler will insert the
request id into the logging context.
The handler may return a deferred, in which case the completion of the request isn't
logged until the deferred completes.
"""
@defer.inlineCallbacks
def wrapped_request_handler(self, request):
"""
Args:
self:
request (synapse.http.site.SynapseRequest):
"""
def wrapped_async_request_handler(self, request):
with request.processing():
yield h(self, request)
request_id = request.get_request_id()
with LoggingContext(request_id) as request_context:
request_context.request = request_id
with Measure(self.clock, "wrapped_request_handler"):
# we start the request metrics timer here with an initial stab
# at the servlet name. For most requests that name will be
# JsonResource (or a subclass), and JsonResource._async_render
# will update it once it picks a servlet.
servlet_name = self.__class__.__name__
with request.processing(servlet_name):
with PreserveLoggingContext(request_context):
d = defer.maybeDeferred(h, self, request)
# record the arrival of the request *after*
# dispatching to the handler, so that the handler
# can update the servlet name in the request
# metrics
requests_counter.labels(request.method,
request.request_metrics.name).inc()
yield d
return wrapped_request_handler
# we need to preserve_fn here, because the synchronous render method won't yield for
# us (obviously)
return preserve_fn(wrapped_async_request_handler)
class HttpServer(object):
@ -272,7 +254,7 @@ class JsonResource(HttpServer, resource.Resource):
""" This gets called by twisted every time someone sends us a request.
"""
self._async_render(request)
return server.NOT_DONE_YET
return NOT_DONE_YET
@wrap_json_request_handler
@defer.inlineCallbacks
@ -413,8 +395,7 @@ def respond_with_json(request, code, json_object, send_cors=False,
return
if pretty_print:
json_bytes = (encode_pretty_printed_json(json_object) + "\n"
).encode("utf-8")
json_bytes = encode_pretty_printed_json(json_object) + b"\n"
else:
if canonical_json or synapse.events.USE_FROZEN_DICTS:
# canonicaljson already encodes to bytes
@ -450,8 +431,12 @@ def respond_with_json_bytes(request, code, json_bytes, send_cors=False,
if send_cors:
set_cors_headers(request)
request.write(json_bytes)
finish_request(request)
# todo: we can almost certainly avoid this copy and encode the json straight into
# the bytesIO, but it would involve faffing around with string->bytes wrappers.
bytes_io = BytesIO(json_bytes)
producer = NoRangeStaticProducer(request, bytes_io)
producer.start()
return NOT_DONE_YET

View file

@ -11,7 +11,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import contextlib
import logging
import time
@ -19,8 +18,8 @@ import time
from twisted.web.server import Request, Site
from synapse.http import redact_uri
from synapse.http.request_metrics import RequestMetrics
from synapse.util.logcontext import ContextResourceUsage, LoggingContext
from synapse.http.request_metrics import RequestMetrics, requests_counter
from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
logger = logging.getLogger(__name__)
@ -34,25 +33,43 @@ class SynapseRequest(Request):
It extends twisted's twisted.web.server.Request, and adds:
* Unique request ID
* A log context associated with the request
* Redaction of access_token query-params in __repr__
* Logging at start and end
* Metrics to record CPU, wallclock and DB time by endpoint.
It provides a method `processing` which should be called by the Resource
which is handling the request, and returns a context manager.
It also provides a method `processing`, which returns a context manager. If this
method is called, the request won't be logged until the context manager is closed;
this is useful for asynchronous request handlers which may go on processing the
request even after the client has disconnected.
Attributes:
logcontext(LoggingContext) : the log context for this request
"""
def __init__(self, site, channel, *args, **kw):
Request.__init__(self, channel, *args, **kw)
self.site = site
self._channel = channel
self._channel = channel # this is used by the tests
self.authenticated_entity = None
self.start_time = 0
# we can't yet create the logcontext, as we don't know the method.
self.logcontext = None
global _next_request_seq
self.request_seq = _next_request_seq
_next_request_seq += 1
# whether an asynchronous request handler has called processing()
self._is_processing = False
# the time when the asynchronous request handler completed its processing
self._processing_finished_time = None
# what time we finished sending the response to the client (or the connection
# dropped)
self.finish_time = None
def __repr__(self):
# We overwrite this so that we don't log ``access_token``
return '<%s at 0x%x method=%r uri=%r clientproto=%r site=%r>' % (
@ -74,11 +91,116 @@ class SynapseRequest(Request):
return self.requestHeaders.getRawHeaders(b"User-Agent", [None])[-1]
def render(self, resrc):
# this is called once a Resource has been found to serve the request; in our
# case the Resource in question will normally be a JsonResource.
# create a LogContext for this request
request_id = self.get_request_id()
logcontext = self.logcontext = LoggingContext(request_id)
logcontext.request = request_id
# override the Server header which is set by twisted
self.setHeader("Server", self.site.server_version_string)
return Request.render(self, resrc)
with PreserveLoggingContext(self.logcontext):
# we start the request metrics timer here with an initial stab
# at the servlet name. For most requests that name will be
# JsonResource (or a subclass), and JsonResource._async_render
# will update it once it picks a servlet.
servlet_name = resrc.__class__.__name__
self._started_processing(servlet_name)
Request.render(self, resrc)
# record the arrival of the request *after*
# dispatching to the handler, so that the handler
# can update the servlet name in the request
# metrics
requests_counter.labels(self.method,
self.request_metrics.name).inc()
@contextlib.contextmanager
def processing(self):
"""Record the fact that we are processing this request.
Returns a context manager; the correct way to use this is:
@defer.inlineCallbacks
def handle_request(request):
with request.processing("FooServlet"):
yield really_handle_the_request()
Once the context manager is closed, the completion of the request will be logged,
and the various metrics will be updated.
"""
if self._is_processing:
raise RuntimeError("Request is already processing")
self._is_processing = True
try:
yield
except Exception:
# this should already have been caught, and sent back to the client as a 500.
logger.exception("Asynchronous messge handler raised an uncaught exception")
finally:
# the request handler has finished its work and either sent the whole response
# back, or handed over responsibility to a Producer.
self._processing_finished_time = time.time()
self._is_processing = False
# if we've already sent the response, log it now; otherwise, we wait for the
# response to be sent.
if self.finish_time is not None:
self._finished_processing()
def finish(self):
"""Called when all response data has been written to this Request.
Overrides twisted.web.server.Request.finish to record the finish time and do
logging.
"""
self.finish_time = time.time()
Request.finish(self)
if not self._is_processing:
with PreserveLoggingContext(self.logcontext):
self._finished_processing()
def connectionLost(self, reason):
"""Called when the client connection is closed before the response is written.
Overrides twisted.web.server.Request.connectionLost to record the finish time and
do logging.
"""
self.finish_time = time.time()
Request.connectionLost(self, reason)
# we only get here if the connection to the client drops before we send
# the response.
#
# It's useful to log it here so that we can get an idea of when
# the client disconnects.
with PreserveLoggingContext(self.logcontext):
logger.warn(
"Error processing request: %s %s", reason.type, reason.value,
)
if not self._is_processing:
self._finished_processing()
def _started_processing(self, servlet_name):
"""Record the fact that we are processing this request.
This will log the request's arrival. Once the request completes,
be sure to call finished_processing.
Args:
servlet_name (str): the name of the servlet which will be
processing this request. This is used in the metrics.
It is possible to update this afterwards by updating
self.request_metrics.name.
"""
self.start_time = time.time()
self.request_metrics = RequestMetrics()
self.request_metrics.start(
@ -94,13 +216,21 @@ class SynapseRequest(Request):
)
def _finished_processing(self):
try:
context = LoggingContext.current_context()
usage = context.get_resource_usage()
except Exception:
usage = ContextResourceUsage()
"""Log the completion of this request and update the metrics
"""
end_time = time.time()
usage = self.logcontext.get_resource_usage()
if self._processing_finished_time is None:
# we completed the request without anything calling processing()
self._processing_finished_time = time.time()
# the time between receiving the request and the request handler finishing
processing_time = self._processing_finished_time - self.start_time
# the time between the request handler finishing and the response being sent
# to the client (nb may be negative)
response_send_time = self.finish_time - self._processing_finished_time
# need to decode as it could be raw utf-8 bytes
# from a IDN servname in an auth header
@ -116,22 +246,31 @@ class SynapseRequest(Request):
user_agent = self.get_user_agent()
if user_agent is not None:
user_agent = user_agent.decode("utf-8", "replace")
else:
user_agent = "-"
code = str(self.code)
if not self.finished:
# we didn't send the full response before we gave up (presumably because
# the connection dropped)
code += "!"
self.site.access_logger.info(
"%s - %s - {%s}"
" Processed request: %.3fsec (%.3fsec, %.3fsec) (%.3fsec/%.3fsec/%d)"
" Processed request: %.3fsec/%.3fsec (%.3fsec, %.3fsec) (%.3fsec/%.3fsec/%d)"
" %sB %s \"%s %s %s\" \"%s\" [%d dbevts]",
self.getClientIP(),
self.site.site_tag,
authenticated_entity,
end_time - self.start_time,
processing_time,
response_send_time,
usage.ru_utime,
usage.ru_stime,
usage.db_sched_duration_sec,
usage.db_txn_duration_sec,
int(usage.db_txn_count),
self.sentLength,
self.code,
code,
self.method,
self.get_redacted_uri(),
self.clientproto,
@ -140,38 +279,10 @@ class SynapseRequest(Request):
)
try:
self.request_metrics.stop(end_time, self)
self.request_metrics.stop(self.finish_time, self)
except Exception as e:
logger.warn("Failed to stop metrics: %r", e)
@contextlib.contextmanager
def processing(self, servlet_name):
"""Record the fact that we are processing this request.
Returns a context manager; the correct way to use this is:
@defer.inlineCallbacks
def handle_request(request):
with request.processing("FooServlet"):
yield really_handle_the_request()
This will log the request's arrival. Once the context manager is
closed, the completion of the request will be logged, and the various
metrics will be updated.
Args:
servlet_name (str): the name of the servlet which will be
processing this request. This is used in the metrics.
It is possible to update this afterwards by updating
self.request_metrics.servlet_name.
"""
# TODO: we should probably just move this into render() and finish(),
# to save having to call a separate method.
self._started_processing(servlet_name)
yield
self._finished_processing()
class XForwardedForRequest(SynapseRequest):
def __init__(self, *args, **kw):

View file

@ -25,7 +25,7 @@ from synapse.api.errors import AuthError
from synapse.handlers.presence import format_user_presence_state
from synapse.metrics import LaterGauge
from synapse.types import StreamToken
from synapse.util.async import (
from synapse.util.async_helpers import (
DeferredTimeoutError,
ObservableDeferred,
add_timeout_to_deferred,

View file

@ -26,7 +26,7 @@ from twisted.internet import defer
from synapse.api.constants import EventTypes, Membership
from synapse.event_auth import get_user_power_level
from synapse.state import POWER_KEY
from synapse.util.async import Linearizer
from synapse.util.async_helpers import Linearizer
from synapse.util.caches import register_cache
from synapse.util.caches.descriptors import cached

View file

@ -35,7 +35,7 @@ from synapse.push.presentable_names import (
name_from_member_event,
)
from synapse.types import UserID
from synapse.util.async import concurrently_execute
from synapse.util.async_helpers import concurrently_execute
from synapse.visibility import filter_events_for_client
logger = logging.getLogger(__name__)

View file

@ -18,6 +18,7 @@ import logging
from twisted.internet import defer
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.push.pusher import PusherFactory
from synapse.util.logcontext import make_deferred_yieldable, run_in_background
@ -122,8 +123,14 @@ class PusherPool:
p['app_id'], p['pushkey'], p['user_name'],
)
@defer.inlineCallbacks
def on_new_notifications(self, min_stream_id, max_stream_id):
run_as_background_process(
"on_new_notifications",
self._on_new_notifications, min_stream_id, max_stream_id,
)
@defer.inlineCallbacks
def _on_new_notifications(self, min_stream_id, max_stream_id):
try:
users_affected = yield self.store.get_push_action_users_in_range(
min_stream_id, max_stream_id
@ -147,8 +154,14 @@ class PusherPool:
except Exception:
logger.exception("Exception in pusher on_new_notifications")
@defer.inlineCallbacks
def on_new_receipts(self, min_stream_id, max_stream_id, affected_room_ids):
run_as_background_process(
"on_new_receipts",
self._on_new_receipts, min_stream_id, max_stream_id, affected_room_ids,
)
@defer.inlineCallbacks
def _on_new_receipts(self, min_stream_id, max_stream_id, affected_room_ids):
try:
# Need to subtract 1 from the minimum because the lower bound here
# is not inclusive

View file

@ -14,7 +14,7 @@
# limitations under the License.
from synapse.http.server import JsonResource
from synapse.replication.http import membership, send_event
from synapse.replication.http import federation, membership, send_event
REPLICATION_PREFIX = "/_synapse/replication"
@ -27,3 +27,4 @@ class ReplicationRestResource(JsonResource):
def register_servlets(self, hs):
send_event.register_servlets(hs, self)
membership.register_servlets(hs, self)
federation.register_servlets(hs, self)

View file

@ -0,0 +1,259 @@
# -*- coding: utf-8 -*-
# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from twisted.internet import defer
from synapse.events import FrozenEvent
from synapse.events.snapshot import EventContext
from synapse.http.servlet import parse_json_object_from_request
from synapse.replication.http._base import ReplicationEndpoint
from synapse.util.metrics import Measure
logger = logging.getLogger(__name__)
class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
"""Handles events newly received from federation, including persisting and
notifying.
The API looks like:
POST /_synapse/replication/fed_send_events/:txn_id
{
"events": [{
"event": { .. serialized event .. },
"internal_metadata": { .. serialized internal_metadata .. },
"rejected_reason": .., // The event.rejected_reason field
"context": { .. serialized event context .. },
}],
"backfilled": false
"""
NAME = "fed_send_events"
PATH_ARGS = ()
def __init__(self, hs):
super(ReplicationFederationSendEventsRestServlet, self).__init__(hs)
self.store = hs.get_datastore()
self.clock = hs.get_clock()
self.federation_handler = hs.get_handlers().federation_handler
@staticmethod
@defer.inlineCallbacks
def _serialize_payload(store, event_and_contexts, backfilled):
"""
Args:
store
event_and_contexts (list[tuple[FrozenEvent, EventContext]])
backfilled (bool): Whether or not the events are the result of
backfilling
"""
event_payloads = []
for event, context in event_and_contexts:
serialized_context = yield context.serialize(event, store)
event_payloads.append({
"event": event.get_pdu_json(),
"internal_metadata": event.internal_metadata.get_dict(),
"rejected_reason": event.rejected_reason,
"context": serialized_context,
})
payload = {
"events": event_payloads,
"backfilled": backfilled,
}
defer.returnValue(payload)
@defer.inlineCallbacks
def _handle_request(self, request):
with Measure(self.clock, "repl_fed_send_events_parse"):
content = parse_json_object_from_request(request)
backfilled = content["backfilled"]
event_payloads = content["events"]
event_and_contexts = []
for event_payload in event_payloads:
event_dict = event_payload["event"]
internal_metadata = event_payload["internal_metadata"]
rejected_reason = event_payload["rejected_reason"]
event = FrozenEvent(event_dict, internal_metadata, rejected_reason)
context = yield EventContext.deserialize(
self.store, event_payload["context"],
)
event_and_contexts.append((event, context))
logger.info(
"Got %d events from federation",
len(event_and_contexts),
)
yield self.federation_handler.persist_events_and_notify(
event_and_contexts, backfilled,
)
defer.returnValue((200, {}))
class ReplicationFederationSendEduRestServlet(ReplicationEndpoint):
"""Handles EDUs newly received from federation, including persisting and
notifying.
Request format:
POST /_synapse/replication/fed_send_edu/:edu_type/:txn_id
{
"origin": ...,
"content: { ... }
}
"""
NAME = "fed_send_edu"
PATH_ARGS = ("edu_type",)
def __init__(self, hs):
super(ReplicationFederationSendEduRestServlet, self).__init__(hs)
self.store = hs.get_datastore()
self.clock = hs.get_clock()
self.registry = hs.get_federation_registry()
@staticmethod
def _serialize_payload(edu_type, origin, content):
return {
"origin": origin,
"content": content,
}
@defer.inlineCallbacks
def _handle_request(self, request, edu_type):
with Measure(self.clock, "repl_fed_send_edu_parse"):
content = parse_json_object_from_request(request)
origin = content["origin"]
edu_content = content["content"]
logger.info(
"Got %r edu from %s",
edu_type, origin,
)
result = yield self.registry.on_edu(edu_type, origin, edu_content)
defer.returnValue((200, result))
class ReplicationGetQueryRestServlet(ReplicationEndpoint):
"""Handle responding to queries from federation.
Request format:
POST /_synapse/replication/fed_query/:query_type
{
"args": { ... }
}
"""
NAME = "fed_query"
PATH_ARGS = ("query_type",)
# This is a query, so let's not bother caching
CACHE = False
def __init__(self, hs):
super(ReplicationGetQueryRestServlet, self).__init__(hs)
self.store = hs.get_datastore()
self.clock = hs.get_clock()
self.registry = hs.get_federation_registry()
@staticmethod
def _serialize_payload(query_type, args):
"""
Args:
query_type (str)
args (dict): The arguments received for the given query type
"""
return {
"args": args,
}
@defer.inlineCallbacks
def _handle_request(self, request, query_type):
with Measure(self.clock, "repl_fed_query_parse"):
content = parse_json_object_from_request(request)
args = content["args"]
logger.info(
"Got %r query",
query_type,
)
result = yield self.registry.on_query(query_type, args)
defer.returnValue((200, result))
class ReplicationCleanRoomRestServlet(ReplicationEndpoint):
"""Called to clean up any data in DB for a given room, ready for the
server to join the room.
Request format:
POST /_synapse/replication/fed_query/:fed_cleanup_room/:txn_id
{}
"""
NAME = "fed_cleanup_room"
PATH_ARGS = ("room_id",)
def __init__(self, hs):
super(ReplicationCleanRoomRestServlet, self).__init__(hs)
self.store = hs.get_datastore()
@staticmethod
def _serialize_payload(room_id, args):
"""
Args:
room_id (str)
"""
return {}
@defer.inlineCallbacks
def _handle_request(self, request, room_id):
yield self.store.clean_room_for_join(room_id)
defer.returnValue((200, {}))
def register_servlets(hs, http_server):
ReplicationFederationSendEventsRestServlet(hs).register(http_server)
ReplicationFederationSendEduRestServlet(hs).register(http_server)
ReplicationGetQueryRestServlet(hs).register(http_server)
ReplicationCleanRoomRestServlet(hs).register(http_server)

View file

@ -13,19 +13,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from synapse.storage import DataStore
from synapse.storage.transactions import TransactionStore
from ._base import BaseSlavedStore
class TransactionStore(BaseSlavedStore):
get_destination_retry_timings = TransactionStore.__dict__[
"get_destination_retry_timings"
]
_get_destination_retry_timings = DataStore._get_destination_retry_timings.__func__
set_destination_retry_timings = DataStore.set_destination_retry_timings.__func__
_set_destination_retry_timings = DataStore._set_destination_retry_timings.__func__
prep_send_transaction = DataStore.prep_send_transaction.__func__
delivered_txn = DataStore.delivered_txn.__func__
class SlavedTransactionStore(TransactionStore, BaseSlavedStore):
pass

View file

@ -107,7 +107,7 @@ class ReplicationClientHandler(object):
Can be overriden in subclasses to handle more.
"""
logger.info("Received rdata %s -> %s", stream_name, token)
self.store.process_replication_rows(stream_name, token, rows)
return self.store.process_replication_rows(stream_name, token, rows)
def on_position(self, stream_name, token):
"""Called when we get new position data. By default this just pokes
@ -115,7 +115,7 @@ class ReplicationClientHandler(object):
Can be overriden in subclasses to handle more.
"""
self.store.process_replication_rows(stream_name, token, [])
return self.store.process_replication_rows(stream_name, token, [])
def on_sync(self, data):
"""When we received a SYNC we wake up any deferreds that were waiting

View file

@ -59,6 +59,12 @@ class Command(object):
"""
return self.data
def get_logcontext_id(self):
"""Get a suitable string for the logcontext when processing this command"""
# by default, we just use the command name.
return self.NAME
class ServerCommand(Command):
"""Sent by the server on new connection and includes the server_name.
@ -116,6 +122,9 @@ class RdataCommand(Command):
_json_encoder.encode(self.row),
))
def get_logcontext_id(self):
return "RDATA-" + self.stream_name
class PositionCommand(Command):
"""Sent by the client to tell the client the stream postition without
@ -190,6 +199,9 @@ class ReplicateCommand(Command):
def to_line(self):
return " ".join((self.stream_name, str(self.token),))
def get_logcontext_id(self):
return "REPLICATE-" + self.stream_name
class UserSyncCommand(Command):
"""Sent by the client to inform the server that a user has started or

View file

@ -63,6 +63,8 @@ from twisted.protocols.basic import LineOnlyReceiver
from twisted.python.failure import Failure
from synapse.metrics import LaterGauge
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.util.logcontext import make_deferred_yieldable, run_in_background
from synapse.util.stringutils import random_string
from .commands import (
@ -222,7 +224,11 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver):
# Now lets try and call on_<CMD_NAME> function
try:
getattr(self, "on_%s" % (cmd_name,))(cmd)
run_as_background_process(
"replication-" + cmd.get_logcontext_id(),
getattr(self, "on_%s" % (cmd_name,)),
cmd,
)
except Exception:
logger.exception("[%s] Failed to handle line: %r", self.id(), line)
@ -387,7 +393,7 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol):
self.name = cmd.data
def on_USER_SYNC(self, cmd):
self.streamer.on_user_sync(
return self.streamer.on_user_sync(
self.conn_id, cmd.user_id, cmd.is_syncing, cmd.last_sync_ms,
)
@ -397,22 +403,33 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol):
if stream_name == "ALL":
# Subscribe to all streams we're publishing to.
for stream in iterkeys(self.streamer.streams_by_name):
self.subscribe_to_stream(stream, token)
deferreds = [
run_in_background(
self.subscribe_to_stream,
stream, token,
)
for stream in iterkeys(self.streamer.streams_by_name)
]
return make_deferred_yieldable(
defer.gatherResults(deferreds, consumeErrors=True)
)
else:
self.subscribe_to_stream(stream_name, token)
return self.subscribe_to_stream(stream_name, token)
def on_FEDERATION_ACK(self, cmd):
self.streamer.federation_ack(cmd.token)
return self.streamer.federation_ack(cmd.token)
def on_REMOVE_PUSHER(self, cmd):
self.streamer.on_remove_pusher(cmd.app_id, cmd.push_key, cmd.user_id)
return self.streamer.on_remove_pusher(
cmd.app_id, cmd.push_key, cmd.user_id,
)
def on_INVALIDATE_CACHE(self, cmd):
self.streamer.on_invalidate_cache(cmd.cache_func, cmd.keys)
return self.streamer.on_invalidate_cache(cmd.cache_func, cmd.keys)
def on_USER_IP(self, cmd):
self.streamer.on_user_ip(
return self.streamer.on_user_ip(
cmd.user_id, cmd.access_token, cmd.ip, cmd.user_agent, cmd.device_id,
cmd.last_seen,
)
@ -542,14 +559,13 @@ class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol):
# Check if this is the last of a batch of updates
rows = self.pending_batches.pop(stream_name, [])
rows.append(row)
self.handler.on_rdata(stream_name, cmd.token, rows)
return self.handler.on_rdata(stream_name, cmd.token, rows)
def on_POSITION(self, cmd):
self.handler.on_position(cmd.stream_name, cmd.token)
return self.handler.on_position(cmd.stream_name, cmd.token)
def on_SYNC(self, cmd):
self.handler.on_sync(cmd.data)
return self.handler.on_sync(cmd.data)
def replicate(self, stream_name, token):
"""Send the subscription request to the server

View file

@ -17,7 +17,7 @@
to ensure idempotency when performing PUTs using the REST API."""
import logging
from synapse.util.async import ObservableDeferred
from synapse.util.async_helpers import ObservableDeferred
from synapse.util.logcontext import make_deferred_yieldable, run_in_background
logger = logging.getLogger(__name__)

View file

@ -391,10 +391,17 @@ class DeactivateAccountRestServlet(ClientV1RestServlet):
if not is_admin:
raise AuthError(403, "You are not a server admin")
yield self._deactivate_account_handler.deactivate_account(
result = yield self._deactivate_account_handler.deactivate_account(
target_user_id, erase,
)
defer.returnValue((200, {}))
if result:
id_server_unbind_result = "success"
else:
id_server_unbind_result = "no-support"
defer.returnValue((200, {
"id_server_unbind_result": id_server_unbind_result,
}))
class ShutdownRoomRestServlet(ClientV1RestServlet):

View file

@ -84,7 +84,8 @@ class PresenceStatusRestServlet(ClientV1RestServlet):
except Exception:
raise SynapseError(400, "Unable to parse state")
yield self.presence_handler.set_state(user, state)
if self.hs.config.use_presence:
yield self.presence_handler.set_state(user, state)
defer.returnValue((200, {}))

View file

@ -34,7 +34,7 @@ from synapse.http.servlet import (
parse_string,
)
from synapse.streams.config import PaginationConfig
from synapse.types import RoomAlias, RoomID, ThirdPartyInstanceID, UserID
from synapse.types import RoomAlias, RoomID, StreamToken, ThirdPartyInstanceID, UserID
from .base import ClientV1RestServlet, client_path_patterns
@ -384,15 +384,39 @@ class RoomMemberListRestServlet(ClientV1RestServlet):
def on_GET(self, request, room_id):
# TODO support Pagination stream API (limit/tokens)
requester = yield self.auth.get_user_by_req(request)
events = yield self.message_handler.get_state_events(
handler = self.message_handler
# request the state as of a given event, as identified by a stream token,
# for consistency with /messages etc.
# useful for getting the membership in retrospect as of a given /sync
# response.
at_token_string = parse_string(request, "at")
if at_token_string is None:
at_token = None
else:
at_token = StreamToken.from_string(at_token_string)
# let you filter down on particular memberships.
# XXX: this may not be the best shape for this API - we could pass in a filter
# instead, except filters aren't currently aware of memberships.
# See https://github.com/matrix-org/matrix-doc/issues/1337 for more details.
membership = parse_string(request, "membership")
not_membership = parse_string(request, "not_membership")
events = yield handler.get_state_events(
room_id=room_id,
user_id=requester.user.to_string(),
at_token=at_token,
types=[(EventTypes.Member, None)],
)
chunk = []
for event in events:
if event["type"] != EventTypes.Member:
if (
(membership and event['content'].get("membership") != membership) or
(not_membership and event['content'].get("membership") == not_membership)
):
continue
chunk.append(event)
@ -401,6 +425,8 @@ class RoomMemberListRestServlet(ClientV1RestServlet):
}))
# deprecated in favour of /members?membership=join?
# except it does custom AS logic and has a simpler return format
class JoinedRoomMemberListRestServlet(ClientV1RestServlet):
PATTERNS = client_path_patterns("/rooms/(?P<room_id>[^/]*)/joined_members$")

View file

@ -129,12 +129,9 @@ class RegisterRestServlet(ClientV1RestServlet):
login_type = register_json["type"]
is_application_server = login_type == LoginType.APPLICATION_SERVICE
is_using_shared_secret = login_type == LoginType.SHARED_SECRET
can_register = (
self.enable_registration
or is_application_server
or is_using_shared_secret
)
if not can_register:
raise SynapseError(403, "Registration has been disabled")
@ -144,7 +141,6 @@ class RegisterRestServlet(ClientV1RestServlet):
LoginType.PASSWORD: self._do_password,
LoginType.EMAIL_IDENTITY: self._do_email_identity,
LoginType.APPLICATION_SERVICE: self._do_app_service,
LoginType.SHARED_SECRET: self._do_shared_secret,
}
session_info = self._get_session_info(request, session)
@ -325,56 +321,6 @@ class RegisterRestServlet(ClientV1RestServlet):
"home_server": self.hs.hostname,
})
@defer.inlineCallbacks
def _do_shared_secret(self, request, register_json, session):
assert_params_in_dict(register_json, ["mac", "user", "password"])
if not self.hs.config.registration_shared_secret:
raise SynapseError(400, "Shared secret registration is not enabled")
user = register_json["user"].encode("utf-8")
password = register_json["password"].encode("utf-8")
admin = register_json.get("admin", None)
# Its important to check as we use null bytes as HMAC field separators
if b"\x00" in user:
raise SynapseError(400, "Invalid user")
if b"\x00" in password:
raise SynapseError(400, "Invalid password")
# str() because otherwise hmac complains that 'unicode' does not
# have the buffer interface
got_mac = str(register_json["mac"])
want_mac = hmac.new(
key=self.hs.config.registration_shared_secret.encode(),
digestmod=sha1,
)
want_mac.update(user)
want_mac.update(b"\x00")
want_mac.update(password)
want_mac.update(b"\x00")
want_mac.update(b"admin" if admin else b"notadmin")
want_mac = want_mac.hexdigest()
if compare_digest(want_mac, got_mac):
handler = self.handlers.registration_handler
user_id, token = yield handler.register(
localpart=user.lower(),
password=password,
admin=bool(admin),
)
self._remove_session(session)
defer.returnValue({
"user_id": user_id,
"access_token": token,
"home_server": self.hs.hostname,
})
else:
raise SynapseError(
403, "HMAC incorrect",
)
class CreateUserRestServlet(ClientV1RestServlet):
"""Handles user creation via a server-to-server interface

View file

@ -209,10 +209,17 @@ class DeactivateAccountRestServlet(RestServlet):
yield self.auth_handler.validate_user_via_ui_auth(
requester, body, self.hs.get_ip_from_request(request),
)
yield self._deactivate_account_handler.deactivate_account(
result = yield self._deactivate_account_handler.deactivate_account(
requester.user.to_string(), erase,
)
defer.returnValue((200, {}))
if result:
id_server_unbind_result = "success"
else:
id_server_unbind_result = "no-support"
defer.returnValue((200, {
"id_server_unbind_result": id_server_unbind_result,
}))
class EmailThreepidRequestTokenRestServlet(RestServlet):
@ -364,7 +371,7 @@ class ThreepidDeleteRestServlet(RestServlet):
user_id = requester.user.to_string()
try:
yield self.auth_handler.delete_threepid(
ret = yield self.auth_handler.delete_threepid(
user_id, body['medium'], body['address']
)
except Exception:
@ -374,7 +381,14 @@ class ThreepidDeleteRestServlet(RestServlet):
logger.exception("Failed to remove threepid")
raise SynapseError(500, "Failed to remove threepid")
defer.returnValue((200, {}))
if ret:
id_server_unbind_result = "success"
else:
id_server_unbind_result = "no-support"
defer.returnValue((200, {
"id_server_unbind_result": id_server_unbind_result,
}))
class WhoamiRestServlet(RestServlet):

View file

@ -370,6 +370,7 @@ class SyncRestServlet(RestServlet):
ephemeral_events = room.ephemeral
result["ephemeral"] = {"events": ephemeral_events}
result["unread_notifications"] = room.unread_notifications
result["summary"] = room.summary
return result

Some files were not shown because too many files have changed in this diff Show more