diff --git a/changelog.d/14979.misc b/changelog.d/14979.misc new file mode 100644 index 0000000000..c09911e48d --- /dev/null +++ b/changelog.d/14979.misc @@ -0,0 +1 @@ +Add denormalised event stream ordering column to membership state tables for future use. Contributed by Nick @ Beeper (@fizzadar). diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index 1536937b67..b6cce0a7cc 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -1147,11 +1147,15 @@ class PersistEventsStore: # been inserted into room_memberships. txn.execute_batch( """INSERT INTO current_state_events - (room_id, type, state_key, event_id, membership) - VALUES (?, ?, ?, ?, (SELECT membership FROM room_memberships WHERE event_id = ?)) + (room_id, type, state_key, event_id, membership, event_stream_ordering) + VALUES ( + ?, ?, ?, ?, + (SELECT membership FROM room_memberships WHERE event_id = ?), + (SELECT stream_ordering FROM events WHERE event_id = ?) + ) """, [ - (room_id, key[0], key[1], ev_id, ev_id) + (room_id, key[0], key[1], ev_id, ev_id, ev_id) for key, ev_id in to_insert.items() ], ) @@ -1178,11 +1182,15 @@ class PersistEventsStore: if to_insert: txn.execute_batch( """INSERT INTO local_current_membership - (room_id, user_id, event_id, membership) - VALUES (?, ?, ?, (SELECT membership FROM room_memberships WHERE event_id = ?)) + (room_id, user_id, event_id, membership, event_stream_ordering) + VALUES ( + ?, ?, ?, + (SELECT membership FROM room_memberships WHERE event_id = ?), + (SELECT stream_ordering FROM events WHERE event_id = ?) + ) """, [ - (room_id, key[1], ev_id, ev_id) + (room_id, key[1], ev_id, ev_id, ev_id) for key, ev_id in to_insert.items() if key[0] == EventTypes.Member and self.is_mine_id(key[1]) ], @@ -1790,6 +1798,7 @@ class PersistEventsStore: table="room_memberships", keys=( "event_id", + "event_stream_ordering", "user_id", "sender", "room_id", @@ -1800,6 +1809,7 @@ class PersistEventsStore: values=[ ( event.event_id, + event.internal_metadata.stream_ordering, event.state_key, event.user_id, event.room_id, @@ -1832,6 +1842,7 @@ class PersistEventsStore: keyvalues={"room_id": event.room_id, "user_id": event.state_key}, values={ "event_id": event.event_id, + "event_stream_ordering": event.internal_metadata.stream_ordering, "membership": event.membership, }, ) diff --git a/synapse/storage/databases/main/events_bg_updates.py b/synapse/storage/databases/main/events_bg_updates.py index b9d3c36d60..0e81d38cca 100644 --- a/synapse/storage/databases/main/events_bg_updates.py +++ b/synapse/storage/databases/main/events_bg_updates.py @@ -17,7 +17,7 @@ from typing import TYPE_CHECKING, Dict, List, Optional, Sequence, Set, Tuple, ca import attr -from synapse.api.constants import EventContentFields, RelationTypes +from synapse.api.constants import EventContentFields, EventTypes, RelationTypes from synapse.api.room_versions import KNOWN_ROOM_VERSIONS from synapse.events import make_event_from_dict from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause @@ -71,6 +71,10 @@ class _BackgroundUpdates: EVENTS_JUMP_TO_DATE_INDEX = "events_jump_to_date_index" + POPULATE_MEMBERSHIP_EVENT_STREAM_ORDERING = ( + "populate_membership_event_stream_ordering" + ) + @attr.s(slots=True, frozen=True, auto_attribs=True) class _CalculateChainCover: @@ -99,6 +103,10 @@ class EventsBackgroundUpdatesStore(SQLBaseStore): ): super().__init__(database, db_conn, hs) + self.db_pool.updates.register_background_update_handler( + _BackgroundUpdates.POPULATE_MEMBERSHIP_EVENT_STREAM_ORDERING, + self._populate_membership_event_stream_ordering, + ) self.db_pool.updates.register_background_update_handler( _BackgroundUpdates.EVENT_ORIGIN_SERVER_TS_NAME, self._background_reindex_origin_server_ts, @@ -1498,3 +1506,97 @@ class EventsBackgroundUpdatesStore(SQLBaseStore): ) return batch_size + + async def _populate_membership_event_stream_ordering( + self, progress: JsonDict, batch_size: int + ) -> int: + def _populate_membership_event_stream_ordering( + txn: LoggingTransaction, + ) -> bool: + + if "max_stream_ordering" in progress: + max_stream_ordering = progress["max_stream_ordering"] + else: + txn.execute("SELECT max(stream_ordering) FROM events") + res = txn.fetchone() + if res is None or res[0] is None: + return True + else: + max_stream_ordering = res[0] + + start = progress.get("stream_ordering", 0) + stop = start + batch_size + + sql = f""" + SELECT room_id, event_id, stream_ordering + FROM events + WHERE + type = '{EventTypes.Member}' + AND stream_ordering >= ? + AND stream_ordering < ? + """ + txn.execute(sql, (start, stop)) + + rows: List[Tuple[str, str, int]] = cast( + List[Tuple[str, str, int]], txn.fetchall() + ) + + event_ids: List[Tuple[str]] = [] + event_stream_orderings: List[Tuple[int]] = [] + + for _, event_id, event_stream_ordering in rows: + event_ids.append((event_id,)) + event_stream_orderings.append((event_stream_ordering,)) + + self.db_pool.simple_update_many_txn( + txn, + table="current_state_events", + key_names=("event_id",), + key_values=event_ids, + value_names=("event_stream_ordering",), + value_values=event_stream_orderings, + ) + + self.db_pool.simple_update_many_txn( + txn, + table="room_memberships", + key_names=("event_id",), + key_values=event_ids, + value_names=("event_stream_ordering",), + value_values=event_stream_orderings, + ) + + # NOTE: local_current_membership has no index on event_id, so only + # the room ID here will reduce the query rows read. + for room_id, event_id, event_stream_ordering in rows: + txn.execute( + """ + UPDATE local_current_membership + SET event_stream_ordering = ? + WHERE room_id = ? AND event_id = ? + """, + (event_stream_ordering, room_id, event_id), + ) + + self.db_pool.updates._background_update_progress_txn( + txn, + _BackgroundUpdates.POPULATE_MEMBERSHIP_EVENT_STREAM_ORDERING, + { + "stream_ordering": stop, + "max_stream_ordering": max_stream_ordering, + }, + ) + + return stop > max_stream_ordering + + finished = await self.db_pool.runInteraction( + "_populate_membership_event_stream_ordering", + _populate_membership_event_stream_ordering, + ) + + if finished: + await self.db_pool.updates._end_background_update( + _BackgroundUpdates.POPULATE_MEMBERSHIP_EVENT_STREAM_ORDERING + ) + + return batch_size diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index d7d08369ca..6d0ef10258 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -1779,7 +1779,7 @@ class EventsWorkerStore(SQLBaseStore): txn: LoggingTransaction, ) -> List[Tuple[int, str, str, str, str, str, str, str, bool, bool]]: sql = ( - "SELECT event_stream_ordering, e.event_id, e.room_id, e.type," + "SELECT out.event_stream_ordering, e.event_id, e.room_id, e.type," " se.state_key, redacts, relates_to_id, membership, rejections.reason IS NOT NULL," " e.outlier" " FROM events AS e" @@ -1791,10 +1791,10 @@ class EventsWorkerStore(SQLBaseStore): " LEFT JOIN event_relations USING (event_id)" " LEFT JOIN room_memberships USING (event_id)" " LEFT JOIN rejections USING (event_id)" - " WHERE ? < event_stream_ordering" - " AND event_stream_ordering <= ?" + " WHERE ? < out.event_stream_ordering" + " AND out.event_stream_ordering <= ?" " AND out.instance_name = ?" - " ORDER BY event_stream_ordering ASC" + " ORDER BY out.event_stream_ordering ASC" ) txn.execute(sql, (last_id, current_id, instance_name)) diff --git a/synapse/storage/schema/main/delta/73/26membership_tables_event_stream_ordering.sql b/synapse/storage/schema/main/delta/73/26membership_tables_event_stream_ordering.sql new file mode 100644 index 0000000000..7c30a67fc4 --- /dev/null +++ b/synapse/storage/schema/main/delta/73/26membership_tables_event_stream_ordering.sql @@ -0,0 +1,21 @@ +/* Copyright 2022 Beeper + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +ALTER TABLE current_state_events ADD COLUMN event_stream_ordering BIGINT; +ALTER TABLE local_current_membership ADD COLUMN event_stream_ordering BIGINT; +ALTER TABLE room_memberships ADD COLUMN event_stream_ordering BIGINT; + +INSERT INTO background_updates (update_name, progress_json) VALUES + ('populate_membership_event_stream_ordering', '{}');