diff --git a/synapse/rest/client/v2_alpha/__init__.py b/synapse/rest/client/v2_alpha/__init__.py index 5831ff0e62..a108132346 100644 --- a/synapse/rest/client/v2_alpha/__init__.py +++ b/synapse/rest/client/v2_alpha/__init__.py @@ -22,6 +22,7 @@ from . import ( receipts, keys, tokenrefresh, + tags, ) from synapse.http.server import JsonResource @@ -44,3 +45,4 @@ class ClientV2AlphaRestResource(JsonResource): receipts.register_servlets(hs, client_resource) keys.register_servlets(hs, client_resource) tokenrefresh.register_servlets(hs, client_resource) + tags.register_servlets(hs, client_resource) diff --git a/synapse/rest/client/v2_alpha/tags.py b/synapse/rest/client/v2_alpha/tags.py new file mode 100644 index 0000000000..c4a670c5a7 --- /dev/null +++ b/synapse/rest/client/v2_alpha/tags.py @@ -0,0 +1,89 @@ +# -*- coding: utf-8 -*- +# Copyright 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ._base import client_v2_pattern + +from synapse.http.servlet import RestServlet + +from twisted.internet import defer + +import logging + +logger = logging.getLogger(__name__) + + +class TagListServlet(RestServlet): + """ + GET /user/{user_id}/rooms/{room_id}/tags HTTP/1.1 + """ + PATTERN = client_v2_pattern( + "/user/(?P[^/]*)/rooms/(?P[^/]*)/tags" + ) + + def __init__(self, hs): + super(TagListServlet, self).__init__() + self.auth = hs.get_auth() + self.store = hs.get_datastore() + + @defer.inlineCallbacks + def on_GET(self, request, user_id, room_id): + auth_user, _ = yield self.auth.get_user_by_req(request) + if user_id != auth_user.to_string(): + raise AuthError(403, "Cannot get tags for other users.") + + tags = yield self.store.get_tags_for_room(user_id, room_id) + + defer.returnValue((200, {"tags": tags})) + + +class TagServlet(RestServlet): + """ + PUT /user/{user_id}/rooms/{room_id}/tags/{tag} HTTP/1.1 + DELETE /user/{user_id}/rooms/{room_id}/tags/{tag} HTTP/1.1 + """ + PATTERN = client_v2_pattern( + "/user/(?P[^/]*)/rooms/(?P[^/]*)/tags/(?P[^/]*)" + ) + def __init__(self, hs): + super(TagServlet, self).__init__() + self.auth = hs.get_auth() + self.store = hs.get_datastore() + + @defer.inlineCallbacks + def on_PUT(self, request, user_id, room_id, tag): + auth_user, _ = yield self.auth.get_user_by_req(request) + if user_id != auth_user.to_string(): + raise AuthError(403, "Cannot add tags for other users.") + + yield self.store.add_tag_to_room(user_id, room_id, tag) + + # TODO: poke the notifier. + defer.returnValue((200, {})) + + @defer.inlineCallbacks + def on_DELETE(self, request, user_id, room_id, tag): + auth_user, _ = yield self.auth.get_user_by_req(request) + if user_id != auth_user.to_string(): + raise AuthError(403, "Cannot add tags for other users.") + + yield self.store.remove_tag_from_room(user_id, room_id, tag) + + # TODO: poke the notifier. + defer.returnValue((200, {})) + + +def register_servlets(hs, http_server): + TagListServlet(hs).register(http_server) + TagServlet(hs).register(http_server) diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index a1bd9c4ce9..e7443f2838 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -41,6 +41,7 @@ from .end_to_end_keys import EndToEndKeyStore from .receipts import ReceiptsStore from .search import SearchStore +from .tags import TagsStore import logging @@ -71,6 +72,7 @@ class DataStore(RoomMemberStore, RoomStore, ReceiptsStore, EndToEndKeyStore, SearchStore, + TagsStore, ): def __init__(self, hs): diff --git a/synapse/storage/schema/delta/25/tags.sql b/synapse/storage/schema/delta/25/tags.sql new file mode 100644 index 0000000000..168766dcf3 --- /dev/null +++ b/synapse/storage/schema/delta/25/tags.sql @@ -0,0 +1,37 @@ +/* Copyright 2015 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +CREATE TABLE IF NOT EXISTS room_tags( + user_id TEXT NOT NULL, + room_id TEXT NOT NULL, + tag TEXT NOT NULL, -- The name of the tag. + CONSTRAINT room_tag_uniqueness UNIQUE (user_id, room_id, tag) +); + +CREATE TABLE IF NOT EXISTS room_tags_revisions ( + user_id TEXT NOT NULL, + room_id TEXT NOT NULL, + stream_id BIGINT NOT NULL, -- The current version of the room tags. + CONSTRAINT room_tag_revisions_uniqueness UNIQUE (user_id, room_id) +); + +CREATE TABLE IF NOT EXISTS private_user_data_max_stream_id( + Lock CHAR(1) NOT NULL DEFAULT 'X' UNIQUE, -- Makes sure this table only has one row. + stream_id BIGINT NOT NULL, + CHECK (Lock='X') +); + +INSERT INTO private_user_data_max_stream_id (stream_id) VALUES (0); diff --git a/synapse/storage/tags.py b/synapse/storage/tags.py new file mode 100644 index 0000000000..507e60596f --- /dev/null +++ b/synapse/storage/tags.py @@ -0,0 +1,190 @@ +# -*- coding: utf-8 -*- +# Copyright 2014, 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ._base import SQLBaseStore +from synapse.util.caches.descriptors import cached +from twisted.internet import defer +from .util.id_generators import StreamIdGenerator + +import logging + +logger = logging.getLogger(__name__) + + +class TagsStore(SQLBaseStore): + def __init__(self, hs): + super(TagsStore, self).__init__(hs) + + self._private_user_data_id_gen = StreamIdGenerator( + "private_user_data_max_stream_id", "stream_id" + ) + + @cached() + def get_tags_for_user(self, user_id): + """Get all the tags for a user. + + + Args: + user_id(str): The user to get the tags for. + Returns: + A deferred dict mapping from room_id strings to lists of tag + strings. + """ + + deferred = self._simple_select_list( + "room_tags", {"user_id": user_id}, ["room_id", "tag"] + ) + + @deferred.addCallback + def tags_by_room(rows): + tags_by_room = {} + for row in rows: + tags_by_room.setdefault(row["room_id"], []).append(row["tag"]) + return tags_by_room + + return deferred + + @defer.inlineCallbacks + def get_updated_tags(self, user_id, stream_id): + """Get all the tags for the rooms where the tags have changed since the + given version + + Args: + user_id(str): The user to get the tags for. + stream_id(int): The earliest update to get for the user. + Returns: + A deferred dict mapping from room_id strings to lists of tag + strings for all the rooms that changed since the stream_id token. + """ + def get_updated_tags_txn(txn): + sql = ( + "SELECT room_id from room_tags_revisions" + " WHERE user_id = ? AND stream_id > ?" + ) + txn.execute(sql, (user_id, stream_id)) + room_ids = [row[0] for row in txn.fetchall()] + return room_ids + + room_ids = yield self.runInteraction( + "get_updated_tags", get_updated_tags_txn + ) + + results = {} + if room_ids: + tags_by_room = yield self.get_tags_for_user(self, user_id) + for room_id in rooms_ids: + results[room_id] = tags_by_room[room_id] + + defer.returnValue(results) + + def get_tags_for_room(self, user_id, room_id): + """Get all the tags for the given room + Args: + user_id(str): The user to get tags for + room_id(str): The room to get tags for + Returns: + A deferred list of string tags. + """ + return self._simple_select_onecol( + table="room_tags", + keyvalues={"user_id": user_id, "room_id": room_id}, + retcol="tag", + desc="get_tags_for_room", + ) + + @defer.inlineCallbacks + def add_tag_to_room(self, user_id, room_id, tag): + """Add a tag to a room for a user. + Returns: + A deferred that completes once the tag has been added. + """ + def add_tag_txn(txn, next_id): + sql = ( + "INSERT INTO room_tags (user_id, room_id, tag)" + " VALUES (?, ?, ?)" + ) + try: + txn.execute(sql, (user_id, room_id, tag)) + except database_engine.module.IntegrityError as e: + # Return early if the row is already in the table + # and we don't need to bump the revision number of the + # private_user_data. + return + self._update_revision_txn(txn, user_id, room_id, next_id) + + with (yield self._private_user_data_id_gen.get_next(self)) as next_id: + yield self.runInteraction("add_tag", add_tag_txn, next_id) + + self.get_tags_for_user.invalidate((user_id,)) + + @defer.inlineCallbacks + def remove_tag_from_room(self, user_id, room_id, tag): + """Remove a tag from a room for a user. + Returns: + A deffered that completes once the tag has been removed + """ + def remove_tag_txn(txn, next_id): + sql = ( + "DELETE FROM room_tags " + " WHERE user_id = ? AND room_id = ? AND tag = ?" + ) + txn.execute(sql, (user_id, room_id, tag)) + self._update_revision_txn(txn, user_id, room_id, next_id) + + with (yield self._private_user_data_id_gen.get_next(self)) as next_id: + yield self.runInteraction("remove_tag", remove_tag_txn, next_id) + + self.get_tags_for_user.invalidate((user_id,)) + + def _update_revision_txn(self, txn, user_id, room_id, next_id): + """Update the latest revision of the tags for the given user and room. + + Args: + txn: The database cursor + user_id(str): The ID of the user. + room_id(str): The ID of the room. + next_id(int): The the revision to advance to. + """ + + update_max_id_sql = ( + "UPDATE private_user_data_max_stream_id" + " SET stream_id = ?" + " WHERE stream_id < ?" + ) + txn.execute(update_max_id_sql, (next_id, next_id)) + + update_sql = ( + "UPDATE room_tags_revisions" + " SET stream_id = ?" + " WHERE user_id = ?" + " AND room_id = ?" + ) + txn.execute(update_sql, (next_id, user_id, room_id)) + + if txn.rowcount == 0: + insert_sql = ( + "INSERT INTO room_tags_revisions (user_id, room_id, stream_id)" + " VALUES (?, ?, ?)" + ) + try: + txn.execute(insert_sql, (user_id, room_id, next_id)) + except database_engine.module.IntegrityError as e: + # Ignore insertion errors. It doesn't matter if the row wasn't + # inserted because if two updates happend concurrently the one + # with the higher stream_id will not be reported to a client + # unless the previous update has completed. It doesn't matter + # which stream_id ends up in the table, as long as it is higher + # than the id that the client has. + pass