Merge branch 'develop' of github.com:matrix-org/synapse into erikj/timings

This commit is contained in:
Erik Johnston 2016-06-06 11:51:27 +01:00
commit 6882231f78
8 changed files with 139 additions and 254 deletions

View file

@ -39,7 +39,7 @@ from synapse.replication.slave.storage.presence import SlavedPresenceStore
from synapse.server import HomeServer
from synapse.storage.client_ips import ClientIpStore
from synapse.storage.engines import create_engine
from synapse.storage.presence import UserPresenceState
from synapse.storage.presence import PresenceStore, UserPresenceState
from synapse.storage.roommember import RoomMemberStore
from synapse.util.async import sleep
from synapse.util.httpresourcetree import create_resource_tree
@ -122,11 +122,8 @@ class SynchrotronSlavedStore(
SlavedFilteringStore,
SlavedPresenceStore,
BaseSlavedStore,
ClientIpStore, # After BaseSlavedStre because the constructor is different
ClientIpStore, # After BaseSlavedStore because the constructor is different
):
def get_presence_list_accepted(self, user_localpart):
return ()
# XXX: This is a bit broken because we don't persist forgotten rooms
# in a way that they can be streamed. This means that we don't have a
# way to invalidate the forgotten rooms cache correctly.
@ -136,6 +133,13 @@ class SynchrotronSlavedStore(
RoomMemberStore.__dict__["who_forgot_in_room"]
)
# XXX: This is a bit broken because we don't persist the accepted list in a
# way that can be replicated. This means that we don't have a way to
# invalidate the cache correctly.
get_presence_list_accepted = PresenceStore.__dict__[
"get_presence_list_accepted"
]
UPDATE_SYNCING_USERS_MS = 10 * 1000
@ -357,6 +361,7 @@ class SynchrotronServer(HomeServer):
def expire_broken_caches():
store.who_forgot_in_room.invalidate_all()
store.get_presence_list_accepted.invalidate_all()
def notify_from_stream(
result, stream_name, stream_key, room=None, user=None

View file

@ -131,15 +131,10 @@ class SlavedEventStore(BaseSlavedStore):
_get_events_from_cache = DataStore._get_events_from_cache.__func__
_invalidate_get_event_cache = DataStore._invalidate_get_event_cache.__func__
_parse_events_txn = DataStore._parse_events_txn.__func__
_get_events_txn = DataStore._get_events_txn.__func__
_get_event_txn = DataStore._get_event_txn.__func__
_enqueue_events = DataStore._enqueue_events.__func__
_do_fetch = DataStore._do_fetch.__func__
_fetch_events_txn = DataStore._fetch_events_txn.__func__
_fetch_event_rows = DataStore._fetch_event_rows.__func__
_get_event_from_row = DataStore._get_event_from_row.__func__
_get_event_from_row_txn = DataStore._get_event_from_row_txn.__func__
_get_rooms_for_user_where_membership_is_txn = (
DataStore._get_rooms_for_user_where_membership_is_txn.__func__
)

View file

@ -298,6 +298,7 @@ class ApplicationServiceTransactionStore(SQLBaseStore):
dict(txn_id=txn_id, as_id=service.id)
)
@defer.inlineCallbacks
def get_oldest_unsent_txn(self, service):
"""Get the oldest transaction which has not been sent for this
service.
@ -308,12 +309,23 @@ class ApplicationServiceTransactionStore(SQLBaseStore):
A Deferred which resolves to an AppServiceTransaction or
None.
"""
return self.runInteraction(
entry = yield self.runInteraction(
"get_oldest_unsent_appservice_txn",
self._get_oldest_unsent_txn,
service
)
if not entry:
defer.returnValue(None)
event_ids = json.loads(entry["event_ids"])
events = yield self.get_events(event_ids)
defer.returnValue(AppServiceTransaction(
service=service, id=entry["txn_id"], events=events
))
def _get_oldest_unsent_txn(self, txn, service):
# Monotonically increasing txn ids, so just select the smallest
# one in the txns table (we delete them when they are sent)
@ -328,12 +340,7 @@ class ApplicationServiceTransactionStore(SQLBaseStore):
entry = rows[0]
event_ids = json.loads(entry["event_ids"])
events = self._get_events_txn(txn, event_ids)
return AppServiceTransaction(
service=service, id=entry["txn_id"], events=events
)
return entry
def _get_last_txn(self, txn, service_id):
txn.execute(

View file

@ -139,6 +139,9 @@ class _EventPeristenceQueue(object):
pass
_EventCacheEntry = namedtuple("_EventCacheEntry", ("event", "redacted_event"))
class EventsStore(SQLBaseStore):
EVENT_ORIGIN_SERVER_TS_NAME = "event_origin_server_ts"
@ -738,100 +741,65 @@ class EventsStore(SQLBaseStore):
event_id_list = event_ids
event_ids = set(event_ids)
event_map = self._get_events_from_cache(
event_entry_map = self._get_events_from_cache(
event_ids,
check_redacted=check_redacted,
get_prev_content=get_prev_content,
allow_rejected=allow_rejected,
)
missing_events_ids = [e for e in event_ids if e not in event_map]
missing_events_ids = [e for e in event_ids if e not in event_entry_map]
if missing_events_ids:
missing_events = yield self._enqueue_events(
missing_events_ids,
check_redacted=check_redacted,
get_prev_content=get_prev_content,
allow_rejected=allow_rejected,
)
event_map.update(missing_events)
event_entry_map.update(missing_events)
defer.returnValue([
event_map[e_id] for e_id in event_id_list
if e_id in event_map and event_map[e_id]
])
events = []
for event_id in event_id_list:
entry = event_entry_map.get(event_id, None)
if not entry:
continue
def _get_events_txn(self, txn, event_ids, check_redacted=True,
get_prev_content=False, allow_rejected=False):
if not event_ids:
return []
if allow_rejected or not entry.event.rejected_reason:
if check_redacted and entry.redacted_event:
event = entry.redacted_event
else:
event = entry.event
event_map = self._get_events_from_cache(
event_ids,
check_redacted=check_redacted,
get_prev_content=get_prev_content,
allow_rejected=allow_rejected,
)
events.append(event)
missing_events_ids = [e for e in event_ids if e not in event_map]
if get_prev_content:
if "replaces_state" in event.unsigned:
prev = yield self.get_event(
event.unsigned["replaces_state"],
get_prev_content=False,
allow_none=True,
)
if prev:
event.unsigned = dict(event.unsigned)
event.unsigned["prev_content"] = prev.content
event.unsigned["prev_sender"] = prev.sender
if not missing_events_ids:
return [
event_map[e_id] for e_id in event_ids
if e_id in event_map and event_map[e_id]
]
missing_events = self._fetch_events_txn(
txn,
missing_events_ids,
check_redacted=check_redacted,
get_prev_content=get_prev_content,
allow_rejected=allow_rejected,
)
event_map.update(missing_events)
return [
event_map[e_id] for e_id in event_ids
if e_id in event_map and event_map[e_id]
]
defer.returnValue(events)
def _invalidate_get_event_cache(self, event_id):
for check_redacted in (False, True):
for get_prev_content in (False, True):
self._get_event_cache.invalidate(
(event_id, check_redacted, get_prev_content)
)
self._get_event_cache.invalidate((event_id,))
def _get_event_txn(self, txn, event_id, check_redacted=True,
get_prev_content=False, allow_rejected=False):
events = self._get_events_txn(
txn, [event_id],
check_redacted=check_redacted,
get_prev_content=get_prev_content,
allow_rejected=allow_rejected,
)
return events[0] if events else None
def _get_events_from_cache(self, events, check_redacted, get_prev_content,
allow_rejected):
def _get_events_from_cache(self, events, allow_rejected):
event_map = {}
for event_id in events:
try:
ret = self._get_event_cache.get(
(event_id, check_redacted, get_prev_content,)
)
ret = self._get_event_cache.get((event_id,), None)
if not ret:
continue
if allow_rejected or not ret.rejected_reason:
event_map[event_id] = ret
else:
event_map[event_id] = None
except KeyError:
pass
if allow_rejected or not ret.event.rejected_reason:
event_map[event_id] = ret
else:
event_map[event_id] = None
return event_map
@ -902,8 +870,7 @@ class EventsStore(SQLBaseStore):
reactor.callFromThread(fire, event_list)
@defer.inlineCallbacks
def _enqueue_events(self, events, check_redacted=True,
get_prev_content=False, allow_rejected=False):
def _enqueue_events(self, events, check_redacted=True, allow_rejected=False):
"""Fetches events from the database using the _event_fetch_list. This
allows batch and bulk fetching of events - it allows us to fetch events
without having to create a new transaction for each request for events.
@ -941,8 +908,6 @@ class EventsStore(SQLBaseStore):
[
preserve_fn(self._get_event_from_row)(
row["internal_metadata"], row["json"], row["redacts"],
check_redacted=check_redacted,
get_prev_content=get_prev_content,
rejected_reason=row["rejects"],
)
for row in rows
@ -951,7 +916,7 @@ class EventsStore(SQLBaseStore):
)
defer.returnValue({
e.event_id: e
e.event.event_id: e
for e in res if e
})
@ -981,37 +946,8 @@ class EventsStore(SQLBaseStore):
return rows
def _fetch_events_txn(self, txn, events, check_redacted=True,
get_prev_content=False, allow_rejected=False):
if not events:
return {}
rows = self._fetch_event_rows(
txn, events,
)
if not allow_rejected:
rows[:] = [r for r in rows if not r["rejects"]]
res = [
self._get_event_from_row_txn(
txn,
row["internal_metadata"], row["json"], row["redacts"],
check_redacted=check_redacted,
get_prev_content=get_prev_content,
rejected_reason=row["rejects"],
)
for row in rows
]
return {
r.event_id: r
for r in res
}
@defer.inlineCallbacks
def _get_event_from_row(self, internal_metadata, js, redacted,
check_redacted=True, get_prev_content=False,
rejected_reason=None):
d = json.loads(js)
internal_metadata = json.loads(internal_metadata)
@ -1021,26 +957,27 @@ class EventsStore(SQLBaseStore):
table="rejections",
keyvalues={"event_id": rejected_reason},
retcol="reason",
desc="_get_event_from_row",
desc="_get_event_from_row_rejected_reason",
)
ev = FrozenEvent(
original_ev = FrozenEvent(
d,
internal_metadata_dict=internal_metadata,
rejected_reason=rejected_reason,
)
if check_redacted and redacted:
ev = prune_event(ev)
redacted_event = None
if redacted:
redacted_event = prune_event(original_ev)
redaction_id = yield self._simple_select_one_onecol(
table="redactions",
keyvalues={"redacts": ev.event_id},
keyvalues={"redacts": redacted_event.event_id},
retcol="event_id",
desc="_get_event_from_row",
desc="_get_event_from_row_redactions",
)
ev.unsigned["redacted_by"] = redaction_id
redacted_event.unsigned["redacted_by"] = redaction_id
# Get the redaction event.
because = yield self.get_event(
@ -1052,86 +989,16 @@ class EventsStore(SQLBaseStore):
if because:
# It's fine to do add the event directly, since get_pdu_json
# will serialise this field correctly
ev.unsigned["redacted_because"] = because
redacted_event.unsigned["redacted_because"] = because
if get_prev_content and "replaces_state" in ev.unsigned:
prev = yield self.get_event(
ev.unsigned["replaces_state"],
get_prev_content=False,
allow_none=True,
)
if prev:
ev.unsigned["prev_content"] = prev.content
ev.unsigned["prev_sender"] = prev.sender
self._get_event_cache.prefill(
(ev.event_id, check_redacted, get_prev_content), ev
cache_entry = _EventCacheEntry(
event=original_ev,
redacted_event=redacted_event,
)
defer.returnValue(ev)
self._get_event_cache.prefill((original_ev.event_id,), cache_entry)
def _get_event_from_row_txn(self, txn, internal_metadata, js, redacted,
check_redacted=True, get_prev_content=False,
rejected_reason=None):
d = json.loads(js)
internal_metadata = json.loads(internal_metadata)
if rejected_reason:
rejected_reason = self._simple_select_one_onecol_txn(
txn,
table="rejections",
keyvalues={"event_id": rejected_reason},
retcol="reason",
)
ev = FrozenEvent(
d,
internal_metadata_dict=internal_metadata,
rejected_reason=rejected_reason,
)
if check_redacted and redacted:
ev = prune_event(ev)
redaction_id = self._simple_select_one_onecol_txn(
txn,
table="redactions",
keyvalues={"redacts": ev.event_id},
retcol="event_id",
)
ev.unsigned["redacted_by"] = redaction_id
# Get the redaction event.
because = self._get_event_txn(
txn,
redaction_id,
check_redacted=False
)
if because:
ev.unsigned["redacted_because"] = because
if get_prev_content and "replaces_state" in ev.unsigned:
prev = self._get_event_txn(
txn,
ev.unsigned["replaces_state"],
get_prev_content=False,
)
if prev:
ev.unsigned["prev_content"] = prev.content
ev.unsigned["prev_sender"] = prev.sender
self._get_event_cache.prefill(
(ev.event_id, check_redacted, get_prev_content), ev
)
return ev
def _parse_events_txn(self, txn, rows):
event_ids = [r["event_id"] for r in rows]
return self._get_events_txn(txn, event_ids)
defer.returnValue(cache_entry)
@defer.inlineCallbacks
def count_daily_messages(self):

View file

@ -194,32 +194,44 @@ class RoomStore(SQLBaseStore):
@cachedInlineCallbacks()
def get_room_name_and_aliases(self, room_id):
def f(txn):
def get_room_name(txn):
sql = (
"SELECT event_id FROM current_state_events "
"WHERE room_id = ? "
"SELECT name FROM room_names"
" INNER JOIN current_state_events USING (room_id, event_id)"
" WHERE room_id = ?"
" LIMIT 1"
)
sql += " AND ((type = 'm.room.name' AND state_key = '')"
sql += " OR type = 'm.room.aliases')"
txn.execute(sql, (room_id,))
results = self.cursor_to_dict(txn)
rows = txn.fetchall()
if rows:
return rows[0][0]
else:
return None
return self._parse_events_txn(txn, results)
return [row[0] for row in txn.fetchall()]
events = yield self.runInteraction("get_room_name_and_aliases", f)
def get_room_aliases(txn):
sql = (
"SELECT content FROM current_state_events"
" INNER JOIN events USING (room_id, event_id)"
" WHERE room_id = ?"
)
txn.execute(sql, (room_id,))
return [row[0] for row in txn.fetchall()]
name = yield self.runInteraction("get_room_name", get_room_name)
alias_contents = yield self.runInteraction("get_room_aliases", get_room_aliases)
name = None
aliases = []
for e in events:
if e.type == 'm.room.name':
if 'name' in e.content:
name = e.content['name']
elif e.type == 'm.room.aliases':
if 'aliases' in e.content:
aliases.extend(e.content['aliases'])
for c in alias_contents:
try:
content = json.loads(c)
except:
continue
aliases.extend(content.get('aliases', []))
defer.returnValue((name, aliases))

View file

@ -21,6 +21,7 @@ from synapse.storage.engines import PostgresEngine, Sqlite3Engine
import logging
import re
import ujson as json
logger = logging.getLogger(__name__)
@ -52,7 +53,7 @@ class SearchStore(BackgroundUpdateStore):
def reindex_search_txn(txn):
sql = (
"SELECT stream_ordering, event_id FROM events"
"SELECT stream_ordering, event_id, room_id, type, content FROM events"
" WHERE ? <= stream_ordering AND stream_ordering < ?"
" AND (%s)"
" ORDER BY stream_ordering DESC"
@ -61,28 +62,30 @@ class SearchStore(BackgroundUpdateStore):
txn.execute(sql, (target_min_stream_id, max_stream_id, batch_size))
rows = txn.fetchall()
rows = self.cursor_to_dict(txn)
if not rows:
return 0
min_stream_id = rows[-1][0]
event_ids = [row[1] for row in rows]
events = self._get_events_txn(txn, event_ids)
min_stream_id = rows[-1]["stream_ordering"]
event_search_rows = []
for event in events:
for row in rows:
try:
event_id = event.event_id
room_id = event.room_id
content = event.content
if event.type == "m.room.message":
event_id = row["event_id"]
room_id = row["room_id"]
etype = row["type"]
try:
content = json.loads(row["content"])
except:
continue
if etype == "m.room.message":
key = "content.body"
value = content["body"]
elif event.type == "m.room.topic":
elif etype == "m.room.topic":
key = "content.topic"
value = content["topic"]
elif event.type == "m.room.name":
elif etype == "m.room.name":
key = "content.name"
value = content["name"]
except (KeyError, AttributeError):

View file

@ -132,29 +132,25 @@ class StreamStore(SQLBaseStore):
return True
return False
ret = self._get_events_txn(
txn,
# apply the filter on the room id list
[
r["event_id"] for r in rows
if app_service_interested(r)
],
get_prev_content=True
)
return [r for r in rows if app_service_interested(r)]
self._set_before_and_after(ret, rows)
rows = yield self.runInteraction("get_appservice_room_stream", f)
if rows:
key = "s%d" % max(r["stream_ordering"] for r in rows)
else:
# Assume we didn't get anything because there was nothing to
# get.
key = to_key
ret = yield self._get_events(
[r["event_id"] for r in rows],
get_prev_content=True
)
return ret, key
self._set_before_and_after(ret, rows, topo_order=from_id is None)
results = yield self.runInteraction("get_appservice_room_stream", f)
defer.returnValue(results)
if rows:
key = "s%d" % max(r["stream_ordering"] for r in rows)
else:
# Assume we didn't get anything because there was nothing to
# get.
key = to_key
defer.returnValue((ret, key))
@defer.inlineCallbacks
def get_room_events_stream_for_rooms(self, room_ids, from_key, to_key, limit=0,

View file

@ -357,7 +357,7 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase):
other_events = [Mock(event_id="e5"), Mock(event_id="e6")]
# we aren't testing store._base stuff here, so mock this out
self.store._get_events_txn = Mock(return_value=events)
self.store.get_events = Mock(return_value=events)
yield self._insert_txn(self.as_list[1]["id"], 9, other_events)
yield self._insert_txn(service.id, 10, events)