diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 3c27428c08..4fcef45e93 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -46,50 +46,83 @@ class DataStore(RoomDataStore, RoomMemberStore, MessageStore, RoomStore, self.event_factory = hs.get_event_factory() self.hs = hs + @defer.inlineCallbacks def persist_event(self, event): - if event.type == MessageEvent.TYPE: - return self.store_message( - user_id=event.user_id, - room_id=event.room_id, - msg_id=event.msg_id, - content=json.dumps(event.content) - ) - elif event.type == RoomMemberEvent.TYPE: - return self.store_room_member( - user_id=event.target_user_id, - sender=event.user_id, - room_id=event.room_id, - content=event.content, - membership=event.content["membership"] - ) + if event.type == RoomMemberEvent.TYPE: + yield self._store_room_member(event) elif event.type == FeedbackEvent.TYPE: - return self.store_feedback( - room_id=event.room_id, - msg_id=event.msg_id, - msg_sender_id=event.msg_sender_id, - fb_sender_id=event.user_id, - fb_type=event.feedback_type, - content=json.dumps(event.content) - ) - elif event.type == RoomTopicEvent.TYPE: - return self.store_room_data( - room_id=event.room_id, - etype=event.type, - state_key=event.state_key, - content=json.dumps(event.content) - ) + yield self._store_feedback(event) elif event.type == RoomConfigEvent.TYPE: - if "visibility" in event.content: - visibility = event.content["visibility"] - return self.store_room_config( - room_id=event.room_id, - visibility=visibility - ) + yield self._store_room_config(event) + self._store_event(event) + + @defer.inlineCallbacks + def get_event(self, event_id): + events_dict = yield self._simple_select_one( + "events", + {"event_id": event_id}, + [ + "event_id", + "type", + "sender", + "room_id", + "content", + "unrecognized_keys" + ], + ) + + event = self._parse_event_from_row(events_dict) + defer.returnValue(event) + + @defer.inlineCallbacks + def _store_event(self, event): + vals = { + "event_id": event.event_id, + "event_type", event.type, + "sender": event.user_id, + "room_id": event.room_id, + "content": event.content, + } + + unrec = {k: v for k, v in event.get_full_dict() if k not in vals.keys()} + val["unrecognized_keys"] = unrec + + yield self._simple_insert("events", vals) + + if hasattr(event, "state_key"): + vals = { + "event_id": event.event_id, + "room_id": event.room_id, + "event_type": event.event_type, + "state_key": event.state_key, + } + + if hasattr(event, "prev_state"): + vals["prev_state"] = event.prev_state + + yield self._simple_insert("state_events", vals) + + # TODO (erikj): We also need to update the current state table? + + @defer.inlineCallbacks + def get_current_state(room_id, event_type=None, state_key="") + sql = ( + "SELECT e.* FROM events as e" + "INNER JOIN current_state as c ON e.event_id = c.event_id " + "INNER JOIN state_events as s ON e.event_id = s.event_id " + "WHERE c.room_id = ? " + ) + + if event_type: + sql += " s.type = ? AND s.state_key = ? " + args = (room_id, event_type, state_key) else: - raise NotImplementedError( - "Don't know how to persist type=%s" % event.type - ) + args = (room_id, ) + + results = yield self._execute_query(sql, *args) + + defer.returnValue( def schema_path(schema): diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 65f691ead4..03537b7e3b 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -12,7 +12,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - import logging from twisted.internet import defer @@ -20,6 +19,8 @@ from twisted.internet import defer from synapse.api.errors import StoreError import collections +import json + logger = logging.getLogger(__name__) @@ -28,6 +29,7 @@ class SQLBaseStore(object): def __init__(self, hs): self._db_pool = hs.get_db_pool() + self.event_factory = hs.get_event_factory() def cursor_to_dict(self, cursor): """Converts a SQL cursor into an list of dicts. @@ -63,6 +65,9 @@ class SQLBaseStore(object): return decoder(cursor) return self._db_pool.runInteraction(interaction) + def _execut_query(self, query, *args): + return self._execute(self.cursor_to_dict, *args) + # "Simple" SQL API methods that operate on a single table with no JOINs, # no complex WHERE clauses, just a dict of values for columns. @@ -279,6 +284,16 @@ class SQLBaseStore(object): return self._db_pool.runInteraction(func) + def _parse_event_from_row(self, row_dict): + d = copy.deepcopy({k: v for k, v in row.items() if v}) + d.update(json.loads(row["unrecognized_keys"])) + del d["unrecognized_keys"] + + return self.event_factory.create_event( + etype=d["type"], + **d + ) + class Table(object): """ A base class used to store information about a particular table. diff --git a/synapse/storage/feedback.py b/synapse/storage/feedback.py index 9bd562c762..fc93f92e1d 100644 --- a/synapse/storage/feedback.py +++ b/synapse/storage/feedback.py @@ -22,54 +22,27 @@ import json class FeedbackStore(SQLBaseStore): - def store_feedback(self, room_id, msg_id, msg_sender_id, - fb_sender_id, fb_type, content): - return self._simple_insert(FeedbackTable.table_name, dict( - room_id=room_id, - msg_id=msg_id, - msg_sender_id=msg_sender_id, - fb_sender_id=fb_sender_id, - fb_type=fb_type, - content=content, - )) + def _store_feedback(self, event): + return self._simple_insert("feedback", { + "event_id": event.event_id, + "feedback_type": event.feedback_type, + "room_id": event.room_id, + "target_event_id": event.target_event, + }) - def get_feedback(self, room_id=None, msg_id=None, msg_sender_id=None, - fb_sender_id=None, fb_type=None): - query = FeedbackTable.select_statement( - "msg_sender_id = ? AND room_id = ? AND msg_id = ? " + - "AND fb_sender_id = ? AND feedback_type = ? " + - "ORDER BY id DESC LIMIT 1") - return self._execute( - FeedbackTable.decode_single_result, - query, msg_sender_id, room_id, msg_id, fb_sender_id, fb_type, + @defer.inlineCallback + def get_feedback_for_event(self, event_id): + sql = ( + "SELECT events.* FROM events INNER JOIN feedback " + "ON events.event_id = feedback.event_id " + "WHERE feedback.target_event_id = ? " ) - def get_max_feedback_id(self): - return self._simple_max_id(FeedbackTable.table_name) + rows = yield self._execute_query(sql, event_id) - -class FeedbackTable(Table): - table_name = "feedback" - - fields = [ - "id", - "content", - "feedback_type", - "fb_sender_id", - "msg_id", - "room_id", - "msg_sender_id" - ] - - class EntryType(collections.namedtuple("FeedbackEntry", fields)): - - def as_event(self, event_factory): - return event_factory.create_event( - etype=FeedbackEvent.TYPE, - room_id=self.room_id, - msg_id=self.msg_id, - msg_sender_id=self.msg_sender_id, - user_id=self.fb_sender_id, - feedback_type=self.feedback_type, - content=json.loads(self.content), - ) + defer.returnValue( + [ + self._parse_event_from_row(r) + for r in rows + ] + ) diff --git a/synapse/storage/message.py b/synapse/storage/message.py deleted file mode 100644 index 7bb69c1384..0000000000 --- a/synapse/storage/message.py +++ /dev/null @@ -1,81 +0,0 @@ -# -*- coding: utf-8 -*- -# Copyright 2014 matrix.org -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from ._base import SQLBaseStore, Table -from synapse.api.events.room import MessageEvent - -import collections -import json - - -class MessageStore(SQLBaseStore): - - def get_message(self, user_id, room_id, msg_id): - """Get a message from the store. - - Args: - user_id (str): The ID of the user who sent the message. - room_id (str): The room the message was sent in. - msg_id (str): The unique ID for this user/room combo. - """ - query = MessagesTable.select_statement( - "user_id = ? AND room_id = ? AND msg_id = ? " + - "ORDER BY id DESC LIMIT 1") - return self._execute( - MessagesTable.decode_single_result, - query, user_id, room_id, msg_id, - ) - - def store_message(self, user_id, room_id, msg_id, content): - """Store a message in the store. - - Args: - user_id (str): The ID of the user who sent the message. - room_id (str): The room the message was sent in. - msg_id (str): The unique ID for this user/room combo. - content (str): The content of the message (JSON) - """ - return self._simple_insert(MessagesTable.table_name, dict( - user_id=user_id, - room_id=room_id, - msg_id=msg_id, - content=content, - )) - - def get_max_message_id(self): - return self._simple_max_id(MessagesTable.table_name) - - -class MessagesTable(Table): - table_name = "messages" - - fields = [ - "id", - "user_id", - "room_id", - "msg_id", - "content" - ] - - class EntryType(collections.namedtuple("MessageEntry", fields)): - - def as_event(self, event_factory): - return event_factory.create_event( - etype=MessageEvent.TYPE, - room_id=self.room_id, - user_id=self.user_id, - msg_id=self.msg_id, - content=json.loads(self.content), - ) diff --git a/synapse/storage/roomdata.py b/synapse/storage/roomdata.py deleted file mode 100644 index cc04d1ba14..0000000000 --- a/synapse/storage/roomdata.py +++ /dev/null @@ -1,85 +0,0 @@ -# -*- coding: utf-8 -*- -# Copyright 2014 matrix.org -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from ._base import SQLBaseStore, Table - -import collections -import json - - -class RoomDataStore(SQLBaseStore): - - """Provides various CRUD operations for Room Events. """ - - def get_room_data(self, room_id, etype, state_key=""): - """Retrieve the data stored under this type and state_key. - - Args: - room_id (str) - etype (str) - state_key (str) - Returns: - namedtuple: Or None if nothing exists at this path. - """ - query = RoomDataTable.select_statement( - "room_id = ? AND type = ? AND state_key = ? " - "ORDER BY id DESC LIMIT 1" - ) - return self._execute( - RoomDataTable.decode_single_result, - query, room_id, etype, state_key, - ) - - def store_room_data(self, room_id, etype, state_key="", content=None): - """Stores room specific data. - - Args: - room_id (str) - etype (str) - state_key (str) - data (str)- The data to store for this path in JSON. - Returns: - The store ID for this data. - """ - return self._simple_insert(RoomDataTable.table_name, dict( - etype=etype, - state_key=state_key, - room_id=room_id, - content=content, - )) - - def get_max_room_data_id(self): - return self._simple_max_id(RoomDataTable.table_name) - - -class RoomDataTable(Table): - table_name = "room_data" - - fields = [ - "id", - "room_id", - "type", - "state_key", - "content" - ] - - class EntryType(collections.namedtuple("RoomDataEntry", fields)): - - def as_event(self, event_factory): - return event_factory.create_event( - etype=self.type, - room_id=self.room_id, - content=json.loads(self.content), - ) diff --git a/synapse/storage/schema/im.sql b/synapse/storage/schema/im.sql index e09ff6b64b..ad9770244d 100644 --- a/synapse/storage/schema/im.sql +++ b/synapse/storage/schema/im.sql @@ -16,8 +16,8 @@ CREATE TABLE IF NOT EXISTS events( ordering INTEGER PRIMARY KEY AUTOINCREMENT, event_id TEXT NOT NULL, - event_type TEXT NOT NULL, - sender TEXT, + type TEXT NOT NULL, +-- sender TEXT, room_id TEXT, content TEXT, unrecognized_keys TEXT @@ -26,7 +26,7 @@ CREATE TABLE IF NOT EXISTS events( CREATE TABLE IF NOT EXISTS state_events( event_id TEXT NOT NULL, room_id TEXT NOT NULL, - event_type TEXT NOT NULL, + type TEXT NOT NULL, state_key TEXT NOT NULL, prev_state TEXT ); @@ -47,9 +47,8 @@ CREATE TABLE IF NOT EXISTS room_memberships( CREATE TABLE IF NOT EXISTS feedback( event_id TEXT NOT NULL, feedback_type TEXT, - fb_sender_id TEXT, - room_id TEXT, - content TEXT + target_event_id TEXT,sudo + room_id TEXT ); CREATE TABLE IF NOT EXISTS rooms(