diff --git a/synapse/storage/databases/main/__init__.py b/synapse/storage/databases/main/__init__.py index dc3948c170..837dc7646e 100644 --- a/synapse/storage/databases/main/__init__.py +++ b/synapse/storage/databases/main/__init__.py @@ -43,7 +43,7 @@ from .event_federation import EventFederationStore from .event_push_actions import EventPushActionsStore from .events_bg_updates import EventsBackgroundUpdatesStore from .events_forward_extremities import EventForwardExtremitiesStore -from .filtering import FilteringWorkerStore +from .filtering import FilteringStore from .keys import KeyStore from .lock import LockStore from .media_repository import MediaRepositoryStore @@ -99,7 +99,7 @@ class DataStore( EventFederationStore, MediaRepositoryStore, RejectionsStore, - FilteringWorkerStore, + FilteringStore, PusherStore, PushRuleStore, ApplicationServiceTransactionStore, diff --git a/synapse/storage/databases/main/filtering.py b/synapse/storage/databases/main/filtering.py index 8e57c8e5a0..88be0f5f2f 100644 --- a/synapse/storage/databases/main/filtering.py +++ b/synapse/storage/databases/main/filtering.py @@ -13,16 +13,23 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import Optional, Tuple, Union, cast +from typing import TYPE_CHECKING, Optional, Tuple, Union, cast from canonicaljson import encode_canonical_json from synapse.api.errors import Codes, StoreError, SynapseError from synapse.storage._base import SQLBaseStore, db_to_json -from synapse.storage.database import LoggingTransaction +from synapse.storage.database import ( + DatabasePool, + LoggingDatabaseConnection, + LoggingTransaction, +) from synapse.types import JsonDict from synapse.util.caches.descriptors import cached +if TYPE_CHECKING: + from synapse.server import HomeServer + class FilteringWorkerStore(SQLBaseStore): @cached(num_args=2) @@ -97,3 +104,67 @@ class FilteringWorkerStore(SQLBaseStore): if attempts >= 5: raise StoreError(500, "Couldn't generate a filter ID.") + + +class FilteringBackgroundUpdateStore(FilteringWorkerStore): + POPULATE_USER_FILTERS_FULL_USER_ID = "populate_user_filters_full_user_id" + + def __init__( + self, + database: DatabasePool, + db_conn: LoggingDatabaseConnection, + hs: "HomeServer", + ): + super().__init__(database, db_conn, hs) + + self.db_pool.updates.register_background_update_handler( + self.POPULATE_USER_FILTERS_FULL_USER_ID, + self._populate_user_filters_full_user_id, + ) + + async def _populate_user_filters_full_user_id( + self, progress: JsonDict, batch_size: int + ) -> int: + """Populates the `user_filters.full_user_id` column. + + In a future Synapse version, this column will be renamed to `user_id`, replacing + the existing `user_id` column. + + Note that completion of this background update does not imply that there are no + longer any `NULL` values in `full_user_id`. Until the old `user_id` column has + been removed, Synapse may be rolled back to a previous version which does not + populate `full_user_id` after the background update has finished. + """ + + def _populate_user_filters_full_user_id_txn( + txn: LoggingTransaction, + ) -> bool: + sql = """ + UPDATE user_filters + SET full_user_id = '@' || user_id || ':' || ? + WHERE user_id IN ( + SELECT user_id + FROM user_filters + WHERE full_user_id IS NULL + LIMIT ? + ) + """ + txn.execute(sql, (self.hs.hostname, batch_size)) + + return txn.rowcount == 0 + + finished = await self.db_pool.runInteraction( + "_populate_user_filters_full_user_id_txn", + _populate_user_filters_full_user_id_txn, + ) + + if finished: + await self.db_pool.updates._end_background_update( + self.POPULATE_USER_FILTERS_FULL_USER_ID + ) + + return batch_size + + +class FilteringStore(FilteringBackgroundUpdateStore): + pass diff --git a/synapse/storage/schema/main/delta/75/02_add_user_filters_full_user_id_column.sql b/synapse/storage/schema/main/delta/75/02_add_user_filters_full_user_id_column.sql index cfe4e7cb00..ba993d95c4 100644 --- a/synapse/storage/schema/main/delta/75/02_add_user_filters_full_user_id_column.sql +++ b/synapse/storage/schema/main/delta/75/02_add_user_filters_full_user_id_column.sql @@ -21,3 +21,6 @@ CREATE UNIQUE INDEX full_user_filters_unique ON user_filters (full_user_id, filt -- NB: This will lock the table for writes while the index is being built. -- There are around 4,000,000 user_filters on matrix.org so we expect this to take -- a couple of seconds at most. + +INSERT INTO background_updates (ordering, update_name, progress_json) VALUES + (7502, 'populate_user_filters_full_user_id', '{}');