Merge pull request #2281 from matrix-org/erikj/phone_home_stats

Fix phone home stats
This commit is contained in:
Erik Johnston 2017-06-15 12:46:23 +01:00 committed by GitHub
commit 052c5d19d5
5 changed files with 88 additions and 217 deletions

View file

@ -35,7 +35,7 @@ from synapse.storage.prepare_database import UpgradeDatabaseException, prepare_d
from synapse.server import HomeServer from synapse.server import HomeServer
from twisted.internet import reactor, task, defer from twisted.internet import reactor, defer
from twisted.application import service from twisted.application import service
from twisted.web.resource import Resource, EncodingResourceWrapper from twisted.web.resource import Resource, EncodingResourceWrapper
from twisted.web.static import File from twisted.web.static import File
@ -53,7 +53,7 @@ from synapse.api.urls import (
from synapse.config.homeserver import HomeServerConfig from synapse.config.homeserver import HomeServerConfig
from synapse.crypto import context_factory from synapse.crypto import context_factory
from synapse.util.logcontext import LoggingContext, PreserveLoggingContext from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
from synapse.metrics import register_memory_metrics, get_metrics_for from synapse.metrics import register_memory_metrics
from synapse.metrics.resource import MetricsResource, METRICS_PREFIX from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
from synapse.replication.tcp.resource import ReplicationStreamProtocolFactory from synapse.replication.tcp.resource import ReplicationStreamProtocolFactory
from synapse.federation.transport.server import TransportLayerServer from synapse.federation.transport.server import TransportLayerServer
@ -398,7 +398,8 @@ def run(hs):
ThreadPool._worker = profile(ThreadPool._worker) ThreadPool._worker = profile(ThreadPool._worker)
reactor.run = profile(reactor.run) reactor.run = profile(reactor.run)
start_time = hs.get_clock().time() clock = hs.get_clock()
start_time = clock.time()
stats = {} stats = {}
@ -410,41 +411,23 @@ def run(hs):
if uptime < 0: if uptime < 0:
uptime = 0 uptime = 0
# If the stats directory is empty then this is the first time we've
# reported stats.
first_time = not stats
stats["homeserver"] = hs.config.server_name stats["homeserver"] = hs.config.server_name
stats["timestamp"] = now stats["timestamp"] = now
stats["uptime_seconds"] = uptime stats["uptime_seconds"] = uptime
stats["total_users"] = yield hs.get_datastore().count_all_users() stats["total_users"] = yield hs.get_datastore().count_all_users()
total_nonbridged_users = yield hs.get_datastore().count_nonbridged_users()
stats["total_nonbridged_users"] = total_nonbridged_users
room_count = yield hs.get_datastore().get_room_count() room_count = yield hs.get_datastore().get_room_count()
stats["total_room_count"] = room_count stats["total_room_count"] = room_count
stats["daily_active_users"] = yield hs.get_datastore().count_daily_users() stats["daily_active_users"] = yield hs.get_datastore().count_daily_users()
daily_messages = yield hs.get_datastore().count_daily_messages() stats["daily_active_rooms"] = yield hs.get_datastore().count_daily_active_rooms()
if daily_messages is not None: stats["daily_messages"] = yield hs.get_datastore().count_daily_messages()
stats["daily_messages"] = daily_messages
else:
stats.pop("daily_messages", None)
if first_time: daily_sent_messages = yield hs.get_datastore().count_daily_sent_messages()
# Add callbacks to report the synapse stats as metrics whenever stats["daily_sent_messages"] = daily_sent_messages
# prometheus requests them, typically every 30s.
# As some of the stats are expensive to calculate we only update
# them when synapse phones home to matrix.org every 24 hours.
metrics = get_metrics_for("synapse.usage")
metrics.add_callback("timestamp", lambda: stats["timestamp"])
metrics.add_callback("uptime_seconds", lambda: stats["uptime_seconds"])
metrics.add_callback("total_users", lambda: stats["total_users"])
metrics.add_callback("total_room_count", lambda: stats["total_room_count"])
metrics.add_callback(
"daily_active_users", lambda: stats["daily_active_users"]
)
metrics.add_callback(
"daily_messages", lambda: stats.get("daily_messages", 0)
)
logger.info("Reporting stats to matrix.org: %s" % (stats,)) logger.info("Reporting stats to matrix.org: %s" % (stats,))
try: try:
@ -456,9 +439,12 @@ def run(hs):
logger.warn("Error reporting stats: %s", e) logger.warn("Error reporting stats: %s", e)
if hs.config.report_stats: if hs.config.report_stats:
phone_home_task = task.LoopingCall(phone_stats_home) logger.info("Scheduling stats reporting for 3 hour intervals")
logger.info("Scheduling stats reporting for 24 hour intervals") clock.looping_call(phone_stats_home, 3 * 60 * 60 * 1000)
phone_home_task.start(60 * 60 * 24, now=False)
# We wait 5 minutes to send the first set of stats as the server can
# be quite busy the first few minutes
clock.call_later(5 * 60, phone_stats_home)
def in_thread(): def in_thread():
# Uncomment to enable tracing of log context changes. # Uncomment to enable tracing of log context changes.

View file

@ -246,7 +246,7 @@ class DataStore(RoomMemberStore, RoomStore,
cur.close() cur.close()
self.find_stream_orderings_looping_call = self._clock.looping_call( self.find_stream_orderings_looping_call = self._clock.looping_call(
self._find_stream_orderings_for_times, 60 * 60 * 1000 self._find_stream_orderings_for_times, 10 * 60 * 1000
) )
self._stream_order_on_start = self.get_room_max_stream_ordering() self._stream_order_on_start = self.get_room_max_stream_ordering()
@ -287,17 +287,19 @@ class DataStore(RoomMemberStore, RoomStore,
Counts the number of users who used this homeserver in the last 24 hours. Counts the number of users who used this homeserver in the last 24 hours.
""" """
def _count_users(txn): def _count_users(txn):
txn.execute( yesterday = int(self._clock.time_msec()) - (1000 * 60 * 60 * 24),
"SELECT COUNT(DISTINCT user_id) AS users"
" FROM user_ips" sql = """
" WHERE last_seen > ?", SELECT COALESCE(count(*), 0) FROM (
# This is close enough to a day for our purposes. SELECT user_id FROM user_ips
(int(self._clock.time_msec()) - (1000 * 60 * 60 * 24),) WHERE last_seen > ?
) GROUP BY user_id
rows = self.cursor_to_dict(txn) ) u
if rows: """
return rows[0]["users"]
return 0 txn.execute(sql, (yesterday,))
count, = txn.fetchone()
return count
ret = yield self.runInteraction("count_users", _count_users) ret = yield self.runInteraction("count_users", _count_users)
defer.returnValue(ret) defer.returnValue(ret)

View file

@ -38,7 +38,6 @@ from functools import wraps
import synapse.metrics import synapse.metrics
import logging import logging
import math
import ujson as json import ujson as json
# these are only included to make the type annotations work # these are only included to make the type annotations work
@ -1625,68 +1624,54 @@ class EventsStore(SQLBaseStore):
call to this function, it will return None. call to this function, it will return None.
""" """
def _count_messages(txn): def _count_messages(txn):
now = self.hs.get_clock().time() sql = """
SELECT COALESCE(COUNT(*), 0) FROM events
txn.execute( WHERE type = 'm.room.message'
"SELECT reported_stream_token, reported_time FROM stats_reporting" AND stream_ordering > ?
) """
last_reported = self.cursor_to_dict(txn) txn.execute(sql, (self.stream_ordering_day_ago,))
count, = txn.fetchone()
txn.execute( return count
"SELECT stream_ordering"
" FROM events"
" ORDER BY stream_ordering DESC"
" LIMIT 1"
)
now_reporting = self.cursor_to_dict(txn)
if not now_reporting:
logger.info("Calculating daily messages skipped; no now_reporting")
return None
now_reporting = now_reporting[0]["stream_ordering"]
txn.execute("DELETE FROM stats_reporting")
txn.execute(
"INSERT INTO stats_reporting"
" (reported_stream_token, reported_time)"
" VALUES (?, ?)",
(now_reporting, now,)
)
if not last_reported:
logger.info("Calculating daily messages skipped; no last_reported")
return None
# Close enough to correct for our purposes.
yesterday = (now - 24 * 60 * 60)
since_yesterday_seconds = yesterday - last_reported[0]["reported_time"]
any_since_yesterday = math.fabs(since_yesterday_seconds) > 60 * 60
if any_since_yesterday:
logger.info(
"Calculating daily messages skipped; since_yesterday_seconds: %d" %
(since_yesterday_seconds,)
)
return None
txn.execute(
"SELECT COUNT(*) as messages"
" FROM events NATURAL JOIN event_json"
" WHERE json like '%m.room.message%'"
" AND stream_ordering > ?"
" AND stream_ordering <= ?",
(
last_reported[0]["reported_stream_token"],
now_reporting,
)
)
rows = self.cursor_to_dict(txn)
if not rows:
logger.info("Calculating daily messages skipped; messages count missing")
return None
return rows[0]["messages"]
ret = yield self.runInteraction("count_messages", _count_messages) ret = yield self.runInteraction("count_messages", _count_messages)
defer.returnValue(ret) defer.returnValue(ret)
@defer.inlineCallbacks
def count_daily_sent_messages(self):
def _count_messages(txn):
# This is good enough as if you have silly characters in your own
# hostname then thats your own fault.
like_clause = "%:" + self.hs.hostname
sql = """
SELECT COALESCE(COUNT(*), 0) FROM events
WHERE type = 'm.room.message'
AND sender LIKE ?
AND stream_ordering > ?
"""
txn.execute(sql, (like_clause, self.stream_ordering_day_ago,))
count, = txn.fetchone()
return count
ret = yield self.runInteraction("count_daily_sent_messages", _count_messages)
defer.returnValue(ret)
@defer.inlineCallbacks
def count_daily_active_rooms(self):
def _count(txn):
sql = """
SELECT COALESCE(COUNT(DISTINCT room_id), 0) FROM events
WHERE type = 'm.room.message'
AND stream_ordering > ?
"""
txn.execute(sql, (self.stream_ordering_day_ago,))
count, = txn.fetchone()
return count
ret = yield self.runInteraction("count_daily_active_rooms", _count)
defer.returnValue(ret)
@defer.inlineCallbacks @defer.inlineCallbacks
def _background_reindex_fields_sender(self, progress, batch_size): def _background_reindex_fields_sender(self, progress, batch_size):
target_min_stream_id = progress["target_min_stream_id_inclusive"] target_min_stream_id = progress["target_min_stream_id_inclusive"]

View file

@ -437,6 +437,19 @@ class RegistrationStore(background_updates.BackgroundUpdateStore):
ret = yield self.runInteraction("count_users", _count_users) ret = yield self.runInteraction("count_users", _count_users)
defer.returnValue(ret) defer.returnValue(ret)
@defer.inlineCallbacks
def count_nonbridged_users(self):
def _count_users(txn):
txn.execute("""
SELECT COALESCE(COUNT(*), 0) FROM users
WHERE appservice_id IS NULL
""")
count, = txn.fetchone()
return count
ret = yield self.runInteraction("count_users", _count_users)
defer.returnValue(ret)
@defer.inlineCallbacks @defer.inlineCallbacks
def find_next_generated_user_id_localpart(self): def find_next_generated_user_id_localpart(self):
""" """

View file

@ -1,115 +0,0 @@
# -*- coding: utf-8 -*-
# Copyright 2015, 2016 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from mock import Mock
from synapse.types import RoomID, UserID
from tests import unittest
from twisted.internet import defer
from tests.storage.event_injector import EventInjector
from tests.utils import setup_test_homeserver
class EventsStoreTestCase(unittest.TestCase):
@defer.inlineCallbacks
def setUp(self):
self.hs = yield setup_test_homeserver(
resource_for_federation=Mock(),
http_client=None,
)
self.store = self.hs.get_datastore()
self.db_pool = self.hs.get_db_pool()
self.message_handler = self.hs.get_handlers().message_handler
self.event_injector = EventInjector(self.hs)
@defer.inlineCallbacks
def test_count_daily_messages(self):
yield self.db_pool.runQuery("DELETE FROM stats_reporting")
self.hs.clock.now = 100
# Never reported before, and nothing which could be reported
count = yield self.store.count_daily_messages()
self.assertIsNone(count)
count = yield self.db_pool.runQuery("SELECT COUNT(*) FROM stats_reporting")
self.assertEqual([(0,)], count)
# Create something to report
room = RoomID.from_string("!abc123:test")
user = UserID.from_string("@raccoonlover:test")
yield self.event_injector.create_room(room, user)
self.base_event = yield self._get_last_stream_token()
yield self.event_injector.inject_message(room, user, "Raccoons are really cute")
# Never reported before, something could be reported, but isn't because
# it isn't old enough.
count = yield self.store.count_daily_messages()
self.assertIsNone(count)
yield self._assert_stats_reporting(1, self.hs.clock.now)
# Already reported yesterday, two new events from today.
yield self.event_injector.inject_message(room, user, "Yeah they are!")
yield self.event_injector.inject_message(room, user, "Incredibly!")
self.hs.clock.now += 60 * 60 * 24
count = yield self.store.count_daily_messages()
self.assertEqual(2, count) # 2 since yesterday
yield self._assert_stats_reporting(3, self.hs.clock.now) # 3 ever
# Last reported too recently.
yield self.event_injector.inject_message(room, user, "Who could disagree?")
self.hs.clock.now += 60 * 60 * 22
count = yield self.store.count_daily_messages()
self.assertIsNone(count)
yield self._assert_stats_reporting(4, self.hs.clock.now)
# Last reported too long ago
yield self.event_injector.inject_message(room, user, "No one.")
self.hs.clock.now += 60 * 60 * 26
count = yield self.store.count_daily_messages()
self.assertIsNone(count)
yield self._assert_stats_reporting(5, self.hs.clock.now)
# And now let's actually report something
yield self.event_injector.inject_message(room, user, "Indeed.")
yield self.event_injector.inject_message(room, user, "Indeed.")
yield self.event_injector.inject_message(room, user, "Indeed.")
# A little over 24 hours is fine :)
self.hs.clock.now += (60 * 60 * 24) + 50
count = yield self.store.count_daily_messages()
self.assertEqual(3, count)
yield self._assert_stats_reporting(8, self.hs.clock.now)
@defer.inlineCallbacks
def _get_last_stream_token(self):
rows = yield self.db_pool.runQuery(
"SELECT stream_ordering"
" FROM events"
" ORDER BY stream_ordering DESC"
" LIMIT 1"
)
if not rows:
defer.returnValue(0)
else:
defer.returnValue(rows[0][0])
@defer.inlineCallbacks
def _assert_stats_reporting(self, messages, time):
rows = yield self.db_pool.runQuery(
"SELECT reported_stream_token, reported_time FROM stats_reporting"
)
self.assertEqual([(self.base_event + messages, time,)], rows)