Merge branch 'develop' into babolivier/retention_doc

This commit is contained in:
Brendan Abolivier 2020-01-07 15:07:19 +00:00
commit 4be582d7c8
83 changed files with 794 additions and 497 deletions

View file

@ -1,3 +1,73 @@
Synapse 1.8.0rc1 (2020-01-07)
=============================
Features
--------
- Add v2 APIs for the `send_join` and `send_leave` federation endpoints (as described in [MSC1802](https://github.com/matrix-org/matrix-doc/pull/1802)). ([\#6349](https://github.com/matrix-org/synapse/issues/6349))
- Add a develop script to generate full SQL schemas. ([\#6394](https://github.com/matrix-org/synapse/issues/6394))
- Add custom SAML username mapping functinality through an external provider plugin. ([\#6411](https://github.com/matrix-org/synapse/issues/6411))
- Automatically delete empty groups/communities. ([\#6453](https://github.com/matrix-org/synapse/issues/6453))
- Add option `limit_profile_requests_to_users_who_share_rooms` to prevent requirement of a local user sharing a room with another user to query their profile information. ([\#6523](https://github.com/matrix-org/synapse/issues/6523))
- Add an `export_signing_key` script to extract the public part of signing keys when rotating them. ([\#6546](https://github.com/matrix-org/synapse/issues/6546))
- Add experimental config option to specify multiple databases. ([\#6580](https://github.com/matrix-org/synapse/issues/6580))
- Raise an error if someone tries to use the `log_file` config option. ([\#6626](https://github.com/matrix-org/synapse/issues/6626))
Bugfixes
--------
- Prevent redacted events from being returned during message search. ([\#6377](https://github.com/matrix-org/synapse/issues/6377), [\#6522](https://github.com/matrix-org/synapse/issues/6522))
- Prevent error on trying to search a upgraded room when the server is not in the predecessor room. ([\#6385](https://github.com/matrix-org/synapse/issues/6385))
- Improve performance of looking up cross-signing keys. ([\#6486](https://github.com/matrix-org/synapse/issues/6486))
- Fix race which occasionally caused deleted devices to reappear. ([\#6514](https://github.com/matrix-org/synapse/issues/6514))
- Fix missing row in `device_max_stream_id` that could cause unable to decrypt errors after server restart. ([\#6555](https://github.com/matrix-org/synapse/issues/6555))
- Fix a bug which meant that we did not send systemd notifications on startup if acme was enabled. ([\#6571](https://github.com/matrix-org/synapse/issues/6571))
- Fix exception when fetching the `matrix.org:ed25519:auto` key. ([\#6625](https://github.com/matrix-org/synapse/issues/6625))
- Fix bug where a moderator upgraded a room and became an admin in the new room. ([\#6633](https://github.com/matrix-org/synapse/issues/6633))
- Fix an error which was thrown by the `PresenceHandler` `_on_shutdown` handler. ([\#6640](https://github.com/matrix-org/synapse/issues/6640))
- Fix exceptions in the synchrotron worker log when events are rejected. ([\#6645](https://github.com/matrix-org/synapse/issues/6645))
- Ensure that upgraded rooms are removed from the directory. ([\#6648](https://github.com/matrix-org/synapse/issues/6648))
- Fix a bug causing Synapse not to fetch missing events when it believes it has every event in the room. ([\#6652](https://github.com/matrix-org/synapse/issues/6652))
Improved Documentation
----------------------
- Document the Room Shutdown Admin API. ([\#6541](https://github.com/matrix-org/synapse/issues/6541))
- Reword sections of [docs/federate.md](docs/federate.md) that explained delegation at time of Synapse 1.0 transition. ([\#6601](https://github.com/matrix-org/synapse/issues/6601))
- Added the section 'Configuration' in [docs/turn-howto.md](docs/turn-howto.md). ([\#6614](https://github.com/matrix-org/synapse/issues/6614))
Deprecations and Removals
-------------------------
- Remove redundant code from event authorisation implementation. ([\#6502](https://github.com/matrix-org/synapse/issues/6502))
- Remove unused, undocumented `/_matrix/content` API. ([\#6628](https://github.com/matrix-org/synapse/issues/6628))
Internal Changes
----------------
- Add *experimental* support for multiple physical databases and split out state storage to separate data store. ([\#6245](https://github.com/matrix-org/synapse/issues/6245), [\#6510](https://github.com/matrix-org/synapse/issues/6510), [\#6511](https://github.com/matrix-org/synapse/issues/6511), [\#6513](https://github.com/matrix-org/synapse/issues/6513), [\#6564](https://github.com/matrix-org/synapse/issues/6564), [\#6565](https://github.com/matrix-org/synapse/issues/6565))
- Port sections of code base to async/await. ([\#6496](https://github.com/matrix-org/synapse/issues/6496), [\#6504](https://github.com/matrix-org/synapse/issues/6504), [\#6505](https://github.com/matrix-org/synapse/issues/6505), [\#6517](https://github.com/matrix-org/synapse/issues/6517), [\#6559](https://github.com/matrix-org/synapse/issues/6559), [\#6647](https://github.com/matrix-org/synapse/issues/6647), [\#6653](https://github.com/matrix-org/synapse/issues/6653))
- Remove `SnapshotCache` in favour of `ResponseCache`. ([\#6506](https://github.com/matrix-org/synapse/issues/6506))
- Silence mypy errors for files outside those specified. ([\#6512](https://github.com/matrix-org/synapse/issues/6512))
- Clean up some logging when handling incoming events over federation. ([\#6515](https://github.com/matrix-org/synapse/issues/6515))
- Test more folders against mypy. ([\#6534](https://github.com/matrix-org/synapse/issues/6534))
- Update `mypy` to new version. ([\#6537](https://github.com/matrix-org/synapse/issues/6537))
- Adjust the sytest blacklist for worker mode. ([\#6538](https://github.com/matrix-org/synapse/issues/6538))
- Remove unused `get_pagination_rows` methods from `EventSource` classes. ([\#6557](https://github.com/matrix-org/synapse/issues/6557))
- Clean up logs from the push notifier at startup. ([\#6558](https://github.com/matrix-org/synapse/issues/6558))
- Improve diagnostics on database upgrade failure. ([\#6570](https://github.com/matrix-org/synapse/issues/6570))
- Reduce the reconnect time when worker replication fails, to make it easier to catch up. ([\#6617](https://github.com/matrix-org/synapse/issues/6617))
- Simplify http handling by removing redundant `SynapseRequestFactory`. ([\#6619](https://github.com/matrix-org/synapse/issues/6619))
- Add a workaround for synapse raising exceptions when fetching the notary's own key from the notary. ([\#6620](https://github.com/matrix-org/synapse/issues/6620))
- Automate generation of the sample log config. ([\#6627](https://github.com/matrix-org/synapse/issues/6627))
- Simplify event creation code by removing redundant queries on the `event_reference_hashes` table. ([\#6629](https://github.com/matrix-org/synapse/issues/6629))
- Fix errors when `frozen_dicts` are enabled. ([\#6642](https://github.com/matrix-org/synapse/issues/6642))
Synapse 1.7.3 (2019-12-31) Synapse 1.7.3 (2019-12-31)
========================== ==========================

View file

@ -1 +0,0 @@
Split out state storage into separate data store.

View file

@ -1 +0,0 @@
Implement v2 APIs for the `send_join` and `send_leave` federation endpoints (as described in [MSC1802](https://github.com/matrix-org/matrix-doc/pull/1802)).

View file

@ -1 +0,0 @@
Prevent redacted events from being returned during message search.

View file

@ -1 +0,0 @@
Prevent error on trying to search a upgraded room when the server is not in the predecessor room.

View file

@ -1 +0,0 @@
Add a develop script to generate full SQL schemas.

View file

@ -1 +0,0 @@
Allow custom SAML username mapping functinality through an external provider plugin.

View file

@ -1 +0,0 @@
Automatically delete empty groups/communities.

View file

@ -1 +0,0 @@
Improve performance of looking up cross-signing keys.

View file

@ -1 +0,0 @@
Port synapse.handlers.initial_sync to async/await.

View file

@ -1 +0,0 @@
Remove redundant code from event authorisation implementation.

View file

@ -1 +0,0 @@
Port handlers.account_data and handlers.account_validity to async/await.

View file

@ -1 +0,0 @@
Make `make_deferred_yieldable` to work with async/await.

View file

@ -1 +0,0 @@
Remove `SnapshotCache` in favour of `ResponseCache`.

View file

@ -1 +0,0 @@
Change phone home stats to not assume there is a single database and report information about the database used by the main data store.

View file

@ -1 +0,0 @@
Move database config from apps into HomeServer object.

View file

@ -1 +0,0 @@
Silence mypy errors for files outside those specified.

View file

@ -1 +0,0 @@
Remove all assumptions of there being a single phyiscal DB apart from the `synapse.config`.

View file

@ -1 +0,0 @@
Fix race which occasionally caused deleted devices to reappear.

View file

@ -1 +0,0 @@
Clean up some logging when handling incoming events over federation.

View file

@ -1 +0,0 @@
Port some of FederationHandler to async/await.

View file

@ -1 +0,0 @@
Prevent redacted events from being returned during message search.

View file

@ -1 +0,0 @@
Add option `limit_profile_requests_to_users_who_share_rooms` to prevent requirement of a local user sharing a room with another user to query their profile information.

View file

@ -1 +0,0 @@
Test more folders against mypy.

View file

@ -1 +0,0 @@
Update `mypy` to new version.

View file

@ -1 +0,0 @@
Adjust the sytest blacklist for worker mode.

View file

@ -1 +0,0 @@
Document the Room Shutdown Admin API.

View file

@ -1 +0,0 @@
Add an export_signing_key script to extract the public part of signing keys when rotating them.

View file

@ -1 +0,0 @@
Fix missing row in device_max_stream_id that could cause unable to decrypt errors after server restart.

View file

@ -1 +0,0 @@
Remove unused `get_pagination_rows` methods from `EventSource` classes.

View file

@ -1 +0,0 @@
Clean up logs from the push notifier at startup.

View file

@ -1 +0,0 @@
Port `synapse.handlers.admin` and `synapse.handlers.deactivate_account` to async/await.

View file

@ -1 +0,0 @@
Change `EventContext` to use the `Storage` class, in preparation for moving state database queries to a separate data store.

View file

@ -1 +0,0 @@
Add assertion that schema delta file names are unique.

View file

@ -1 +0,0 @@
Improve diagnostics on database upgrade failure.

View file

@ -1 +0,0 @@
Fix a bug which meant that we did not send systemd notifications on startup if acme was enabled.

View file

@ -1 +0,0 @@
Reword sections of federate.md that explained delegation at time of Synapse 1.0 transition.

View file

@ -1 +0,0 @@
Added the section 'Configuration' in /docs/turn-howto.md.

View file

@ -85,6 +85,9 @@ PYTHONPATH="$tmpdir" \
' > "${PACKAGE_BUILD_DIR}/etc/matrix-synapse/homeserver.yaml" ' > "${PACKAGE_BUILD_DIR}/etc/matrix-synapse/homeserver.yaml"
# build the log config file
"${TARGET_PYTHON}" -B "${VIRTUALENV_DIR}/bin/generate_log_config" \
--output-file="${PACKAGE_BUILD_DIR}/etc/matrix-synapse/log.yaml"
# add a dependency on the right version of python to substvars. # add a dependency on the right version of python to substvars.
PYPKG=`basename $SNAKE` PYPKG=`basename $SNAKE`

6
debian/changelog vendored
View file

@ -1,3 +1,9 @@
matrix-synapse-py3 (1.7.3ubuntu1) UNRELEASED; urgency=medium
* Automate generation of the default log configuration file.
-- Richard van der Hoff <richard@matrix.org> Fri, 03 Jan 2020 13:55:38 +0000
matrix-synapse-py3 (1.7.3) stable; urgency=medium matrix-synapse-py3 (1.7.3) stable; urgency=medium
* New synapse release 1.7.3. * New synapse release 1.7.3.

1
debian/install vendored
View file

@ -1,2 +1 @@
debian/log.yaml etc/matrix-synapse
debian/manage_debconf.pl /opt/venvs/matrix-synapse/lib/ debian/manage_debconf.pl /opt/venvs/matrix-synapse/lib/

36
debian/log.yaml vendored
View file

@ -1,36 +0,0 @@
version: 1
formatters:
precise:
format: '%(asctime)s - %(name)s - %(lineno)d - %(levelname)s - %(request)s- %(message)s'
filters:
context:
(): synapse.logging.context.LoggingContextFilter
request: ""
handlers:
file:
class: logging.handlers.RotatingFileHandler
formatter: precise
filename: /var/log/matrix-synapse/homeserver.log
maxBytes: 104857600
backupCount: 10
filters: [context]
encoding: utf8
console:
class: logging.StreamHandler
formatter: precise
level: WARN
loggers:
synapse:
level: INFO
synapse.storage.SQL:
level: INFO
root:
level: INFO
handlers: [file, console]

View file

@ -692,10 +692,6 @@ media_store_path: "DATADIR/media_store"
# config: # config:
# directory: /mnt/some/other/directory # directory: /mnt/some/other/directory
# Directory where in-progress uploads are stored.
#
uploads_path: "DATADIR/uploads"
# The largest allowed upload size in bytes # The largest allowed upload size in bytes
# #
#max_upload_size: 10M #max_upload_size: 10M

View file

@ -1,4 +1,4 @@
# Example log config file for synapse. # Log configuration for Synapse.
# #
# This is a YAML file containing a standard Python logging configuration # This is a YAML file containing a standard Python logging configuration
# dictionary. See [1] for details on the valid settings. # dictionary. See [1] for details on the valid settings.
@ -20,7 +20,7 @@ handlers:
file: file:
class: logging.handlers.RotatingFileHandler class: logging.handlers.RotatingFileHandler
formatter: precise formatter: precise
filename: /home/rav/work/synapse/homeserver.log filename: /var/log/matrix-synapse/homeserver.log
maxBytes: 104857600 maxBytes: 104857600
backupCount: 10 backupCount: 10
filters: [context] filters: [context]

View file

@ -7,12 +7,22 @@ set -e
cd `dirname $0`/.. cd `dirname $0`/..
SAMPLE_CONFIG="docs/sample_config.yaml" SAMPLE_CONFIG="docs/sample_config.yaml"
SAMPLE_LOG_CONFIG="docs/sample_log_config.yaml"
check() {
diff -u "$SAMPLE_LOG_CONFIG" <(./scripts/generate_log_config) >/dev/null || return 1
}
if [ "$1" == "--check" ]; then if [ "$1" == "--check" ]; then
diff -u "$SAMPLE_CONFIG" <(./scripts/generate_config --header-file docs/.sample_config_header.yaml) >/dev/null || { diff -u "$SAMPLE_CONFIG" <(./scripts/generate_config --header-file docs/.sample_config_header.yaml) >/dev/null || {
echo -e "\e[1m\e[31m$SAMPLE_CONFIG is not up-to-date. Regenerate it with \`scripts-dev/generate_sample_config\`.\e[0m" >&2 echo -e "\e[1m\e[31m$SAMPLE_CONFIG is not up-to-date. Regenerate it with \`scripts-dev/generate_sample_config\`.\e[0m" >&2
exit 1 exit 1
} }
diff -u "$SAMPLE_LOG_CONFIG" <(./scripts/generate_log_config) >/dev/null || {
echo -e "\e[1m\e[31m$SAMPLE_LOG_CONFIG is not up-to-date. Regenerate it with \`scripts-dev/generate_sample_config\`.\e[0m" >&2
exit 1
}
else else
./scripts/generate_config --header-file docs/.sample_config_header.yaml -o "$SAMPLE_CONFIG" ./scripts/generate_config --header-file docs/.sample_config_header.yaml -o "$SAMPLE_CONFIG"
./scripts/generate_log_config -o "$SAMPLE_LOG_CONFIG"
fi fi

43
scripts/generate_log_config Executable file
View file

@ -0,0 +1,43 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Copyright 2020 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import argparse
import sys
from synapse.config.logger import DEFAULT_LOG_CONFIG
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument(
"-o",
"--output-file",
type=argparse.FileType("w"),
default=sys.stdout,
help="File to write the configuration to. Default: stdout",
)
parser.add_argument(
"-f",
"--log-file",
type=str,
default="/var/log/matrix-synapse/homeserver.log",
help="name of the log file",
)
args = parser.parse_args()
args.output_file.write(DEFAULT_LOG_CONFIG.substitute(log_file=args.log_file))

View file

@ -166,6 +166,11 @@ class Store(
logger.exception("Failed to insert: %s", table) logger.exception("Failed to insert: %s", table)
raise raise
def set_room_is_public(self, room_id, is_public):
raise Exception(
"Attempt to set room_is_public during port_db: database not empty?"
)
class MockHomeserver: class MockHomeserver:
def __init__(self, config): def __init__(self, config):

View file

@ -36,7 +36,7 @@ try:
except ImportError: except ImportError:
pass pass
__version__ = "1.7.3" __version__ = "1.8.0rc1"
if bool(os.environ.get("SYNAPSE_TEST_PATCH_LOG_CONTEXTS", False)): if bool(os.environ.get("SYNAPSE_TEST_PATCH_LOG_CONTEXTS", False)):
# We import here so that we don't have to install a bunch of deps when # We import here so that we don't have to install a bunch of deps when

View file

@ -29,7 +29,6 @@ FEDERATION_V2_PREFIX = FEDERATION_PREFIX + "/v2"
FEDERATION_UNSTABLE_PREFIX = FEDERATION_PREFIX + "/unstable" FEDERATION_UNSTABLE_PREFIX = FEDERATION_PREFIX + "/unstable"
STATIC_PREFIX = "/_matrix/static" STATIC_PREFIX = "/_matrix/static"
WEB_CLIENT_PREFIX = "/_matrix/client" WEB_CLIENT_PREFIX = "/_matrix/client"
CONTENT_REPO_PREFIX = "/_matrix/content"
SERVER_KEY_V2_PREFIX = "/_matrix/key/v2" SERVER_KEY_V2_PREFIX = "/_matrix/key/v2"
MEDIA_PREFIX = "/_matrix/media/r0" MEDIA_PREFIX = "/_matrix/media/r0"
LEGACY_MEDIA_PREFIX = "/_matrix/media/v1" LEGACY_MEDIA_PREFIX = "/_matrix/media/v1"

View file

@ -39,7 +39,6 @@ import synapse
import synapse.config.logger import synapse.config.logger
from synapse import events from synapse import events
from synapse.api.urls import ( from synapse.api.urls import (
CONTENT_REPO_PREFIX,
FEDERATION_PREFIX, FEDERATION_PREFIX,
LEGACY_MEDIA_PREFIX, LEGACY_MEDIA_PREFIX,
MEDIA_PREFIX, MEDIA_PREFIX,
@ -65,7 +64,6 @@ from synapse.replication.tcp.resource import ReplicationStreamProtocolFactory
from synapse.rest import ClientRestResource from synapse.rest import ClientRestResource
from synapse.rest.admin import AdminRestResource from synapse.rest.admin import AdminRestResource
from synapse.rest.key.v2 import KeyApiV2Resource from synapse.rest.key.v2 import KeyApiV2Resource
from synapse.rest.media.v0.content_repository import ContentRepoResource
from synapse.rest.well_known import WellKnownResource from synapse.rest.well_known import WellKnownResource
from synapse.server import HomeServer from synapse.server import HomeServer
from synapse.storage import DataStore from synapse.storage import DataStore
@ -223,13 +221,7 @@ class SynapseHomeServer(HomeServer):
if self.get_config().enable_media_repo: if self.get_config().enable_media_repo:
media_repo = self.get_media_repository_resource() media_repo = self.get_media_repository_resource()
resources.update( resources.update(
{ {MEDIA_PREFIX: media_repo, LEGACY_MEDIA_PREFIX: media_repo}
MEDIA_PREFIX: media_repo,
LEGACY_MEDIA_PREFIX: media_repo,
CONTENT_REPO_PREFIX: ContentRepoResource(
self, self.config.uploads_path
),
}
) )
elif name == "media": elif name == "media":
raise ConfigError( raise ConfigError(
@ -318,7 +310,7 @@ def setup(config_options):
"Synapse Homeserver", config_options "Synapse Homeserver", config_options
) )
except ConfigError as e: except ConfigError as e:
sys.stderr.write("\n" + str(e) + "\n") sys.stderr.write("\nERROR: %s\n" % (e,))
sys.exit(1) sys.exit(1)
if not config: if not config:

View file

@ -21,7 +21,7 @@ from twisted.web.resource import NoResource
import synapse import synapse
from synapse import events from synapse import events
from synapse.api.urls import CONTENT_REPO_PREFIX, LEGACY_MEDIA_PREFIX, MEDIA_PREFIX from synapse.api.urls import LEGACY_MEDIA_PREFIX, MEDIA_PREFIX
from synapse.app import _base from synapse.app import _base
from synapse.config._base import ConfigError from synapse.config._base import ConfigError
from synapse.config.homeserver import HomeServerConfig from synapse.config.homeserver import HomeServerConfig
@ -37,7 +37,6 @@ from synapse.replication.slave.storage.registration import SlavedRegistrationSto
from synapse.replication.slave.storage.transactions import SlavedTransactionStore from synapse.replication.slave.storage.transactions import SlavedTransactionStore
from synapse.replication.tcp.client import ReplicationClientHandler from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.rest.admin import register_servlets_for_media_repo from synapse.rest.admin import register_servlets_for_media_repo
from synapse.rest.media.v0.content_repository import ContentRepoResource
from synapse.server import HomeServer from synapse.server import HomeServer
from synapse.storage.data_stores.main.media_repository import MediaRepositoryStore from synapse.storage.data_stores.main.media_repository import MediaRepositoryStore
from synapse.util.httpresourcetree import create_resource_tree from synapse.util.httpresourcetree import create_resource_tree
@ -82,9 +81,6 @@ class MediaRepositoryServer(HomeServer):
{ {
MEDIA_PREFIX: media_repo, MEDIA_PREFIX: media_repo,
LEGACY_MEDIA_PREFIX: media_repo, LEGACY_MEDIA_PREFIX: media_repo,
CONTENT_REPO_PREFIX: ContentRepoResource(
self, self.config.uploads_path
),
"/_synapse/admin": admin_resource, "/_synapse/admin": admin_resource,
} }
) )

View file

@ -48,7 +48,7 @@ from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
from synapse.replication.slave.storage.registration import SlavedRegistrationStore from synapse.replication.slave.storage.registration import SlavedRegistrationStore
from synapse.replication.slave.storage.room import RoomStore from synapse.replication.slave.storage.room import RoomStore
from synapse.replication.tcp.client import ReplicationClientHandler from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.replication.tcp.streams.events import EventsStreamEventRow from synapse.replication.tcp.streams.events import EventsStreamEventRow, EventsStreamRow
from synapse.rest.client.v1 import events from synapse.rest.client.v1 import events
from synapse.rest.client.v1.initial_sync import InitialSyncRestServlet from synapse.rest.client.v1.initial_sync import InitialSyncRestServlet
from synapse.rest.client.v1.room import RoomInitialSyncRestServlet from synapse.rest.client.v1.room import RoomInitialSyncRestServlet
@ -371,8 +371,7 @@ class SyncReplicationHandler(ReplicationClientHandler):
def get_currently_syncing_users(self): def get_currently_syncing_users(self):
return self.presence_handler.get_currently_syncing_users() return self.presence_handler.get_currently_syncing_users()
@defer.inlineCallbacks async def process_and_notify(self, stream_name, token, rows):
def process_and_notify(self, stream_name, token, rows):
try: try:
if stream_name == "events": if stream_name == "events":
# We shouldn't get multiple rows per token for events stream, so # We shouldn't get multiple rows per token for events stream, so
@ -380,7 +379,14 @@ class SyncReplicationHandler(ReplicationClientHandler):
for row in rows: for row in rows:
if row.type != EventsStreamEventRow.TypeId: if row.type != EventsStreamEventRow.TypeId:
continue continue
event = yield self.store.get_event(row.data.event_id) assert isinstance(row, EventsStreamRow)
event = await self.store.get_event(
row.data.event_id, allow_rejected=True
)
if event.rejected_reason:
continue
extra_users = () extra_users = ()
if event.type == EventTypes.Member: if event.type == EventTypes.Member:
extra_users = (event.state_key,) extra_users = (event.state_key,)
@ -412,11 +418,11 @@ class SyncReplicationHandler(ReplicationClientHandler):
elif stream_name == "device_lists": elif stream_name == "device_lists":
all_room_ids = set() all_room_ids = set()
for row in rows: for row in rows:
room_ids = yield self.store.get_rooms_for_user(row.user_id) room_ids = await self.store.get_rooms_for_user(row.user_id)
all_room_ids.update(room_ids) all_room_ids.update(room_ids)
self.notifier.on_new_event("device_list_key", token, rooms=all_room_ids) self.notifier.on_new_event("device_list_key", token, rooms=all_room_ids)
elif stream_name == "presence": elif stream_name == "presence":
yield self.presence_handler.process_replication_rows(token, rows) await self.presence_handler.process_replication_rows(token, rows)
elif stream_name == "receipts": elif stream_name == "receipts":
self.notifier.on_new_event( self.notifier.on_new_event(
"groups_key", token, users=[row.user_id for row in rows] "groups_key", token, users=[row.user_id for row in rows]

View file

@ -15,7 +15,6 @@
import logging import logging
import os import os
from textwrap import indent from textwrap import indent
from typing import List
import yaml import yaml
@ -30,16 +29,13 @@ class DatabaseConnectionConfig:
Args: Args:
name: A label for the database, used for logging. name: A label for the database, used for logging.
db_config: The config for a particular database, as per `database` db_config: The config for a particular database, as per `database`
section of main config. Has two fields: `name` for database section of main config. Has three fields: `name` for database
module name, and `args` for the args to give to the database module name, `args` for the args to give to the database
connector. connector, and optional `data_stores` that is a list of stores to
data_stores: The list of data stores that should be provisioned on the provision on this database (defaulting to all).
database. Defaults to all data stores.
""" """
def __init__( def __init__(self, name: str, db_config: dict):
self, name: str, db_config: dict, data_stores: List[str] = ["main", "state"]
):
if db_config["name"] not in ("sqlite3", "psycopg2"): if db_config["name"] not in ("sqlite3", "psycopg2"):
raise ConfigError("Unsupported database type %r" % (db_config["name"],)) raise ConfigError("Unsupported database type %r" % (db_config["name"],))
@ -48,6 +44,10 @@ class DatabaseConnectionConfig:
{"cp_min": 1, "cp_max": 1, "check_same_thread": False} {"cp_min": 1, "cp_max": 1, "check_same_thread": False}
) )
data_stores = db_config.get("data_stores")
if data_stores is None:
data_stores = ["main", "state"]
self.name = name self.name = name
self.config = db_config self.config = db_config
self.data_stores = data_stores self.data_stores = data_stores
@ -59,8 +59,37 @@ class DatabaseConfig(Config):
def read_config(self, config, **kwargs): def read_config(self, config, **kwargs):
self.event_cache_size = self.parse_size(config.get("event_cache_size", "10K")) self.event_cache_size = self.parse_size(config.get("event_cache_size", "10K"))
# We *experimentally* support specifying multiple databases via the
# `databases` key. This is a map from a label to database config in the
# same format as the `database` config option, plus an extra
# `data_stores` key to specify which data store goes where. For example:
#
# databases:
# master:
# name: psycopg2
# data_stores: ["main"]
# args: {}
# state:
# name: psycopg2
# data_stores: ["state"]
# args: {}
multi_database_config = config.get("databases")
database_config = config.get("database") database_config = config.get("database")
if multi_database_config and database_config:
raise ConfigError("Can't specify both 'database' and 'datbases' in config")
if multi_database_config:
if config.get("database_path"):
raise ConfigError("Can't specify 'database_path' with 'databases'")
self.databases = [
DatabaseConnectionConfig(name, db_conf)
for name, db_conf in multi_database_config.items()
]
else:
if database_config is None: if database_config is None:
database_config = {"name": "sqlite3", "args": {}} database_config = {"name": "sqlite3", "args": {}}

View file

@ -12,7 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import argparse
import logging import logging
import logging.config import logging.config
import os import os
@ -37,10 +37,17 @@ from synapse.logging._structured import (
from synapse.logging.context import LoggingContextFilter from synapse.logging.context import LoggingContextFilter
from synapse.util.versionstring import get_version_string from synapse.util.versionstring import get_version_string
from ._base import Config from ._base import Config, ConfigError
DEFAULT_LOG_CONFIG = Template( DEFAULT_LOG_CONFIG = Template(
""" """\
# Log configuration for Synapse.
#
# This is a YAML file containing a standard Python logging configuration
# dictionary. See [1] for details on the valid settings.
#
# [1]: https://docs.python.org/3.7/library/logging.config.html#configuration-dictionary-schema
version: 1 version: 1
formatters: formatters:
@ -81,11 +88,18 @@ disable_existing_loggers: false
""" """
) )
LOG_FILE_ERROR = """\
Support for the log_file configuration option and --log-file command-line option was
removed in Synapse 1.3.0. You should instead set up a separate log configuration file.
"""
class LoggingConfig(Config): class LoggingConfig(Config):
section = "logging" section = "logging"
def read_config(self, config, **kwargs): def read_config(self, config, **kwargs):
if config.get("log_file"):
raise ConfigError(LOG_FILE_ERROR)
self.log_config = self.abspath(config.get("log_config")) self.log_config = self.abspath(config.get("log_config"))
self.no_redirect_stdio = config.get("no_redirect_stdio", False) self.no_redirect_stdio = config.get("no_redirect_stdio", False)
@ -106,6 +120,8 @@ class LoggingConfig(Config):
def read_arguments(self, args): def read_arguments(self, args):
if args.no_redirect_stdio is not None: if args.no_redirect_stdio is not None:
self.no_redirect_stdio = args.no_redirect_stdio self.no_redirect_stdio = args.no_redirect_stdio
if args.log_file is not None:
raise ConfigError(LOG_FILE_ERROR)
@staticmethod @staticmethod
def add_arguments(parser): def add_arguments(parser):
@ -118,6 +134,10 @@ class LoggingConfig(Config):
help="Do not redirect stdout/stderr to the log", help="Do not redirect stdout/stderr to the log",
) )
logging_group.add_argument(
"-f", "--log-file", dest="log_file", help=argparse.SUPPRESS,
)
def generate_files(self, config, config_dir_path): def generate_files(self, config, config_dir_path):
log_config = config.get("log_config") log_config = config.get("log_config")
if log_config and not os.path.exists(log_config): if log_config and not os.path.exists(log_config):

View file

@ -156,7 +156,6 @@ class ContentRepositoryConfig(Config):
(provider_class, parsed_config, wrapper_config) (provider_class, parsed_config, wrapper_config)
) )
self.uploads_path = self.ensure_directory(config.get("uploads_path", "uploads"))
self.dynamic_thumbnails = config.get("dynamic_thumbnails", False) self.dynamic_thumbnails = config.get("dynamic_thumbnails", False)
self.thumbnail_requirements = parse_thumbnail_requirements( self.thumbnail_requirements = parse_thumbnail_requirements(
config.get("thumbnail_sizes", DEFAULT_THUMBNAIL_SIZES) config.get("thumbnail_sizes", DEFAULT_THUMBNAIL_SIZES)
@ -231,10 +230,6 @@ class ContentRepositoryConfig(Config):
# config: # config:
# directory: /mnt/some/other/directory # directory: /mnt/some/other/directory
# Directory where in-progress uploads are stored.
#
uploads_path: "%(uploads_path)s"
# The largest allowed upload size in bytes # The largest allowed upload size in bytes
# #
#max_upload_size: 10M #max_upload_size: 10M

View file

@ -14,7 +14,7 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import collections.abc
import hashlib import hashlib
import logging import logging
@ -40,8 +40,11 @@ def check_event_content_hash(event, hash_algorithm=hashlib.sha256):
# some malformed events lack a 'hashes'. Protect against it being missing # some malformed events lack a 'hashes'. Protect against it being missing
# or a weird type by basically treating it the same as an unhashed event. # or a weird type by basically treating it the same as an unhashed event.
hashes = event.get("hashes") hashes = event.get("hashes")
if not isinstance(hashes, dict): # nb it might be a frozendict or a dict
raise SynapseError(400, "Malformed 'hashes'", Codes.UNAUTHORIZED) if not isinstance(hashes, collections.abc.Mapping):
raise SynapseError(
400, "Malformed 'hashes': %s" % (type(hashes),), Codes.UNAUTHORIZED
)
if name not in hashes: if name not in hashes:
raise SynapseError( raise SynapseError(

View file

@ -511,17 +511,18 @@ class BaseV2KeyFetcher(object):
server_name = response_json["server_name"] server_name = response_json["server_name"]
verified = False verified = False
for key_id in response_json["signatures"].get(server_name, {}): for key_id in response_json["signatures"].get(server_name, {}):
# each of the keys used for the signature must be present in the response
# json.
key = verify_keys.get(key_id) key = verify_keys.get(key_id)
if not key: if not key:
raise KeyLookupError( # the key may not be present in verify_keys if:
"Key response is signed by key id %s:%s but that key is not " # * we got the key from the notary server, and:
"present in the response" % (server_name, key_id) # * the key belongs to the notary server, and:
) # * the notary server is using a different key to sign notary
# responses.
continue
verify_signed_json(response_json, server_name, key.verify_key) verify_signed_json(response_json, server_name, key.verify_key)
verified = True verified = True
break
if not verified: if not verified:
raise KeyLookupError( raise KeyLookupError(

View file

@ -248,13 +248,13 @@ class FederationHandler(BaseHandler):
prevs = set(pdu.prev_event_ids()) prevs = set(pdu.prev_event_ids())
seen = await self.store.have_seen_events(prevs) seen = await self.store.have_seen_events(prevs)
if min_depth and pdu.depth < min_depth: if min_depth is not None and pdu.depth < min_depth:
# This is so that we don't notify the user about this # This is so that we don't notify the user about this
# message, to work around the fact that some events will # message, to work around the fact that some events will
# reference really really old events we really don't want to # reference really really old events we really don't want to
# send to the clients. # send to the clients.
pdu.internal_metadata.outlier = True pdu.internal_metadata.outlier = True
elif min_depth and pdu.depth > min_depth: elif min_depth is not None and pdu.depth > min_depth:
missing_prevs = prevs - seen missing_prevs = prevs - seen
if sent_to_us_directly and missing_prevs: if sent_to_us_directly and missing_prevs:
# If we're missing stuff, ensure we only fetch stuff one # If we're missing stuff, ensure we only fetch stuff one

View file

@ -48,7 +48,7 @@ from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.replication.http.send_event import ReplicationSendEventRestServlet from synapse.replication.http.send_event import ReplicationSendEventRestServlet
from synapse.storage.data_stores.main.events_worker import EventRedactBehaviour from synapse.storage.data_stores.main.events_worker import EventRedactBehaviour
from synapse.storage.state import StateFilter from synapse.storage.state import StateFilter
from synapse.types import RoomAlias, UserID, create_requester from synapse.types import Collection, RoomAlias, UserID, create_requester
from synapse.util.async_helpers import Linearizer from synapse.util.async_helpers import Linearizer
from synapse.util.frozenutils import frozendict_json_encoder from synapse.util.frozenutils import frozendict_json_encoder
from synapse.util.metrics import measure_func from synapse.util.metrics import measure_func
@ -422,7 +422,7 @@ class EventCreationHandler(object):
event_dict, event_dict,
token_id=None, token_id=None,
txn_id=None, txn_id=None,
prev_events_and_hashes=None, prev_event_ids: Optional[Collection[str]] = None,
require_consent=True, require_consent=True,
): ):
""" """
@ -439,10 +439,9 @@ class EventCreationHandler(object):
token_id (str) token_id (str)
txn_id (str) txn_id (str)
prev_events_and_hashes (list[(str, dict[str, str], int)]|None): prev_event_ids:
the forward extremities to use as the prev_events for the the forward extremities to use as the prev_events for the
new event. For each event, a tuple of (event_id, hashes, depth) new event.
where *hashes* is a map from algorithm to hash.
If None, they will be requested from the database. If None, they will be requested from the database.
@ -498,9 +497,7 @@ class EventCreationHandler(object):
builder.internal_metadata.txn_id = txn_id builder.internal_metadata.txn_id = txn_id
event, context = yield self.create_new_client_event( event, context = yield self.create_new_client_event(
builder=builder, builder=builder, requester=requester, prev_event_ids=prev_event_ids,
requester=requester,
prev_events_and_hashes=prev_events_and_hashes,
) )
# In an ideal world we wouldn't need the second part of this condition. However, # In an ideal world we wouldn't need the second part of this condition. However,
@ -714,7 +711,7 @@ class EventCreationHandler(object):
@measure_func("create_new_client_event") @measure_func("create_new_client_event")
@defer.inlineCallbacks @defer.inlineCallbacks
def create_new_client_event( def create_new_client_event(
self, builder, requester=None, prev_events_and_hashes=None self, builder, requester=None, prev_event_ids: Optional[Collection[str]] = None
): ):
"""Create a new event for a local client """Create a new event for a local client
@ -723,10 +720,9 @@ class EventCreationHandler(object):
requester (synapse.types.Requester|None): requester (synapse.types.Requester|None):
prev_events_and_hashes (list[(str, dict[str, str], int)]|None): prev_event_ids:
the forward extremities to use as the prev_events for the the forward extremities to use as the prev_events for the
new event. For each event, a tuple of (event_id, hashes, depth) new event.
where *hashes* is a map from algorithm to hash.
If None, they will be requested from the database. If None, they will be requested from the database.
@ -734,22 +730,15 @@ class EventCreationHandler(object):
Deferred[(synapse.events.EventBase, synapse.events.snapshot.EventContext)] Deferred[(synapse.events.EventBase, synapse.events.snapshot.EventContext)]
""" """
if prev_events_and_hashes is not None: if prev_event_ids is not None:
assert len(prev_events_and_hashes) <= 10, ( assert len(prev_event_ids) <= 10, (
"Attempting to create an event with %i prev_events" "Attempting to create an event with %i prev_events"
% (len(prev_events_and_hashes),) % (len(prev_event_ids),)
) )
else: else:
prev_events_and_hashes = yield self.store.get_prev_events_for_room( prev_event_ids = yield self.store.get_prev_events_for_room(builder.room_id)
builder.room_id
)
prev_events = [ event = yield builder.build(prev_event_ids=prev_event_ids)
(event_id, prev_hashes)
for event_id, prev_hashes, _ in prev_events_and_hashes
]
event = yield builder.build(prev_event_ids=[p for p, _ in prev_events])
context = yield self.state.compute_event_context(event) context = yield self.state.compute_event_context(event)
if requester: if requester:
context.app_service = requester.app_service context.app_service = requester.app_service
@ -1042,9 +1031,7 @@ class EventCreationHandler(object):
# For each room we need to find a joined member we can use to send # For each room we need to find a joined member we can use to send
# the dummy event with. # the dummy event with.
prev_events_and_hashes = yield self.store.get_prev_events_for_room(room_id) latest_event_ids = yield self.store.get_prev_events_for_room(room_id)
latest_event_ids = (event_id for (event_id, _, _) in prev_events_and_hashes)
members = yield self.state.get_current_users_in_room( members = yield self.state.get_current_users_in_room(
room_id, latest_event_ids=latest_event_ids room_id, latest_event_ids=latest_event_ids
@ -1063,7 +1050,7 @@ class EventCreationHandler(object):
"room_id": room_id, "room_id": room_id,
"sender": user_id, "sender": user_id,
}, },
prev_events_and_hashes=prev_events_and_hashes, prev_event_ids=latest_event_ids,
) )
event.internal_metadata.proactively_send = False event.internal_metadata.proactively_send = False

View file

@ -95,12 +95,7 @@ assert LAST_ACTIVE_GRANULARITY < IDLE_TIMER
class PresenceHandler(object): class PresenceHandler(object):
def __init__(self, hs): def __init__(self, hs: "synapse.server.HomeServer"):
"""
Args:
hs (synapse.server.HomeServer):
"""
self.hs = hs self.hs = hs
self.is_mine = hs.is_mine self.is_mine = hs.is_mine
self.is_mine_id = hs.is_mine_id self.is_mine_id = hs.is_mine_id
@ -230,7 +225,7 @@ class PresenceHandler(object):
is some spurious presence changes that will self-correct. is some spurious presence changes that will self-correct.
""" """
# If the DB pool has already terminated, don't try updating # If the DB pool has already terminated, don't try updating
if not self.store.database.is_running(): if not self.store.db.is_running():
return return
logger.info( logger.info(

View file

@ -16,6 +16,7 @@
# limitations under the License. # limitations under the License.
"""Contains functions for performing events on rooms.""" """Contains functions for performing events on rooms."""
import itertools import itertools
import logging import logging
import math import math
@ -271,7 +272,7 @@ class RoomCreationHandler(BaseHandler):
except AuthError as e: except AuthError as e:
logger.warning("Unable to update PLs in old room: %s", e) logger.warning("Unable to update PLs in old room: %s", e)
logger.info("Setting correct PLs in new room") logger.info("Setting correct PLs in new room to %s", old_room_pl_state.content)
yield self.event_creation_handler.create_and_send_nonmember_event( yield self.event_creation_handler.create_and_send_nonmember_event(
requester, requester,
{ {
@ -365,13 +366,18 @@ class RoomCreationHandler(BaseHandler):
needed_power_level = max(state_default, ban, max(event_power_levels.values())) needed_power_level = max(state_default, ban, max(event_power_levels.values()))
# Raise the requester's power level in the new room if necessary # Raise the requester's power level in the new room if necessary
current_power_level = power_levels["users"][requester.user.to_string()] current_power_level = power_levels["users"][user_id]
if current_power_level < needed_power_level: if current_power_level < needed_power_level:
# Assign this power level to the requester # make sure we copy the event content rather than overwriting it.
power_levels["users"][requester.user.to_string()] = needed_power_level # note that if frozen_dicts are enabled, `power_levels` will be a frozen
# dict so we can't just copy.deepcopy it.
# Set the power levels to the modified state new_power_levels = {k: v for k, v in power_levels.items() if k != "users"}
initial_state[(EventTypes.PowerLevels, "")] = power_levels new_power_levels["users"] = {
k: v for k, v in power_levels.get("users", {}).items() if k != user_id
}
new_power_levels["users"][user_id] = needed_power_level
initial_state[(EventTypes.PowerLevels, "")] = new_power_levels
yield self._send_events_for_new_room( yield self._send_events_for_new_room(
requester, requester,
@ -733,7 +739,7 @@ class RoomCreationHandler(BaseHandler):
initial_state, initial_state,
creation_content, creation_content,
room_alias=None, room_alias=None,
power_level_content_override=None, power_level_content_override=None, # Doesn't apply when initial state has power level state event content
creator_join_profile=None, creator_join_profile=None,
): ):
def create(etype, content, **kwargs): def create(etype, content, **kwargs):

View file

@ -25,7 +25,7 @@ from twisted.internet import defer
from synapse import types from synapse import types
from synapse.api.constants import EventTypes, Membership from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import AuthError, Codes, SynapseError from synapse.api.errors import AuthError, Codes, SynapseError
from synapse.types import RoomID, UserID from synapse.types import Collection, RoomID, UserID
from synapse.util.async_helpers import Linearizer from synapse.util.async_helpers import Linearizer
from synapse.util.distributor import user_joined_room, user_left_room from synapse.util.distributor import user_joined_room, user_left_room
@ -149,7 +149,7 @@ class RoomMemberHandler(object):
target, target,
room_id, room_id,
membership, membership,
prev_events_and_hashes, prev_event_ids: Collection[str],
txn_id=None, txn_id=None,
ratelimit=True, ratelimit=True,
content=None, content=None,
@ -177,7 +177,7 @@ class RoomMemberHandler(object):
}, },
token_id=requester.access_token_id, token_id=requester.access_token_id,
txn_id=txn_id, txn_id=txn_id,
prev_events_and_hashes=prev_events_and_hashes, prev_event_ids=prev_event_ids,
require_consent=require_consent, require_consent=require_consent,
) )
@ -370,8 +370,7 @@ class RoomMemberHandler(object):
if block_invite: if block_invite:
raise SynapseError(403, "Invites have been disabled on this server") raise SynapseError(403, "Invites have been disabled on this server")
prev_events_and_hashes = yield self.store.get_prev_events_for_room(room_id) latest_event_ids = yield self.store.get_prev_events_for_room(room_id)
latest_event_ids = (event_id for (event_id, _, _) in prev_events_and_hashes)
current_state_ids = yield self.state_handler.get_current_state_ids( current_state_ids = yield self.state_handler.get_current_state_ids(
room_id, latest_event_ids=latest_event_ids room_id, latest_event_ids=latest_event_ids
@ -485,7 +484,7 @@ class RoomMemberHandler(object):
membership=effective_membership_state, membership=effective_membership_state,
txn_id=txn_id, txn_id=txn_id,
ratelimit=ratelimit, ratelimit=ratelimit,
prev_events_and_hashes=prev_events_and_hashes, prev_event_ids=latest_event_ids,
content=content, content=content,
require_consent=require_consent, require_consent=require_consent,
) )
@ -507,6 +506,8 @@ class RoomMemberHandler(object):
Returns: Returns:
Deferred Deferred
""" """
logger.info("Transferring room state from %s to %s", old_room_id, room_id)
# Find all local users that were in the old room and copy over each user's state # Find all local users that were in the old room and copy over each user's state
users = yield self.store.get_users_in_room(old_room_id) users = yield self.store.get_users_in_room(old_room_id)
yield self.copy_user_state_on_room_upgrade(old_room_id, room_id, users) yield self.copy_user_state_on_room_upgrade(old_room_id, room_id, users)

View file

@ -47,9 +47,9 @@ class SynapseRequest(Request):
logcontext(LoggingContext) : the log context for this request logcontext(LoggingContext) : the log context for this request
""" """
def __init__(self, site, channel, *args, **kw): def __init__(self, channel, *args, **kw):
Request.__init__(self, channel, *args, **kw) Request.__init__(self, channel, *args, **kw)
self.site = site self.site = channel.site
self._channel = channel # this is used by the tests self._channel = channel # this is used by the tests
self.authenticated_entity = None self.authenticated_entity = None
self.start_time = 0 self.start_time = 0
@ -331,18 +331,6 @@ class XForwardedForRequest(SynapseRequest):
) )
class SynapseRequestFactory(object):
def __init__(self, site, x_forwarded_for):
self.site = site
self.x_forwarded_for = x_forwarded_for
def __call__(self, *args, **kwargs):
if self.x_forwarded_for:
return XForwardedForRequest(self.site, *args, **kwargs)
else:
return SynapseRequest(self.site, *args, **kwargs)
class SynapseSite(Site): class SynapseSite(Site):
""" """
Subclass of a twisted http Site that does access logging with python's Subclass of a twisted http Site that does access logging with python's
@ -364,7 +352,7 @@ class SynapseSite(Site):
self.site_tag = site_tag self.site_tag = site_tag
proxied = config.get("x_forwarded", False) proxied = config.get("x_forwarded", False)
self.requestFactory = SynapseRequestFactory(self, proxied) self.requestFactory = XForwardedForRequest if proxied else SynapseRequest
self.access_logger = logging.getLogger(logger_name) self.access_logger = logging.getLogger(logger_name)
self.server_version_string = server_version_string.encode("ascii") self.server_version_string = server_version_string.encode("ascii")

View file

@ -46,7 +46,8 @@ class ReplicationClientFactory(ReconnectingClientFactory):
is required. is required.
""" """
maxDelay = 30 # Try at least once every N seconds initialDelay = 0.1
maxDelay = 1 # Try at least once every N seconds
def __init__(self, hs, client_name, handler: AbstractReplicationClientHandler): def __init__(self, hs, client_name, handler: AbstractReplicationClientHandler):
self.client_name = client_name self.client_name = client_name

View file

@ -15,6 +15,7 @@
import logging import logging
from canonicaljson import encode_canonical_json, json from canonicaljson import encode_canonical_json, json
from signedjson.key import encode_verify_key_base64
from signedjson.sign import sign_json from signedjson.sign import sign_json
from twisted.internet import defer from twisted.internet import defer
@ -216,10 +217,23 @@ class RemoteKey(DirectServeResource):
if cache_misses and query_remote_on_cache_miss: if cache_misses and query_remote_on_cache_miss:
yield self.fetcher.get_keys(cache_misses) yield self.fetcher.get_keys(cache_misses)
yield self.query_keys(request, query, query_remote_on_cache_miss=False) yield self.query_keys(request, query, query_remote_on_cache_miss=False)
else: return
signed_keys = [] signed_keys = []
for key_json in json_results: for key_json in json_results:
key_json = json.loads(key_json) key_json = json.loads(key_json)
# backwards-compatibility hack for #6596: if the requested key belongs
# to us, make sure that all of the signing keys appear in the
# "verify_keys" section.
if key_json["server_name"] == self.config.server_name:
verify_keys = key_json["verify_keys"]
for signing_key in self.config.key_server_signing_keys:
key_id = "%s:%s" % (signing_key.alg, signing_key.version)
verify_keys[key_id] = {
"key": encode_verify_key_base64(signing_key.verify_key)
}
for signing_key in self.config.key_server_signing_keys: for signing_key in self.config.key_server_signing_keys:
key_json = sign_json(key_json, self.config.server_name, signing_key) key_json = sign_json(key_json, self.config.server_name, signing_key)

View file

@ -1,103 +0,0 @@
# -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import base64
import logging
import os
import re
from canonicaljson import json
from twisted.protocols.basic import FileSender
from twisted.web import resource, server
from synapse.api.errors import Codes, cs_error
from synapse.http.server import finish_request, respond_with_json_bytes
logger = logging.getLogger(__name__)
class ContentRepoResource(resource.Resource):
"""Provides file uploading and downloading.
Uploads are POSTed to wherever this Resource is linked to. This resource
returns a "content token" which can be used to GET this content again. The
token is typically a path, but it may not be. Tokens can expire, be
one-time uses, etc.
In this case, the token is a path to the file and contains 3 interesting
sections:
- User ID base64d (for namespacing content to each user)
- random 24 char string
- Content type base64d (so we can return it when clients GET it)
"""
isLeaf = True
def __init__(self, hs, directory):
resource.Resource.__init__(self)
self.hs = hs
self.directory = directory
def render_GET(self, request):
# no auth here on purpose, to allow anyone to view, even across home
# servers.
# TODO: A little crude here, we could do this better.
filename = request.path.decode("ascii").split("/")[-1]
# be paranoid
filename = re.sub("[^0-9A-z.-_]", "", filename)
file_path = self.directory + "/" + filename
logger.debug("Searching for %s", file_path)
if os.path.isfile(file_path):
# filename has the content type
base64_contentype = filename.split(".")[1]
content_type = base64.urlsafe_b64decode(base64_contentype)
logger.info("Sending file %s", file_path)
f = open(file_path, "rb")
request.setHeader("Content-Type", content_type)
# cache for at least a day.
# XXX: we might want to turn this off for data we don't want to
# recommend caching as it's sensitive or private - or at least
# select private. don't bother setting Expires as all our matrix
# clients are smart enough to be happy with Cache-Control (right?)
request.setHeader(b"Cache-Control", b"public,max-age=86400,s-maxage=86400")
d = FileSender().beginFileTransfer(f, request)
# after the file has been sent, clean up and finish the request
def cbFinished(ignored):
f.close()
finish_request(request)
d.addCallback(cbFinished)
else:
respond_with_json_bytes(
request,
404,
json.dumps(cs_error("Not found", code=Codes.NOT_FOUND)),
send_cors=True,
)
return server.NOT_DONE_YET
def render_OPTIONS(self, request):
respond_with_json_bytes(request, 200, {}, send_cors=True)
return server.NOT_DONE_YET

View file

@ -16,6 +16,7 @@
# limitations under the License. # limitations under the License.
import logging import logging
import random import random
from abc import ABCMeta
from six import PY2 from six import PY2
from six.moves import builtins from six.moves import builtins
@ -30,7 +31,8 @@ from synapse.types import get_domain_from_id
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class SQLBaseStore(object): # some of our subclasses have abstract methods, so we use the ABCMeta metaclass.
class SQLBaseStore(metaclass=ABCMeta):
"""Base class for data stores that holds helper functions. """Base class for data stores that holds helper functions.
Note that multiple instances of this class will exist as there will be one Note that multiple instances of this class will exist as there will be one

View file

@ -14,6 +14,7 @@
# limitations under the License. # limitations under the License.
import logging import logging
from typing import Optional
from canonicaljson import json from canonicaljson import json
@ -97,15 +98,14 @@ class BackgroundUpdater(object):
def start_doing_background_updates(self): def start_doing_background_updates(self):
run_as_background_process("background_updates", self.run_background_updates) run_as_background_process("background_updates", self.run_background_updates)
@defer.inlineCallbacks async def run_background_updates(self, sleep=True):
def run_background_updates(self, sleep=True):
logger.info("Starting background schema updates") logger.info("Starting background schema updates")
while True: while True:
if sleep: if sleep:
yield self._clock.sleep(self.BACKGROUND_UPDATE_INTERVAL_MS / 1000.0) await self._clock.sleep(self.BACKGROUND_UPDATE_INTERVAL_MS / 1000.0)
try: try:
result = yield self.do_next_background_update( result = await self.do_next_background_update(
self.BACKGROUND_UPDATE_DURATION_MS self.BACKGROUND_UPDATE_DURATION_MS
) )
except Exception: except Exception:
@ -170,20 +170,21 @@ class BackgroundUpdater(object):
return not update_exists return not update_exists
@defer.inlineCallbacks async def do_next_background_update(
def do_next_background_update(self, desired_duration_ms): self, desired_duration_ms: float
) -> Optional[int]:
"""Does some amount of work on the next queued background update """Does some amount of work on the next queued background update
Returns once some amount of work is done.
Args: Args:
desired_duration_ms(float): How long we want to spend desired_duration_ms(float): How long we want to spend
updating. updating.
Returns: Returns:
A deferred that completes once some amount of work is done. None if there is no more work to do, otherwise an int
The deferred will have a value of None if there is currently
no more work to do.
""" """
if not self._background_update_queue: if not self._background_update_queue:
updates = yield self.db.simple_select_list( updates = await self.db.simple_select_list(
"background_updates", "background_updates",
keyvalues=None, keyvalues=None,
retcols=("update_name", "depends_on"), retcols=("update_name", "depends_on"),
@ -201,11 +202,12 @@ class BackgroundUpdater(object):
update_name = self._background_update_queue.pop(0) update_name = self._background_update_queue.pop(0)
self._background_update_queue.append(update_name) self._background_update_queue.append(update_name)
res = yield self._do_background_update(update_name, desired_duration_ms) res = await self._do_background_update(update_name, desired_duration_ms)
return res return res
@defer.inlineCallbacks async def _do_background_update(
def _do_background_update(self, update_name, desired_duration_ms): self, update_name: str, desired_duration_ms: float
) -> int:
logger.info("Starting update batch on background update '%s'", update_name) logger.info("Starting update batch on background update '%s'", update_name)
update_handler = self._background_update_handlers[update_name] update_handler = self._background_update_handlers[update_name]
@ -225,7 +227,7 @@ class BackgroundUpdater(object):
else: else:
batch_size = self.DEFAULT_BACKGROUND_BATCH_SIZE batch_size = self.DEFAULT_BACKGROUND_BATCH_SIZE
progress_json = yield self.db.simple_select_one_onecol( progress_json = await self.db.simple_select_one_onecol(
"background_updates", "background_updates",
keyvalues={"update_name": update_name}, keyvalues={"update_name": update_name},
retcol="progress_json", retcol="progress_json",
@ -234,7 +236,7 @@ class BackgroundUpdater(object):
progress = json.loads(progress_json) progress = json.loads(progress_json)
time_start = self._clock.time_msec() time_start = self._clock.time_msec()
items_updated = yield update_handler(progress, batch_size) items_updated = await update_handler(progress, batch_size)
time_stop = self._clock.time_msec() time_stop = self._clock.time_msec()
duration_ms = time_stop - time_start duration_ms = time_stop - time_start
@ -263,7 +265,9 @@ class BackgroundUpdater(object):
* A dict of the current progress * A dict of the current progress
* An integer count of the number of items to update in this batch. * An integer count of the number of items to update in this batch.
The handler should return a deferred integer count of items updated. The handler should return a deferred or coroutine which returns an integer count
of items updated.
The handler is responsible for updating the progress of the update. The handler is responsible for updating the progress of the update.
Args: Args:
@ -432,6 +436,21 @@ class BackgroundUpdater(object):
"background_updates", keyvalues={"update_name": update_name} "background_updates", keyvalues={"update_name": update_name}
) )
def _background_update_progress(self, update_name: str, progress: dict):
"""Update the progress of a background update
Args:
update_name: The name of the background update task
progress: The progress of the update.
"""
return self.db.runInteraction(
"background_update_progress",
self._background_update_progress_txn,
update_name,
progress,
)
def _background_update_progress_txn(self, txn, update_name, progress): def _background_update_progress_txn(self, txn, update_name, progress):
"""Update the progress of a background update """Update the progress of a background update

View file

@ -37,6 +37,8 @@ class DataStores(object):
# store. # store.
self.databases = [] self.databases = []
self.main = None
self.state = None
for database_config in hs.config.database.databases: for database_config in hs.config.database.databases:
db_name = database_config.name db_name = database_config.name
@ -54,10 +56,22 @@ class DataStores(object):
if "main" in database_config.data_stores: if "main" in database_config.data_stores:
logger.info("Starting 'main' data store") logger.info("Starting 'main' data store")
# Sanity check we don't try and configure the main store on
# multiple databases.
if self.main:
raise Exception("'main' data store already configured")
self.main = main_store_class(database, db_conn, hs) self.main = main_store_class(database, db_conn, hs)
if "state" in database_config.data_stores: if "state" in database_config.data_stores:
logger.info("Starting 'state' data store") logger.info("Starting 'state' data store")
# Sanity check we don't try and configure the state store on
# multiple databases.
if self.state:
raise Exception("'state' data store already configured")
self.state = StateGroupDataStore(database, db_conn, hs) self.state = StateGroupDataStore(database, db_conn, hs)
db_conn.commit() db_conn.commit()
@ -65,3 +79,10 @@ class DataStores(object):
self.databases.append(database) self.databases.append(database)
logger.info("Database %r prepared", db_name) logger.info("Database %r prepared", db_name)
# Sanity check that we have actually configured all the required stores.
if not self.main:
raise Exception("No 'main' data store configured")
if not self.state:
raise Exception("No 'main' data store configured")

View file

@ -14,13 +14,10 @@
# limitations under the License. # limitations under the License.
import itertools import itertools
import logging import logging
import random
from six.moves import range from six.moves import range
from six.moves.queue import Empty, PriorityQueue from six.moves.queue import Empty, PriorityQueue
from unpaddedbase64 import encode_base64
from twisted.internet import defer from twisted.internet import defer
from synapse.api.errors import StoreError from synapse.api.errors import StoreError
@ -148,8 +145,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
retcol="event_id", retcol="event_id",
) )
@defer.inlineCallbacks def get_prev_events_for_room(self, room_id: str):
def get_prev_events_for_room(self, room_id):
""" """
Gets a subset of the current forward extremities in the given room. Gets a subset of the current forward extremities in the given room.
@ -160,41 +156,30 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
room_id (str): room_id room_id (str): room_id
Returns: Returns:
Deferred[list[(str, dict[str, str], int)]] Deferred[List[str]]: the event ids of the forward extremites
for each event, a tuple of (event_id, hashes, depth)
where *hashes* is a map from algorithm to hash.
"""
res = yield self.get_latest_event_ids_and_hashes_in_room(room_id)
if len(res) > 10:
# Sort by reverse depth, so we point to the most recent.
res.sort(key=lambda a: -a[2])
# we use half of the limit for the actual most recent events, and
# the other half to randomly point to some of the older events, to
# make sure that we don't completely ignore the older events.
res = res[0:5] + random.sample(res[5:], 5)
return res
def get_latest_event_ids_and_hashes_in_room(self, room_id):
"""
Gets the current forward extremities in the given room
Args:
room_id (str): room_id
Returns:
Deferred[list[(str, dict[str, str], int)]]
for each event, a tuple of (event_id, hashes, depth)
where *hashes* is a map from algorithm to hash.
""" """
return self.db.runInteraction( return self.db.runInteraction(
"get_latest_event_ids_and_hashes_in_room", "get_prev_events_for_room", self._get_prev_events_for_room_txn, room_id
self._get_latest_event_ids_and_hashes_in_room,
room_id,
) )
def _get_prev_events_for_room_txn(self, txn, room_id: str):
# we just use the 10 newest events. Older events will become
# prev_events of future events.
sql = """
SELECT e.event_id FROM event_forward_extremities AS f
INNER JOIN events AS e USING (event_id)
WHERE f.room_id = ?
ORDER BY e.depth DESC
LIMIT 10
"""
txn.execute(sql, (room_id,))
return [row[0] for row in txn]
def get_rooms_with_many_extremities(self, min_count, limit, room_id_filter): def get_rooms_with_many_extremities(self, min_count, limit, room_id_filter):
"""Get the top rooms with at least N extremities. """Get the top rooms with at least N extremities.
@ -243,27 +228,6 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
desc="get_latest_event_ids_in_room", desc="get_latest_event_ids_in_room",
) )
def _get_latest_event_ids_and_hashes_in_room(self, txn, room_id):
sql = (
"SELECT e.event_id, e.depth FROM events as e "
"INNER JOIN event_forward_extremities as f "
"ON e.event_id = f.event_id "
"AND e.room_id = f.room_id "
"WHERE f.room_id = ?"
)
txn.execute(sql, (room_id,))
results = []
for event_id, depth in txn.fetchall():
hashes = self._get_event_reference_hashes_txn(txn, event_id)
prev_hashes = {
k: encode_base64(v) for k, v in hashes.items() if k == "sha256"
}
results.append((event_id, prev_hashes, depth))
return results
def get_min_depth(self, room_id): def get_min_depth(self, room_id):
""" For hte given room, get the minimum depth we have seen for it. """ For hte given room, get the minimum depth we have seen for it.
""" """
@ -506,7 +470,7 @@ class EventFederationStore(EventFederationWorkerStore):
def _update_min_depth_for_room_txn(self, txn, room_id, depth): def _update_min_depth_for_room_txn(self, txn, room_id, depth):
min_depth = self._get_min_depth_interaction(txn, room_id) min_depth = self._get_min_depth_interaction(txn, room_id)
if min_depth and depth >= min_depth: if min_depth is not None and depth >= min_depth:
return return
self.db.simple_upsert_txn( self.db.simple_upsert_txn(

View file

@ -137,7 +137,7 @@ class EventsWorkerStore(SQLBaseStore):
@defer.inlineCallbacks @defer.inlineCallbacks
def get_event( def get_event(
self, self,
event_id: List[str], event_id: str,
redact_behaviour: EventRedactBehaviour = EventRedactBehaviour.REDACT, redact_behaviour: EventRedactBehaviour = EventRedactBehaviour.REDACT,
get_prev_content: bool = False, get_prev_content: bool = False,
allow_rejected: bool = False, allow_rejected: bool = False,
@ -148,15 +148,22 @@ class EventsWorkerStore(SQLBaseStore):
Args: Args:
event_id: The event_id of the event to fetch event_id: The event_id of the event to fetch
redact_behaviour: Determine what to do with a redacted event. Possible values: redact_behaviour: Determine what to do with a redacted event. Possible values:
* AS_IS - Return the full event body with no redacted content * AS_IS - Return the full event body with no redacted content
* REDACT - Return the event but with a redacted body * REDACT - Return the event but with a redacted body
* DISALLOW - Do not return redacted events * DISALLOW - Do not return redacted events (behave as per allow_none
if the event is redacted)
get_prev_content: If True and event is a state event, get_prev_content: If True and event is a state event,
include the previous states content in the unsigned field. include the previous states content in the unsigned field.
allow_rejected: If True return rejected events.
allow_rejected: If True, return rejected events. Otherwise,
behave as per allow_none.
allow_none: If True, return None if no event found, if allow_none: If True, return None if no event found, if
False throw a NotFoundError False throw a NotFoundError
check_room_id: if not None, check the room of the found event. check_room_id: if not None, check the room of the found event.
If there is a mismatch, behave as per allow_none. If there is a mismatch, behave as per allow_none.
@ -196,14 +203,18 @@ class EventsWorkerStore(SQLBaseStore):
Args: Args:
event_ids: The event_ids of the events to fetch event_ids: The event_ids of the events to fetch
redact_behaviour: Determine what to do with a redacted event. Possible redact_behaviour: Determine what to do with a redacted event. Possible
values: values:
* AS_IS - Return the full event body with no redacted content * AS_IS - Return the full event body with no redacted content
* REDACT - Return the event but with a redacted body * REDACT - Return the event but with a redacted body
* DISALLOW - Do not return redacted events * DISALLOW - Do not return redacted events (omit them from the response)
get_prev_content: If True and event is a state event, get_prev_content: If True and event is a state event,
include the previous states content in the unsigned field. include the previous states content in the unsigned field.
allow_rejected: If True return rejected events.
allow_rejected: If True, return rejected events. Otherwise,
omits rejeted events from the response.
Returns: Returns:
Deferred : Dict from event_id to event. Deferred : Dict from event_id to event.
@ -228,15 +239,21 @@ class EventsWorkerStore(SQLBaseStore):
"""Get events from the database and return in a list in the same order """Get events from the database and return in a list in the same order
as given by `event_ids` arg. as given by `event_ids` arg.
Unknown events will be omitted from the response.
Args: Args:
event_ids: The event_ids of the events to fetch event_ids: The event_ids of the events to fetch
redact_behaviour: Determine what to do with a redacted event. Possible values: redact_behaviour: Determine what to do with a redacted event. Possible values:
* AS_IS - Return the full event body with no redacted content * AS_IS - Return the full event body with no redacted content
* REDACT - Return the event but with a redacted body * REDACT - Return the event but with a redacted body
* DISALLOW - Do not return redacted events * DISALLOW - Do not return redacted events (omit them from the response)
get_prev_content: If True and event is a state event, get_prev_content: If True and event is a state event,
include the previous states content in the unsigned field. include the previous states content in the unsigned field.
allow_rejected: If True, return rejected events.
allow_rejected: If True, return rejected events. Otherwise,
omits rejected events from the response.
Returns: Returns:
Deferred[list[EventBase]]: List of events fetched from the database. The Deferred[list[EventBase]]: List of events fetched from the database. The
@ -369,9 +386,14 @@ class EventsWorkerStore(SQLBaseStore):
If events are pulled from the database, they will be cached for future lookups. If events are pulled from the database, they will be cached for future lookups.
Unknown events are omitted from the response.
Args: Args:
event_ids (Iterable[str]): The event_ids of the events to fetch event_ids (Iterable[str]): The event_ids of the events to fetch
allow_rejected (bool): Whether to include rejected events
allow_rejected (bool): Whether to include rejected events. If False,
rejected events are omitted from the response.
Returns: Returns:
Deferred[Dict[str, _EventCacheEntry]]: Deferred[Dict[str, _EventCacheEntry]]:
@ -506,9 +528,13 @@ class EventsWorkerStore(SQLBaseStore):
Returned events will be added to the cache for future lookups. Returned events will be added to the cache for future lookups.
Unknown events are omitted from the response.
Args: Args:
event_ids (Iterable[str]): The event_ids of the events to fetch event_ids (Iterable[str]): The event_ids of the events to fetch
allow_rejected (bool): Whether to include rejected events
allow_rejected (bool): Whether to include rejected events. If False,
rejected events are omitted from the response.
Returns: Returns:
Deferred[Dict[str, _EventCacheEntry]]: Deferred[Dict[str, _EventCacheEntry]]:

View file

@ -17,6 +17,7 @@
import collections import collections
import logging import logging
import re import re
from abc import abstractmethod
from typing import Optional, Tuple from typing import Optional, Tuple
from six import integer_types from six import integer_types
@ -367,6 +368,8 @@ class RoomWorkerStore(SQLBaseStore):
class RoomBackgroundUpdateStore(SQLBaseStore): class RoomBackgroundUpdateStore(SQLBaseStore):
REMOVE_TOMESTONED_ROOMS_BG_UPDATE = "remove_tombstoned_rooms_from_directory"
def __init__(self, database: Database, db_conn, hs): def __init__(self, database: Database, db_conn, hs):
super(RoomBackgroundUpdateStore, self).__init__(database, db_conn, hs) super(RoomBackgroundUpdateStore, self).__init__(database, db_conn, hs)
@ -376,6 +379,11 @@ class RoomBackgroundUpdateStore(SQLBaseStore):
"insert_room_retention", self._background_insert_retention, "insert_room_retention", self._background_insert_retention,
) )
self.db.updates.register_background_update_handler(
self.REMOVE_TOMESTONED_ROOMS_BG_UPDATE,
self._remove_tombstoned_rooms_from_directory,
)
@defer.inlineCallbacks @defer.inlineCallbacks
def _background_insert_retention(self, progress, batch_size): def _background_insert_retention(self, progress, batch_size):
"""Retrieves a list of all rooms within a range and inserts an entry for each of """Retrieves a list of all rooms within a range and inserts an entry for each of
@ -444,6 +452,62 @@ class RoomBackgroundUpdateStore(SQLBaseStore):
defer.returnValue(batch_size) defer.returnValue(batch_size)
async def _remove_tombstoned_rooms_from_directory(
self, progress, batch_size
) -> int:
"""Removes any rooms with tombstone events from the room directory
Nowadays this is handled by the room upgrade handler, but we may have some
that got left behind
"""
last_room = progress.get("room_id", "")
def _get_rooms(txn):
txn.execute(
"""
SELECT room_id
FROM rooms r
INNER JOIN current_state_events cse USING (room_id)
WHERE room_id > ? AND r.is_public
AND cse.type = '%s' AND cse.state_key = ''
ORDER BY room_id ASC
LIMIT ?;
"""
% EventTypes.Tombstone,
(last_room, batch_size),
)
return [row[0] for row in txn]
rooms = await self.db.runInteraction(
"get_tombstoned_directory_rooms", _get_rooms
)
if not rooms:
await self.db.updates._end_background_update(
self.REMOVE_TOMESTONED_ROOMS_BG_UPDATE
)
return 0
for room_id in rooms:
logger.info("Removing tombstoned room %s from the directory", room_id)
await self.set_room_is_public(room_id, False)
await self.db.updates._background_update_progress(
self.REMOVE_TOMESTONED_ROOMS_BG_UPDATE, {"room_id": rooms[-1]}
)
return len(rooms)
@abstractmethod
def set_room_is_public(self, room_id, is_public):
# this will need to be implemented if a background update is performed with
# existing (tombstoned, public) rooms in the database.
#
# It's overridden by RoomStore for the synapse master.
raise NotImplementedError()
class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore): class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore):
def __init__(self, database: Database, db_conn, hs): def __init__(self, database: Database, db_conn, hs):

View file

@ -0,0 +1,18 @@
/* Copyright 2020 The Matrix.org Foundation C.I.C.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-- Now that #6232 is a thing, we can remove old rooms from the directory.
INSERT INTO background_updates (update_name, progress_json) VALUES
('remove_tombstoned_rooms_from_directory', '{}');

View file

@ -12,7 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import collections.abc
import logging import logging
from collections import namedtuple from collections import namedtuple
from typing import Iterable, Tuple from typing import Iterable, Tuple
@ -107,7 +107,7 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
predecessor = create_event.content.get("predecessor", None) predecessor = create_event.content.get("predecessor", None)
# Ensure the key is a dictionary # Ensure the key is a dictionary
if not isinstance(predecessor, dict): if not isinstance(predecessor, collections.abc.Mapping):
return None return None
return predecessor return predecessor

View file

@ -15,6 +15,7 @@
# limitations under the License. # limitations under the License.
import re import re
import string import string
import sys
from collections import namedtuple from collections import namedtuple
import attr import attr
@ -23,6 +24,17 @@ from unpaddedbase64 import decode_base64
from synapse.api.errors import SynapseError from synapse.api.errors import SynapseError
# define a version of typing.Collection that works on python 3.5
if sys.version_info[:3] >= (3, 6, 0):
from typing import Collection
else:
from typing import Sized, Iterable, Container, TypeVar
T_co = TypeVar("T_co", covariant=True)
class Collection(Iterable[T_co], Container[T_co], Sized):
__slots__ = ()
class Requester( class Requester(
namedtuple( namedtuple(

View file

@ -19,6 +19,7 @@ from mock import Mock
import canonicaljson import canonicaljson
import signedjson.key import signedjson.key
import signedjson.sign import signedjson.sign
from nacl.signing import SigningKey
from signedjson.key import encode_verify_key_base64, get_verify_key from signedjson.key import encode_verify_key_base64, get_verify_key
from twisted.internet import defer from twisted.internet import defer
@ -412,6 +413,49 @@ class PerspectivesKeyFetcherTestCase(unittest.HomeserverTestCase):
handlers=None, http_client=self.http_client, config=config handlers=None, http_client=self.http_client, config=config
) )
def build_perspectives_response(
self, server_name: str, signing_key: SigningKey, valid_until_ts: int,
) -> dict:
"""
Build a valid perspectives server response to a request for the given key
"""
verify_key = signedjson.key.get_verify_key(signing_key)
verifykey_id = "%s:%s" % (verify_key.alg, verify_key.version)
response = {
"server_name": server_name,
"old_verify_keys": {},
"valid_until_ts": valid_until_ts,
"verify_keys": {
verifykey_id: {
"key": signedjson.key.encode_verify_key_base64(verify_key)
}
},
}
# the response must be signed by both the origin server and the perspectives
# server.
signedjson.sign.sign_json(response, server_name, signing_key)
self.mock_perspective_server.sign_response(response)
return response
def expect_outgoing_key_query(
self, expected_server_name: str, expected_key_id: str, response: dict
) -> None:
"""
Tell the mock http client to expect a perspectives-server key query
"""
def post_json(destination, path, data, **kwargs):
self.assertEqual(destination, self.mock_perspective_server.server_name)
self.assertEqual(path, "/_matrix/key/v2/query")
# check that the request is for the expected key
q = data["server_keys"]
self.assertEqual(list(q[expected_server_name].keys()), [expected_key_id])
return {"server_keys": [response]}
self.http_client.post_json.side_effect = post_json
def test_get_keys_from_perspectives(self): def test_get_keys_from_perspectives(self):
# arbitrarily advance the clock a bit # arbitrarily advance the clock a bit
self.reactor.advance(100) self.reactor.advance(100)
@ -424,33 +468,61 @@ class PerspectivesKeyFetcherTestCase(unittest.HomeserverTestCase):
testverifykey_id = "ed25519:ver1" testverifykey_id = "ed25519:ver1"
VALID_UNTIL_TS = 200 * 1000 VALID_UNTIL_TS = 200 * 1000
# valid response response = self.build_perspectives_response(
response = { SERVER_NAME, testkey, VALID_UNTIL_TS,
"server_name": SERVER_NAME, )
"old_verify_keys": {},
"valid_until_ts": VALID_UNTIL_TS,
"verify_keys": {
testverifykey_id: {
"key": signedjson.key.encode_verify_key_base64(testverifykey)
}
},
}
# the response must be signed by both the origin server and the perspectives self.expect_outgoing_key_query(SERVER_NAME, "key1", response)
# server.
signedjson.sign.sign_json(response, SERVER_NAME, testkey)
self.mock_perspective_server.sign_response(response)
def post_json(destination, path, data, **kwargs): keys_to_fetch = {SERVER_NAME: {"key1": 0}}
self.assertEqual(destination, self.mock_perspective_server.server_name) keys = self.get_success(fetcher.get_keys(keys_to_fetch))
self.assertEqual(path, "/_matrix/key/v2/query") self.assertIn(SERVER_NAME, keys)
k = keys[SERVER_NAME][testverifykey_id]
self.assertEqual(k.valid_until_ts, VALID_UNTIL_TS)
self.assertEqual(k.verify_key, testverifykey)
self.assertEqual(k.verify_key.alg, "ed25519")
self.assertEqual(k.verify_key.version, "ver1")
# check that the request is for the expected key # check that the perspectives store is correctly updated
q = data["server_keys"] lookup_triplet = (SERVER_NAME, testverifykey_id, None)
self.assertEqual(list(q[SERVER_NAME].keys()), ["key1"]) key_json = self.get_success(
return {"server_keys": [response]} self.hs.get_datastore().get_server_keys_json([lookup_triplet])
)
res = key_json[lookup_triplet]
self.assertEqual(len(res), 1)
res = res[0]
self.assertEqual(res["key_id"], testverifykey_id)
self.assertEqual(res["from_server"], self.mock_perspective_server.server_name)
self.assertEqual(res["ts_added_ms"], self.reactor.seconds() * 1000)
self.assertEqual(res["ts_valid_until_ms"], VALID_UNTIL_TS)
self.http_client.post_json.side_effect = post_json self.assertEqual(
bytes(res["key_json"]), canonicaljson.encode_canonical_json(response)
)
def test_get_perspectives_own_key(self):
"""Check that we can get the perspectives server's own keys
This is slightly complicated by the fact that the perspectives server may
use different keys for signing notary responses.
"""
# arbitrarily advance the clock a bit
self.reactor.advance(100)
fetcher = PerspectivesKeyFetcher(self.hs)
SERVER_NAME = self.mock_perspective_server.server_name
testkey = signedjson.key.generate_signing_key("ver1")
testverifykey = signedjson.key.get_verify_key(testkey)
testverifykey_id = "ed25519:ver1"
VALID_UNTIL_TS = 200 * 1000
response = self.build_perspectives_response(
SERVER_NAME, testkey, VALID_UNTIL_TS
)
self.expect_outgoing_key_query(SERVER_NAME, "key1", response)
keys_to_fetch = {SERVER_NAME: {"key1": 0}} keys_to_fetch = {SERVER_NAME: {"key1": 0}}
keys = self.get_success(fetcher.get_keys(keys_to_fetch)) keys = self.get_success(fetcher.get_keys(keys_to_fetch))
@ -490,35 +562,14 @@ class PerspectivesKeyFetcherTestCase(unittest.HomeserverTestCase):
VALID_UNTIL_TS = 200 * 1000 VALID_UNTIL_TS = 200 * 1000
def build_response(): def build_response():
# valid response return self.build_perspectives_response(
response = { SERVER_NAME, testkey, VALID_UNTIL_TS
"server_name": SERVER_NAME, )
"old_verify_keys": {},
"valid_until_ts": VALID_UNTIL_TS,
"verify_keys": {
testverifykey_id: {
"key": signedjson.key.encode_verify_key_base64(testverifykey)
}
},
}
# the response must be signed by both the origin server and the perspectives
# server.
signedjson.sign.sign_json(response, SERVER_NAME, testkey)
self.mock_perspective_server.sign_response(response)
return response
def get_key_from_perspectives(response): def get_key_from_perspectives(response):
fetcher = PerspectivesKeyFetcher(self.hs) fetcher = PerspectivesKeyFetcher(self.hs)
keys_to_fetch = {SERVER_NAME: {"key1": 0}} keys_to_fetch = {SERVER_NAME: {"key1": 0}}
self.expect_outgoing_key_query(SERVER_NAME, "key1", response)
def post_json(destination, path, data, **kwargs):
self.assertEqual(destination, self.mock_perspective_server.server_name)
self.assertEqual(path, "/_matrix/key/v2/query")
return {"server_keys": [response]}
self.http_client.post_json.side_effect = post_json
return self.get_success(fetcher.get_keys(keys_to_fetch)) return self.get_success(fetcher.get_keys(keys_to_fetch))
# start with a valid response so we can check we are testing the right thing # start with a valid response so we can check we are testing the right thing

View file

@ -0,0 +1,130 @@
# -*- coding: utf-8 -*-
# Copyright 2020 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import urllib.parse
from io import BytesIO
from mock import Mock
import signedjson.key
from nacl.signing import SigningKey
from signedjson.sign import sign_json
from twisted.web.resource import NoResource
from synapse.http.site import SynapseRequest
from synapse.rest.key.v2 import KeyApiV2Resource
from synapse.util.httpresourcetree import create_resource_tree
from tests import unittest
from tests.server import FakeChannel, wait_until_result
class RemoteKeyResourceTestCase(unittest.HomeserverTestCase):
def make_homeserver(self, reactor, clock):
self.http_client = Mock()
return self.setup_test_homeserver(http_client=self.http_client)
def create_test_json_resource(self):
return create_resource_tree(
{"/_matrix/key/v2": KeyApiV2Resource(self.hs)}, root_resource=NoResource()
)
def expect_outgoing_key_request(
self, server_name: str, signing_key: SigningKey
) -> None:
"""
Tell the mock http client to expect an outgoing GET request for the given key
"""
def get_json(destination, path, ignore_backoff=False, **kwargs):
self.assertTrue(ignore_backoff)
self.assertEqual(destination, server_name)
key_id = "%s:%s" % (signing_key.alg, signing_key.version)
self.assertEqual(
path, "/_matrix/key/v2/server/%s" % (urllib.parse.quote(key_id),)
)
response = {
"server_name": server_name,
"old_verify_keys": {},
"valid_until_ts": 200 * 1000,
"verify_keys": {
key_id: {
"key": signedjson.key.encode_verify_key_base64(
signing_key.verify_key
)
}
},
}
sign_json(response, server_name, signing_key)
return response
self.http_client.get_json.side_effect = get_json
def make_notary_request(self, server_name: str, key_id: str) -> dict:
"""Send a GET request to the test server requesting the given key.
Checks that the response is a 200 and returns the decoded json body.
"""
channel = FakeChannel(self.site, self.reactor)
req = SynapseRequest(channel)
req.content = BytesIO(b"")
req.requestReceived(
b"GET",
b"/_matrix/key/v2/query/%s/%s"
% (server_name.encode("utf-8"), key_id.encode("utf-8")),
b"1.1",
)
wait_until_result(self.reactor, req)
self.assertEqual(channel.code, 200)
resp = channel.json_body
return resp
def test_get_key(self):
"""Fetch a remote key"""
SERVER_NAME = "remote.server"
testkey = signedjson.key.generate_signing_key("ver1")
self.expect_outgoing_key_request(SERVER_NAME, testkey)
resp = self.make_notary_request(SERVER_NAME, "ed25519:ver1")
keys = resp["server_keys"]
self.assertEqual(len(keys), 1)
self.assertIn("ed25519:ver1", keys[0]["verify_keys"])
self.assertEqual(len(keys[0]["verify_keys"]), 1)
# it should be signed by both the origin server and the notary
self.assertIn(SERVER_NAME, keys[0]["signatures"])
self.assertIn(self.hs.hostname, keys[0]["signatures"])
def test_get_own_key(self):
"""Fetch our own key"""
testkey = signedjson.key.generate_signing_key("ver1")
self.expect_outgoing_key_request(self.hs.hostname, testkey)
resp = self.make_notary_request(self.hs.hostname, "ed25519:ver1")
keys = resp["server_keys"]
self.assertEqual(len(keys), 1)
# it should be signed by both itself, and the notary signing key
sigs = keys[0]["signatures"]
self.assertEqual(len(sigs), 1)
self.assertIn(self.hs.hostname, sigs)
oursigs = sigs[self.hs.hostname]
self.assertEqual(len(oursigs), 2)
# and both keys should be present in the verify_keys section
self.assertIn("ed25519:ver1", keys[0]["verify_keys"])
self.assertIn("ed25519:a_lPym", keys[0]["verify_keys"])

View file

@ -20,6 +20,7 @@ from twisted.python.failure import Failure
from twisted.test.proto_helpers import AccumulatingProtocol, MemoryReactorClock from twisted.test.proto_helpers import AccumulatingProtocol, MemoryReactorClock
from twisted.web.http import unquote from twisted.web.http import unquote
from twisted.web.http_headers import Headers from twisted.web.http_headers import Headers
from twisted.web.server import Site
from synapse.http.site import SynapseRequest from synapse.http.site import SynapseRequest
from synapse.util import Clock from synapse.util import Clock
@ -42,6 +43,7 @@ class FakeChannel(object):
wire). wire).
""" """
site = attr.ib(type=Site)
_reactor = attr.ib() _reactor = attr.ib()
result = attr.ib(default=attr.Factory(dict)) result = attr.ib(default=attr.Factory(dict))
_producer = None _producer = None
@ -176,9 +178,9 @@ def make_request(
content = content.encode("utf8") content = content.encode("utf8")
site = FakeSite() site = FakeSite()
channel = FakeChannel(reactor) channel = FakeChannel(site, reactor)
req = request(site, channel) req = request(channel)
req.process = lambda: b"" req.process = lambda: b""
req.content = BytesIO(content) req.content = BytesIO(content)
req.postpath = list(map(unquote, path[1:].split(b"/"))) req.postpath = list(map(unquote, path[1:].split(b"/")))

View file

@ -2,44 +2,37 @@ from mock import Mock
from twisted.internet import defer from twisted.internet import defer
from synapse.storage.background_updates import BackgroundUpdater
from tests import unittest from tests import unittest
from tests.utils import setup_test_homeserver
class BackgroundUpdateTestCase(unittest.TestCase): class BackgroundUpdateTestCase(unittest.HomeserverTestCase):
@defer.inlineCallbacks def prepare(self, reactor, clock, homeserver):
def setUp(self): self.updates = self.hs.get_datastore().db.updates # type: BackgroundUpdater
hs = yield setup_test_homeserver(self.addCleanup) # the base test class should have run the real bg updates for us
self.store = hs.get_datastore() self.assertTrue(self.updates.has_completed_background_updates())
self.clock = hs.get_clock()
self.update_handler = Mock() self.update_handler = Mock()
self.updates.register_background_update_handler(
yield self.store.db.updates.register_background_update_handler(
"test_update", self.update_handler "test_update", self.update_handler
) )
# run the real background updates, to get them out the way
# (perhaps we should run them as part of the test HS setup, since we
# run all of the other schema setup stuff there?)
while True:
res = yield self.store.db.updates.do_next_background_update(1000)
if res is None:
break
@defer.inlineCallbacks
def test_do_background_update(self): def test_do_background_update(self):
desired_count = 1000 # the time we claim each update takes
duration_ms = 42 duration_ms = 42
# the target runtime for each bg update
target_background_update_duration_ms = 50000
# first step: make a bit of progress # first step: make a bit of progress
@defer.inlineCallbacks @defer.inlineCallbacks
def update(progress, count): def update(progress, count):
self.clock.advance_time_msec(count * duration_ms) yield self.clock.sleep((count * duration_ms) / 1000)
progress = {"my_key": progress["my_key"] + 1} progress = {"my_key": progress["my_key"] + 1}
yield self.store.db.runInteraction( yield self.hs.get_datastore().db.runInteraction(
"update_progress", "update_progress",
self.store.db.updates._background_update_progress_txn, self.updates._background_update_progress_txn,
"test_update", "test_update",
progress, progress,
) )
@ -47,37 +40,46 @@ class BackgroundUpdateTestCase(unittest.TestCase):
self.update_handler.side_effect = update self.update_handler.side_effect = update
yield self.store.db.updates.start_background_update( self.get_success(
"test_update", {"my_key": 1} self.updates.start_background_update("test_update", {"my_key": 1})
) )
self.update_handler.reset_mock() self.update_handler.reset_mock()
result = yield self.store.db.updates.do_next_background_update( res = self.get_success(
duration_ms * desired_count self.updates.do_next_background_update(
target_background_update_duration_ms
),
by=0.1,
) )
self.assertIsNotNone(result) self.assertIsNotNone(res)
# on the first call, we should get run with the default background update size
self.update_handler.assert_called_once_with( self.update_handler.assert_called_once_with(
{"my_key": 1}, self.store.db.updates.DEFAULT_BACKGROUND_BATCH_SIZE {"my_key": 1}, self.updates.DEFAULT_BACKGROUND_BATCH_SIZE
) )
# second step: complete the update # second step: complete the update
# we should now get run with a much bigger number of items to update
@defer.inlineCallbacks @defer.inlineCallbacks
def update(progress, count): def update(progress, count):
yield self.store.db.updates._end_background_update("test_update") self.assertEqual(progress, {"my_key": 2})
self.assertAlmostEqual(
count, target_background_update_duration_ms / duration_ms, places=0,
)
yield self.updates._end_background_update("test_update")
return count return count
self.update_handler.side_effect = update self.update_handler.side_effect = update
self.update_handler.reset_mock() self.update_handler.reset_mock()
result = yield self.store.db.updates.do_next_background_update( result = self.get_success(
duration_ms * desired_count self.updates.do_next_background_update(target_background_update_duration_ms)
) )
self.assertIsNotNone(result) self.assertIsNotNone(result)
self.update_handler.assert_called_once_with({"my_key": 2}, desired_count) self.update_handler.assert_called_once()
# third step: we don't expect to be called any more # third step: we don't expect to be called any more
self.update_handler.reset_mock() self.update_handler.reset_mock()
result = yield self.store.db.updates.do_next_background_update( result = self.get_success(
duration_ms * desired_count self.updates.do_next_background_update(target_background_update_duration_ms)
) )
self.assertIsNone(result) self.assertIsNone(result)
self.assertFalse(self.update_handler.called) self.assertFalse(self.update_handler.called)

View file

@ -60,21 +60,14 @@ class EventFederationWorkerStoreTestCase(tests.unittest.TestCase):
(event_id, bytearray(b"ffff")), (event_id, bytearray(b"ffff")),
) )
for i in range(0, 11): for i in range(0, 20):
yield self.store.db.runInteraction("insert", insert_event, i) yield self.store.db.runInteraction("insert", insert_event, i)
# this should get the last five and five others # this should get the last ten
r = yield self.store.get_prev_events_for_room(room_id) r = yield self.store.get_prev_events_for_room(room_id)
self.assertEqual(10, len(r)) self.assertEqual(10, len(r))
for i in range(0, 5): for i in range(0, 10):
el = r[i] self.assertEqual("$event_%i:local" % (19 - i), r[i])
depth = el[2]
self.assertEqual(10 - i, depth)
for i in range(5, 5):
el = r[i]
depth = el[2]
self.assertLessEqual(5, depth)
@defer.inlineCallbacks @defer.inlineCallbacks
def test_get_rooms_with_many_extremities(self): def test_get_rooms_with_many_extremities(self):

View file

@ -36,7 +36,7 @@ from synapse.config.homeserver import HomeServerConfig
from synapse.config.ratelimiting import FederationRateLimitConfig from synapse.config.ratelimiting import FederationRateLimitConfig
from synapse.federation.transport import server as federation_server from synapse.federation.transport import server as federation_server
from synapse.http.server import JsonResource from synapse.http.server import JsonResource
from synapse.http.site import SynapseRequest from synapse.http.site import SynapseRequest, SynapseSite
from synapse.logging.context import LoggingContext from synapse.logging.context import LoggingContext
from synapse.server import HomeServer from synapse.server import HomeServer
from synapse.types import Requester, UserID, create_requester from synapse.types import Requester, UserID, create_requester
@ -210,6 +210,15 @@ class HomeserverTestCase(TestCase):
# Register the resources # Register the resources
self.resource = self.create_test_json_resource() self.resource = self.create_test_json_resource()
# create a site to wrap the resource.
self.site = SynapseSite(
logger_name="synapse.access.http.fake",
site_tag="test",
config={},
resource=self.resource,
server_version_string="1",
)
from tests.rest.client.v1.utils import RestHelper from tests.rest.client.v1.utils import RestHelper
self.helper = RestHelper(self.hs, self.resource, getattr(self, "user_id", None)) self.helper = RestHelper(self.hs, self.resource, getattr(self, "user_id", None))
@ -522,10 +531,6 @@ class HomeserverTestCase(TestCase):
secrets = self.hs.get_secrets() secrets = self.hs.get_secrets()
requester = Requester(user, None, False, None, None) requester = Requester(user, None, False, None, None)
prev_events_and_hashes = None
if prev_event_ids:
prev_events_and_hashes = [[p, {}, 0] for p in prev_event_ids]
event, context = self.get_success( event, context = self.get_success(
event_creator.create_event( event_creator.create_event(
requester, requester,
@ -535,7 +540,7 @@ class HomeserverTestCase(TestCase):
"sender": user.to_string(), "sender": user.to_string(),
"content": {"body": secrets.token_hex(), "msgtype": "m.text"}, "content": {"body": secrets.token_hex(), "msgtype": "m.text"},
}, },
prev_events_and_hashes=prev_events_and_hashes, prev_event_ids=prev_event_ids,
) )
) )

View file

@ -182,7 +182,6 @@ commands = mypy \
synapse/logging/ \ synapse/logging/ \
synapse/module_api \ synapse/module_api \
synapse/rest/consent \ synapse/rest/consent \
synapse/rest/media/v0 \
synapse/rest/saml2 \ synapse/rest/saml2 \
synapse/spam_checker_api \ synapse/spam_checker_api \
synapse/storage/engines \ synapse/storage/engines \