Merge remote-tracking branch 'origin/develop' into erikj/timings

This commit is contained in:
David Baker 2016-06-03 16:56:27 +01:00
commit 0e2ed5445b
6 changed files with 673 additions and 51 deletions

View file

@ -1,3 +1,89 @@
Changes in synapse v0.16.0-rc1 (2016-06-03)
===========================================
Version 0.15 was not released. See v0.15.0-rc1 below for additional changes.
Features:
* Add email notifications for missed messages (PR #759, #786, #799, #810, #815,
#821)
* Add a ``url_preview_ip_range_whitelist`` config param (PR #760)
* Add /report endpoint (PR #762)
* Add basic ignore user API (PR #763)
* Add an openidish mechanism for proving that you own a given user_id (PR #765)
* Allow clients to specify a server_name to avoid 'No known servers' (PR #794)
* Add secondary_directory_servers option to fetch room list from other servers
(PR #808, #813)
Changes:
* Report per request metrics for all of the things using request_handler (PR
#756)
* Correctly handle ``NULL`` password hashes from the database (PR #775)
* Allow receipts for events we haven't seen in the db (PR #784)
* Make synctl read a cache factor from config file (PR #785)
* Increment badge count per missed convo, not per msg (PR #793)
* Special case m.room.third_party_invite event auth to match invites (PR #814)
Bug fixes:
* Fix typo in event_auth servlet path (PR #757)
* Fix password reset (PR #758)
Performance improvements:
* Reduce database inserts when sending transactions (PR #767)
* Queue events by room for persistence (PR #768)
* Add cache to ``get_user_by_id`` (PR #772)
* Add and use ``get_domain_from_id`` (PR #773)
* Use tree cache for ``get_linearized_receipts_for_room`` (PR #779)
* Remove unused indices (PR #782)
* Add caches to ``bulk_get_push_rules*`` (PR #804)
* Cache ``get_event_reference_hashes`` (PR #806)
* Add ``get_users_with_read_receipts_in_room`` cache (PR #809)
* Use state to calculate ``get_users_in_room`` (PR #811)
* Load push rules in storage layer so that they get cached (PR #825)
* Make ``get_joined_hosts_for_room`` use get_users_in_room (PR #828)
* Poke notifier on next reactor tick (PR #829)
* Change CacheMetrics to be quicker (PR #830)
Changes in synapse v0.15.0-rc1 (2016-04-26)
===========================================
Features:
* Add login support for Javascript Web Tokens, thanks to Niklas Riekenbrauck
(PR #671,#687)
* Add URL previewing support (PR #688)
* Add login support for LDAP, thanks to Christoph Witzany (PR #701)
* Add GET endpoint for pushers (PR #716)
Changes:
* Never notify for member events (PR #667)
* Deduplicate identical ``/sync`` requests (PR #668)
* Require user to have left room to forget room (PR #673)
* Use DNS cache if within TTL (PR #677)
* Let users see their own leave events (PR #699)
* Deduplicate membership changes (PR #700)
* Increase performance of pusher code (PR #705)
* Respond with error status 504 if failed to talk to remote server (PR #731)
* Increase search performance on postgres (PR #745)
Bug fixes:
* Fix bug where disabling all notifications still resulted in push (PR #678)
* Fix bug where users couldn't reject remote invites if remote refused (PR #691)
* Fix bug where synapse attempted to backfill from itself (PR #693)
* Fix bug where profile information was not correctly added when joining remote
rooms (PR #703)
* Fix bug where register API required incorrect key name for AS registration
(PR #727)
Changes in synapse v0.14.0 (2016-03-30)
=======================================

View file

@ -16,4 +16,4 @@
""" This is a reference implementation of a Matrix home server.
"""
__version__ = "0.14.0"
__version__ = "0.16.0-rc1"

510
synapse/app/synchrotron.py Normal file
View file

@ -0,0 +1,510 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright 2016 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import synapse
from synapse.api.constants import EventTypes, PresenceState
from synapse.config._base import ConfigError
from synapse.config.database import DatabaseConfig
from synapse.config.logger import LoggingConfig
from synapse.config.appservice import AppServiceConfig
from synapse.events import FrozenEvent
from synapse.handlers.presence import PresenceHandler
from synapse.http.site import SynapseSite
from synapse.http.server import JsonResource
from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
from synapse.rest.client.v2_alpha import sync
from synapse.replication.slave.storage._base import BaseSlavedStore
from synapse.replication.slave.storage.events import SlavedEventStore
from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
from synapse.replication.slave.storage.filtering import SlavedFilteringStore
from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore
from synapse.replication.slave.storage.presence import SlavedPresenceStore
from synapse.server import HomeServer
from synapse.storage.client_ips import ClientIpStore
from synapse.storage.engines import create_engine
from synapse.storage.presence import UserPresenceState
from synapse.storage.roommember import RoomMemberStore
from synapse.util.async import sleep
from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.logcontext import LoggingContext, preserve_fn
from synapse.util.manhole import manhole
from synapse.util.rlimit import change_resource_limit
from synapse.util.stringutils import random_string
from synapse.util.versionstring import get_version_string
from twisted.internet import reactor, defer
from twisted.web.resource import Resource
from daemonize import Daemonize
import sys
import logging
import contextlib
import ujson as json
logger = logging.getLogger("synapse.app.synchrotron")
class SynchrotronConfig(DatabaseConfig, LoggingConfig, AppServiceConfig):
def read_config(self, config):
self.replication_url = config["replication_url"]
self.server_name = config["server_name"]
self.use_insecure_ssl_client_just_for_testing_do_not_use = config.get(
"use_insecure_ssl_client_just_for_testing_do_not_use", False
)
self.user_agent_suffix = None
self.listeners = config["listeners"]
self.soft_file_limit = config.get("soft_file_limit")
self.daemonize = config.get("daemonize")
self.pid_file = self.abspath(config.get("pid_file"))
self.macaroon_secret_key = config["macaroon_secret_key"]
self.expire_access_token = config.get("expire_access_token", False)
def default_config(self, server_name, **kwargs):
pid_file = self.abspath("synchroton.pid")
return """\
# Slave configuration
# The replication listener on the synapse to talk to.
#replication_url: https://localhost:{replication_port}/_synapse/replication
server_name: "%(server_name)s"
listeners:
# Enable a /sync listener on the synchrontron
#- type: http
# port: {http_port}
# bind_address: ""
# Enable a ssh manhole listener on the synchrotron
# - type: manhole
# port: {manhole_port}
# bind_address: 127.0.0.1
# Enable a metric listener on the synchrotron
# - type: http
# port: {metrics_port}
# bind_address: 127.0.0.1
# resources:
# - names: ["metrics"]
# compress: False
report_stats: False
daemonize: False
pid_file: %(pid_file)s
""" % locals()
class SynchrotronSlavedStore(
SlavedPushRuleStore,
SlavedEventStore,
SlavedReceiptsStore,
SlavedAccountDataStore,
SlavedApplicationServiceStore,
SlavedRegistrationStore,
SlavedFilteringStore,
SlavedPresenceStore,
BaseSlavedStore,
ClientIpStore, # After BaseSlavedStre because the constructor is different
):
def get_presence_list_accepted(self, user_localpart):
return ()
# XXX: This is a bit broken because we don't persist forgotten rooms
# in a way that they can be streamed. This means that we don't have a
# way to invalidate the forgotten rooms cache correctly.
# For now we expire the cache every 10 minutes.
BROKEN_CACHE_EXPIRY_MS = 60 * 60 * 1000
who_forgot_in_room = (
RoomMemberStore.__dict__["who_forgot_in_room"]
)
UPDATE_SYNCING_USERS_MS = 10 * 1000
class SynchrotronPresence(object):
def __init__(self, hs):
self.http_client = hs.get_simple_http_client()
self.store = hs.get_datastore()
self.user_to_num_current_syncs = {}
self.syncing_users_url = hs.config.replication_url + "/syncing_users"
self.clock = hs.get_clock()
active_presence = self.store.take_presence_startup_info()
self.user_to_current_state = {
state.user_id: state
for state in active_presence
}
self.process_id = random_string(16)
logger.info("Presence process_id is %r", self.process_id)
self._sending_sync = False
self._need_to_send_sync = False
self.clock.looping_call(
self._send_syncing_users_regularly,
UPDATE_SYNCING_USERS_MS,
)
reactor.addSystemEventTrigger("before", "shutdown", self._on_shutdown)
def set_state(self, user, state):
# TODO Hows this supposed to work?
pass
get_states = PresenceHandler.get_states.__func__
current_state_for_users = PresenceHandler.current_state_for_users.__func__
@defer.inlineCallbacks
def user_syncing(self, user_id, affect_presence):
if affect_presence:
curr_sync = self.user_to_num_current_syncs.get(user_id, 0)
self.user_to_num_current_syncs[user_id] = curr_sync + 1
prev_states = yield self.current_state_for_users([user_id])
if prev_states[user_id].state == PresenceState.OFFLINE:
# TODO: Don't block the sync request on this HTTP hit.
yield self._send_syncing_users_now()
def _end():
if affect_presence:
self.user_to_num_current_syncs[user_id] -= 1
@contextlib.contextmanager
def _user_syncing():
try:
yield
finally:
_end()
defer.returnValue(_user_syncing())
@defer.inlineCallbacks
def _on_shutdown(self):
# When the synchrotron is shutdown tell the master to clear the in
# progress syncs for this process
self.user_to_num_current_syncs.clear()
yield self._send_syncing_users_now()
def _send_syncing_users_regularly(self):
# Only send an update if we aren't in the middle of sending one.
if not self._sending_sync:
preserve_fn(self._send_syncing_users_now)()
@defer.inlineCallbacks
def _send_syncing_users_now(self):
if self._sending_sync:
# We don't want to race with sending another update.
# Instead we wait for that update to finish and send another
# update afterwards.
self._need_to_send_sync = True
return
# Flag that we are sending an update.
self._sending_sync = True
yield self.http_client.post_json_get_json(self.syncing_users_url, {
"process_id": self.process_id,
"syncing_users": [
user_id for user_id, count in self.user_to_num_current_syncs.items()
if count > 0
],
})
# Unset the flag as we are no longer sending an update.
self._sending_sync = False
if self._need_to_send_sync:
# If something happened while we were sending the update then
# we might need to send another update.
# TODO: Check if the update that was sent matches the current state
# as we only need to send an update if they are different.
self._need_to_send_sync = False
yield self._send_syncing_users_now()
def process_replication(self, result):
stream = result.get("presence", {"rows": []})
for row in stream["rows"]:
(
position, user_id, state, last_active_ts,
last_federation_update_ts, last_user_sync_ts, status_msg,
currently_active
) = row
self.user_to_current_state[user_id] = UserPresenceState(
user_id, state, last_active_ts,
last_federation_update_ts, last_user_sync_ts, status_msg,
currently_active
)
class SynchrotronTyping(object):
def __init__(self, hs):
self._latest_room_serial = 0
self._room_serials = {}
self._room_typing = {}
def stream_positions(self):
return {"typing": self._latest_room_serial}
def process_replication(self, result):
stream = result.get("typing")
if stream:
self._latest_room_serial = int(stream["position"])
for row in stream["rows"]:
position, room_id, typing_json = row
typing = json.loads(typing_json)
self._room_serials[room_id] = position
self._room_typing[room_id] = typing
class SynchrotronApplicationService(object):
def notify_interested_services(self, event):
pass
class SynchrotronServer(HomeServer):
def get_db_conn(self, run_new_connection=True):
# Any param beginning with cp_ is a parameter for adbapi, and should
# not be passed to the database engine.
db_params = {
k: v for k, v in self.db_config.get("args", {}).items()
if not k.startswith("cp_")
}
db_conn = self.database_engine.module.connect(**db_params)
if run_new_connection:
self.database_engine.on_new_connection(db_conn)
return db_conn
def setup(self):
logger.info("Setting up.")
self.datastore = SynchrotronSlavedStore(self.get_db_conn(), self)
logger.info("Finished setting up.")
def _listen_http(self, listener_config):
port = listener_config["port"]
bind_address = listener_config.get("bind_address", "")
site_tag = listener_config.get("tag", port)
resources = {}
for res in listener_config["resources"]:
for name in res["names"]:
if name == "metrics":
resources[METRICS_PREFIX] = MetricsResource(self)
elif name == "client":
resource = JsonResource(self, canonical_json=False)
sync.register_servlets(self, resource)
resources.update({
"/_matrix/client/r0": resource,
"/_matrix/client/unstable": resource,
"/_matrix/client/v2_alpha": resource,
})
root_resource = create_resource_tree(resources, Resource())
reactor.listenTCP(
port,
SynapseSite(
"synapse.access.http.%s" % (site_tag,),
site_tag,
listener_config,
root_resource,
),
interface=bind_address
)
logger.info("Synapse synchrotron now listening on port %d", port)
def start_listening(self):
for listener in self.config.listeners:
if listener["type"] == "http":
self._listen_http(listener)
elif listener["type"] == "manhole":
reactor.listenTCP(
listener["port"],
manhole(
username="matrix",
password="rabbithole",
globals={"hs": self},
),
interface=listener.get("bind_address", '127.0.0.1')
)
else:
logger.warn("Unrecognized listener type: %s", listener["type"])
@defer.inlineCallbacks
def replicate(self):
http_client = self.get_simple_http_client()
store = self.get_datastore()
replication_url = self.config.replication_url
clock = self.get_clock()
notifier = self.get_notifier()
presence_handler = self.get_presence_handler()
typing_handler = self.get_typing_handler()
def expire_broken_caches():
store.who_forgot_in_room.invalidate_all()
def notify_from_stream(
result, stream_name, stream_key, room=None, user=None
):
stream = result.get(stream_name)
if stream:
position_index = stream["field_names"].index("position")
if room:
room_index = stream["field_names"].index(room)
if user:
user_index = stream["field_names"].index(user)
users = ()
rooms = ()
for row in stream["rows"]:
position = row[position_index]
if user:
users = (row[user_index],)
if room:
rooms = (row[room_index],)
notifier.on_new_event(
stream_key, position, users=users, rooms=rooms
)
def notify(result):
stream = result.get("events")
if stream:
max_position = stream["position"]
for row in stream["rows"]:
position = row[0]
internal = json.loads(row[1])
event_json = json.loads(row[2])
event = FrozenEvent(event_json, internal_metadata_dict=internal)
extra_users = ()
if event.type == EventTypes.Member:
extra_users = (event.state_key,)
notifier.on_new_room_event(
event, position, max_position, extra_users
)
notify_from_stream(
result, "push_rules", "push_rules_key", user="user_id"
)
notify_from_stream(
result, "user_account_data", "account_data_key", user="user_id"
)
notify_from_stream(
result, "room_account_data", "account_data_key", user="user_id"
)
notify_from_stream(
result, "tag_account_data", "account_data_key", user="user_id"
)
notify_from_stream(
result, "receipts", "receipt_key", room="room_id"
)
notify_from_stream(
result, "typing", "typing_key", room="room_id"
)
next_expire_broken_caches_ms = 0
while True:
try:
args = store.stream_positions()
args.update(typing_handler.stream_positions())
args["timeout"] = 30000
result = yield http_client.get_json(replication_url, args=args)
now_ms = clock.time_msec()
if now_ms > next_expire_broken_caches_ms:
expire_broken_caches()
next_expire_broken_caches_ms = (
now_ms + store.BROKEN_CACHE_EXPIRY_MS
)
yield store.process_replication(result)
typing_handler.process_replication(result)
presence_handler.process_replication(result)
notify(result)
except:
logger.exception("Error replicating from %r", replication_url)
sleep(5)
def build_presence_handler(self):
return SynchrotronPresence(self)
def build_typing_handler(self):
return SynchrotronTyping(self)
def setup(config_options):
try:
config = SynchrotronConfig.load_config(
"Synapse synchrotron", config_options
)
except ConfigError as e:
sys.stderr.write("\n" + e.message + "\n")
sys.exit(1)
if not config:
sys.exit(0)
config.setup_logging()
database_engine = create_engine(config.database_config)
ss = SynchrotronServer(
config.server_name,
db_config=config.database_config,
config=config,
version_string=get_version_string("Synapse", synapse),
database_engine=database_engine,
application_service_handler=SynchrotronApplicationService(),
)
ss.setup()
ss.start_listening()
change_resource_limit(ss.config.soft_file_limit)
def start():
ss.get_datastore().start_profiling()
ss.replicate()
reactor.callWhenRunning(start)
return ss
if __name__ == '__main__':
with LoggingContext("main"):
ps = setup(sys.argv[1:])
if ps.config.daemonize:
def run():
with LoggingContext("run"):
change_resource_limit(ps.config.soft_file_limit)
reactor.run()
daemon = Daemonize(
app="synapse-pusher",
pid=ps.config.pid_file,
action=run,
auto_close_fds=False,
verbose=True,
logger=logger,
)
daemon.start()
else:
reactor.run()

View file

@ -186,7 +186,7 @@ class Mailer(object):
multipart_msg = MIMEMultipart('alternative')
multipart_msg['Subject'] = "[%s] %s" % (self.app_name, summary_text)
multipart_msg['From'] = self.hs.config.email_notif_from
multipart_msg['From'] = from_string
multipart_msg['To'] = email_address
multipart_msg['Date'] = email.utils.formatdate()
multipart_msg['Message-ID'] = email.utils.make_msgid()

View file

@ -17,7 +17,7 @@ from twisted.internet import defer
from .appservice import (
ApplicationServiceStore, ApplicationServiceTransactionStore
)
from ._base import Cache, LoggingTransaction
from ._base import LoggingTransaction
from .directory import DirectoryStore
from .events import EventsStore
from .presence import PresenceStore, UserPresenceState
@ -45,6 +45,7 @@ from .search import SearchStore
from .tags import TagsStore
from .account_data import AccountDataStore
from .openid import OpenIdStore
from .client_ips import ClientIpStore
from .util.id_generators import IdGenerator, StreamIdGenerator, ChainedIdGenerator
@ -58,12 +59,6 @@ import logging
logger = logging.getLogger(__name__)
# Number of msec of granularity to store the user IP 'last seen' time. Smaller
# times give more inserts into the database even for readonly API hits
# 120 seconds == 2 minutes
LAST_SEEN_GRANULARITY = 120 * 1000
class DataStore(RoomMemberStore, RoomStore,
RegistrationStore, StreamStore, ProfileStore,
PresenceStore, TransactionStore,
@ -84,6 +79,7 @@ class DataStore(RoomMemberStore, RoomStore,
AccountDataStore,
EventPushActionsStore,
OpenIdStore,
ClientIpStore,
):
def __init__(self, db_conn, hs):
@ -91,11 +87,6 @@ class DataStore(RoomMemberStore, RoomStore,
self._clock = hs.get_clock()
self.database_engine = hs.database_engine
self.client_ip_last_seen = Cache(
name="client_ip_last_seen",
keylen=4,
)
self._stream_id_gen = StreamIdGenerator(
db_conn, "events", "stream_ordering",
extra_tables=[("local_invites", "stream_id")]
@ -216,39 +207,6 @@ class DataStore(RoomMemberStore, RoomStore,
return [UserPresenceState(**row) for row in rows]
@defer.inlineCallbacks
def insert_client_ip(self, user, access_token, ip, user_agent):
now = int(self._clock.time_msec())
key = (user.to_string(), access_token, ip)
try:
last_seen = self.client_ip_last_seen.get(key)
except KeyError:
last_seen = None
# Rate-limited inserts
if last_seen is not None and (now - last_seen) < LAST_SEEN_GRANULARITY:
defer.returnValue(None)
self.client_ip_last_seen.prefill(key, now)
# It's safe not to lock here: a) no unique constraint,
# b) LAST_SEEN_GRANULARITY makes concurrent updates incredibly unlikely
yield self._simple_upsert(
"user_ips",
keyvalues={
"user_id": user.to_string(),
"access_token": access_token,
"ip": ip,
"user_agent": user_agent,
},
values={
"last_seen": now,
},
desc="insert_client_ip",
lock=False,
)
@defer.inlineCallbacks
def count_daily_users(self):
"""

View file

@ -0,0 +1,68 @@
# -*- coding: utf-8 -*-
# Copyright 2016 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ._base import SQLBaseStore, Cache
from twisted.internet import defer
# Number of msec of granularity to store the user IP 'last seen' time. Smaller
# times give more inserts into the database even for readonly API hits
# 120 seconds == 2 minutes
LAST_SEEN_GRANULARITY = 120 * 1000
class ClientIpStore(SQLBaseStore):
def __init__(self, hs):
self.client_ip_last_seen = Cache(
name="client_ip_last_seen",
keylen=4,
)
super(ClientIpStore, self).__init__(hs)
@defer.inlineCallbacks
def insert_client_ip(self, user, access_token, ip, user_agent):
now = int(self._clock.time_msec())
key = (user.to_string(), access_token, ip)
try:
last_seen = self.client_ip_last_seen.get(key)
except KeyError:
last_seen = None
# Rate-limited inserts
if last_seen is not None and (now - last_seen) < LAST_SEEN_GRANULARITY:
defer.returnValue(None)
self.client_ip_last_seen.prefill(key, now)
# It's safe not to lock here: a) no unique constraint,
# b) LAST_SEEN_GRANULARITY makes concurrent updates incredibly unlikely
yield self._simple_upsert(
"user_ips",
keyvalues={
"user_id": user.to_string(),
"access_token": access_token,
"ip": ip,
"user_agent": user_agent,
},
values={
"last_seen": now,
},
desc="insert_client_ip",
lock=False,
)