diff --git a/synapse/handlers/account_data.py b/synapse/handlers/account_data.py index 543e575b10..aba7315cf7 100644 --- a/synapse/handlers/account_data.py +++ b/synapse/handlers/account_data.py @@ -133,6 +133,50 @@ class AccountDataHandler: ) return response["max_stream_id"] + async def remove_account_data_for_room( + self, user_id: str, room_id: str, account_data_type: str + ) -> Optional[int]: + """ + Deletes the room account data for the given user and account data type. + + "Deleting" account data merely means setting the content of the account data + to an empty JSON object: {}. + + Args: + user_id: The user ID to remove room account data for. + room_id: The room ID to target. + account_data_type: The account data type to remove. + + Returns: + The maximum stream ID, or None if the room account data item did not exist. + """ + if self._instance_name in self._account_data_writers: + max_stream_id = await self._store.remove_account_data_for_room( + user_id, room_id, account_data_type + ) + if max_stream_id is None: + # The referenced account data did not exist, so no delete occurred. + return None + + self._notifier.on_new_event( + StreamKeyType.ACCOUNT_DATA, max_stream_id, users=[user_id] + ) + + # Notify Synapse modules that the content of the type has changed to an + # empty dictionary. + await self._notify_modules(user_id, room_id, account_data_type, {}) + + return max_stream_id + else: + response = await self._remove_room_data_client( + instance_name=random.choice(self._account_data_writers), + user_id=user_id, + room_id=room_id, + account_data_type=account_data_type, + content={}, + ) + return response["max_stream_id"] + async def add_account_data_for_user( self, user_id: str, account_data_type: str, content: JsonDict ) -> int: @@ -168,6 +212,45 @@ class AccountDataHandler: ) return response["max_stream_id"] + async def remove_account_data_for_user( + self, user_id: str, account_data_type: str + ) -> Optional[int]: + """Removes a piece of global account_data for a user. + + Args: + user_id: The user to remove account data for. + account_data_type: The type of account_data to remove. + + Returns: + The maximum stream ID, or None if the room account data item did not exist. + """ + + if self._instance_name in self._account_data_writers: + max_stream_id = await self._store.remove_account_data_for_user( + user_id, account_data_type + ) + if max_stream_id is None: + # The referenced account data did not exist, so no delete occurred. + return None + + self._notifier.on_new_event( + StreamKeyType.ACCOUNT_DATA, max_stream_id, users=[user_id] + ) + + # Notify Synapse modules that the content of the type has changed to an + # empty dictionary. + await self._notify_modules(user_id, None, account_data_type, {}) + + return max_stream_id + else: + response = await self._remove_user_data_client( + instance_name=random.choice(self._account_data_writers), + user_id=user_id, + account_data_type=account_data_type, + content={}, + ) + return response["max_stream_id"] + async def add_tag_to_room( self, user_id: str, room_id: str, tag: str, content: JsonDict ) -> int: diff --git a/synapse/rest/client/account_data.py b/synapse/rest/client/account_data.py index f13970b898..6c1470f464 100644 --- a/synapse/rest/client/account_data.py +++ b/synapse/rest/client/account_data.py @@ -75,6 +75,41 @@ class AccountDataServlet(RestServlet): return 200, event +class UnstableAccountDataServlet(RestServlet): + """ + Contains an unstable endpoint for removing user account data, as specified by + MSC3391. If that MSC is accepted, this code should have unstable prefixes removed + and become incorporated into AccountDataServlet above. + """ + + PATTERNS = client_patterns( + "/org.matrix.msc3391/user/(?P[^/]*)" + "/account_data/(?P[^/]*)", + unstable=True, + releases=(), + ) + + def __init__(self, hs: "HomeServer"): + super().__init__() + self.auth = hs.get_auth() + self.store = hs.get_datastores().main + self.handler = hs.get_account_data_handler() + + async def on_DELETE( + self, + request: SynapseRequest, + user_id: str, + account_data_type: str, + ) -> Tuple[int, JsonDict]: + requester = await self.auth.get_user_by_req(request) + if user_id != requester.user.to_string(): + raise AuthError(403, "Cannot delete account data for other users.") + + await self.handler.remove_account_data_for_user(user_id, account_data_type) + + return 200, {} + + class RoomAccountDataServlet(RestServlet): """ PUT /user/{user_id}/rooms/{room_id}/account_data/{account_dataType} HTTP/1.1 @@ -155,6 +190,56 @@ class RoomAccountDataServlet(RestServlet): return 200, event +class UnstableRoomAccountDataServlet(RestServlet): + """ + Contains an unstable endpoint for removing room account data, as specified by + MSC3391. If that MSC is accepted, this code should have unstable prefixes removed + and become incorporated into RoomAccountDataServlet above. + """ + + PATTERNS = client_patterns( + "/org.matrix.msc3391/user/(?P[^/]*)" + "/rooms/(?P[^/]*)" + "/account_data/(?P[^/]*)", + unstable=True, + releases=(), + ) + + def __init__(self, hs: "HomeServer"): + super().__init__() + self.auth = hs.get_auth() + self.store = hs.get_datastores().main + self.handler = hs.get_account_data_handler() + + async def on_DELETE( + self, + request: SynapseRequest, + user_id: str, + room_id: str, + account_data_type: str, + ) -> Tuple[int, JsonDict]: + requester = await self.auth.get_user_by_req(request) + if user_id != requester.user.to_string(): + raise AuthError(403, "Cannot delete account data for other users.") + + if not RoomID.is_valid(room_id): + raise SynapseError( + 400, + f"{room_id} is not a valid room ID", + Codes.INVALID_PARAM, + ) + + await self.handler.remove_account_data_for_room( + user_id, room_id, account_data_type + ) + + return 200, {} + + def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None: AccountDataServlet(hs).register(http_server) RoomAccountDataServlet(hs).register(http_server) + + if hs.config.experimental.msc3391_enabled: + UnstableAccountDataServlet(hs).register(http_server) + UnstableRoomAccountDataServlet(hs).register(http_server) diff --git a/synapse/storage/database.py b/synapse/storage/database.py index 0b29e67b94..97beaa2297 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py @@ -1898,6 +1898,19 @@ class DatabasePool: updatevalues: Dict[str, Any], desc: str, ) -> int: + """ + Update rows in the given database table. + If the given keyvalues don't match anything, nothing will be updated. + + Args: + table: The database table to update. + keyvalues: A mapping of column name to value to match rows on. + updatevalues: A mapping of column name to value to replace in any matched rows. + desc: description of the transaction, for logging and metrics. + + Returns: + The number of rows that were updated. Will be 0 if no matching rows were found. + """ return await self.runInteraction( desc, self.simple_update_txn, table, keyvalues, updatevalues ) @@ -1909,6 +1922,19 @@ class DatabasePool: keyvalues: Dict[str, Any], updatevalues: Dict[str, Any], ) -> int: + """ + Update rows in the given database table. + If the given keyvalues don't match anything, nothing will be updated. + + Args: + txn: The database transaction object. + table: The database table to update. + keyvalues: A mapping of column name to value to match rows on. + updatevalues: A mapping of column name to value to replace in any matched rows. + + Returns: + The number of rows that were updated. Will be 0 if no matching rows were found. + """ if keyvalues: where = "WHERE %s" % " AND ".join("%s = ?" % k for k in keyvalues.keys()) else: diff --git a/synapse/storage/databases/main/account_data.py b/synapse/storage/databases/main/account_data.py index 07908c41d9..a11be64755 100644 --- a/synapse/storage/databases/main/account_data.py +++ b/synapse/storage/databases/main/account_data.py @@ -469,6 +469,74 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore) return self._account_data_id_gen.get_current_token() + async def remove_account_data_for_room( + self, user_id: str, room_id: str, account_data_type: str + ) -> Optional[int]: + """Delete the room account data for the user of a given type. + + Args: + user_id: The user to remove account_data for. + room_id: The room ID to scope the request to. + account_data_type: The account data type to delete. + + Returns: + The maximum stream position, or None if there was no matching room account + data to delete. + """ + assert self._can_write_to_account_data + assert isinstance(self._account_data_id_gen, AbstractStreamIdGenerator) + + def _remove_account_data_for_room_txn( + txn: LoggingTransaction, next_id: int + ) -> bool: + """ + Args: + txn: The transaction object. + next_id: The stream_id to update any existing rows to. + + Returns: + True if an entry in room_account_data had its content set to '{}', + otherwise False. This informs callers of whether there actually was an + existing room account data entry to delete, or if the call was a no-op. + """ + sql = """ + UPDATE room_account_data + SET stream_id = ?, content = '{}' + WHERE user_id = ? + AND room_id = ? + AND account_data_type = ? + AND content != '{}' + """ + txn.execute( + sql, + (next_id, user_id, room_id, account_data_type), + ) + if txn.rowcount == 0: + # We didn't update any rows. This means that there was no matching room + # account data entry to delete in the first place. + return False + + return True + + async with self._account_data_id_gen.get_next() as next_id: + row_updated = await self.db_pool.runInteraction( + "remove_account_data_for_room", + _remove_account_data_for_room_txn, + next_id, + ) + + if not row_updated: + return None + + self._account_data_stream_cache.entity_has_changed(user_id, next_id) + self.get_account_data_for_user.invalidate((user_id,)) + self.get_account_data_for_room.invalidate((user_id, room_id)) + self.get_account_data_for_room_and_type.prefill( + (user_id, room_id, account_data_type), {} + ) + + return self._account_data_id_gen.get_current_token() + async def add_account_data_for_user( self, user_id: str, account_data_type: str, content: JsonDict ) -> int: @@ -569,6 +637,106 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore) self._invalidate_cache_and_stream(txn, self.ignored_by, (ignored_user_id,)) self._invalidate_cache_and_stream(txn, self.ignored_users, (user_id,)) + async def remove_account_data_for_user( + self, + user_id: str, + account_data_type: str, + ) -> Optional[int]: + """ + Delete a single piece of user account data by type. + + A "delete" is performed by updating a potentially existing row in the + "account_data" database table for (user_id, account_data_type) and + setting its content to "{}". + + Args: + user_id: The user ID to modify the account data of. + account_data_type: The type to remove. + + Returns: + The maximum stream position, or None if there was no matching account data + to delete. + """ + assert self._can_write_to_account_data + assert isinstance(self._account_data_id_gen, AbstractStreamIdGenerator) + + def _remove_account_data_for_user_txn( + txn: LoggingTransaction, next_id: int + ) -> bool: + """ + Args: + txn: The transaction object. + next_id: The stream_id to update any existing rows to. + + Returns: + True if an entry in account_data had its content set to '{}', otherwise + False. This informs callers of whether there actually was an existing + account data entry to delete, or if the call was a no-op. + """ + sql = """ + UPDATE account_data + SET stream_id = ?, content = '{}' + WHERE user_id = ? + AND account_data_type = ? + AND content != '{}' + """ + txn.execute(sql, (next_id, user_id, account_data_type)) + if txn.rowcount == 0: + # We didn't update any rows. This means that there was no matching room + # account data entry to delete in the first place. + return False + + # Ignored users get denormalized into a separate table as an optimisation. + if account_data_type == AccountDataTypes.IGNORED_USER_LIST: + # If this method was called with the ignored users account data type, we + # simply delete all ignored users. + + # First pull all the users that this user ignores. + previously_ignored_users = set( + self.db_pool.simple_select_onecol_txn( + txn, + table="ignored_users", + keyvalues={"ignorer_user_id": user_id}, + retcol="ignored_user_id", + ) + ) + + # Then delete them from the database. + self.db_pool.simple_delete_txn( + txn, + table="ignored_users", + keyvalues={"ignorer_user_id": user_id}, + ) + + # Invalidate the cache for ignored users which were removed. + for ignored_user_id in previously_ignored_users: + self._invalidate_cache_and_stream( + txn, self.ignored_by, (ignored_user_id,) + ) + + # Invalidate for this user the cache tracking ignored users. + self._invalidate_cache_and_stream(txn, self.ignored_users, (user_id,)) + + return True + + async with self._account_data_id_gen.get_next() as next_id: + row_updated = await self.db_pool.runInteraction( + "remove_account_data_for_user", + _remove_account_data_for_user_txn, + next_id, + ) + + if not row_updated: + return None + + self._account_data_stream_cache.entity_has_changed(user_id, next_id) + self.get_account_data_for_user.invalidate((user_id,)) + self.get_global_account_data_by_type_for_user.prefill( + (user_id, account_data_type), {} + ) + + return self._account_data_id_gen.get_current_token() + async def purge_account_data_for_user(self, user_id: str) -> None: """ Removes ALL the account data for a user.