diff --git a/synapse/storage/databases/main/purge_events.py b/synapse/storage/databases/main/purge_events.py index ba385f9fc4..f51372810c 100644 --- a/synapse/storage/databases/main/purge_events.py +++ b/synapse/storage/databases/main/purge_events.py @@ -417,6 +417,7 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore): # "rooms" happens last, to keep the foreign keys in the other tables # happy "rooms", + "receipts_ranged", ): logger.info("[purge] removing %s from %s", room_id, table) txn.execute("DELETE FROM %s WHERE room_id=?" % (table,), (room_id,)) diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py index d318753650..7c586cf8f3 100644 --- a/synapse/storage/databases/main/receipts.py +++ b/synapse/storage/databases/main/receipts.py @@ -604,7 +604,8 @@ class ReceiptsWorkerStore(SQLBaseStore): room_id: str, receipt_type: str, user_id: str, - event_id: str, + start_event_id: Optional[str], + end_event_id: str, data: JsonDict, stream_id: int, ) -> Optional[int]: @@ -617,37 +618,31 @@ class ReceiptsWorkerStore(SQLBaseStore): """ assert self._can_write_to_receipts + if start_event_id is not None: + res = self.db_pool.simple_select_one_onecol_txn( + txn, + table="events", + # XXX Use topo ordering + retcol="stream_ordering", + keyvalues={"event_id": start_event_id}, + allow_none=True, + ) + start_stream_ordering = int(res) if res else None + else: + start_stream_ordering = None + res = self.db_pool.simple_select_one_txn( txn, table="events", + # XXX Use topo ordering retcols=["stream_ordering", "received_ts"], - keyvalues={"event_id": event_id}, + keyvalues={"event_id": end_event_id}, allow_none=True, ) - - stream_ordering = int(res["stream_ordering"]) if res else None + end_stream_ordering = int(res["stream_ordering"]) if res else None + # XXX This is just for logging in the caller, can it be removed. rx_ts = res["received_ts"] if res else 0 - # We don't want to clobber receipts for more recent events, so we - # have to compare orderings of existing receipts - if stream_ordering is not None: - sql = ( - "SELECT stream_ordering, event_id FROM events" - " INNER JOIN receipts_linearized AS r USING (event_id, room_id)" - " WHERE r.room_id = ? AND r.receipt_type = ? AND r.user_id = ?" - ) - txn.execute(sql, (room_id, receipt_type, user_id)) - - for so, eid in txn: - if int(so) >= stream_ordering: - logger.debug( - "Ignoring new receipt for %s in favour of existing " - "one for later event %s", - event_id, - eid, - ) - return None - txn.call_after( self.invalidate_caches_for_receipt, room_id, receipt_type, user_id ) @@ -656,33 +651,33 @@ class ReceiptsWorkerStore(SQLBaseStore): self._receipts_stream_cache.entity_has_changed, room_id, stream_id ) - self.db_pool.simple_upsert_txn( + # Splat the receipt into the table. + # XXX This might overlap other ranges, should coalesce. + self.db_pool.simple_insert_txn( txn, - table="receipts_linearized", - keyvalues={ - "room_id": room_id, - "receipt_type": receipt_type, - "user_id": user_id, - }, + table="receipts_ranged", values={ + "room_id": room_id, + "user_id": user_id, + "receipt_type": receipt_type, + "start_event_id": start_event_id, + "end_event_id": end_event_id, "stream_id": stream_id, - "event_id": event_id, "data": json_encoder.encode(data), }, - # receipts_linearized has a unique constraint on - # (user_id, room_id, receipt_type), so no need to lock - lock=False, ) + # XXX How do we migrate receipts_linearized or do we use one of non-ranged receipts? + # When updating a local users read receipt, remove any push actions # which resulted from the receipt's event and all earlier events. if ( self.hs.is_mine_id(user_id) and receipt_type in (ReceiptTypes.READ, ReceiptTypes.READ_PRIVATE) - and stream_ordering is not None + and end_stream_ordering is not None ): self._remove_old_push_actions_before_txn( # type: ignore[attr-defined] - txn, room_id=room_id, user_id=user_id, stream_ordering=stream_ordering + txn, room_id=room_id, user_id=user_id, stream_ordering=end_stream_ordering ) return rx_ts @@ -742,18 +737,20 @@ class ReceiptsWorkerStore(SQLBaseStore): if not event_ids: return None + start_event_id = None if len(event_ids) == 1: - linearized_event_id = event_ids[0] + end_event_id = event_ids[0] else: # we need to points in graph -> linearized form. - linearized_event_id = await self.db_pool.runInteraction( + end_event_id = await self.db_pool.runInteraction( "insert_receipt_conv", self._graph_to_linear, receipt.room_id, event_ids, ) elif isinstance(receipt, RangedReadReceipt): - linearized_event_id = receipt.end_event_id + start_event_id = receipt.start_event_id + end_event_id = receipt.end_event_id else: raise ValueError("Unexpected receipt type: %s", type(receipt)) @@ -764,7 +761,8 @@ class ReceiptsWorkerStore(SQLBaseStore): receipt.room_id, receipt.receipt_type, receipt.user_id, - linearized_event_id, + start_event_id, + end_event_id, receipt.data, stream_id=stream_id, # Read committed is actually beneficial here because we check for a receipt with @@ -780,7 +778,7 @@ class ReceiptsWorkerStore(SQLBaseStore): now = self._clock.time_msec() logger.debug( "RR for event %s in %s (%i ms old)", - linearized_event_id, + end_event_id, # XXX log start? receipt.room_id, now - event_ts, ) diff --git a/synapse/storage/schema/main/delta/70/02ranged_read_receipts.sql b/synapse/storage/schema/main/delta/70/02ranged_read_receipts.sql new file mode 100644 index 0000000000..8ae0865f14 --- /dev/null +++ b/synapse/storage/schema/main/delta/70/02ranged_read_receipts.sql @@ -0,0 +1,32 @@ +/* Copyright 2022 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE TABLE receipts_ranged ( + stream_id bigint NOT NULL, + room_id text NOT NULL, + receipt_type text NOT NULL, + user_id text NOT NULL, + -- A null start means "everything before this". + start_event_id text, + end_event_id text NOT NULL, + data text NOT NULL, + instance_name text +); + + +CREATE INDEX receipts_ranged_id ON receipts_ranged (stream_id); +CREATE INDEX receipts_ranged_room_type_user ON receipts_ranged (room_id, receipt_type, user_id); +CREATE INDEX receipts_ranged_room_stream ON receipts_ranged (room_id, stream_id); +CREATE INDEX receipts_ranged_user ON receipts_ranged (user_id);