Add background update to populate user_filters.full_user_id

Signed-off-by: Sean Quah <seanq@matrix.org>
This commit is contained in:
Sean Quah 2023-04-15 01:23:01 +01:00
parent cc90467096
commit 0a734d0cf2
3 changed files with 78 additions and 4 deletions

View file

@ -43,7 +43,7 @@ from .event_federation import EventFederationStore
from .event_push_actions import EventPushActionsStore
from .events_bg_updates import EventsBackgroundUpdatesStore
from .events_forward_extremities import EventForwardExtremitiesStore
from .filtering import FilteringWorkerStore
from .filtering import FilteringStore
from .keys import KeyStore
from .lock import LockStore
from .media_repository import MediaRepositoryStore
@ -99,7 +99,7 @@ class DataStore(
EventFederationStore,
MediaRepositoryStore,
RejectionsStore,
FilteringWorkerStore,
FilteringStore,
PusherStore,
PushRuleStore,
ApplicationServiceTransactionStore,

View file

@ -13,16 +13,23 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Optional, Tuple, Union, cast
from typing import TYPE_CHECKING, Optional, Tuple, Union, cast
from canonicaljson import encode_canonical_json
from synapse.api.errors import Codes, StoreError, SynapseError
from synapse.storage._base import SQLBaseStore, db_to_json
from synapse.storage.database import LoggingTransaction
from synapse.storage.database import (
DatabasePool,
LoggingDatabaseConnection,
LoggingTransaction,
)
from synapse.types import JsonDict
from synapse.util.caches.descriptors import cached
if TYPE_CHECKING:
from synapse.server import HomeServer
class FilteringWorkerStore(SQLBaseStore):
@cached(num_args=2)
@ -97,3 +104,67 @@ class FilteringWorkerStore(SQLBaseStore):
if attempts >= 5:
raise StoreError(500, "Couldn't generate a filter ID.")
class FilteringBackgroundUpdateStore(FilteringWorkerStore):
POPULATE_USER_FILTERS_FULL_USER_ID = "populate_user_filters_full_user_id"
def __init__(
self,
database: DatabasePool,
db_conn: LoggingDatabaseConnection,
hs: "HomeServer",
):
super().__init__(database, db_conn, hs)
self.db_pool.updates.register_background_update_handler(
self.POPULATE_USER_FILTERS_FULL_USER_ID,
self._populate_user_filters_full_user_id,
)
async def _populate_user_filters_full_user_id(
self, progress: JsonDict, batch_size: int
) -> int:
"""Populates the `user_filters.full_user_id` column.
In a future Synapse version, this column will be renamed to `user_id`, replacing
the existing `user_id` column.
Note that completion of this background update does not imply that there are no
longer any `NULL` values in `full_user_id`. Until the old `user_id` column has
been removed, Synapse may be rolled back to a previous version which does not
populate `full_user_id` after the background update has finished.
"""
def _populate_user_filters_full_user_id_txn(
txn: LoggingTransaction,
) -> bool:
sql = """
UPDATE user_filters
SET full_user_id = '@' || user_id || ':' || ?
WHERE user_id IN (
SELECT user_id
FROM user_filters
WHERE full_user_id IS NULL
LIMIT ?
)
"""
txn.execute(sql, (self.hs.hostname, batch_size))
return txn.rowcount == 0
finished = await self.db_pool.runInteraction(
"_populate_user_filters_full_user_id_txn",
_populate_user_filters_full_user_id_txn,
)
if finished:
await self.db_pool.updates._end_background_update(
self.POPULATE_USER_FILTERS_FULL_USER_ID
)
return batch_size
class FilteringStore(FilteringBackgroundUpdateStore):
pass

View file

@ -21,3 +21,6 @@ CREATE UNIQUE INDEX full_user_filters_unique ON user_filters (full_user_id, filt
-- NB: This will lock the table for writes while the index is being built.
-- There are around 4,000,000 user_filters on matrix.org so we expect this to take
-- a couple of seconds at most.
INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
(7502, 'populate_user_filters_full_user_id', '{}');