Add a ranged receipts table and insert into it.

This commit is contained in:
Patrick Cloke 2022-05-26 16:06:30 -04:00
parent 82166cfa51
commit f43e0b4b1a
3 changed files with 73 additions and 42 deletions

View file

@ -417,6 +417,7 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore):
# "rooms" happens last, to keep the foreign keys in the other tables
# happy
"rooms",
"receipts_ranged",
):
logger.info("[purge] removing %s from %s", room_id, table)
txn.execute("DELETE FROM %s WHERE room_id=?" % (table,), (room_id,))

View file

@ -604,7 +604,8 @@ class ReceiptsWorkerStore(SQLBaseStore):
room_id: str,
receipt_type: str,
user_id: str,
event_id: str,
start_event_id: Optional[str],
end_event_id: str,
data: JsonDict,
stream_id: int,
) -> Optional[int]:
@ -617,37 +618,31 @@ class ReceiptsWorkerStore(SQLBaseStore):
"""
assert self._can_write_to_receipts
if start_event_id is not None:
res = self.db_pool.simple_select_one_onecol_txn(
txn,
table="events",
# XXX Use topo ordering
retcol="stream_ordering",
keyvalues={"event_id": start_event_id},
allow_none=True,
)
start_stream_ordering = int(res) if res else None
else:
start_stream_ordering = None
res = self.db_pool.simple_select_one_txn(
txn,
table="events",
# XXX Use topo ordering
retcols=["stream_ordering", "received_ts"],
keyvalues={"event_id": event_id},
keyvalues={"event_id": end_event_id},
allow_none=True,
)
stream_ordering = int(res["stream_ordering"]) if res else None
end_stream_ordering = int(res["stream_ordering"]) if res else None
# XXX This is just for logging in the caller, can it be removed.
rx_ts = res["received_ts"] if res else 0
# We don't want to clobber receipts for more recent events, so we
# have to compare orderings of existing receipts
if stream_ordering is not None:
sql = (
"SELECT stream_ordering, event_id FROM events"
" INNER JOIN receipts_linearized AS r USING (event_id, room_id)"
" WHERE r.room_id = ? AND r.receipt_type = ? AND r.user_id = ?"
)
txn.execute(sql, (room_id, receipt_type, user_id))
for so, eid in txn:
if int(so) >= stream_ordering:
logger.debug(
"Ignoring new receipt for %s in favour of existing "
"one for later event %s",
event_id,
eid,
)
return None
txn.call_after(
self.invalidate_caches_for_receipt, room_id, receipt_type, user_id
)
@ -656,33 +651,33 @@ class ReceiptsWorkerStore(SQLBaseStore):
self._receipts_stream_cache.entity_has_changed, room_id, stream_id
)
self.db_pool.simple_upsert_txn(
# Splat the receipt into the table.
# XXX This might overlap other ranges, should coalesce.
self.db_pool.simple_insert_txn(
txn,
table="receipts_linearized",
keyvalues={
"room_id": room_id,
"receipt_type": receipt_type,
"user_id": user_id,
},
table="receipts_ranged",
values={
"room_id": room_id,
"user_id": user_id,
"receipt_type": receipt_type,
"start_event_id": start_event_id,
"end_event_id": end_event_id,
"stream_id": stream_id,
"event_id": event_id,
"data": json_encoder.encode(data),
},
# receipts_linearized has a unique constraint on
# (user_id, room_id, receipt_type), so no need to lock
lock=False,
)
# XXX How do we migrate receipts_linearized or do we use one of non-ranged receipts?
# When updating a local users read receipt, remove any push actions
# which resulted from the receipt's event and all earlier events.
if (
self.hs.is_mine_id(user_id)
and receipt_type in (ReceiptTypes.READ, ReceiptTypes.READ_PRIVATE)
and stream_ordering is not None
and end_stream_ordering is not None
):
self._remove_old_push_actions_before_txn( # type: ignore[attr-defined]
txn, room_id=room_id, user_id=user_id, stream_ordering=stream_ordering
txn, room_id=room_id, user_id=user_id, stream_ordering=end_stream_ordering
)
return rx_ts
@ -742,18 +737,20 @@ class ReceiptsWorkerStore(SQLBaseStore):
if not event_ids:
return None
start_event_id = None
if len(event_ids) == 1:
linearized_event_id = event_ids[0]
end_event_id = event_ids[0]
else:
# we need to points in graph -> linearized form.
linearized_event_id = await self.db_pool.runInteraction(
end_event_id = await self.db_pool.runInteraction(
"insert_receipt_conv",
self._graph_to_linear,
receipt.room_id,
event_ids,
)
elif isinstance(receipt, RangedReadReceipt):
linearized_event_id = receipt.end_event_id
start_event_id = receipt.start_event_id
end_event_id = receipt.end_event_id
else:
raise ValueError("Unexpected receipt type: %s", type(receipt))
@ -764,7 +761,8 @@ class ReceiptsWorkerStore(SQLBaseStore):
receipt.room_id,
receipt.receipt_type,
receipt.user_id,
linearized_event_id,
start_event_id,
end_event_id,
receipt.data,
stream_id=stream_id,
# Read committed is actually beneficial here because we check for a receipt with
@ -780,7 +778,7 @@ class ReceiptsWorkerStore(SQLBaseStore):
now = self._clock.time_msec()
logger.debug(
"RR for event %s in %s (%i ms old)",
linearized_event_id,
end_event_id, # XXX log start?
receipt.room_id,
now - event_ts,
)

View file

@ -0,0 +1,32 @@
/* Copyright 2022 The Matrix.org Foundation C.I.C
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
CREATE TABLE receipts_ranged (
stream_id bigint NOT NULL,
room_id text NOT NULL,
receipt_type text NOT NULL,
user_id text NOT NULL,
-- A null start means "everything before this".
start_event_id text,
end_event_id text NOT NULL,
data text NOT NULL,
instance_name text
);
CREATE INDEX receipts_ranged_id ON receipts_ranged (stream_id);
CREATE INDEX receipts_ranged_room_type_user ON receipts_ranged (room_id, receipt_type, user_id);
CREATE INDEX receipts_ranged_room_stream ON receipts_ranged (room_id, stream_id);
CREATE INDEX receipts_ranged_user ON receipts_ranged (user_id);