diff --git a/synapse/storage/databases/main/account_data.py b/synapse/storage/databases/main/account_data.py index 0df12e6380..96e1e5e45b 100644 --- a/synapse/storage/databases/main/account_data.py +++ b/synapse/storage/databases/main/account_data.py @@ -469,9 +469,9 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore) content_json = json_encoder.encode(content) - async with self._account_data_id_gen.get_next() as next_id: - await self.db_pool.simple_upsert( - desc="add_room_account_data", + def _add_account_data_to_room(txn: LoggingTransaction, next_id: int) -> None: + self.db_pool.simple_upsert_txn( + txn, table="room_account_data", keyvalues={ "user_id": user_id, @@ -481,6 +481,18 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore) values={"stream_id": next_id, "content": content_json}, ) + # Clear any previous record that this user account data type was deleted. + self._remove_entries_from_account_data_undelivered_deletes_for_type_txn( + txn, account_data_type, room_id, user_id + ) + + async with self._account_data_id_gen.get_next() as next_id: + await self.db_pool.runInteraction( + "add_account_data_to_room", + _add_account_data_to_room, + next_id, + ) + self._account_data_stream_cache.entity_has_changed(user_id, next_id) self.get_account_data_for_user.invalidate((user_id,)) self.get_account_data_for_room.invalidate((user_id, room_id)) @@ -620,6 +632,11 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore) values={"stream_id": next_id, "content": content_json}, ) + # Clear any previous record that this user account data type was deleted. + self._remove_entries_from_account_data_undelivered_deletes_for_type_txn( + txn, account_data_type, room_id=None, user_id=user_id + ) + # Ignored users get denormalized into a separate table as an optimisation. if account_data_type != AccountDataTypes.IGNORED_USER_LIST: return @@ -821,6 +838,40 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore) ), ) + def _remove_entries_from_account_data_undelivered_deletes_for_type_txn( + self, + txn: LoggingTransaction, + account_data_type: str, + room_id: Optional[str], + user_id: str, + ) -> None: + """ + Removes all entries from the 'account_data_undelivered_deletes' table for a given + {user,room} account data entry. + + This should be called when adding/updating an account data entry, as the entry + will no longer be in a deleted state. + + Args: + txn: The transaction that is handling the addition/modification to the + relevant account data type. + account_data_type: The type of {room,user} account data that was modified. + room_id: The ID of the room if this refers to room account data, otherwise + this should be None. + user_id: The ID of the user this account data is related to. + """ + # Remove all entries pertaining to this account data type as it is + # no longer deleted! + self.db_pool.simple_delete_txn( + txn, + table="account_data_undelivered_deletes", + keyvalues={ + "type": account_data_type, + "room_id": room_id, + "user_id": user_id, + }, + ) + async def purge_account_data_for_user(self, user_id: str) -> None: """ Removes ALL the account data for a user.