Set thread_id column to non-null for event_push_{actions,actions_staging,summary} (#15437)

Updates the database schema to require a thread_id (by adding a
constraint that the column is non-null) for event_push_actions,
event_push_actions_staging, and event_push_actions_summary.

For PostgreSQL we add the constraint as NOT VALID, then
VALIDATE the constraint a background job to avoid locking
the table during an upgrade.

For SQLite we simply rebuild the table & copy the data.
This commit is contained in:
Patrick Cloke 2023-05-03 07:49:03 -04:00 committed by GitHub
parent 04e79e6a18
commit a7b3e9ce65
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 225 additions and 234 deletions

1
changelog.d/15437.misc Normal file
View file

@ -0,0 +1 @@
Make the `thread_id` column on `event_push_actions`, `event_push_actions_staging`, and `event_push_summary` non-null.

View file

@ -561,6 +561,50 @@ class BackgroundUpdater:
updater, oneshot=True updater, oneshot=True
) )
def register_background_validate_constraint(
self, update_name: str, constraint_name: str, table: str
) -> None:
"""Helper for store classes to do a background validate constraint.
This only applies on PostgreSQL.
To use:
1. use a schema delta file to add a background update. Example:
INSERT INTO background_updates (update_name, progress_json) VALUES
('validate_my_constraint', '{}');
2. In the Store constructor, call this method
Args:
update_name: update_name to register for
constraint_name: name of constraint to validate
table: table the constraint is applied to
"""
def runner(conn: Connection) -> None:
c = conn.cursor()
sql = f"""
ALTER TABLE {table} VALIDATE CONSTRAINT {constraint_name};
"""
logger.debug("[SQL] %s", sql)
c.execute(sql)
async def updater(progress: JsonDict, batch_size: int) -> int:
assert isinstance(
self.db_pool.engine, engines.PostgresEngine
), "validate constraint background update registered for non-Postres database"
logger.info("Validating constraint %s to %s", constraint_name, table)
await self.db_pool.runWithConnection(runner)
await self._end_background_update(update_name)
return 1
self._background_update_handlers[update_name] = _BackgroundUpdateHandler(
updater, oneshot=True
)
async def create_index_in_background( async def create_index_in_background(
self, self,
index_name: str, index_name: str,

View file

@ -100,7 +100,6 @@ from synapse.storage.database import (
) )
from synapse.storage.databases.main.receipts import ReceiptsWorkerStore from synapse.storage.databases.main.receipts import ReceiptsWorkerStore
from synapse.storage.databases.main.stream import StreamWorkerStore from synapse.storage.databases.main.stream import StreamWorkerStore
from synapse.types import JsonDict
from synapse.util import json_encoder from synapse.util import json_encoder
from synapse.util.caches.descriptors import cached from synapse.util.caches.descriptors import cached
@ -289,180 +288,22 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
unique=True, unique=True,
) )
self.db_pool.updates.register_background_update_handler( self.db_pool.updates.register_background_validate_constraint(
"event_push_backfill_thread_id", "event_push_actions_staging_thread_id",
self._background_backfill_thread_id, constraint_name="event_push_actions_staging_thread_id",
table="event_push_actions_staging",
) )
self.db_pool.updates.register_background_validate_constraint(
# Indexes which will be used to quickly make the thread_id column non-null. "event_push_actions_thread_id",
self.db_pool.updates.register_background_index_update( constraint_name="event_push_actions_thread_id",
"event_push_actions_thread_id_null",
index_name="event_push_actions_thread_id_null",
table="event_push_actions", table="event_push_actions",
columns=["thread_id"],
where_clause="thread_id IS NULL",
) )
self.db_pool.updates.register_background_index_update( self.db_pool.updates.register_background_validate_constraint(
"event_push_summary_thread_id_null", "event_push_summary_thread_id",
index_name="event_push_summary_thread_id_null", constraint_name="event_push_summary_thread_id",
table="event_push_summary", table="event_push_summary",
columns=["thread_id"],
where_clause="thread_id IS NULL",
) )
# Check ASAP (and then later, every 1s) to see if we have finished
# background updates the event_push_actions and event_push_summary tables.
self._clock.call_later(0.0, self._check_event_push_backfill_thread_id)
self._event_push_backfill_thread_id_done = False
@wrap_as_background_process("check_event_push_backfill_thread_id")
async def _check_event_push_backfill_thread_id(self) -> None:
"""
Has thread_id finished backfilling?
If not, we need to just-in-time update it so the queries work.
"""
done = await self.db_pool.updates.has_completed_background_update(
"event_push_backfill_thread_id"
)
if done:
self._event_push_backfill_thread_id_done = True
else:
# Reschedule to run.
self._clock.call_later(15.0, self._check_event_push_backfill_thread_id)
async def _background_backfill_thread_id(
self, progress: JsonDict, batch_size: int
) -> int:
"""
Fill in the thread_id field for event_push_actions and event_push_summary.
This is preparatory so that it can be made non-nullable in the future.
Because all current (null) data is done in an unthreaded manner this
simply assumes it is on the "main" timeline. Since event_push_actions
are periodically cleared it is not possible to correctly re-calculate
the thread_id.
"""
event_push_actions_done = progress.get("event_push_actions_done", False)
def add_thread_id_txn(
txn: LoggingTransaction, start_stream_ordering: int
) -> int:
sql = """
SELECT stream_ordering
FROM event_push_actions
WHERE
thread_id IS NULL
AND stream_ordering > ?
ORDER BY stream_ordering
LIMIT ?
"""
txn.execute(sql, (start_stream_ordering, batch_size))
# No more rows to process.
rows = txn.fetchall()
if not rows:
progress["event_push_actions_done"] = True
self.db_pool.updates._background_update_progress_txn(
txn, "event_push_backfill_thread_id", progress
)
return 0
# Update the thread ID for any of those rows.
max_stream_ordering = rows[-1][0]
sql = """
UPDATE event_push_actions
SET thread_id = 'main'
WHERE ? < stream_ordering AND stream_ordering <= ? AND thread_id IS NULL
"""
txn.execute(
sql,
(
start_stream_ordering,
max_stream_ordering,
),
)
# Update progress.
processed_rows = txn.rowcount
progress["max_event_push_actions_stream_ordering"] = max_stream_ordering
self.db_pool.updates._background_update_progress_txn(
txn, "event_push_backfill_thread_id", progress
)
return processed_rows
def add_thread_id_summary_txn(txn: LoggingTransaction) -> int:
min_user_id = progress.get("max_summary_user_id", "")
min_room_id = progress.get("max_summary_room_id", "")
# Slightly overcomplicated query for getting the Nth user ID / room
# ID tuple, or the last if there are less than N remaining.
sql = """
SELECT user_id, room_id FROM (
SELECT user_id, room_id FROM event_push_summary
WHERE (user_id, room_id) > (?, ?)
AND thread_id IS NULL
ORDER BY user_id, room_id
LIMIT ?
) AS e
ORDER BY user_id DESC, room_id DESC
LIMIT 1
"""
txn.execute(sql, (min_user_id, min_room_id, batch_size))
row = txn.fetchone()
if not row:
return 0
max_user_id, max_room_id = row
sql = """
UPDATE event_push_summary
SET thread_id = 'main'
WHERE
(?, ?) < (user_id, room_id) AND (user_id, room_id) <= (?, ?)
AND thread_id IS NULL
"""
txn.execute(sql, (min_user_id, min_room_id, max_user_id, max_room_id))
processed_rows = txn.rowcount
progress["max_summary_user_id"] = max_user_id
progress["max_summary_room_id"] = max_room_id
self.db_pool.updates._background_update_progress_txn(
txn, "event_push_backfill_thread_id", progress
)
return processed_rows
# First update the event_push_actions table, then the event_push_summary table.
#
# Note that the event_push_actions_staging table is ignored since it is
# assumed that items in that table will only exist for a short period of
# time.
if not event_push_actions_done:
result = await self.db_pool.runInteraction(
"event_push_backfill_thread_id",
add_thread_id_txn,
progress.get("max_event_push_actions_stream_ordering", 0),
)
else:
result = await self.db_pool.runInteraction(
"event_push_backfill_thread_id",
add_thread_id_summary_txn,
)
# Only done after the event_push_summary table is done.
if not result:
await self.db_pool.updates._end_background_update(
"event_push_backfill_thread_id"
)
return result
async def get_unread_counts_by_room_for_user(self, user_id: str) -> Dict[str, int]: async def get_unread_counts_by_room_for_user(self, user_id: str) -> Dict[str, int]:
"""Get the notification count by room for a user. Only considers notifications, """Get the notification count by room for a user. Only considers notifications,
not highlight or unread counts, and threads are currently aggregated under their room. not highlight or unread counts, and threads are currently aggregated under their room.
@ -711,25 +552,6 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
(ReceiptTypes.READ, ReceiptTypes.READ_PRIVATE), (ReceiptTypes.READ, ReceiptTypes.READ_PRIVATE),
) )
# First ensure that the existing rows have an updated thread_id field.
if not self._event_push_backfill_thread_id_done:
txn.execute(
"""
UPDATE event_push_summary
SET thread_id = ?
WHERE room_id = ? AND user_id = ? AND thread_id is NULL
""",
(MAIN_TIMELINE, room_id, user_id),
)
txn.execute(
"""
UPDATE event_push_actions
SET thread_id = ?
WHERE room_id = ? AND user_id = ? AND thread_id is NULL
""",
(MAIN_TIMELINE, room_id, user_id),
)
# First we pull the counts from the summary table. # First we pull the counts from the summary table.
# #
# We check that `last_receipt_stream_ordering` matches the stream ordering of the # We check that `last_receipt_stream_ordering` matches the stream ordering of the
@ -1545,25 +1367,6 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
(room_id, user_id, stream_ordering, *thread_args), (room_id, user_id, stream_ordering, *thread_args),
) )
# First ensure that the existing rows have an updated thread_id field.
if not self._event_push_backfill_thread_id_done:
txn.execute(
"""
UPDATE event_push_summary
SET thread_id = ?
WHERE room_id = ? AND user_id = ? AND thread_id is NULL
""",
(MAIN_TIMELINE, room_id, user_id),
)
txn.execute(
"""
UPDATE event_push_actions
SET thread_id = ?
WHERE room_id = ? AND user_id = ? AND thread_id is NULL
""",
(MAIN_TIMELINE, room_id, user_id),
)
# Fetch the notification counts between the stream ordering of the # Fetch the notification counts between the stream ordering of the
# latest receipt and what was previously summarised. # latest receipt and what was previously summarised.
unread_counts = self._get_notif_unread_count_for_user_room( unread_counts = self._get_notif_unread_count_for_user_room(
@ -1698,19 +1501,6 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
rotate_to_stream_ordering: The new maximum event stream ordering to summarise. rotate_to_stream_ordering: The new maximum event stream ordering to summarise.
""" """
# Ensure that any new actions have an updated thread_id.
if not self._event_push_backfill_thread_id_done:
txn.execute(
"""
UPDATE event_push_actions
SET thread_id = ?
WHERE ? < stream_ordering AND stream_ordering <= ? AND thread_id IS NULL
""",
(MAIN_TIMELINE, old_rotate_stream_ordering, rotate_to_stream_ordering),
)
# XXX Do we need to update summaries here too?
# Calculate the new counts that should be upserted into event_push_summary # Calculate the new counts that should be upserted into event_push_summary
sql = """ sql = """
SELECT user_id, room_id, thread_id, SELECT user_id, room_id, thread_id,
@ -1773,20 +1563,6 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
logger.info("Rotating notifications, handling %d rows", len(summaries)) logger.info("Rotating notifications, handling %d rows", len(summaries))
# Ensure that any updated threads have the proper thread_id.
if not self._event_push_backfill_thread_id_done:
txn.execute_batch(
"""
UPDATE event_push_summary
SET thread_id = ?
WHERE room_id = ? AND user_id = ? AND thread_id is NULL
""",
[
(MAIN_TIMELINE, room_id, user_id)
for user_id, room_id, _ in summaries
],
)
self.db_pool.simple_upsert_many_txn( self.db_pool.simple_upsert_many_txn(
txn, txn,
table="event_push_summary", table="event_push_summary",

View file

@ -106,6 +106,9 @@ Changes in SCHEMA_VERSION = 76:
SCHEMA_COMPAT_VERSION = ( SCHEMA_COMPAT_VERSION = (
# Queries against `event_stream_ordering` columns in membership tables must # Queries against `event_stream_ordering` columns in membership tables must
# be disambiguated. # be disambiguated.
#
# The threads_id column must written to with non-null values for the
# event_push_actions, event_push_actions_staging, and event_push_summary tables.
74 74
) )
"""Limit on how far the synapse codebase can be rolled back without breaking db compat """Limit on how far the synapse codebase can be rolled back without breaking db compat

View file

@ -0,0 +1,28 @@
/* Copyright 2023 The Matrix.org Foundation C.I.C
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-- Force the background updates from 06thread_notifications.sql to run in the
-- foreground as code will now require those to be "done".
DELETE FROM background_updates WHERE update_name = 'event_push_backfill_thread_id';
-- Overwrite any null thread_id values.
UPDATE event_push_actions_staging SET thread_id = 'main' WHERE thread_id IS NULL;
UPDATE event_push_actions SET thread_id = 'main' WHERE thread_id IS NULL;
UPDATE event_push_summary SET thread_id = 'main' WHERE thread_id IS NULL;
-- Drop the background updates to calculate the indexes used to find null thread_ids.
DELETE FROM background_updates WHERE update_name = 'event_push_actions_thread_id_null';
DELETE FROM background_updates WHERE update_name = 'event_push_summary_thread_id_null';

View file

@ -0,0 +1,37 @@
/* Copyright 2022 The Matrix.org Foundation C.I.C
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-- The thread_id columns can now be made non-nullable, this is done by using a
-- constraint (and not altering the column) to avoid taking out a full table lock.
--
-- We initially add an invalid constraint which guards against new data (this
-- doesn't lock the table).
ALTER TABLE event_push_actions_staging
ADD CONSTRAINT event_push_actions_staging_thread_id CHECK (thread_id IS NOT NULL) NOT VALID;
ALTER TABLE event_push_actions
ADD CONSTRAINT event_push_actions_thread_id CHECK (thread_id IS NOT NULL) NOT VALID;
ALTER TABLE event_push_summary
ADD CONSTRAINT event_push_summary_thread_id CHECK (thread_id IS NOT NULL) NOT VALID;
-- We then validate the constraint which doesn't need to worry about new data. It
-- only needs a SHARE UPDATE EXCLUSIVE lock but can still take a while to complete.
INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
(7605, 'event_push_actions_staging_thread_id', '{}'),
(7605, 'event_push_actions_thread_id', '{}'),
(7605, 'event_push_summary_thread_id', '{}');
-- Drop the indexes used to find null thread_ids.
DROP INDEX IF EXISTS event_push_actions_thread_id_null;
DROP INDEX IF EXISTS event_push_summary_thread_id_null;

View file

@ -0,0 +1,102 @@
/* Copyright 2022 The Matrix.org Foundation C.I.C
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-- The thread_id columns can now be made non-nullable.
--
-- SQLite doesn't support modifying columns to an existing table, so it must
-- be recreated.
-- Create the new tables.
CREATE TABLE event_push_actions_staging_new (
event_id TEXT NOT NULL,
user_id TEXT NOT NULL,
actions TEXT NOT NULL,
notif SMALLINT NOT NULL,
highlight SMALLINT NOT NULL,
unread SMALLINT,
thread_id TEXT,
inserted_ts BIGINT,
CONSTRAINT event_push_actions_staging_thread_id CHECK (thread_id is NOT NULL)
);
CREATE TABLE event_push_actions_new (
room_id TEXT NOT NULL,
event_id TEXT NOT NULL,
user_id TEXT NOT NULL,
profile_tag VARCHAR(32),
actions TEXT NOT NULL,
topological_ordering BIGINT,
stream_ordering BIGINT,
notif SMALLINT,
highlight SMALLINT,
unread SMALLINT,
thread_id TEXT,
CONSTRAINT event_id_user_id_profile_tag_uniqueness UNIQUE (room_id, event_id, user_id, profile_tag),
CONSTRAINT event_push_actions_thread_id CHECK (thread_id is NOT NULL)
);
CREATE TABLE event_push_summary_new (
user_id TEXT NOT NULL,
room_id TEXT NOT NULL,
notif_count BIGINT NOT NULL,
stream_ordering BIGINT NOT NULL,
unread_count BIGINT,
last_receipt_stream_ordering BIGINT,
thread_id TEXT,
CONSTRAINT event_push_summary_thread_id CHECK (thread_id is NOT NULL)
);
-- Copy the data.
INSERT INTO event_push_actions_staging_new (event_id, user_id, actions, notif, highlight, unread, thread_id, inserted_ts)
SELECT event_id, user_id, actions, notif, highlight, unread, thread_id, inserted_ts
FROM event_push_actions_staging;
INSERT INTO event_push_actions_new (room_id, event_id, user_id, profile_tag, actions, topological_ordering, stream_ordering, notif, highlight, unread, thread_id)
SELECT room_id, event_id, user_id, profile_tag, actions, topological_ordering, stream_ordering, notif, highlight, unread, thread_id
FROM event_push_actions;
INSERT INTO event_push_summary_new (user_id, room_id, notif_count, stream_ordering, unread_count, last_receipt_stream_ordering, thread_id)
SELECT user_id, room_id, notif_count, stream_ordering, unread_count, last_receipt_stream_ordering, thread_id
FROM event_push_summary;
-- Drop the old tables.
DROP TABLE event_push_actions_staging;
DROP TABLE event_push_actions;
DROP TABLE event_push_summary;
-- Rename the tables.
ALTER TABLE event_push_actions_staging_new RENAME TO event_push_actions_staging;
ALTER TABLE event_push_actions_new RENAME TO event_push_actions;
ALTER TABLE event_push_summary_new RENAME TO event_push_summary;
-- Recreate the indexes.
CREATE INDEX event_push_actions_staging_id ON event_push_actions_staging(event_id);
CREATE INDEX event_push_actions_highlights_index ON event_push_actions (user_id, room_id, topological_ordering, stream_ordering);
CREATE INDEX event_push_actions_rm_tokens on event_push_actions( user_id, room_id, topological_ordering, stream_ordering );
CREATE INDEX event_push_actions_room_id_user_id on event_push_actions(room_id, user_id);
CREATE INDEX event_push_actions_stream_ordering on event_push_actions( stream_ordering, user_id );
CREATE INDEX event_push_actions_u_highlight ON event_push_actions (user_id, stream_ordering);
CREATE UNIQUE INDEX event_push_summary_unique_index2 ON event_push_summary (user_id, room_id, thread_id) ;
-- Recreate some indexes in the background, by re-running the background updates
-- from 72/02event_push_actions_index.sql and 72/06thread_notifications.sql.
INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
(7403, 'event_push_summary_unique_index2', '{}')
ON CONFLICT (update_name) DO UPDATE SET progress_json = '{}';
INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
(7403, 'event_push_actions_stream_highlight_index', '{}')
ON CONFLICT (update_name) DO UPDATE SET progress_json = '{}';