From f1dcaf3296c880044a4ccb28e093bc2c71941f7f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 5 Nov 2015 17:20:28 +0000 Subject: [PATCH 01/44] Bump changelog and version number --- CHANGES.rst | 17 +++++++++++++++++ synapse/__init__.py | 2 +- 2 files changed, 18 insertions(+), 1 deletion(-) diff --git a/CHANGES.rst b/CHANGES.rst index da118b7ceb..169d800ff1 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -1,3 +1,20 @@ +Changes in synapse v0.11.0-rc1 (XXXX-XX-XX) +=========================================== + +* Add Search API (PR #307, #324, #327, #336, ) +* Add 'archived' state to v2 /sync API (PR #316) +* Add ability to reject invites (PR #317) +* Add config option to disable password login (PR #322) +* Add the login fallback API (PR #330) +* Add room context API (PR #334) +* Add room tagging support (PR #335) +* Add anonymous access support (PR #344, #345, #348) +* Update v2 /sync API to match spec (PR #305, #316, #321, #332, #337, #341) +* Change retry schedule for remote application services (PR #320) +* Change retry schedule for remote servers (PR #340) +* Fix bug where we hosted static content in the incorrect place (PR #329) +* Fix bug where we didn't increment retry interval for remote servers (PR #343) + Changes in synapse v0.10.1-rc1 (2015-10-15) =========================================== diff --git a/synapse/__init__.py b/synapse/__init__.py index e9ce0412ed..f68a15bb85 100644 --- a/synapse/__init__.py +++ b/synapse/__init__.py @@ -16,4 +16,4 @@ """ This is a reference implementation of a Matrix home server. """ -__version__ = "0.10.1-rc1" +__version__ = "0.11.0-rc1" From 3a02a13e38cbd8cef1266261ab24d80738235507 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 5 Nov 2015 17:26:56 +0000 Subject: [PATCH 02/44] Add PR --- CHANGES.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGES.rst b/CHANGES.rst index 169d800ff1..9fd207dc64 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -1,7 +1,7 @@ Changes in synapse v0.11.0-rc1 (XXXX-XX-XX) =========================================== -* Add Search API (PR #307, #324, #327, #336, ) +* Add Search API (PR #307, #324, #327, #336, #350) * Add 'archived' state to v2 /sync API (PR #316) * Add ability to reject invites (PR #317) * Add config option to disable password login (PR #322) From 545a7b291a95b891622d9fcf72299ae6006ca526 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 5 Nov 2015 18:06:18 +0000 Subject: [PATCH 03/44] Remove anonymous access, since its not ready yet --- CHANGES.rst | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/CHANGES.rst b/CHANGES.rst index 9fd207dc64..5a63ae6568 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -8,9 +8,8 @@ Changes in synapse v0.11.0-rc1 (XXXX-XX-XX) * Add the login fallback API (PR #330) * Add room context API (PR #334) * Add room tagging support (PR #335) -* Add anonymous access support (PR #344, #345, #348) * Update v2 /sync API to match spec (PR #305, #316, #321, #332, #337, #341) -* Change retry schedule for remote application services (PR #320) +* Change retry schedule for application services (PR #320) * Change retry schedule for remote servers (PR #340) * Fix bug where we hosted static content in the incorrect place (PR #329) * Fix bug where we didn't increment retry interval for remote servers (PR #343) From 0da4b11efba8f1dd6fad1c6227555010ad9f09d0 Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Fri, 6 Nov 2015 11:00:06 +0100 Subject: [PATCH 04/44] remove references to matrix.org/beta --- README.rst | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/README.rst b/README.rst index 6c8431aa86..a8f0c62158 100644 --- a/README.rst +++ b/README.rst @@ -20,8 +20,8 @@ The overall architecture is:: https://somewhere.org/_matrix https://elsewhere.net/_matrix ``#matrix:matrix.org`` is the official support room for Matrix, and can be -accessed by the web client at http://matrix.org/beta or via an IRC bridge at -irc://irc.freenode.net/matrix. +accessed by any client from https://matrix.org/blog/try-matrix-now or via IRC +bridge at irc://irc.freenode.net/matrix. Synapse is currently in rapid development, but as of version 0.5 we believe it is sufficiently stable to be run as an internet-facing service for real usage! @@ -77,14 +77,14 @@ Meanwhile, iOS and Android SDKs and clients are available from: - https://github.com/matrix-org/matrix-android-sdk We'd like to invite you to join #matrix:matrix.org (via -https://matrix.org/beta), run a homeserver, take a look at the Matrix spec at -https://matrix.org/docs/spec and API docs at https://matrix.org/docs/api, -experiment with the APIs and the demo clients, and report any bugs via -https://matrix.org/jira. +https://matrix.org/blog/try-matrix-now), run a homeserver, take a look at the +Matrix spec at https://matrix.org/docs/spec and API docs at +https://matrix.org/docs/api, experiment with the APIs and the demo clients, and +report any bugs via https://matrix.org/jira. Thanks for using Matrix! -[1] End-to-end encryption is currently in development +[1] End-to-end encryption is currently in development - see https://matrix.org/git/olm Synapse Installation ==================== From f2c4ee41b91f1828d516df871adcfbaed46d5407 Mon Sep 17 00:00:00 2001 From: Daniel Wagner-Hall Date: Fri, 6 Nov 2015 14:27:49 +0000 Subject: [PATCH 05/44] Remove accidentally added ID column --- synapse/storage/schema/delta/25/history_visibility.sql | 1 - 1 file changed, 1 deletion(-) diff --git a/synapse/storage/schema/delta/25/history_visibility.sql b/synapse/storage/schema/delta/25/history_visibility.sql index 9f387ed69f..532cb05151 100644 --- a/synapse/storage/schema/delta/25/history_visibility.sql +++ b/synapse/storage/schema/delta/25/history_visibility.sql @@ -18,7 +18,6 @@ * so that we can join on them in SELECT statements. */ CREATE TABLE IF NOT EXISTS history_visibility( - id INTEGER PRIMARY KEY, event_id TEXT NOT NULL, room_id TEXT NOT NULL, history_visibility TEXT NOT NULL, From 767c20a869933204b6e545c6f1495cd7cd298a87 Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Fri, 6 Nov 2015 20:49:57 +0100 Subject: [PATCH 06/44] add a key existence check to tags_by_room to avoid /events 500'ing when testing against vector --- synapse/storage/tags.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/synapse/storage/tags.py b/synapse/storage/tags.py index 641ea250f0..73babd53d9 100644 --- a/synapse/storage/tags.py +++ b/synapse/storage/tags.py @@ -95,7 +95,8 @@ class TagsStore(SQLBaseStore): if room_ids: tags_by_room = yield self.get_tags_for_user(user_id) for room_id in room_ids: - results[room_id] = tags_by_room[room_id] + if room_id in tags_by_room: + results[room_id] = tags_by_room[room_id] defer.returnValue(results) From dd40fb68e455b9a736f1871a19119eb0391cd0d5 Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Sun, 8 Nov 2015 16:04:37 +0000 Subject: [PATCH 07/44] fix comedy important missing comma breaking recent-ordered FTS on sqlite --- synapse/storage/search.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/search.py b/synapse/storage/search.py index 3cea2011fa..91a96d2a83 100644 --- a/synapse/storage/search.py +++ b/synapse/storage/search.py @@ -154,7 +154,7 @@ class SearchStore(SQLBaseStore): ) elif isinstance(self.database_engine, Sqlite3Engine): sql = ( - "SELECT rank(matchinfo(event_search)) as rank, room_id, event_id" + "SELECT rank(matchinfo(event_search)) as rank, room_id, event_id," " topological_ordering, stream_ordering" " FROM event_search" " NATURAL JOIN events" From c4135d85e116cecbc119c1243911a0b60a6452d7 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Mon, 9 Nov 2015 14:52:18 +0000 Subject: [PATCH 08/44] SYN-513: Include updates for rooms that have had all their tags deleted --- synapse/handlers/sync.py | 2 +- synapse/storage/tags.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 5294d96466..ff766e3af5 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -272,7 +272,7 @@ class SyncHandler(BaseHandler): def private_user_data_for_room(self, room_id, tags_by_room): private_user_data = [] tags = tags_by_room.get(room_id) - if tags: + if tags is not None: private_user_data.append({ "type": "m.tag", "content": {"tags": tags}, diff --git a/synapse/storage/tags.py b/synapse/storage/tags.py index 641ea250f0..bf695b7800 100644 --- a/synapse/storage/tags.py +++ b/synapse/storage/tags.py @@ -95,7 +95,7 @@ class TagsStore(SQLBaseStore): if room_ids: tags_by_room = yield self.get_tags_for_user(user_id) for room_id in room_ids: - results[room_id] = tags_by_room[room_id] + results[room_id] = tags_by_room.get(room_id, {}) defer.returnValue(results) From 9107ed23b73b76347a63a2a2eea4e41f30f02062 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Thu, 5 Nov 2015 16:56:40 +0000 Subject: [PATCH 09/44] Add a couple of unit tests for room//messages ... merely because I was trying to figure out how it worked, and couldn't. --- synapse/rest/client/v1/room.py | 2 +- tests/rest/client/v1/test_rooms.py | 56 ++++++++++++++++++++++++++++++ 2 files changed, 57 insertions(+), 1 deletion(-) diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py index 6e0d93766b..f7012067f7 100644 --- a/synapse/rest/client/v1/room.py +++ b/synapse/rest/client/v1/room.py @@ -319,7 +319,7 @@ class RoomMemberListRestServlet(ClientV1RestServlet): })) -# TODO: Needs unit testing +# TODO: Needs better unit testing class RoomMessageListRestServlet(ClientV1RestServlet): PATTERN = client_path_pattern("/rooms/(?P[^/]*)/messages$") diff --git a/tests/rest/client/v1/test_rooms.py b/tests/rest/client/v1/test_rooms.py index b43563fa4b..7749378064 100644 --- a/tests/rest/client/v1/test_rooms.py +++ b/tests/rest/client/v1/test_rooms.py @@ -994,3 +994,59 @@ class RoomInitialSyncTestCase(RestTestCase): } self.assertTrue(self.user_id in presence_by_user) self.assertEquals("m.presence", presence_by_user[self.user_id]["type"]) + + +class RoomMessageListTestCase(RestTestCase): + """ Tests /rooms/$room_id/messages REST events. """ + user_id = "@sid1:red" + + @defer.inlineCallbacks + def setUp(self): + self.mock_resource = MockHttpResource(prefix=PATH_PREFIX) + self.auth_user_id = self.user_id + + hs = yield setup_test_homeserver( + "red", + http_client=None, + replication_layer=Mock(), + ratelimiter=NonCallableMock(spec_set=["send_message"]), + ) + self.ratelimiter = hs.get_ratelimiter() + self.ratelimiter.send_message.return_value = (True, 0) + + hs.get_handlers().federation_handler = Mock() + + def _get_user_by_access_token(token=None, allow_guest=False): + return { + "user": UserID.from_string(self.auth_user_id), + "token_id": 1, + "is_guest": False, + } + hs.get_v1auth()._get_user_by_access_token = _get_user_by_access_token + + def _insert_client_ip(*args, **kwargs): + return defer.succeed(None) + hs.get_datastore().insert_client_ip = _insert_client_ip + + synapse.rest.client.v1.room.register_servlets(hs, self.mock_resource) + + self.room_id = yield self.create_room_as(self.user_id) + + @defer.inlineCallbacks + def test_topo_token_is_accepted(self): + token = "t1-0_0_0_0_0" + (code, response) = yield self.mock_resource.trigger_get( + "/rooms/%s/messages?access_token=x&from=%s" % + (self.room_id, token)) + self.assertEquals(200, code) + self.assertTrue("start" in response) + self.assertEquals(token, response['start']) + self.assertTrue("chunk" in response) + self.assertTrue("end" in response) + + @defer.inlineCallbacks + def test_stream_token_is_rejected(self): + (code, response) = yield self.mock_resource.trigger_get( + "/rooms/%s/messages?access_token=x&from=s0_0_0_0" % + self.room_id) + self.assertEquals(400, code) From c6a01f2ed0da9f0db169307e7d1649021f0c2123 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Mon, 9 Nov 2015 14:37:28 +0000 Subject: [PATCH 10/44] Add storage module for tracking background updates. The progress for each background update is stored as a JSON blob in the database. Each background update is broken up into separate batches. The batch size is automatically tuned to try avoid blocking single threaded databases for too long. --- synapse/storage/background_updates.py | 210 ++++++++++++++++++ .../schema/delta/25/00background_updates.sql | 21 ++ 2 files changed, 231 insertions(+) create mode 100644 synapse/storage/background_updates.py create mode 100644 synapse/storage/schema/delta/25/00background_updates.sql diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py new file mode 100644 index 0000000000..32a233c213 --- /dev/null +++ b/synapse/storage/background_updates.py @@ -0,0 +1,210 @@ +# -*- coding: utf-8 -*- +# Copyright 2014, 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ._base import SQLBaseStore + +from twisted.internet import defer + +import ujson as json +import logging + +logger = logging.getLogger(__name__) + + +class BackgroundUpdatePerformance(object): + """Tracks the how long a background update is taking to update its items""" + + def __init__(self, name): + self.name = name + self.total_item_count = 0 + self.total_duration_ms = 0 + self.avg_item_count = 0 + self.avg_duration_ms = 0 + + def update(self, item_count, duration_ms): + """Update the stats after doing an update""" + self.total_item_count += item_count + self.total_duration_ms += duration_ms + + # Exponential moving averages for the number of items updated and + # the duration. + self.avg_item_count += 0.1 * (item_count - self.avg_item_count) + self.avg_duration_ms += 0.1 * (duration_ms - self.avg_duration_ms) + + def average_duration_ms_per_item(self): + """An estimate of how long it takes to do a single update. + Returns: + A duration in ms as a float + """ + if self.total_item_count == 0: + return None + else: + # Use the exponential moving average so that we can adapt to + # changes in how long the update process takes. + return float(self.avg_duration_ms) / float(self.avg_item_count) + + +class BackgroundUpdateStore(SQLBaseStore): + """ Background updates are updates to the database that run in the + background. Each update processes a batch of data at once. We attempt to + limit the impact of each update by monitoring how long each batch takes to + process and autotuning the batch size. + """ + + MINIMUM_BACKGROUND_BATCH_SIZE = 100 + DEFAULT_BACKGROUND_BATCH_SIZE = 100 + + def __init__(self, hs): + super(BackgroundUpdateStore, self).__init__(hs) + self._background_update_performance = {} + self._background_update_queue = [] + self._background_update_handlers = {} + + @defer.inlineCallbacks + def do_background_update(self, desired_duration_ms): + """Does some amount of work on a background update + Args: + desired_duration_ms(float): How long we want to spend + updating. + Returns: + A deferred that completes once some amount of work is done. + The deferred will have a value of None if there is currently + no more work to do. + """ + if not self._background_update_queue: + updates = yield self._simple_select_list( + "background_updates", + keyvalues=None, + retcols=("update_name",), + ) + for update in updates: + self._background_update_queue.append(update['update_name']) + + if not self._background_update_queue: + defer.returnValue(None) + + update_name = self._background_update_queue.pop(0) + self._background_update_queue.append(update_name) + + update_handler = self._background_update_handlers[update_name] + + performance = self._background_update_performance.get(update_name) + + if performance is None: + performance = BackgroundUpdatePerformance(update_name) + self._background_update_performance[update_name] = performance + + duration_ms_per_item = performance.average_duration_ms_per_item() + + if duration_ms_per_item is not None: + batch_size = int(desired_duration_ms / duration_ms_per_item) + # Clamp the batch size so that we always make progress + batch_size = max(batch_size, self.MINIMUM_BACKGROUND_BATCH_SIZE) + else: + batch_size = self.DEFAULT_BACKGROUND_BATCH_SIZE + + progress_json = yield self._simple_select_one_onecol( + "background_updates", + keyvalues={"update_name": update_name}, + retcol="progress_json" + ) + + progress = json.loads(progress_json) + + time_start = self._clock.time_msec() + items_updated = yield update_handler(progress, batch_size) + time_stop = self._clock.time_msec() + + duration_ms = time_stop - time_start + + logger.info( + "Updating %. Updated %r items in %rms", + update_name, items_updated, duration_ms + ) + + performance.update(items_updated, duration_ms) + + defer.returnValue(len(self._background_update_performance)) + + def register_background_update_handler(self, update_name, update_handler): + """Register a handler for doing a background update. + + The handler should take two arguments: + + * A dict of the current progress + * An integer count of the number of items to update in this batch. + + The handler should return a deferred integer count of items updated. + The hander is responsible for updating the progress of the update. + + Args: + update_name(str): The name of the update that this code handles. + update_handler(function): The function that does the update. + """ + self._background_update_handlers[update_name] = update_handler + + def start_background_update(self, update_name, progress): + """Starts a background update running. + + Args: + update_name: The update to set running. + progress: The initial state of the progress of the update. + + Returns: + A deferred that completes once the task has been added to the + queue. + """ + # Clear the background update queue so that we will pick up the new + # task on the next iteration of do_background_update. + self._background_update_queue = [] + progress_json = json.dumps(progress) + + return self._simple_insert( + "background_updates", + {"update_name": update_name, "progress_json": progress_json} + ) + + def _end_background_update(self, update_name): + """Removes a completed background update task from the queue. + + Args: + update_name(str): The name of the completed task to remove + Returns: + A deferred that completes once the task is removed. + """ + self._background_update_queue = [ + name for name in self._background_update_queue if name != update_name + ] + return self._simple_delete_one( + "background_updates", keyvalues={"update_name": update_name} + ) + + def _background_update_progress_txn(self, txn, update_name, progress): + """Update the progress of a background update + + Args: + txn(cursor): The transaction. + update_name(str): The name of the background update task + progress(dict): The progress of the update. + """ + + progress_json = json.dumps(progress) + + self._simple_update_one_txn( + txn, + "background_updates", + keyvalues={"update_name": update_name}, + updatevalues={"progress_json": progress_json}, + ) diff --git a/synapse/storage/schema/delta/25/00background_updates.sql b/synapse/storage/schema/delta/25/00background_updates.sql new file mode 100644 index 0000000000..41a9b59b1b --- /dev/null +++ b/synapse/storage/schema/delta/25/00background_updates.sql @@ -0,0 +1,21 @@ +/* Copyright 2015 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +CREATE TABLE IF NOT EXISTS background_updates( + update_name TEXT NOT NULL, -- The name of the background update. + progress_json TEXT NOT NULL, -- The current progress of the update as JSON. + CONSTRAINT background_updates_uniqueness UNIQUE (update_name) +); From 0d63dc3ec983f9ae21eeedd4276b1ac767e9cc1b Mon Sep 17 00:00:00 2001 From: Daniel Wagner-Hall Date: Mon, 9 Nov 2015 17:26:43 +0000 Subject: [PATCH 11/44] Get display name from identity server, not client --- synapse/handlers/room.py | 8 +++----- synapse/rest/client/v1/room.py | 3 +-- 2 files changed, 4 insertions(+), 7 deletions(-) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 8cce8d0e99..834972a580 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -582,7 +582,6 @@ class RoomMemberHandler(BaseHandler): medium, address, id_server, - display_name, token_id, txn_id ): @@ -609,7 +608,6 @@ class RoomMemberHandler(BaseHandler): else: yield self._make_and_store_3pid_invite( id_server, - display_name, medium, address, room_id, @@ -673,7 +671,6 @@ class RoomMemberHandler(BaseHandler): def _make_and_store_3pid_invite( self, id_server, - display_name, medium, address, room_id, @@ -681,7 +678,7 @@ class RoomMemberHandler(BaseHandler): token_id, txn_id ): - token, public_key, key_validity_url = ( + token, public_key, key_validity_url, display_name = ( yield self._ask_id_server_for_third_party_invite( id_server, medium, @@ -725,10 +722,11 @@ class RoomMemberHandler(BaseHandler): # TODO: Check for success token = data["token"] public_key = data["public_key"] + display_name = data["display_name"] key_validity_url = "%s%s/_matrix/identity/api/v1/pubkey/isvalid" % ( id_server_scheme, id_server, ) - defer.returnValue((token, public_key, key_validity_url)) + defer.returnValue((token, public_key, key_validity_url, display_name)) class RoomListHandler(BaseHandler): diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py index 6e0d93766b..90ded26ff8 100644 --- a/synapse/rest/client/v1/room.py +++ b/synapse/rest/client/v1/room.py @@ -459,7 +459,6 @@ class RoomMembershipRestServlet(ClientV1RestServlet): content["medium"], content["address"], content["id_server"], - content["display_name"], token_id, txn_id ) @@ -494,7 +493,7 @@ class RoomMembershipRestServlet(ClientV1RestServlet): defer.returnValue((200, {})) def _has_3pid_invite_keys(self, content): - for key in {"id_server", "medium", "address", "display_name"}: + for key in {"id_server", "medium", "address"}: if key not in content: return False return True From c7db2068c8406c11771c96dd5273aa1bdb72d04d Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Mon, 9 Nov 2015 18:09:46 +0000 Subject: [PATCH 12/44] Don't fiddle with results returned by event sources Overwriting hashes returned by other methods is poor form. Fixes: SYN-516 --- synapse/handlers/sync.py | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index ff766e3af5..492c1c17d5 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -311,8 +311,13 @@ class SyncHandler(BaseHandler): ephemeral_by_room = {} for event in typing: - room_id = event.pop("room_id") - ephemeral_by_room.setdefault(room_id, []).append(event) + # we want to exclude the room_id from the event, but modifying the + # result returned by the event source is poor form (it might cache + # the object) + room_id = event["room_id"] + event_copy = {k: v for (k, v) in event.iteritems() + if k != "room_id"} + ephemeral_by_room.setdefault(room_id, []).append(event_copy) receipt_key = since_token.receipt_key if since_token else "0" @@ -328,8 +333,11 @@ class SyncHandler(BaseHandler): now_token = now_token.copy_and_replace("receipt_key", receipt_key) for event in receipts: - room_id = event.pop("room_id") - ephemeral_by_room.setdefault(room_id, []).append(event) + room_id = event["room_id"] + # exclude room id, as above + event_copy = {k: v for (k, v) in event.iteritems() + if k != "room_id"} + ephemeral_by_room.setdefault(room_id, []).append(event_copy) defer.returnValue((now_token, ephemeral_by_room)) From 2ede7aa8a1da20b735797cc9230da7bbeb55efc3 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Mon, 9 Nov 2015 19:29:32 +0000 Subject: [PATCH 13/44] Add background update task for reindexing event search --- synapse/storage/search.py | 98 ++++++++++++++++++++++++++++++++++++++- 1 file changed, 96 insertions(+), 2 deletions(-) diff --git a/synapse/storage/search.py b/synapse/storage/search.py index 3cea2011fa..f7c269865d 100644 --- a/synapse/storage/search.py +++ b/synapse/storage/search.py @@ -15,7 +15,7 @@ from twisted.internet import defer -from _base import SQLBaseStore +from .background_updates import BackgroundUpdateStore from synapse.api.errors import SynapseError from synapse.storage.engines import PostgresEngine, Sqlite3Engine @@ -25,7 +25,101 @@ import logging logger = logging.getLogger(__name__) -class SearchStore(SQLBaseStore): +class SearchStore(BackgroundUpdateStore): + + EVENT_SEARCH_UPDATE_NAME = "event_search" + + + @defer.inlineCallbacks + def _background_reindex_search(self, progress, batch_size): + target_min_stream_id = progress["target_min_stream_id"] + max_stream_id = progress["max_stream_id"] + rows_inserted = progress.get("rows_inserted", 0) + + INSERT_CLUMP_SIZE = 1000 + TYPES = ["m.room.name", "m.room.message", "m.room.topic"] + + def reindex_search_txn(txn): + sql = ( + "SELECT stream_id, event_id FROM events" + " WHERE ? <= stream_ordering AND stream_ordering < ?" + " AND (%s)" + " ORDER BY stream_ordering DESC" + " LIMIT ?" + ) % (" OR ".join("type = '%s'" % TYPES),) + + txn.execute(sql, target_min_stream_id, max_stream_id, batch_size) + + rows = txn.fetch_all() + if not rows: + return None + + min_stream_id = rows[-1][0] + event_ids = [row[1] for row in rows] + + events = self._get_events_txn(txn, event_ids) + + event_search_rows = [] + for event in events: + try: + event_id = event.event_id + room_id = event.room_id + content = event.content + if event.type == "m.room.message": + key = "content.body" + value = content["body"] + elif event.type == "m.room.topic": + key = "content.topic" + value = content["topic"] + elif event.type == "m.room.name": + key = "content.name" + value = content["name"] + except Exception: + # If the event is missing a necessary field then + # skip over it. + continue + + event_search_rows.append((event_id, room_id, key, value)) + + if isinstance(self.database_engine, PostgresEngine): + sql = ( + "INSERT INTO event_search (event_id, room_id, key, vector)" + " VALUES (?,?,?,to_tsvector('english', ?))" + ) + elif isinstance(self.database_engine, Sqlite3Engine): + sql = ( + "INSERT INTO event_search (event_id, room_id, key, value)" + " VALUES (?,?,?,?)" + ) + else: + # This should be unreachable. + raise Exception("Unrecognized database engine") + + for index in range(0, len(event_search_rows), INSERT_CLUMP_SIZE): + clump = event_search_rows[index : index + INSERT_CLUMP_SIZE) + txn.execute_many(sql, clump) + + progress = { + "target_max_stream_id": target_min_stream_id, + "max_stream_id": min_stream_id, + "rows_inserted": rows_inserted + len(event_search_rows) + } + + self._background_update_progress_txn( + txn, self.EVENT_SEARCH_UPDATE_NAME, progress + ) + + return len(event_search_rows) + + result = yield self.runInteration( + self.EVENT_SEARCH_UPDATE_NAME, reindex_search_txn + ) + + if result is None: + yield _end_background_update(self.EVENT_SEARCH_UPDATE_NAME) + + defer.returnValue(result) + @defer.inlineCallbacks def search_msgs(self, room_ids, search_term, keys): """Performs a full text search over events with given keys. From a412b9a465cb6a64f44e3656d8645977f43c275f Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 10 Nov 2015 15:50:58 +0000 Subject: [PATCH 14/44] Run the background updates when starting synapse. --- synapse/app/homeserver.py | 1 + synapse/storage/background_updates.py | 57 +++++++++++++++++++++++---- synapse/storage/search.py | 11 ++++-- synapse/util/__init__.py | 8 ++++ 4 files changed, 67 insertions(+), 10 deletions(-) diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index a77535a4ee..cd7a52ec07 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -439,6 +439,7 @@ def setup(config_options): hs.get_pusherpool().start() hs.get_state_handler().start_caching() hs.get_datastore().start_profiling() + hs.get_datastore().start_doing_background_updates() hs.get_replication_layer().start_get_pdu_cache() return hs diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py index 32a233c213..b6cdc6ec68 100644 --- a/synapse/storage/background_updates.py +++ b/synapse/storage/background_updates.py @@ -43,7 +43,7 @@ class BackgroundUpdatePerformance(object): self.avg_item_count += 0.1 * (item_count - self.avg_item_count) self.avg_duration_ms += 0.1 * (duration_ms - self.avg_duration_ms) - def average_duration_ms_per_item(self): + def average_items_per_ms(self): """An estimate of how long it takes to do a single update. Returns: A duration in ms as a float @@ -53,7 +53,17 @@ class BackgroundUpdatePerformance(object): else: # Use the exponential moving average so that we can adapt to # changes in how long the update process takes. - return float(self.avg_duration_ms) / float(self.avg_item_count) + return float(self.avg_item_count) / float(self.avg_duration_ms) + + def total_items_per_ms(self): + """An estimate of how long it takes to do a single update. + Returns: + A duration in ms as a float + """ + if self.total_item_count == 0: + return None + else: + return float(self.total_item_count) / float(self.total_duration_ms) class BackgroundUpdateStore(SQLBaseStore): @@ -65,12 +75,41 @@ class BackgroundUpdateStore(SQLBaseStore): MINIMUM_BACKGROUND_BATCH_SIZE = 100 DEFAULT_BACKGROUND_BATCH_SIZE = 100 + BACKGROUND_UPDATE_INTERVAL_MS = 1000 + BACKGROUND_UPDATE_DURATION_MS = 100 def __init__(self, hs): super(BackgroundUpdateStore, self).__init__(hs) self._background_update_performance = {} self._background_update_queue = [] self._background_update_handlers = {} + self._background_update_timer = None + + @defer.inlineCallbacks + def start_doing_background_updates(self): + while True: + if self._background_update_timer is not None: + return + + sleep = defer.Deferred() + self._background_update_timer = self._clock.call_later( + self.BACKGROUND_UPDATE_INTERVAL_MS / 1000., sleep.callback + ) + try: + yield sleep + finally: + self._background_update_timer = None + + result = yield self.do_background_update( + self.BACKGROUND_UPDATE_DURATION_MS + ) + + if result is None: + logger.info( + "No more background updates to do." + " Unscheduling background update task." + ) + return @defer.inlineCallbacks def do_background_update(self, desired_duration_ms): @@ -106,10 +145,10 @@ class BackgroundUpdateStore(SQLBaseStore): performance = BackgroundUpdatePerformance(update_name) self._background_update_performance[update_name] = performance - duration_ms_per_item = performance.average_duration_ms_per_item() + items_per_ms = performance.average_items_per_ms() - if duration_ms_per_item is not None: - batch_size = int(desired_duration_ms / duration_ms_per_item) + if items_per_ms is not None: + batch_size = int(desired_duration_ms * items_per_ms) # Clamp the batch size so that we always make progress batch_size = max(batch_size, self.MINIMUM_BACKGROUND_BATCH_SIZE) else: @@ -130,8 +169,12 @@ class BackgroundUpdateStore(SQLBaseStore): duration_ms = time_stop - time_start logger.info( - "Updating %. Updated %r items in %rms", - update_name, items_updated, duration_ms + "Updating %. Updated %r items in %rms." + " (total_rate=%r/ms, current_rate=%r/ms, total_updated=%r)", + update_name, items_updated, duration_ms, + performance.total_items_per_ms(), + performance.average_items_per_ms(), + performance.total_item_count, ) performance.update(items_updated, duration_ms) diff --git a/synapse/storage/search.py b/synapse/storage/search.py index f7c269865d..d170c546b5 100644 --- a/synapse/storage/search.py +++ b/synapse/storage/search.py @@ -29,6 +29,11 @@ class SearchStore(BackgroundUpdateStore): EVENT_SEARCH_UPDATE_NAME = "event_search" + def __init__(self, hs): + super(SearchStore, self).__init__(hs) + self.register_background_update_handler( + self.EVENT_SEARCH_UPDATE_NAME, self._background_reindex_search + ) @defer.inlineCallbacks def _background_reindex_search(self, progress, batch_size): @@ -74,7 +79,7 @@ class SearchStore(BackgroundUpdateStore): elif event.type == "m.room.name": key = "content.name" value = content["name"] - except Exception: + except (KeyError, AttributeError): # If the event is missing a necessary field then # skip over it. continue @@ -96,7 +101,7 @@ class SearchStore(BackgroundUpdateStore): raise Exception("Unrecognized database engine") for index in range(0, len(event_search_rows), INSERT_CLUMP_SIZE): - clump = event_search_rows[index : index + INSERT_CLUMP_SIZE) + clump = event_search_rows[index:index + INSERT_CLUMP_SIZE] txn.execute_many(sql, clump) progress = { @@ -116,7 +121,7 @@ class SearchStore(BackgroundUpdateStore): ) if result is None: - yield _end_background_update(self.EVENT_SEARCH_UPDATE_NAME) + yield self._end_background_update(self.EVENT_SEARCH_UPDATE_NAME) defer.returnValue(result) diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py index 1d123ccefc..d69c7cb991 100644 --- a/synapse/util/__init__.py +++ b/synapse/util/__init__.py @@ -53,6 +53,14 @@ class Clock(object): loop.stop() def call_later(self, delay, callback, *args, **kwargs): + """Call something later + + Args: + delay(float): How long to wait in seconds. + callback(function): Function to call + *args: Postional arguments to pass to function. + **kwargs: Key arguments to pass to function. + """ current_context = LoggingContext.current_context() def wrapped_callback(*args, **kwargs): From 36c58b18a32f05a2f025bc916c14b9e2f78f439b Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 10 Nov 2015 15:51:40 +0000 Subject: [PATCH 15/44] Test for background updates --- tests/storage/test_background_update.py | 76 +++++++++++++++++++++++++ tests/utils.py | 3 + 2 files changed, 79 insertions(+) create mode 100644 tests/storage/test_background_update.py diff --git a/tests/storage/test_background_update.py b/tests/storage/test_background_update.py new file mode 100644 index 0000000000..29289fa9b4 --- /dev/null +++ b/tests/storage/test_background_update.py @@ -0,0 +1,76 @@ +from tests import unittest +from twisted.internet import defer + +from synapse.api.constants import EventTypes +from synapse.types import UserID, RoomID, RoomAlias + +from tests.utils import setup_test_homeserver + +from mock import Mock + +class BackgroundUpdateTestCase(unittest.TestCase): + + @defer.inlineCallbacks + def setUp(self): + hs = yield setup_test_homeserver() + self.store = hs.get_datastore() + self.clock = hs.get_clock() + + self.update_handler = Mock() + + yield self.store.register_background_update_handler( + "test_update", self.update_handler + ) + + @defer.inlineCallbacks + def test_do_background_update(self): + desired_count = 1000; + duration_ms = 42; + + @defer.inlineCallbacks + def update(progress, count): + self.clock.advance_time_msec(count * duration_ms) + progress = {"my_key": progress["my_key"] + 1} + yield self.store.runInteraction( + "update_progress", + self.store._background_update_progress_txn, + "test_update", + progress, + ) + defer.returnValue(count) + + self.update_handler.side_effect = update + + yield self.store.start_background_update("test_update", {"my_key": 1}) + + self.update_handler.reset_mock() + result = yield self.store.do_background_update( + duration_ms * desired_count + ) + self.assertIsNotNone(result) + self.update_handler.assert_called_once_with( + {"my_key": 1}, self.store.DEFAULT_BACKGROUND_BATCH_SIZE + ) + + @defer.inlineCallbacks + def update(progress, count): + yield self.store._end_background_update("test_update") + defer.returnValue(count) + + self.update_handler.side_effect = update + + self.update_handler.reset_mock() + result = yield self.store.do_background_update( + duration_ms * desired_count + ) + self.assertIsNotNone(result) + self.update_handler.assert_called_once_with( + {"my_key": 2}, desired_count + ) + + self.update_handler.reset_mock() + result = yield self.store.do_background_update( + duration_ms * desired_count + ) + self.assertIsNone(result) + self.assertFalse(self.update_handler.called) diff --git a/tests/utils.py b/tests/utils.py index ca2c33cf8e..91040c2efd 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -243,6 +243,9 @@ class MockClock(object): else: self.timers.append(t) + def advance_time_msec(self, ms): + self.advance_time(ms / 1000.) + class SQLiteMemoryDbPool(ConnectionPool, object): def __init__(self): From 90b503216c40974dc4925a9bbb673779a039143c Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 10 Nov 2015 16:20:13 +0000 Subject: [PATCH 16/44] Use a background task to update databases to use the full text search --- synapse/storage/schema/delta/25/fts.py | 94 ++++++-------------------- synapse/storage/search.py | 8 +-- 2 files changed, 25 insertions(+), 77 deletions(-) diff --git a/synapse/storage/schema/delta/25/fts.py b/synapse/storage/schema/delta/25/fts.py index b7cd0ce3b8..e408d36da0 100644 --- a/synapse/storage/schema/delta/25/fts.py +++ b/synapse/storage/schema/delta/25/fts.py @@ -22,7 +22,7 @@ import ujson logger = logging.getLogger(__name__) -POSTGRES_SQL = """ +POSTGRES_TABLE = """ CREATE TABLE IF NOT EXISTS event_search ( event_id TEXT, room_id TEXT, @@ -31,22 +31,6 @@ CREATE TABLE IF NOT EXISTS event_search ( vector tsvector ); -INSERT INTO event_search SELECT - event_id, room_id, json::json->>'sender', 'content.body', - to_tsvector('english', json::json->'content'->>'body') - FROM events NATURAL JOIN event_json WHERE type = 'm.room.message'; - -INSERT INTO event_search SELECT - event_id, room_id, json::json->>'sender', 'content.name', - to_tsvector('english', json::json->'content'->>'name') - FROM events NATURAL JOIN event_json WHERE type = 'm.room.name'; - -INSERT INTO event_search SELECT - event_id, room_id, json::json->>'sender', 'content.topic', - to_tsvector('english', json::json->'content'->>'topic') - FROM events NATURAL JOIN event_json WHERE type = 'm.room.topic'; - - CREATE INDEX event_search_fts_idx ON event_search USING gin(vector); CREATE INDEX event_search_ev_idx ON event_search(event_id); CREATE INDEX event_search_ev_ridx ON event_search(room_id); @@ -61,67 +45,31 @@ SQLITE_TABLE = ( def run_upgrade(cur, database_engine, *args, **kwargs): if isinstance(database_engine, PostgresEngine): - run_postgres_upgrade(cur) + for statement in get_statements(POSTGRES_TABLE.splitlines()): + cur.execute(statement) return if isinstance(database_engine, Sqlite3Engine): - run_sqlite_upgrade(cur) + cur.execute(SQLITE_TABLE) return + cur.execute("SELECT MIN(stream_ordering) FROM events") + rows = cur.fetchall() + min_stream_id = rows[0][0] -def run_postgres_upgrade(cur): - for statement in get_statements(POSTGRES_SQL.splitlines()): - cur.execute(statement) + cur.execute("SELECT MAX(stream_ordering) FROM events") + rows = cur.fetchall() + max_stream_id = rows[0][0] + if min_stream_id is not None and max_stream_id is not None: + progress = { + "target_min_stream_id_inclusive": min_stream_id, + "max_stream_id_exclusive": max_stream_id + 1, + "rows_inserted": 0, + } + progress_json = ujson.dumps(progress) -def run_sqlite_upgrade(cur): - cur.execute(SQLITE_TABLE) - - rowid = -1 - while True: - cur.execute( - "SELECT rowid, json FROM event_json" - " WHERE rowid > ?" - " ORDER BY rowid ASC LIMIT 100", - (rowid,) - ) - - res = cur.fetchall() - - if not res: - break - - events = [ - ujson.loads(js) - for _, js in res - ] - - rowid = max(rid for rid, _ in res) - - rows = [] - for ev in events: - content = ev.get("content", {}) - body = content.get("body", None) - name = content.get("name", None) - topic = content.get("topic", None) - sender = ev.get("sender", None) - if ev["type"] == "m.room.message" and body: - rows.append(( - ev["event_id"], ev["room_id"], sender, "content.body", body - )) - if ev["type"] == "m.room.name" and name: - rows.append(( - ev["event_id"], ev["room_id"], sender, "content.name", name - )) - if ev["type"] == "m.room.topic" and topic: - rows.append(( - ev["event_id"], ev["room_id"], sender, "content.topic", topic - )) - - if rows: - logger.info(rows) - cur.executemany( - "INSERT INTO event_search (event_id, room_id, sender, key, value)" - " VALUES (?,?,?,?,?)", - rows - ) + cur.execute( + "INSERT into background_updates (update_name, progress_json)" + " VALUES (?, ?)", ("event_search", progress_json) + ) diff --git a/synapse/storage/search.py b/synapse/storage/search.py index d170c546b5..3b19b118eb 100644 --- a/synapse/storage/search.py +++ b/synapse/storage/search.py @@ -37,8 +37,8 @@ class SearchStore(BackgroundUpdateStore): @defer.inlineCallbacks def _background_reindex_search(self, progress, batch_size): - target_min_stream_id = progress["target_min_stream_id"] - max_stream_id = progress["max_stream_id"] + target_min_stream_id = progress["target_min_stream_id_inclusive"] + max_stream_id = progress["max_stream_id_exclusive"] rows_inserted = progress.get("rows_inserted", 0) INSERT_CLUMP_SIZE = 1000 @@ -105,8 +105,8 @@ class SearchStore(BackgroundUpdateStore): txn.execute_many(sql, clump) progress = { - "target_max_stream_id": target_min_stream_id, - "max_stream_id": min_stream_id, + "target_min_stream_id_inclusive": target_min_stream_id, + "max_stream_id_exclusive": min_stream_id, "rows_inserted": rows_inserted + len(event_search_rows) } From 38d82edf0e463e1e6eb6859330f2517cc7ae3e41 Mon Sep 17 00:00:00 2001 From: Daniel Wagner-Hall Date: Tue, 10 Nov 2015 16:57:13 +0000 Subject: [PATCH 17/44] Allow guest users to join and message rooms --- synapse/api/constants.py | 1 + synapse/handlers/_base.py | 57 ++++++++++++++++++++++++++++++++++ synapse/handlers/federation.py | 10 +++--- synapse/handlers/message.py | 4 +-- synapse/handlers/presence.py | 3 +- synapse/handlers/room.py | 16 +++++++++- synapse/rest/client/v1/room.py | 13 ++++++-- 7 files changed, 92 insertions(+), 12 deletions(-) diff --git a/synapse/api/constants.py b/synapse/api/constants.py index 41125e8719..c2450b771a 100644 --- a/synapse/api/constants.py +++ b/synapse/api/constants.py @@ -68,6 +68,7 @@ class EventTypes(object): RoomHistoryVisibility = "m.room.history_visibility" CanonicalAlias = "m.room.canonical_alias" RoomAvatar = "m.room.avatar" + GuestAccess = "m.room.guest_access" # These are used for validation Message = "m.room.message" diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py index eef325a94b..f4ade1f594 100644 --- a/synapse/handlers/_base.py +++ b/synapse/handlers/_base.py @@ -175,6 +175,8 @@ class BaseHandler(object): if not suppress_auth: self.auth.check(event, auth_events=context.current_state) + yield self.maybe_kick_guest_users(event, context.current_state.values()) + if event.type == EventTypes.CanonicalAlias: # Check the alias is acually valid (at this time at least) room_alias_str = event.content.get("alias", None) @@ -282,3 +284,58 @@ class BaseHandler(object): federation_handler.handle_new_event( event, destinations=destinations, ) + + @defer.inlineCallbacks + def maybe_kick_guest_users(self, event, current_state): + # Technically this function invalidates current_state by changing it. + # Hopefully this isn't that important to the caller. + if event.type == EventTypes.GuestAccess: + guest_access = event.content.get("guest_access", "forbidden") + if guest_access != "can_join": + yield self.kick_guest_users(current_state) + + @defer.inlineCallbacks + def kick_guest_users(self, current_state): + for member_event in current_state: + try: + if member_event.type != EventTypes.Member: + continue + + if not self.hs.is_mine(UserID.from_string(member_event.state_key)): + continue + + if member_event.content["membership"] not in { + Membership.JOIN, + Membership.INVITE + }: + continue + + if ( + "kind" not in member_event.content + or member_event.content["kind"] != "guest" + ): + continue + + # We make the user choose to leave, rather than have the + # event-sender kick them. This is partially because we don't + # need to worry about power levels, and partially because guest + # users are a concept which doesn't hugely work over federation, + # and having homeservers have their own users leave keeps more + # of that decision-making and control local to the guest-having + # homeserver. + message_handler = self.hs.get_handlers().message_handler + yield message_handler.create_and_send_event( + { + "type": EventTypes.Member, + "state_key": member_event.state_key, + "content": { + "membership": Membership.LEAVE, + "kind": "guest" + }, + "room_id": member_event.room_id, + "sender": member_event.state_key + }, + ratelimit=False, + ) + except Exception as e: + logger.warn("Error kicking guest user: %s" % (e,)) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 872051b8b9..d1589334a5 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -1097,8 +1097,6 @@ class FederationHandler(BaseHandler): context = yield self._prep_event( origin, event, state=state, - backfilled=backfilled, - current_state=current_state, auth_events=auth_events, ) @@ -1121,7 +1119,6 @@ class FederationHandler(BaseHandler): origin, ev_info["event"], state=ev_info.get("state"), - backfilled=backfilled, auth_events=ev_info.get("auth_events"), ) for ev_info in event_infos @@ -1208,8 +1205,7 @@ class FederationHandler(BaseHandler): defer.returnValue((event_stream_id, max_stream_id)) @defer.inlineCallbacks - def _prep_event(self, origin, event, state=None, backfilled=False, - current_state=None, auth_events=None): + def _prep_event(self, origin, event, state=None, auth_events=None): outlier = event.internal_metadata.is_outlier() context = yield self.state_handler.compute_event_context( @@ -1242,6 +1238,10 @@ class FederationHandler(BaseHandler): context.rejected = RejectedReason.AUTH_ERROR + if event.type == EventTypes.GuestAccess: + full_context = yield self.store.get_current_state(room_id=event.room_id) + yield self.maybe_kick_guest_users(event, full_context) + defer.returnValue(context) @defer.inlineCallbacks diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 654ecd2b37..7d31ff8d46 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -167,7 +167,7 @@ class MessageHandler(BaseHandler): @defer.inlineCallbacks def create_and_send_event(self, event_dict, ratelimit=True, - token_id=None, txn_id=None): + token_id=None, txn_id=None, is_guest=False): """ Given a dict from a client, create and handle a new event. Creates an FrozenEvent object, filling out auth_events, prev_events, @@ -213,7 +213,7 @@ class MessageHandler(BaseHandler): if event.type == EventTypes.Member: member_handler = self.hs.get_handlers().room_member_handler - yield member_handler.change_membership(event, context) + yield member_handler.change_membership(event, context, is_guest=is_guest) else: yield self.handle_new_client_event( event=event, diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 0b780cd528..aca65096fc 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -950,7 +950,8 @@ class PresenceHandler(BaseHandler): ) while len(self._remote_offline_serials) > MAX_OFFLINE_SERIALS: self._remote_offline_serials.pop() # remove the oldest - del self._user_cachemap[user] + if user in self._user_cachemap: + del self._user_cachemap[user] else: # Remove the user from remote_offline_serials now that they're # no longer offline diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 834972a580..7d18218cd9 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -369,7 +369,7 @@ class RoomMemberHandler(BaseHandler): remotedomains.add(member.domain) @defer.inlineCallbacks - def change_membership(self, event, context, do_auth=True): + def change_membership(self, event, context, do_auth=True, is_guest=False): """ Change the membership status of a user in a room. Args: @@ -390,6 +390,20 @@ class RoomMemberHandler(BaseHandler): # if this HS is not currently in the room, i.e. we have to do the # invite/join dance. if event.membership == Membership.JOIN: + if is_guest: + guest_access = context.current_state.get( + (EventTypes.GuestAccess, ""), + None + ) + is_guest_access_allowed = ( + guest_access + and guest_access.content + and "guest_access" in guest_access.content + and guest_access.content["guest_access"] == "can_join" + ) + if not is_guest_access_allowed: + raise AuthError(403, "Guest access not allowed") + yield self._do_join(event, context, do_auth=do_auth) else: if event.membership == Membership.LEAVE: diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py index e88a1ae290..03ac073926 100644 --- a/synapse/rest/client/v1/room.py +++ b/synapse/rest/client/v1/room.py @@ -175,7 +175,7 @@ class RoomSendEventRestServlet(ClientV1RestServlet): @defer.inlineCallbacks def on_POST(self, request, room_id, event_type, txn_id=None): - user, token_id, _ = yield self.auth.get_user_by_req(request) + user, token_id, _ = yield self.auth.get_user_by_req(request, allow_guest=True) content = _parse_json(request) msg_handler = self.handlers.message_handler @@ -220,7 +220,10 @@ class JoinRoomAliasServlet(ClientV1RestServlet): @defer.inlineCallbacks def on_POST(self, request, room_identifier, txn_id=None): - user, token_id, _ = yield self.auth.get_user_by_req(request) + user, token_id, is_guest = yield self.auth.get_user_by_req( + request, + allow_guest=True + ) # the identifier could be a room alias or a room id. Try one then the # other if it fails to parse, without swallowing other valid @@ -242,16 +245,20 @@ class JoinRoomAliasServlet(ClientV1RestServlet): defer.returnValue((200, ret_dict)) else: # room id msg_handler = self.handlers.message_handler + content = {"membership": Membership.JOIN} + if is_guest: + content["kind"] = "guest" yield msg_handler.create_and_send_event( { "type": EventTypes.Member, - "content": {"membership": Membership.JOIN}, + "content": content, "room_id": identifier.to_string(), "sender": user.to_string(), "state_key": user.to_string(), }, token_id=token_id, txn_id=txn_id, + is_guest=is_guest, ) defer.returnValue((200, {"room_id": identifier.to_string()})) From cf437900e0c689aad40f3da89866cf84c0f7ef65 Mon Sep 17 00:00:00 2001 From: Daniel Wagner-Hall Date: Tue, 10 Nov 2015 17:10:27 +0000 Subject: [PATCH 18/44] Return world_readable and guest_can_join in /publicRooms --- synapse/storage/events.py | 2 + synapse/storage/room.py | 71 +++++++++++-------- .../storage/schema/delta/25/guest_access.sql | 25 +++++++ tests/storage/test_room.py | 2 + 4 files changed, 71 insertions(+), 29 deletions(-) create mode 100644 synapse/storage/schema/delta/25/guest_access.sql diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 59c9987202..4a365ff639 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -313,6 +313,8 @@ class EventsStore(SQLBaseStore): self._store_redaction(txn, event) elif event.type == EventTypes.RoomHistoryVisibility: self._store_history_visibility_txn(txn, event) + elif event.type == EventTypes.GuestAccess: + self._store_guest_access_txn(txn, event) self._store_room_members_txn( txn, diff --git a/synapse/storage/room.py b/synapse/storage/room.py index 1c79626736..4f08df478c 100644 --- a/synapse/storage/room.py +++ b/synapse/storage/room.py @@ -99,34 +99,39 @@ class RoomStore(SQLBaseStore): """ def f(txn): - topic_subquery = ( - "SELECT topics.event_id as event_id, " - "topics.room_id as room_id, topic " - "FROM topics " - "INNER JOIN current_state_events as c " - "ON c.event_id = topics.event_id " - ) + def subquery(table_name, column_name=None): + column_name = column_name or table_name + return ( + "SELECT %(table_name)s.event_id as event_id, " + "%(table_name)s.room_id as room_id, %(column_name)s " + "FROM %(table_name)s " + "INNER JOIN current_state_events as c " + "ON c.event_id = %(table_name)s.event_id " % { + "column_name": column_name, + "table_name": table_name, + } + ) - name_subquery = ( - "SELECT room_names.event_id as event_id, " - "room_names.room_id as room_id, name " - "FROM room_names " - "INNER JOIN current_state_events as c " - "ON c.event_id = room_names.event_id " - ) - - # We use non printing ascii character US (\x1F) as a separator sql = ( - "SELECT r.room_id, max(n.name), max(t.topic)" + "SELECT" + " r.room_id," + " max(n.name)," + " max(t.topic)," + " max(v.history_visibility)," + " max(g.guest_access)" " FROM rooms AS r" " LEFT JOIN (%(topic)s) AS t ON t.room_id = r.room_id" " LEFT JOIN (%(name)s) AS n ON n.room_id = r.room_id" + " LEFT JOIN (%(history_visibility)s) AS v ON v.room_id = r.room_id" + " LEFT JOIN (%(guest_access)s) AS g ON g.room_id = r.room_id" " WHERE r.is_public = ?" - " GROUP BY r.room_id" - ) % { - "topic": topic_subquery, - "name": name_subquery, - } + " GROUP BY r.room_id" % { + "topic": subquery("topics", "topic"), + "name": subquery("room_names", "name"), + "history_visibility": subquery("history_visibility"), + "guest_access": subquery("guest_access"), + } + ) txn.execute(sql, (is_public,)) @@ -156,10 +161,12 @@ class RoomStore(SQLBaseStore): "room_id": r[0], "name": r[1], "topic": r[2], - "aliases": r[3], + "world_readable": r[3] == "world_readable", + "guest_can_join": r[4] == "can_join", + "aliases": r[5], } for r in rows - if r[3] # We only return rooms that have at least one alias. + if r[5] # We only return rooms that have at least one alias. ] defer.returnValue(ret) @@ -203,16 +210,22 @@ class RoomStore(SQLBaseStore): ) def _store_history_visibility_txn(self, txn, event): - if hasattr(event, "content") and "history_visibility" in event.content: + self._store_content_index_txn(txn, event, "history_visibility") + + def _store_guest_access_txn(self, txn, event): + self._store_content_index_txn(txn, event, "guest_access") + + def _store_content_index_txn(self, txn, event, key): + if hasattr(event, "content") and key in event.content: sql = ( - "INSERT INTO history_visibility" - " (event_id, room_id, history_visibility)" - " VALUES (?, ?, ?)" + "INSERT INTO %(key)s" + " (event_id, room_id, %(key)s)" + " VALUES (?, ?, ?)" % {"key": key} ) txn.execute(sql, ( event.event_id, event.room_id, - event.content["history_visibility"] + event.content[key] )) def _store_event_search_txn(self, txn, event, key, value): diff --git a/synapse/storage/schema/delta/25/guest_access.sql b/synapse/storage/schema/delta/25/guest_access.sql new file mode 100644 index 0000000000..bdb90e7118 --- /dev/null +++ b/synapse/storage/schema/delta/25/guest_access.sql @@ -0,0 +1,25 @@ +/* Copyright 2015 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * This is a manual index of guest_access content of state events, + * so that we can join on them in SELECT statements. + */ +CREATE TABLE IF NOT EXISTS guest_access( + event_id TEXT NOT NULL, + room_id TEXT NOT NULL, + guest_access TEXT NOT NULL, + UNIQUE (event_id) +); diff --git a/tests/storage/test_room.py b/tests/storage/test_room.py index caffce64e3..91c967548d 100644 --- a/tests/storage/test_room.py +++ b/tests/storage/test_room.py @@ -73,6 +73,8 @@ class RoomStoreTestCase(unittest.TestCase): "room_id": self.room.to_string(), "topic": None, "aliases": [self.alias.to_string()], + "world_readable": False, + "guest_can_join": False, }, rooms[0]) From 940a16119205115c02a3b23e9f3c67a08486bae0 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Wed, 11 Nov 2015 13:59:40 +0000 Subject: [PATCH 19/44] Fix the background update --- synapse/storage/background_updates.py | 13 ++++++++----- synapse/storage/schema/delta/25/fts.py | 7 +++---- synapse/storage/search.py | 16 ++++++++-------- 3 files changed, 19 insertions(+), 17 deletions(-) diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py index b6cdc6ec68..45fccc2e5e 100644 --- a/synapse/storage/background_updates.py +++ b/synapse/storage/background_updates.py @@ -93,16 +93,19 @@ class BackgroundUpdateStore(SQLBaseStore): sleep = defer.Deferred() self._background_update_timer = self._clock.call_later( - self.BACKGROUND_UPDATE_INTERVAL_MS / 1000., sleep.callback + self.BACKGROUND_UPDATE_INTERVAL_MS / 1000., sleep.callback, None ) try: yield sleep finally: self._background_update_timer = None - result = yield self.do_background_update( - self.BACKGROUND_UPDATE_DURATION_MS - ) + try: + result = yield self.do_background_update( + self.BACKGROUND_UPDATE_DURATION_MS + ) + except: + logger.exception("Error doing update") if result is None: logger.info( @@ -169,7 +172,7 @@ class BackgroundUpdateStore(SQLBaseStore): duration_ms = time_stop - time_start logger.info( - "Updating %. Updated %r items in %rms." + "Updating %r. Updated %r items in %rms." " (total_rate=%r/ms, current_rate=%r/ms, total_updated=%r)", update_name, items_updated, duration_ms, performance.total_items_per_ms(), diff --git a/synapse/storage/schema/delta/25/fts.py b/synapse/storage/schema/delta/25/fts.py index e408d36da0..ce92f59025 100644 --- a/synapse/storage/schema/delta/25/fts.py +++ b/synapse/storage/schema/delta/25/fts.py @@ -47,11 +47,10 @@ def run_upgrade(cur, database_engine, *args, **kwargs): if isinstance(database_engine, PostgresEngine): for statement in get_statements(POSTGRES_TABLE.splitlines()): cur.execute(statement) - return - - if isinstance(database_engine, Sqlite3Engine): + elif isinstance(database_engine, Sqlite3Engine): cur.execute(SQLITE_TABLE) - return + else: + raise Exception("Unrecognized database engine") cur.execute("SELECT MIN(stream_ordering) FROM events") rows = cur.fetchall() diff --git a/synapse/storage/search.py b/synapse/storage/search.py index 3b19b118eb..3c0d671129 100644 --- a/synapse/storage/search.py +++ b/synapse/storage/search.py @@ -46,18 +46,18 @@ class SearchStore(BackgroundUpdateStore): def reindex_search_txn(txn): sql = ( - "SELECT stream_id, event_id FROM events" + "SELECT stream_ordering, event_id FROM events" " WHERE ? <= stream_ordering AND stream_ordering < ?" " AND (%s)" " ORDER BY stream_ordering DESC" " LIMIT ?" - ) % (" OR ".join("type = '%s'" % TYPES),) + ) % (" OR ".join("type = '%s'" % (t,) for t in TYPES),) - txn.execute(sql, target_min_stream_id, max_stream_id, batch_size) + txn.execute(sql, (target_min_stream_id, max_stream_id, batch_size)) - rows = txn.fetch_all() + rows = txn.fetchall() if not rows: - return None + return 0 min_stream_id = rows[-1][0] event_ids = [row[1] for row in rows] @@ -102,7 +102,7 @@ class SearchStore(BackgroundUpdateStore): for index in range(0, len(event_search_rows), INSERT_CLUMP_SIZE): clump = event_search_rows[index:index + INSERT_CLUMP_SIZE] - txn.execute_many(sql, clump) + txn.executemany(sql, clump) progress = { "target_min_stream_id_inclusive": target_min_stream_id, @@ -116,11 +116,11 @@ class SearchStore(BackgroundUpdateStore): return len(event_search_rows) - result = yield self.runInteration( + result = yield self.runInteraction( self.EVENT_SEARCH_UPDATE_NAME, reindex_search_txn ) - if result is None: + if not result: yield self._end_background_update(self.EVENT_SEARCH_UPDATE_NAME) defer.returnValue(result) From f15ba926ccfb36cad31a19fe22a4cb384850f4dd Mon Sep 17 00:00:00 2001 From: Daniel Wagner-Hall Date: Wed, 11 Nov 2015 17:13:24 +0000 Subject: [PATCH 20/44] Allow guest access to room initialSync --- synapse/handlers/message.py | 55 ++++++++++++++++++++-------------- synapse/rest/client/v1/room.py | 3 +- 2 files changed, 34 insertions(+), 24 deletions(-) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 7d31ff8d46..6720c6a728 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -456,7 +456,7 @@ class MessageHandler(BaseHandler): defer.returnValue(ret) @defer.inlineCallbacks - def room_initial_sync(self, user_id, room_id, pagin_config=None): + def room_initial_sync(self, user_id, room_id, pagin_config=None, is_guest=False): """Capture the a snapshot of a room. If user is currently a member of the room this will be what is currently in the room. If the user left the room this will be what was in the room when they left. @@ -473,15 +473,19 @@ class MessageHandler(BaseHandler): A JSON serialisable dict with the snapshot of the room. """ - member_event = yield self.auth.check_user_was_in_room(room_id, user_id) + membership, member_event_id = yield self._check_in_room_or_world_readable( + room_id, + user_id, + is_guest + ) - if member_event.membership == Membership.JOIN: + if membership == Membership.JOIN: result = yield self._room_initial_sync_joined( - user_id, room_id, pagin_config, member_event + user_id, room_id, pagin_config, membership, is_guest ) - elif member_event.membership == Membership.LEAVE: + elif membership == Membership.LEAVE: result = yield self._room_initial_sync_parted( - user_id, room_id, pagin_config, member_event + user_id, room_id, pagin_config, membership, member_event_id, is_guest ) private_user_data = [] @@ -497,19 +501,19 @@ class MessageHandler(BaseHandler): @defer.inlineCallbacks def _room_initial_sync_parted(self, user_id, room_id, pagin_config, - member_event): + membership, member_event_id, is_guest): room_state = yield self.store.get_state_for_events( - [member_event.event_id], None + [member_event_id], None ) - room_state = room_state[member_event.event_id] + room_state = room_state[member_event_id] limit = pagin_config.limit if pagin_config else None if limit is None: limit = 10 stream_token = yield self.store.get_stream_token_for_event( - member_event.event_id + member_event_id ) messages, token = yield self.store.get_recent_events_for_room( @@ -519,7 +523,7 @@ class MessageHandler(BaseHandler): ) messages = yield self._filter_events_for_client( - user_id, messages + user_id, messages, is_guest=is_guest ) start_token = StreamToken(token[0], 0, 0, 0, 0) @@ -528,7 +532,7 @@ class MessageHandler(BaseHandler): time_now = self.clock.time_msec() defer.returnValue({ - "membership": member_event.membership, + "membership": membership, "room_id": room_id, "messages": { "chunk": [serialize_event(m, time_now) for m in messages], @@ -542,7 +546,7 @@ class MessageHandler(BaseHandler): @defer.inlineCallbacks def _room_initial_sync_joined(self, user_id, room_id, pagin_config, - member_event): + membership, is_guest): current_state = yield self.state.get_current_state( room_id=room_id, ) @@ -574,12 +578,14 @@ class MessageHandler(BaseHandler): @defer.inlineCallbacks def get_presence(): - states = yield presence_handler.get_states( - target_users=[UserID.from_string(m.user_id) for m in room_members], - auth_user=auth_user, - as_event=True, - check_auth=False, - ) + states = {} + if not is_guest: + states = yield presence_handler.get_states( + target_users=[UserID.from_string(m.user_id) for m in room_members], + auth_user=auth_user, + as_event=True, + check_auth=False, + ) defer.returnValue(states.values()) @@ -599,7 +605,7 @@ class MessageHandler(BaseHandler): ).addErrback(unwrapFirstError) messages = yield self._filter_events_for_client( - user_id, messages + user_id, messages, is_guest=is_guest, require_all_visible_for_guests=False ) start_token = now_token.copy_and_replace("room_key", token[0]) @@ -607,8 +613,7 @@ class MessageHandler(BaseHandler): time_now = self.clock.time_msec() - defer.returnValue({ - "membership": member_event.membership, + ret = { "room_id": room_id, "messages": { "chunk": [serialize_event(m, time_now) for m in messages], @@ -618,4 +623,8 @@ class MessageHandler(BaseHandler): "state": state, "presence": presence, "receipts": receipts, - }) + } + if not is_guest: + ret["membership"] = membership + + defer.returnValue(ret) diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py index 03ac073926..df2dc37b24 100644 --- a/synapse/rest/client/v1/room.py +++ b/synapse/rest/client/v1/room.py @@ -372,12 +372,13 @@ class RoomInitialSyncRestServlet(ClientV1RestServlet): @defer.inlineCallbacks def on_GET(self, request, room_id): - user, _, _ = yield self.auth.get_user_by_req(request) + user, _, is_guest = yield self.auth.get_user_by_req(request, allow_guest=True) pagination_config = PaginationConfig.from_request(request) content = yield self.handlers.message_handler.room_initial_sync( room_id=room_id, user_id=user.to_string(), pagin_config=pagination_config, + is_guest=is_guest, ) defer.returnValue((200, content)) From e1627388d15fac534e22b80c2f7cb1a4e1afacc3 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Wed, 11 Nov 2015 17:14:56 +0000 Subject: [PATCH 21/44] Fix param style to work on both sqlite and postgres --- synapse/storage/schema/delta/25/fts.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/synapse/storage/schema/delta/25/fts.py b/synapse/storage/schema/delta/25/fts.py index ce92f59025..5239d69073 100644 --- a/synapse/storage/schema/delta/25/fts.py +++ b/synapse/storage/schema/delta/25/fts.py @@ -68,7 +68,11 @@ def run_upgrade(cur, database_engine, *args, **kwargs): } progress_json = ujson.dumps(progress) - cur.execute( + sql = ( "INSERT into background_updates (update_name, progress_json)" - " VALUES (?, ?)", ("event_search", progress_json) + " VALUES (?, ?)" ) + + sql = database_engine.convert_param_style(sql) + + cur.execute(sql, ("event_search", progress_json)) From e93d550b79e7e485bb2866ef956fca2379cefac7 Mon Sep 17 00:00:00 2001 From: Daniel Wagner-Hall Date: Wed, 11 Nov 2015 17:49:44 +0000 Subject: [PATCH 22/44] Allow guests to access room context API --- synapse/handlers/room.py | 12 +++++++++--- synapse/rest/client/v1/room.py | 4 ++-- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 7d18218cd9..0266926fc7 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -765,7 +765,7 @@ class RoomListHandler(BaseHandler): class RoomContextHandler(BaseHandler): @defer.inlineCallbacks - def get_event_context(self, user, room_id, event_id, limit): + def get_event_context(self, user, room_id, event_id, limit, is_guest): """Retrieves events, pagination tokens and state around a given event in a room. @@ -789,11 +789,17 @@ class RoomContextHandler(BaseHandler): ) results["events_before"] = yield self._filter_events_for_client( - user.to_string(), results["events_before"] + user.to_string(), + results["events_before"], + is_guest=is_guest, + require_all_visible_for_guests=False ) results["events_after"] = yield self._filter_events_for_client( - user.to_string(), results["events_after"] + user.to_string(), + results["events_after"], + is_guest=is_guest, + require_all_visible_for_guests=False ) if results["events_after"]: diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py index 03ac073926..c583025e30 100644 --- a/synapse/rest/client/v1/room.py +++ b/synapse/rest/client/v1/room.py @@ -417,12 +417,12 @@ class RoomEventContext(ClientV1RestServlet): @defer.inlineCallbacks def on_GET(self, request, room_id, event_id): - user, _ = yield self.auth.get_user_by_req(request) + user, _, is_guest = yield self.auth.get_user_by_req(request, allow_guest=True) limit = int(request.args.get("limit", [10])[0]) results = yield self.handlers.room_context_handler.get_event_context( - user, room_id, event_id, limit, + user, room_id, event_id, limit, is_guest ) time_now = self.clock.time_msec() From e327327174ec31b0babadac978b0a32c1c0b6569 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 11 Nov 2015 18:17:55 +0000 Subject: [PATCH 23/44] Update CHANGES --- CHANGES.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGES.rst b/CHANGES.rst index 5a63ae6568..a2871e8426 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -1,7 +1,7 @@ Changes in synapse v0.11.0-rc1 (XXXX-XX-XX) =========================================== -* Add Search API (PR #307, #324, #327, #336, #350) +* Add Search API (PR #307, #324, #327, #336, #350, #359) * Add 'archived' state to v2 /sync API (PR #316) * Add ability to reject invites (PR #317) * Add config option to disable password login (PR #322) From 63b28c781657ff013610490cd2504630da0a1308 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 11 Nov 2015 18:21:26 +0000 Subject: [PATCH 24/44] Update date --- CHANGES.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGES.rst b/CHANGES.rst index a2871e8426..cf5e33c4f9 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -1,4 +1,4 @@ -Changes in synapse v0.11.0-rc1 (XXXX-XX-XX) +Changes in synapse v0.11.0-rc1 (2015-11-11) =========================================== * Add Search API (PR #307, #324, #327, #336, #350, #359) From 6a9c4cfd0be467e290be52b02e6229fd30367c6e Mon Sep 17 00:00:00 2001 From: Daniel Wagner-Hall Date: Thu, 12 Nov 2015 11:58:48 +0000 Subject: [PATCH 25/44] Fix race creating directories --- synapse/config/_base.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/synapse/config/_base.py b/synapse/config/_base.py index ceef309afc..c18e0bdbb8 100644 --- a/synapse/config/_base.py +++ b/synapse/config/_base.py @@ -14,6 +14,7 @@ # limitations under the License. import argparse +import errno import os import yaml import sys @@ -91,8 +92,11 @@ class Config(object): @classmethod def ensure_directory(cls, dir_path): dir_path = cls.abspath(dir_path) - if not os.path.exists(dir_path): + try: os.makedirs(dir_path) + except OSError, e: + if e.errno != errno.EEXIST: + raise if not os.path.isdir(dir_path): raise ConfigError( "%s is not a directory" % (dir_path,) From 50f1afbd5b8560012d7beb116fc2da0e8b0e6199 Mon Sep 17 00:00:00 2001 From: Daniel Wagner-Hall Date: Thu, 12 Nov 2015 13:37:07 +0000 Subject: [PATCH 26/44] Consider joined guest users as joined users Otherwise they're inconveniently allowed to write events to the room but not to read them from the room. --- synapse/handlers/message.py | 33 +++++++++++++++++++++------------ 1 file changed, 21 insertions(+), 12 deletions(-) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 7d31ff8d46..92e1180262 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -258,20 +258,29 @@ class MessageHandler(BaseHandler): @defer.inlineCallbacks def _check_in_room_or_world_readable(self, room_id, user_id, is_guest): - if is_guest: - visibility = yield self.state_handler.get_current_state( - room_id, EventTypes.RoomHistoryVisibility, "" - ) - if visibility.content["history_visibility"] == "world_readable": - defer.returnValue((Membership.JOIN, None)) - return - else: - raise AuthError( - 403, "Guest access not allowed", errcode=Codes.GUEST_ACCESS_FORBIDDEN - ) - else: + try: + # check_user_was_in_room will return the most recent membership + # event for the user if: + # * The user is a non-guest user, and was ever in the room + # * The user is a guest user, and has joined the room + # else it will throw. member_event = yield self.auth.check_user_was_in_room(room_id, user_id) defer.returnValue((member_event.membership, member_event.event_id)) + return + except AuthError: + if not is_guest: + raise + + visibility = yield self.state_handler.get_current_state( + room_id, EventTypes.RoomHistoryVisibility, "" + ) + if visibility.content["history_visibility"] == "world_readable": + defer.returnValue((Membership.JOIN, None)) + return + else: + raise AuthError( + 403, "Guest access not allowed", errcode=Codes.GUEST_ACCESS_FORBIDDEN + ) @defer.inlineCallbacks def get_state_events(self, user_id, room_id, is_guest=False): From 0a93df5f9c4921690fe456ee18bd8d51a7a08744 Mon Sep 17 00:00:00 2001 From: Daniel Wagner-Hall Date: Thu, 12 Nov 2015 13:44:39 +0000 Subject: [PATCH 27/44] Allow guests to set their display names Depends: https://github.com/matrix-org/synapse/pull/363 Tests in https://github.com/matrix-org/sytest/pull/66 --- synapse/rest/client/v1/profile.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/rest/client/v1/profile.py b/synapse/rest/client/v1/profile.py index 6b379e4e5f..3218e47025 100644 --- a/synapse/rest/client/v1/profile.py +++ b/synapse/rest/client/v1/profile.py @@ -37,7 +37,7 @@ class ProfileDisplaynameRestServlet(ClientV1RestServlet): @defer.inlineCallbacks def on_PUT(self, request, user_id): - auth_user, _, _ = yield self.auth.get_user_by_req(request) + auth_user, _, _ = yield self.auth.get_user_by_req(request, allow_guest=True) user = UserID.from_string(user_id) try: From 39de87869c9ad966f382e597986e860d03f3fef2 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 12 Nov 2015 14:06:31 +0000 Subject: [PATCH 28/44] Fix bug where assumed dict was namedtuple --- synapse/storage/transactions.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py index 4e0d7c9774..ad099775eb 100644 --- a/synapse/storage/transactions.py +++ b/synapse/storage/transactions.py @@ -59,7 +59,7 @@ class TransactionStore(SQLBaseStore): allow_none=True, ) - if result and result.response_code: + if result and result["response_code"]: return result["response_code"], result["response_json"] else: return None From 14a9d805b959b5d2b26fea0d57794cb3ceb28958 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 12 Nov 2015 14:07:25 +0000 Subject: [PATCH 29/44] Use a (hopefully) more efficient SQL query for doing recency based room search --- synapse/storage/search.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/synapse/storage/search.py b/synapse/storage/search.py index 2e88c51ad0..eac265b543 100644 --- a/synapse/storage/search.py +++ b/synapse/storage/search.py @@ -253,11 +253,13 @@ class SearchStore(BackgroundUpdateStore): ) elif isinstance(self.database_engine, Sqlite3Engine): sql = ( - "SELECT rank(matchinfo(event_search)) as rank, room_id, event_id," + "SELECT rank(matchinfo) as rank, room_id, event_id," " topological_ordering, stream_ordering" - " FROM event_search" - " NATURAL JOIN events" - " WHERE value MATCH ? AND room_id = ?" + " FROM (SELECT event_id, matchinfo(event_search) FROM event_search" + " WHERE value MATCH" + " )" + " CROSS JOIN events USING (event_id)" + " WHERE room_id = ?" ) else: # This should be unreachable. From fb7e260a20847ac2a253a0e7868ecb2284c9a766 Mon Sep 17 00:00:00 2001 From: Daniel Wagner-Hall Date: Thu, 12 Nov 2015 15:02:00 +0000 Subject: [PATCH 30/44] Tweak guest access permissions * Allow world_readable rooms to be read by guests who have joined and left * Allow regular users to access world_readable rooms --- synapse/handlers/message.py | 21 +++++++++++---------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 92e1180262..5f56f90590 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -267,17 +267,18 @@ class MessageHandler(BaseHandler): member_event = yield self.auth.check_user_was_in_room(room_id, user_id) defer.returnValue((member_event.membership, member_event.event_id)) return - except AuthError: + except AuthError, auth_error: + visibility = yield self.state_handler.get_current_state( + room_id, EventTypes.RoomHistoryVisibility, "" + ) + if ( + visibility and + visibility.content["history_visibility"] == "world_readable" + ): + defer.returnValue((Membership.JOIN, None)) + return if not is_guest: - raise - - visibility = yield self.state_handler.get_current_state( - room_id, EventTypes.RoomHistoryVisibility, "" - ) - if visibility.content["history_visibility"] == "world_readable": - defer.returnValue((Membership.JOIN, None)) - return - else: + raise auth_error raise AuthError( 403, "Guest access not allowed", errcode=Codes.GUEST_ACCESS_FORBIDDEN ) From 320408ef47eb373c2b8edad7ab3d08c3fa0cb459 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 12 Nov 2015 15:09:45 +0000 Subject: [PATCH 31/44] Fix SQL syntax --- synapse/storage/search.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/synapse/storage/search.py b/synapse/storage/search.py index eac265b543..e1911e2480 100644 --- a/synapse/storage/search.py +++ b/synapse/storage/search.py @@ -255,8 +255,9 @@ class SearchStore(BackgroundUpdateStore): sql = ( "SELECT rank(matchinfo) as rank, room_id, event_id," " topological_ordering, stream_ordering" - " FROM (SELECT event_id, matchinfo(event_search) FROM event_search" - " WHERE value MATCH" + " FROM (SELECT key, event_id, matchinfo(event_search) as matchinfo" + " FROM event_search" + " WHERE value MATCH ?" " )" " CROSS JOIN events USING (event_id)" " WHERE room_id = ?" From 764e79d0514a0cce9dc456df0355108d40dbeda8 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 12 Nov 2015 15:19:56 +0000 Subject: [PATCH 32/44] Comment --- synapse/storage/search.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/synapse/storage/search.py b/synapse/storage/search.py index e1911e2480..0b00ddb9db 100644 --- a/synapse/storage/search.py +++ b/synapse/storage/search.py @@ -252,6 +252,8 @@ class SearchStore(BackgroundUpdateStore): " WHERE vector @@ query AND room_id = ?" ) elif isinstance(self.database_engine, Sqlite3Engine): + # We use CROSS JOIN here to ensure we use the right indexes. + # https://sqlite.org/optoverview.html#crossjoin sql = ( "SELECT rank(matchinfo) as rank, room_id, event_id," " topological_ordering, stream_ordering" From 78f6010207d5e6908ba584121461af4b02714287 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Thu, 12 Nov 2015 13:10:25 +0000 Subject: [PATCH 33/44] Fix an issue with ignoring power_level changes on divergent graphs Changes to m.room.power_levels events are supposed to be handled at a high priority; however a typo meant that the relevant bit of code was never executed, so they were handled just like any other state change - which meant that a bad person could cause room state changes by forking the graph from a point in history when they were allowed to do so. --- synapse/state.py | 16 +++++--- tests/test_state.py | 93 +++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 104 insertions(+), 5 deletions(-) diff --git a/synapse/state.py b/synapse/state.py index bb225c39cf..f893df3378 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -307,19 +307,23 @@ class StateHandler(object): We resolve conflicts in the following order: 1. power levels - 2. memberships - 3. other events. + 2. join rules + 3. memberships + 4. other events. """ resolved_state = {} power_key = (EventTypes.PowerLevels, "") - if power_key in conflicted_state.items(): - power_levels = conflicted_state[power_key] - resolved_state[power_key] = self._resolve_auth_events(power_levels) + if power_key in conflicted_state: + events = conflicted_state[power_key] + logger.debug("Resolving conflicted power levels %r", events) + resolved_state[power_key] = self._resolve_auth_events( + events, auth_events) auth_events.update(resolved_state) for key, events in conflicted_state.items(): if key[0] == EventTypes.JoinRules: + logger.debug("Resolving conflicted join rules %r", events) resolved_state[key] = self._resolve_auth_events( events, auth_events @@ -329,6 +333,7 @@ class StateHandler(object): for key, events in conflicted_state.items(): if key[0] == EventTypes.Member: + logger.debug("Resolving conflicted member lists %r", events) resolved_state[key] = self._resolve_auth_events( events, auth_events @@ -338,6 +343,7 @@ class StateHandler(object): for key, events in conflicted_state.items(): if key not in resolved_state: + logger.debug("Resolving conflicted state %r:%r", key, events) resolved_state[key] = self._resolve_normal_events( events, auth_events ) diff --git a/tests/test_state.py b/tests/test_state.py index 0274c4bc18..e4e995b756 100644 --- a/tests/test_state.py +++ b/tests/test_state.py @@ -317,6 +317,99 @@ class StateTestCase(unittest.TestCase): {e.event_id for e in context_store["E"].current_state.values()} ) + @defer.inlineCallbacks + def test_branch_have_perms_conflict(self): + userid1 = "@user_id:example.com" + userid2 = "@user_id2:example.com" + + nodes = { + "A1": DictObj( + type=EventTypes.Create, + state_key="", + content={"creator": userid1}, + depth=1, + ), + "A2": DictObj( + type=EventTypes.Member, + state_key=userid1, + content={"membership": Membership.JOIN}, + membership=Membership.JOIN, + ), + "A3": DictObj( + type=EventTypes.Member, + state_key=userid2, + content={"membership": Membership.JOIN}, + membership=Membership.JOIN, + ), + "A4": DictObj( + type=EventTypes.PowerLevels, + state_key="", + content={ + "events": {"m.room.name": 50}, + "users": {userid1: 100, + userid2: 60}, + }, + ), + "A5": DictObj( + type=EventTypes.Name, + state_key="", + ), + "B": DictObj( + type=EventTypes.PowerLevels, + state_key="", + content={ + "events": {"m.room.name": 50}, + "users": {userid2: 30}, + }, + ), + "C": DictObj( + type=EventTypes.Name, + state_key="", + sender=userid2, + ), + "D": DictObj( + type=EventTypes.Message, + ), + } + edges = { + "A2": ["A1"], + "A3": ["A2"], + "A4": ["A3"], + "A5": ["A4"], + "B": ["A5"], + "C": ["A5"], + "D": ["B", "C"] + } + self._add_depths(nodes, edges) + graph = Graph(nodes, edges) + + store = StateGroupStore() + self.store.get_state_groups.side_effect = store.get_state_groups + + context_store = {} + + for event in graph.walk(): + context = yield self.state.compute_event_context(event) + store.store_state_groups(event, context) + context_store[event.event_id] = context + + self.assertSetEqual( + {"A1", "A2", "A3", "A5", "B"}, + {e.event_id for e in context_store["D"].current_state.values()} + ) + + def _add_depths(self, nodes, edges): + def _get_depth(ev): + node = nodes[ev] + if 'depth' not in node: + prevs = edges[ev] + depth = max(_get_depth(prev) for prev in prevs) + 1 + node['depth'] = depth + return node['depth'] + + for n in nodes: + _get_depth(n) + @defer.inlineCallbacks def test_annotate_with_old_message(self): event = create_event(type="test_message", name="event") From 8fd8e72cec8df94403441664d8eecc25fa8d363f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 12 Nov 2015 15:33:47 +0000 Subject: [PATCH 34/44] Expand comment --- synapse/storage/search.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/synapse/storage/search.py b/synapse/storage/search.py index 0b00ddb9db..dcc5ac65a3 100644 --- a/synapse/storage/search.py +++ b/synapse/storage/search.py @@ -254,6 +254,12 @@ class SearchStore(BackgroundUpdateStore): elif isinstance(self.database_engine, Sqlite3Engine): # We use CROSS JOIN here to ensure we use the right indexes. # https://sqlite.org/optoverview.html#crossjoin + # + # We want to use the full text search index on event_search to + # extract all possible matches first, then lookup those matches + # in the events table to get the topological ordering. We need + # to use the indexes in this order because sqlite refuses to + # MATCH unless it uses the full text search index sql = ( "SELECT rank(matchinfo) as rank, room_id, event_id," " topological_ordering, stream_ordering" From 3de46c7755ac5e061fc13fb916c13b841b65f2b5 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 12 Nov 2015 15:36:43 +0000 Subject: [PATCH 35/44] Trailing whitespace --- synapse/storage/search.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/search.py b/synapse/storage/search.py index dcc5ac65a3..380270b009 100644 --- a/synapse/storage/search.py +++ b/synapse/storage/search.py @@ -258,7 +258,7 @@ class SearchStore(BackgroundUpdateStore): # We want to use the full text search index on event_search to # extract all possible matches first, then lookup those matches # in the events table to get the topological ordering. We need - # to use the indexes in this order because sqlite refuses to + # to use the indexes in this order because sqlite refuses to # MATCH unless it uses the full text search index sql = ( "SELECT rank(matchinfo) as rank, room_id, event_id," From c0b3554401b821002acfc3989e57d6ac59671aa9 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 12 Nov 2015 16:19:55 +0000 Subject: [PATCH 36/44] Fix missing profile data in federation joins There was a regression where we stopped including profile data in initial joins for rooms joined over federation. --- synapse/federation/federation_client.py | 5 ++++- synapse/handlers/federation.py | 11 +++++++---- synapse/handlers/room.py | 3 ++- 3 files changed, 13 insertions(+), 6 deletions(-) diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index c0c0b693b8..d4f586fae7 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -357,7 +357,8 @@ class FederationClient(FederationBase): defer.returnValue(signed_auth) @defer.inlineCallbacks - def make_membership_event(self, destinations, room_id, user_id, membership): + def make_membership_event(self, destinations, room_id, user_id, membership, + content={},): """ Creates an m.room.member event, with context, without participating in the room. @@ -398,6 +399,8 @@ class FederationClient(FederationBase): logger.debug("Got response to make_%s: %s", membership, pdu_dict) + pdu_dict["content"].update(content) + defer.returnValue( (destination, self.event_from_pdu_json(pdu_dict)) ) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index d1589334a5..c1bce07e31 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -564,7 +564,7 @@ class FederationHandler(BaseHandler): @log_function @defer.inlineCallbacks - def do_invite_join(self, target_hosts, room_id, joinee): + def do_invite_join(self, target_hosts, room_id, joinee, content): """ Attempts to join the `joinee` to the room `room_id` via the server `target_host`. @@ -584,7 +584,8 @@ class FederationHandler(BaseHandler): target_hosts, room_id, joinee, - "join" + "join", + content, ) self.room_queues[room_id] = [] @@ -840,12 +841,14 @@ class FederationHandler(BaseHandler): defer.returnValue(None) @defer.inlineCallbacks - def _make_and_verify_event(self, target_hosts, room_id, user_id, membership): + def _make_and_verify_event(self, target_hosts, room_id, user_id, membership, + content={},): origin, pdu = yield self.replication_layer.make_membership_event( target_hosts, room_id, user_id, - membership + membership, + content, ) logger.debug("Got response to make_%s: %s", membership, pdu) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 0266926fc7..3f04752581 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -504,7 +504,8 @@ class RoomMemberHandler(BaseHandler): yield handler.do_invite_join( room_hosts, room_id, - event.user_id + event.user_id, + event.content, ) else: logger.debug("Doing normal join") From 468a2ed4ecd06b208611d3b44cd588a184efdfec Mon Sep 17 00:00:00 2001 From: Daniel Wagner-Hall Date: Thu, 12 Nov 2015 16:45:28 +0000 Subject: [PATCH 37/44] Return non-room events from guest /events calls --- synapse/notifier.py | 20 +++++++++++++++++--- tests/rest/client/v1/test_presence.py | 3 +++ 2 files changed, 20 insertions(+), 3 deletions(-) diff --git a/synapse/notifier.py b/synapse/notifier.py index 56c4c863b5..e3b42e2331 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -14,6 +14,8 @@ # limitations under the License. from twisted.internet import defer +from synapse.api.constants import EventTypes +from synapse.api.errors import AuthError from synapse.util.logutils import log_function from synapse.util.async import run_on_reactor, ObservableDeferred @@ -346,9 +348,9 @@ class Notifier(object): room_ids = [] if is_guest: - # TODO(daniel): Deal with non-room events too - only_room_events = True if guest_room_id: + if not self._is_world_readable(guest_room_id): + raise AuthError(403, "Guest access not allowed") room_ids = [guest_room_id] else: rooms = yield self.store.get_rooms_for_user(user.to_string()) @@ -361,6 +363,7 @@ class Notifier(object): events = [] end_token = from_token + for name, source in self.event_sources.sources.items(): keyname = "%s_key" % name before_id = getattr(before_token, keyname) @@ -377,7 +380,7 @@ class Notifier(object): room_ids=room_ids, ) - if is_guest: + if name == "room": room_member_handler = self.hs.get_handlers().room_member_handler new_events = yield room_member_handler._filter_events_for_client( user.to_string(), @@ -403,6 +406,17 @@ class Notifier(object): defer.returnValue(result) + @defer.inlineCallbacks + def _is_world_readable(self, room_id): + state = yield self.hs.get_state_handler().get_current_state( + room_id, + EventTypes.RoomHistoryVisibility + ) + if state and "history_visibility" in state.content: + defer.returnValue(state.content["history_visibility"] == "world_readable") + else: + defer.returnValue(False) + @log_function def remove_expired_streams(self): time_now_ms = self.clock.time_msec() diff --git a/tests/rest/client/v1/test_presence.py b/tests/rest/client/v1/test_presence.py index 7f29d73d95..8581796f72 100644 --- a/tests/rest/client/v1/test_presence.py +++ b/tests/rest/client/v1/test_presence.py @@ -321,6 +321,9 @@ class PresenceEventStreamTestCase(unittest.TestCase): hs.handlers.room_member_handler.get_room_members = ( lambda r: self.room_members if r == "a-room" else [] ) + hs.handlers.room_member_handler._filter_events_for_client = ( + lambda user_id, events, **kwargs: events + ) self.mock_datastore = hs.get_datastore() self.mock_datastore.get_app_service_by_token = Mock(return_value=None) From 5dea4d37d160e5766aac6f1723a8b485c5b6c211 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Fri, 13 Nov 2015 10:31:15 +0000 Subject: [PATCH 38/44] Update some comments Add a couple of type annotations, docstrings, and other comments, in the interest of keeping track of what types I have. Merged from pull request #370. --- synapse/handlers/_base.py | 6 +++ synapse/handlers/sync.py | 34 ++++++++++++----- synapse/rest/client/v2_alpha/sync.py | 56 ++++++++++++++++++++++++++++ synapse/state.py | 16 ++++++-- 4 files changed, 98 insertions(+), 14 deletions(-) diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py index f4ade1f594..6519f183df 100644 --- a/synapse/handlers/_base.py +++ b/synapse/handlers/_base.py @@ -29,6 +29,12 @@ logger = logging.getLogger(__name__) class BaseHandler(object): + """ + Common base class for the event handlers. + + :type store: synapse.storage.events.StateStore + :type state_handler: synapse.state.StateHandler + """ def __init__(self, hs): self.store = hs.get_datastore() diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 492c1c17d5..ed93e5a2df 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -47,9 +47,9 @@ class TimelineBatch(collections.namedtuple("TimelineBatch", [ class JoinedSyncResult(collections.namedtuple("JoinedSyncResult", [ - "room_id", - "timeline", - "state", + "room_id", # str + "timeline", # TimelineBatch + "state", # list[FrozenEvent] "ephemeral", "private_user_data", ])): @@ -68,9 +68,9 @@ class JoinedSyncResult(collections.namedtuple("JoinedSyncResult", [ class ArchivedSyncResult(collections.namedtuple("JoinedSyncResult", [ - "room_id", - "timeline", - "state", + "room_id", # str + "timeline", # TimelineBatch + "state", # list[FrozenEvent] "private_user_data", ])): __slots__ = [] @@ -87,8 +87,8 @@ class ArchivedSyncResult(collections.namedtuple("JoinedSyncResult", [ class InvitedSyncResult(collections.namedtuple("InvitedSyncResult", [ - "room_id", - "invite", + "room_id", # str + "invite", # FrozenEvent: the invite event ])): __slots__ = [] @@ -507,6 +507,9 @@ class SyncHandler(BaseHandler): @defer.inlineCallbacks def load_filtered_recents(self, room_id, sync_config, now_token, since_token=None): + """ + :returns a Deferred TimelineBatch + """ limited = True recents = [] filtering_factor = 2 @@ -680,8 +683,13 @@ class SyncHandler(BaseHandler): def compute_state_delta(self, since_token, previous_state, current_state): """ Works out the differnce in state between the current state and the state the client got when it last performed a sync. - Returns: - A list of events. + + :param str since_token: the point we are comparing against + :param list[synapse.events.FrozenEvent] previous_state: the state to + compare to + :param list[synapse.events.FrozenEvent] current_state: the new state + + :returns: A list of events. """ # TODO(mjark) Check if the state events were received by the server # after the previous sync, since we need to include those state @@ -696,6 +704,12 @@ class SyncHandler(BaseHandler): @defer.inlineCallbacks def check_joined_room(self, sync_config, room_id, state_delta): + """ + Check if the user has just joined the given room. If so, return the + full state for the room, instead of the delta since the last sync. + + :returns A deferred Tuple (state_delta, limited) + """ joined = False limited = False for event in state_delta: diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py index d24507effa..997a61abbb 100644 --- a/synapse/rest/client/v2_alpha/sync.py +++ b/synapse/rest/client/v2_alpha/sync.py @@ -165,6 +165,20 @@ class SyncRestServlet(RestServlet): return {"events": filter.filter_presence(formatted)} def encode_joined(self, rooms, filter, time_now, token_id): + """ + Encode the joined rooms in a sync result + + :param list[synapse.handlers.sync.JoinedSyncResult] rooms: list of sync + results for rooms this user is joined to + :param FilterCollection filter: filters to apply to the results + :param int time_now: current time - used as a baseline for age + calculations + :param int token_id: ID of the user's auth token - used for namespacing + of transaction IDs + + :return: the joined rooms list, in our response format + :rtype: dict[str, dict[str, object]] + """ joined = {} for room in rooms: joined[room.room_id] = self.encode_room( @@ -174,6 +188,20 @@ class SyncRestServlet(RestServlet): return joined def encode_invited(self, rooms, filter, time_now, token_id): + """ + Encode the invited rooms in a sync result + + :param list[synapse.handlers.sync.InvitedSyncResult] rooms: list of + sync results for rooms this user is joined to + :param FilterCollection filter: filters to apply to the results + :param int time_now: current time - used as a baseline for age + calculations + :param int token_id: ID of the user's auth token - used for namespacing + of transaction IDs + + :return: the invited rooms list, in our response format + :rtype: dict[str, dict[str, object]] + """ invited = {} for room in rooms: invite = serialize_event( @@ -189,6 +217,20 @@ class SyncRestServlet(RestServlet): return invited def encode_archived(self, rooms, filter, time_now, token_id): + """ + Encode the archived rooms in a sync result + + :param list[synapse.handlers.sync.ArchivedSyncResult] rooms: list of + sync results for rooms this user is joined to + :param FilterCollection filter: filters to apply to the results + :param int time_now: current time - used as a baseline for age + calculations + :param int token_id: ID of the user's auth token - used for namespacing + of transaction IDs + + :return: the invited rooms list, in our response format + :rtype: dict[str, dict[str, object]] + """ joined = {} for room in rooms: joined[room.room_id] = self.encode_room( @@ -199,6 +241,20 @@ class SyncRestServlet(RestServlet): @staticmethod def encode_room(room, filter, time_now, token_id, joined=True): + """ + :param JoinedSyncResult|ArchivedSyncResult room: sync result for a + single room + :param FilterCollection filter: filters to apply to the results + :param int time_now: current time - used as a baseline for age + calculations + :param int token_id: ID of the user's auth token - used for namespacing + of transaction IDs + :param joined: True if the user is joined to this room - will mean + we handle ephemeral events + + :return: the room, encoded in our response format + :rtype: dict[str, object] + """ event_map = {} state_events = filter.filter_room_state(room.state) state_event_ids = [] diff --git a/synapse/state.py b/synapse/state.py index f893df3378..8ea2cac5d6 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -71,7 +71,7 @@ class StateHandler(object): @defer.inlineCallbacks def get_current_state(self, room_id, event_type=None, state_key=""): - """ Returns the current state for the room as a list. This is done by + """ Retrieves the current state for the room. This is done by calling `get_latest_events_in_room` to get the leading edges of the event graph and then resolving any of the state conflicts. @@ -80,6 +80,8 @@ class StateHandler(object): If `event_type` is specified, then the method returns only the one event (or None) with that `event_type` and `state_key`. + + :returns map from (type, state_key) to event """ event_ids = yield self.store.get_latest_event_ids_in_room(room_id) @@ -177,9 +179,10 @@ class StateHandler(object): """ Given a list of event_ids this method fetches the state at each event, resolves conflicts between them and returns them. - Return format is a tuple: (`state_group`, `state_events`), where the - first is the name of a state group if one and only one is involved, - otherwise `None`. + :returns a Deferred tuple of (`state_group`, `state`, `prev_state`). + `state_group` is the name of a state group if one and only one is + involved. `state` is a map from (type, state_key) to event, and + `prev_state` is a list of event ids. """ logger.debug("resolve_state_groups event_ids %s", event_ids) @@ -255,6 +258,11 @@ class StateHandler(object): return self._resolve_events(state_sets) def _resolve_events(self, state_sets, event_type=None, state_key=""): + """ + :returns a tuple (new_state, prev_states). new_state is a map + from (type, state_key) to event. prev_states is a list of event_ids. + :rtype: (dict[(str, str), synapse.events.FrozenEvent], list[str]) + """ state = {} for st in state_sets: for e in st: From 5ab4b0afe8b5126213cab2be7c3700eb7dd49789 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Thu, 12 Nov 2015 16:34:42 +0000 Subject: [PATCH 39/44] Make handlers.sync return a state dictionary, instead of an event list. Basically this moves the process of flattening the existing dictionary into a list up to rest.client.*, instead of doing it in handlers.sync. This simplifies a bit of the code in handlers.sync, but it is also going to be somewhat beneficial in the next stage of my hacking on SPEC-254. Merged from PR #371 --- synapse/handlers/sync.py | 70 ++++++++++++++++------------ synapse/rest/client/v2_alpha/sync.py | 2 +- 2 files changed, 40 insertions(+), 32 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index ed93e5a2df..8b154fa7e7 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -49,7 +49,7 @@ class TimelineBatch(collections.namedtuple("TimelineBatch", [ class JoinedSyncResult(collections.namedtuple("JoinedSyncResult", [ "room_id", # str "timeline", # TimelineBatch - "state", # list[FrozenEvent] + "state", # dict[(str, str), FrozenEvent] "ephemeral", "private_user_data", ])): @@ -70,7 +70,7 @@ class JoinedSyncResult(collections.namedtuple("JoinedSyncResult", [ class ArchivedSyncResult(collections.namedtuple("JoinedSyncResult", [ "room_id", # str "timeline", # TimelineBatch - "state", # list[FrozenEvent] + "state", # dict[(str, str), FrozenEvent] "private_user_data", ])): __slots__ = [] @@ -257,12 +257,11 @@ class SyncHandler(BaseHandler): current_state = yield self.state_handler.get_current_state( room_id ) - current_state_events = current_state.values() defer.returnValue(JoinedSyncResult( room_id=room_id, timeline=batch, - state=current_state_events, + state=current_state, ephemeral=ephemeral_by_room.get(room_id, []), private_user_data=self.private_user_data_for_room( room_id, tags_by_room @@ -361,7 +360,7 @@ class SyncHandler(BaseHandler): defer.returnValue(ArchivedSyncResult( room_id=room_id, timeline=batch, - state=leave_state[leave_event_id].values(), + state=leave_state[leave_event_id], private_user_data=self.private_user_data_for_room( room_id, tags_by_room ), @@ -440,7 +439,10 @@ class SyncHandler(BaseHandler): for room_id in joined_room_ids: recents = events_by_room_id.get(room_id, []) - state = [event for event in recents if event.is_state()] + state = { + (event.type, event.state_key): event + for event in recents if event.is_state()} + if recents: prev_batch = now_token.copy_and_replace( "room_key", recents[0].internal_metadata.before @@ -575,7 +577,6 @@ class SyncHandler(BaseHandler): current_state = yield self.state_handler.get_current_state( room_id ) - current_state_events = current_state.values() state_at_previous_sync = yield self.get_state_at_previous_sync( room_id, since_token=since_token @@ -584,7 +585,7 @@ class SyncHandler(BaseHandler): state_events_delta = yield self.compute_state_delta( since_token=since_token, previous_state=state_at_previous_sync, - current_state=current_state_events, + current_state=current_state, ) state_events_delta, _ = yield self.check_joined_room( @@ -632,7 +633,7 @@ class SyncHandler(BaseHandler): [leave_event.event_id], None ) - state_events_at_leave = leave_state[leave_event.event_id].values() + state_events_at_leave = leave_state[leave_event.event_id] state_at_previous_sync = yield self.get_state_at_previous_sync( leave_event.room_id, since_token=since_token @@ -661,7 +662,7 @@ class SyncHandler(BaseHandler): def get_state_at_previous_sync(self, room_id, since_token): """ Get the room state at the previous sync the client made. Returns: - A Deferred list of Events. + A Deferred map from ((type, state_key)->Event) """ last_events, token = yield self.store.get_recent_events_for_room( room_id, end_token=since_token.room_key, limit=1, @@ -673,11 +674,12 @@ class SyncHandler(BaseHandler): last_event ) if last_event.is_state(): - state = [last_event] + last_context.current_state.values() + state = last_context.current_state.copy() + state[(last_event.type, last_event.state_key)] = last_event else: - state = last_context.current_state.values() + state = last_context.current_state else: - state = () + state = {} defer.returnValue(state) def compute_state_delta(self, since_token, previous_state, current_state): @@ -685,21 +687,23 @@ class SyncHandler(BaseHandler): state the client got when it last performed a sync. :param str since_token: the point we are comparing against - :param list[synapse.events.FrozenEvent] previous_state: the state to - compare to - :param list[synapse.events.FrozenEvent] current_state: the new state + :param dict[(str,str), synapse.events.FrozenEvent] previous_state: the + state to compare to + :param dict[(str,str), synapse.events.FrozenEvent] current_state: the + new state - :returns: A list of events. + :returns A new event dictionary """ # TODO(mjark) Check if the state events were received by the server # after the previous sync, since we need to include those state # updates even if they occured logically before the previous event. # TODO(mjark) Check for new redactions in the state events. - previous_dict = {event.event_id: event for event in previous_state} - state_delta = [] - for event in current_state: - if event.event_id not in previous_dict: - state_delta.append(event) + + state_delta = {} + for key, event in current_state.iteritems(): + if (key not in previous_state or + previous_state[key].event_id != event.event_id): + state_delta[key] = event return state_delta @defer.inlineCallbacks @@ -708,21 +712,25 @@ class SyncHandler(BaseHandler): Check if the user has just joined the given room. If so, return the full state for the room, instead of the delta since the last sync. + :param sync_config: + :param room_id: + :param dict[(str,str), synapse.events.FrozenEvent] state_delta: the + difference in state since the last sync + :returns A deferred Tuple (state_delta, limited) """ joined = False limited = False - for event in state_delta: - if ( - event.type == EventTypes.Member - and event.state_key == sync_config.user.to_string() - ): - if event.content["membership"] == Membership.JOIN: - joined = True + + join_event = state_delta.get(( + EventTypes.Member, sync_config.user.to_string()), None) + if join_event is not None: + if join_event.content["membership"] == Membership.JOIN: + joined = True if joined: - res = yield self.state_handler.get_current_state(room_id) - state_delta = res.values() + state_delta = yield self.state_handler.get_current_state(room_id) + # the timeline is inherently limited if we've just joined limited = True defer.returnValue((state_delta, limited)) diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py index 997a61abbb..272a00bc85 100644 --- a/synapse/rest/client/v2_alpha/sync.py +++ b/synapse/rest/client/v2_alpha/sync.py @@ -256,7 +256,7 @@ class SyncRestServlet(RestServlet): :rtype: dict[str, object] """ event_map = {} - state_events = filter.filter_room_state(room.state) + state_events = filter.filter_room_state(room.state.values()) state_event_ids = [] for event in state_events: # TODO(mjark): Respect formatting requirements in the filter. From fddedd51d9ca44f385d412e8c2e3e8d99325ba67 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Tue, 10 Nov 2015 18:27:23 +0000 Subject: [PATCH 40/44] Fix a few race conditions in the state calculation Be a bit more careful about how we calculate the state to be returned by /sync. In a few places, it was possible for /sync to return slightly later state than that represented by the next_batch token and the timeline. In particular, the following cases were susceptible: * On a full state sync, for an active room * During a per-room incremental sync with a timeline gap * When the user has just joined a room. (Refactor check_joined_room to make it less magical) Also, use store.get_state_for_events() (and thus the existing stategroups) to calculate the state corresponding to a particular sync position, rather than state_handler.compute_event_context(), which recalculates from first principles (and tends to miss some state). Merged from PR https://github.com/matrix-org/synapse/pull/372 --- synapse/handlers/sync.py | 125 ++++++++++++++++++++------------------- synapse/storage/state.py | 14 +++++ 2 files changed, 78 insertions(+), 61 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 8b154fa7e7..6dc9d0fb92 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -254,9 +254,7 @@ class SyncHandler(BaseHandler): room_id, sync_config, now_token, since_token=timeline_since_token ) - current_state = yield self.state_handler.get_current_state( - room_id - ) + current_state = yield self.get_state_at(room_id, now_token) defer.returnValue(JoinedSyncResult( room_id=room_id, @@ -353,14 +351,12 @@ class SyncHandler(BaseHandler): room_id, sync_config, leave_token, since_token=timeline_since_token ) - leave_state = yield self.store.get_state_for_events( - [leave_event_id], None - ) + leave_state = yield self.store.get_state_for_event(leave_event_id) defer.returnValue(ArchivedSyncResult( room_id=room_id, timeline=batch, - state=leave_state[leave_event_id], + state=leave_state, private_user_data=self.private_user_data_for_room( room_id, tags_by_room ), @@ -424,6 +420,9 @@ class SyncHandler(BaseHandler): if len(room_events) <= timeline_limit: # There is no gap in any of the rooms. Therefore we can just # partition the new events by room and return them. + logger.debug("Got %i events for incremental sync - not limited", + len(room_events)) + invite_events = [] leave_events = [] events_by_room_id = {} @@ -439,9 +438,11 @@ class SyncHandler(BaseHandler): for room_id in joined_room_ids: recents = events_by_room_id.get(room_id, []) + logger.debug("Events for room %s: %r", room_id, recents) state = { (event.type, event.state_key): event for event in recents if event.is_state()} + limited = False if recents: prev_batch = now_token.copy_and_replace( @@ -450,9 +451,13 @@ class SyncHandler(BaseHandler): else: prev_batch = now_token - state, limited = yield self.check_joined_room( - sync_config, room_id, state - ) + just_joined = yield self.check_joined_room(sync_config, state) + if just_joined: + logger.debug("User has just joined %s: needs full state", + room_id) + state = yield self.get_state_at(room_id, now_token) + # the timeline is inherently limited if we've just joined + limited = True room_sync = JoinedSyncResult( room_id=room_id, @@ -467,10 +472,15 @@ class SyncHandler(BaseHandler): room_id, tags_by_room ), ) + logger.debug("Result for room %s: %r", room_id, room_sync) + if room_sync: joined.append(room_sync) else: + logger.debug("Got %i events for incremental sync - hit limit", + len(room_events)) + invite_events = yield self.store.get_invites_for_user( sync_config.user.to_string() ) @@ -563,6 +573,8 @@ class SyncHandler(BaseHandler): Returns: A Deferred JoinedSyncResult """ + logger.debug("Doing incremental sync for room %s between %s and %s", + room_id, since_token, now_token) # TODO(mjark): Check for redactions we might have missed. @@ -572,30 +584,26 @@ class SyncHandler(BaseHandler): logging.debug("Recents %r", batch) - # TODO(mjark): This seems racy since this isn't being passed a - # token to indicate what point in the stream this is - current_state = yield self.state_handler.get_current_state( - room_id + current_state = yield self.get_state_at(room_id, now_token) + + state_at_previous_sync = yield self.get_state_at( + room_id, stream_position=since_token ) - state_at_previous_sync = yield self.get_state_at_previous_sync( - room_id, since_token=since_token - ) - - state_events_delta = yield self.compute_state_delta( + state = yield self.compute_state_delta( since_token=since_token, previous_state=state_at_previous_sync, current_state=current_state, ) - state_events_delta, _ = yield self.check_joined_room( - sync_config, room_id, state_events_delta - ) + just_joined = yield self.check_joined_room(sync_config, state) + if just_joined: + state = yield self.get_state_at(room_id, now_token) room_sync = JoinedSyncResult( room_id=room_id, timeline=batch, - state=state_events_delta, + state=state, ephemeral=ephemeral_by_room.get(room_id, []), private_user_data=self.private_user_data_for_room( room_id, tags_by_room @@ -627,16 +635,12 @@ class SyncHandler(BaseHandler): logging.debug("Recents %r", batch) - # TODO(mjark): This seems racy since this isn't being passed a - # token to indicate what point in the stream this is - leave_state = yield self.store.get_state_for_events( - [leave_event.event_id], None + state_events_at_leave = yield self.store.get_state_for_event( + leave_event.event_id ) - state_events_at_leave = leave_state[leave_event.event_id] - - state_at_previous_sync = yield self.get_state_at_previous_sync( - leave_event.room_id, since_token=since_token + state_at_previous_sync = yield self.get_state_at( + leave_event.room_id, stream_position=since_token ) state_events_delta = yield self.compute_state_delta( @@ -659,26 +663,36 @@ class SyncHandler(BaseHandler): defer.returnValue(room_sync) @defer.inlineCallbacks - def get_state_at_previous_sync(self, room_id, since_token): - """ Get the room state at the previous sync the client made. - Returns: - A Deferred map from ((type, state_key)->Event) + def get_state_after_event(self, event): + """ + Get the room state after the given event + + :param synapse.events.EventBase event: event of interest + :return: A Deferred map from ((type, state_key)->Event) + """ + state = yield self.store.get_state_for_event(event.event_id) + if event.is_state(): + state = state.copy() + state[(event.type, event.state_key)] = event + defer.returnValue(state) + + @defer.inlineCallbacks + def get_state_at(self, room_id, stream_position): + """ Get the room state at a particular stream position + :param str room_id: room for which to get state + :param StreamToken stream_position: point at which to get state + :returns: A Deferred map from ((type, state_key)->Event) """ last_events, token = yield self.store.get_recent_events_for_room( - room_id, end_token=since_token.room_key, limit=1, + room_id, end_token=stream_position.room_key, limit=1, ) if last_events: - last_event = last_events[0] - last_context = yield self.state_handler.compute_event_context( - last_event - ) - if last_event.is_state(): - state = last_context.current_state.copy() - state[(last_event.type, last_event.state_key)] = last_event - else: - state = last_context.current_state + last_event = last_events[-1] + state = yield self.get_state_after_event(last_event) + else: + # no events in this room - so presumably no state state = {} defer.returnValue(state) @@ -706,31 +720,20 @@ class SyncHandler(BaseHandler): state_delta[key] = event return state_delta - @defer.inlineCallbacks - def check_joined_room(self, sync_config, room_id, state_delta): + def check_joined_room(self, sync_config, state_delta): """ - Check if the user has just joined the given room. If so, return the - full state for the room, instead of the delta since the last sync. + Check if the user has just joined the given room (so should + be given the full state) :param sync_config: - :param room_id: :param dict[(str,str), synapse.events.FrozenEvent] state_delta: the difference in state since the last sync :returns A deferred Tuple (state_delta, limited) """ - joined = False - limited = False - join_event = state_delta.get(( EventTypes.Member, sync_config.user.to_string()), None) if join_event is not None: if join_event.content["membership"] == Membership.JOIN: - joined = True - - if joined: - state_delta = yield self.state_handler.get_current_state(room_id) - # the timeline is inherently limited if we've just joined - limited = True - - defer.returnValue((state_delta, limited)) + return True + return False diff --git a/synapse/storage/state.py b/synapse/storage/state.py index acfb322a53..80e9b63f50 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -237,6 +237,20 @@ class StateStore(SQLBaseStore): defer.returnValue({event: event_to_state[event] for event in event_ids}) + @defer.inlineCallbacks + def get_state_for_event(self, event_id, types=None): + """ + Get the state dict corresponding to a particular event + + :param str event_id: event whose state should be returned + :param list[(str, str)]|None types: List of (type, state_key) tuples + which are used to filter the state fetched. May be None, which + matches any key + :return: a deferred dict from (type, state_key) -> state_event + """ + state_map = yield self.get_state_for_events([event_id], types) + defer.returnValue(state_map[event_id]) + @cached(num_args=2, lru=True, max_entries=10000) def _get_state_group_for_event(self, room_id, event_id): return self._simple_select_one_onecol( From e4d622aaaf0df503f942d016a5bf798dd52899d1 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Tue, 10 Nov 2015 18:29:25 +0000 Subject: [PATCH 41/44] Implementation of state rollback in /sync Implementation of SPEC-254: roll back the state dictionary to how it looked at the start of the timeline. Merged PR https://github.com/matrix-org/synapse/pull/373 --- synapse/rest/client/v2_alpha/sync.py | 67 +++++++++++++++++++++++++++- synapse/storage/events.py | 6 ++- 2 files changed, 69 insertions(+), 4 deletions(-) diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py index 272a00bc85..efd8281558 100644 --- a/synapse/rest/client/v2_alpha/sync.py +++ b/synapse/rest/client/v2_alpha/sync.py @@ -20,6 +20,7 @@ from synapse.http.servlet import ( ) from synapse.handlers.sync import SyncConfig from synapse.types import StreamToken +from synapse.events import FrozenEvent from synapse.events.utils import ( serialize_event, format_event_for_client_v2_without_event_id, ) @@ -256,7 +257,13 @@ class SyncRestServlet(RestServlet): :rtype: dict[str, object] """ event_map = {} - state_events = filter.filter_room_state(room.state.values()) + state_dict = room.state + timeline_events = filter.filter_room_timeline(room.timeline.events) + + state_dict = SyncRestServlet._rollback_state_for_timeline( + state_dict, timeline_events) + + state_events = filter.filter_room_state(state_dict.values()) state_event_ids = [] for event in state_events: # TODO(mjark): Respect formatting requirements in the filter. @@ -266,7 +273,6 @@ class SyncRestServlet(RestServlet): ) state_event_ids.append(event.event_id) - timeline_events = filter.filter_room_timeline(room.timeline.events) timeline_event_ids = [] for event in timeline_events: # TODO(mjark): Respect formatting requirements in the filter. @@ -297,6 +303,63 @@ class SyncRestServlet(RestServlet): return result + @staticmethod + def _rollback_state_for_timeline(state, timeline): + """ + Wind the state dictionary backwards, so that it represents the + state at the start of the timeline, rather than at the end. + + :param dict[(str, str), synapse.events.EventBase] state: the + state dictionary. Will be updated to the state before the timeline. + :param list[synapse.events.EventBase] timeline: the event timeline + :return: updated state dictionary + """ + logger.debug("Processing state dict %r; timeline %r", state, + [e.get_dict() for e in timeline]) + + result = state.copy() + + for timeline_event in reversed(timeline): + if not timeline_event.is_state(): + continue + + event_key = (timeline_event.type, timeline_event.state_key) + + logger.debug("Considering %s for removal", event_key) + + state_event = result.get(event_key) + if (state_event is None or + state_event.event_id != timeline_event.event_id): + # the event in the timeline isn't present in the state + # dictionary. + # + # the most likely cause for this is that there was a fork in + # the event graph, and the state is no longer valid. Really, + # the event shouldn't be in the timeline. We're going to ignore + # it for now, however. + logger.warn("Found state event %r in timeline which doesn't " + "match state dictionary", timeline_event) + continue + + prev_event_id = timeline_event.unsigned.get("replaces_state", None) + logger.debug("Replacing %s with %s in state dict", + timeline_event.event_id, prev_event_id) + + if prev_event_id is None: + del result[event_key] + else: + result[event_key] = FrozenEvent({ + "type": timeline_event.type, + "state_key": timeline_event.state_key, + "content": timeline_event.unsigned['prev_content'], + "sender": timeline_event.unsigned['prev_sender'], + "event_id": prev_event_id, + "room_id": timeline_event.room_id, + }) + logger.debug("New value: %r", result.get(event_key)) + + return result + def register_servlets(hs, http_server): SyncRestServlet(hs).register(http_server) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 4a365ff639..5d35ca90b9 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -831,7 +831,8 @@ class EventsStore(SQLBaseStore): allow_none=True, ) if prev: - ev.unsigned["prev_content"] = prev.get_dict()["content"] + ev.unsigned["prev_content"] = prev.content + ev.unsigned["prev_sender"] = prev.sender self._get_event_cache.prefill( (ev.event_id, check_redacted, get_prev_content), ev @@ -888,7 +889,8 @@ class EventsStore(SQLBaseStore): get_prev_content=False, ) if prev: - ev.unsigned["prev_content"] = prev.get_dict()["content"] + ev.unsigned["prev_content"] = prev.content + ev.unsigned["prev_sender"] = prev.sender self._get_event_cache.prefill( (ev.event_id, check_redacted, get_prev_content), ev From 9c3f4f8dfd20ee129615ed992c755c802217e6ee Mon Sep 17 00:00:00 2001 From: Daniel Wagner-Hall Date: Fri, 13 Nov 2015 11:56:58 +0000 Subject: [PATCH 42/44] Allow guests to /room/:room_id/{join,leave} --- synapse/rest/client/v1/room.py | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py index d298aee3ab..139dac1cc3 100644 --- a/synapse/rest/client/v1/room.py +++ b/synapse/rest/client/v1/room.py @@ -17,7 +17,7 @@ from twisted.internet import defer from base import ClientV1RestServlet, client_path_pattern -from synapse.api.errors import SynapseError, Codes +from synapse.api.errors import SynapseError, Codes, AuthError from synapse.streams.config import PaginationConfig from synapse.api.constants import EventTypes, Membership from synapse.types import UserID, RoomID, RoomAlias @@ -453,7 +453,13 @@ class RoomMembershipRestServlet(ClientV1RestServlet): @defer.inlineCallbacks def on_POST(self, request, room_id, membership_action, txn_id=None): - user, token_id, _ = yield self.auth.get_user_by_req(request) + user, token_id, is_guest = yield self.auth.get_user_by_req( + request, + allow_guest=True + ) + + if is_guest and membership_action not in {Membership.JOIN, Membership.LEAVE}: + raise AuthError(403, "Guest access not allowed") content = _parse_json(request) @@ -486,16 +492,21 @@ class RoomMembershipRestServlet(ClientV1RestServlet): msg_handler = self.handlers.message_handler + content = {"membership": unicode(membership_action)} + if is_guest: + content["kind"] = "guest" + yield msg_handler.create_and_send_event( { "type": EventTypes.Member, - "content": {"membership": unicode(membership_action)}, + "content": content, "room_id": room_id, "sender": user.to_string(), "state_key": state_key, }, token_id=token_id, txn_id=txn_id, + is_guest=is_guest, ) defer.returnValue((200, {})) From 233af7c74bff9815a282bdd43d9979b081189544 Mon Sep 17 00:00:00 2001 From: Daniel Wagner-Hall Date: Fri, 13 Nov 2015 16:22:51 +0000 Subject: [PATCH 43/44] Pull out jenkins script into a checked in script --- MANIFEST.in | 2 ++ jenkins.sh | 4 ++++ tox.ini | 4 +++- 3 files changed, 9 insertions(+), 1 deletion(-) create mode 100755 jenkins.sh diff --git a/MANIFEST.in b/MANIFEST.in index 573e21f41d..5668665db7 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -20,4 +20,6 @@ recursive-include synapse/static *.gif recursive-include synapse/static *.html recursive-include synapse/static *.js +exclude jenkins.sh + prune demo/etc diff --git a/jenkins.sh b/jenkins.sh new file mode 100755 index 0000000000..2680d16b42 --- /dev/null +++ b/jenkins.sh @@ -0,0 +1,4 @@ +#!/bin/bash -eu + +export PYTHONDONTWRITEBYTECODE=yep +TOXSUFFIX="--reporter=subunit | subunit-1to2 | subunit2junitxml --no-passthrough --output-to=results.xml" tox diff --git a/tox.ini b/tox.ini index 01b23e6bd9..cf54ebb0ac 100644 --- a/tox.ini +++ b/tox.ini @@ -6,10 +6,12 @@ deps = coverage Twisted>=15.1 mock + python-subunit + junitxml setenv = PYTHONDONTWRITEBYTECODE = no_byte_code commands = - coverage run --source=synapse {envbindir}/trial {posargs:tests} + /bin/bash -c "coverage run --source=synapse {envbindir}/trial {posargs:tests} {env:TOXSUFFIX:}" coverage report -m [testenv:packaging] From aca6e5bf466735b0e9c9f9815ebf0c60332d0033 Mon Sep 17 00:00:00 2001 From: "Paul \"LeoNerd\" Evans" Date: Fri, 13 Nov 2015 17:27:25 +0000 Subject: [PATCH 44/44] Don't complain if /make_join response lacks 'prev_state' list (SYN-517) --- synapse/federation/federation_client.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index d4f586fae7..c6a8c1249a 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -401,6 +401,12 @@ class FederationClient(FederationBase): pdu_dict["content"].update(content) + # The protoevent received over the JSON wire may not have all + # the required fields. Lets just gloss over that because + # there's some we never care about + if "prev_state" not in pdu_dict: + pdu_dict["prev_state"] = [] + defer.returnValue( (destination, self.event_from_pdu_json(pdu_dict)) )