Coalesce receipts.

This commit is contained in:
Patrick Cloke 2022-06-02 12:46:42 -04:00
parent 0c395fd8b9
commit a8a45921fb
2 changed files with 99 additions and 20 deletions

View file

@ -150,7 +150,7 @@ def main():
# Create a fork in the DAG. # Create a fork in the DAG.
prev_message_id = first_message_id prev_message_id = first_message_id
for msg in range(1): for msg in range(3):
prev_message_id = _send_and_append(f"Fork 2 Message {msg}", prev_message_id) prev_message_id = _send_and_append(f"Fork 2 Message {msg}", prev_message_id)
sleep(1) sleep(1)
# # Join the forks. # # Join the forks.
@ -159,15 +159,15 @@ def main():
_sync_and_show(room_id) _sync_and_show(room_id)
# User 1 sends another read receipt. # User 1 sends another read receipt.
# print("@test reads everything") print("@test reads everything")
# result = requests.post( result = requests.post(
# f"{HOMESERVER}/_matrix/client/v3/rooms/{room_id}/receipt/m.read/{event_ids[-1]}/{event_ids[0]}", f"{HOMESERVER}/_matrix/client/v3/rooms/{room_id}/receipt/m.read/{event_ids[-1]}/{event_ids[0]}",
# headers=USER_1_HEADERS, headers=USER_1_HEADERS,
# json={}, json={},
# ) )
# _check_for_status(result) _check_for_status(result)
# _sync_and_show(room_id) _sync_and_show(room_id)
if __name__ == "__main__": if __name__ == "__main__":

View file

@ -651,28 +651,34 @@ class ReceiptsWorkerStore(SQLBaseStore):
""" """
assert self._can_write_to_receipts assert self._can_write_to_receipts
start_topo_ordering = None
start_stream_ordering = None
if start_event_id is not None: if start_event_id is not None:
res = self.db_pool.simple_select_one_onecol_txn( res = self.db_pool.simple_select_one_txn(
txn, txn,
table="events", table="events",
# XXX Use topo ordering retcols=("topological_ordering", "stream_ordering"),
retcol="stream_ordering",
keyvalues={"event_id": start_event_id}, keyvalues={"event_id": start_event_id},
allow_none=True, allow_none=True,
) )
start_stream_ordering = int(res) if res else None if res is not None:
else: start_topo_ordering = int(res["topological_ordering"])
start_stream_ordering = None start_stream_ordering = int(res["stream_ordering"])
res = self.db_pool.simple_select_one_txn( res = self.db_pool.simple_select_one_txn(
txn, txn,
table="events", table="events",
# XXX Use topo ordering retcols=("topological_ordering", "stream_ordering", "received_ts"),
retcols=["stream_ordering", "received_ts"],
keyvalues={"event_id": end_event_id}, keyvalues={"event_id": end_event_id},
allow_none=True, allow_none=True,
) )
end_stream_ordering = int(res["stream_ordering"]) if res else None end_topo_ordering = (
None # XXX When is it valid to not find this event? Federation?
)
end_stream_ordering = None
if res is not None:
end_topo_ordering = int(res["topological_ordering"])
end_stream_ordering = int(res["stream_ordering"])
# XXX This is just for logging in the caller, can it be removed. # XXX This is just for logging in the caller, can it be removed.
rx_ts = res["received_ts"] if res else 0 rx_ts = res["received_ts"] if res else 0
@ -684,8 +690,80 @@ class ReceiptsWorkerStore(SQLBaseStore):
self._receipts_stream_cache.entity_has_changed, room_id, stream_id self._receipts_stream_cache.entity_has_changed, room_id, stream_id
) )
# Splat the receipt into the table. # Find all overlapping or adjacent receipts. These receipts are found by
# XXX This might overlap other ranges, should coalesce. # searching for any receipts which:
#
# * Have an end topological ordering directly before or after the new
# receipt's start topological ordering.
# * Have a start topological ordering directly after or before the new
# receipt's end topological ordering.
#
# E.g. the following would be found:
#
# * [1, 7] and [8, 10] should be combined.
# * [1, 7] and [5, 10] should be combined.
# * [None, 7] and [5, 10] should be combined.
#
# XXX Do we care about stream ordering here?
#
# XXX This doesn't handle a start_topo_ordering of None.
sql = """
SELECT
stream_id,
start_event_id,
start_event.topological_ordering,
end_event_id,
end_event.topological_ordering
FROM receipts_ranged
LEFT JOIN events AS end_event ON (end_event.event_id = end_event_id)
LEFT JOIN events AS start_event ON (start_event.event_id = start_event_id)
WHERE
receipts_ranged.room_id = ? AND
user_id = ? AND
receipt_type = ? AND
end_event.topological_ordering >= ? AND
start_event.topological_ordering <= ?;
"""
txn.execute(
sql,
(
room_id,
user_id,
receipt_type,
start_topo_ordering - 1 if start_topo_ordering is not None else None,
end_topo_ordering + 1,
),
)
overlapping_receipts = txn.fetchall()
# Delete the overlapping receipts by stream ID.
self.db_pool.simple_delete_many_txn(
txn,
table="receipts_ranged",
column="stream_id",
values=[receipt[0] for receipt in overlapping_receipts],
keyvalues={},
)
# Potentially expand the start/end event based on overlapping receipts.
for (
_,
overlapping_start_event_id,
overlapping_start_topo_ordering,
overlapping_end_event_id,
overlapping_end_topo_ordering,
) in overlapping_receipts:
if (
start_topo_ordering is not None
and overlapping_start_topo_ordering < start_topo_ordering
):
start_topo_ordering = overlapping_start_topo_ordering
start_event_id = overlapping_start_event_id
if end_topo_ordering < overlapping_end_topo_ordering:
end_topo_ordering = overlapping_end_topo_ordering
end_event_id = overlapping_end_event_id
# Insert the new receipt into the table.
self.db_pool.simple_insert_txn( self.db_pool.simple_insert_txn(
txn, txn,
table="receipts_ranged", table="receipts_ranged",
@ -716,6 +794,7 @@ class ReceiptsWorkerStore(SQLBaseStore):
and receipt_type in (ReceiptTypes.READ, ReceiptTypes.READ_PRIVATE) and receipt_type in (ReceiptTypes.READ, ReceiptTypes.READ_PRIVATE)
and (start_stream_ordering is not None or end_stream_ordering is not None) and (start_stream_ordering is not None or end_stream_ordering is not None)
): ):
# XXX Topo ordering?
self._remove_old_push_actions_txn( # type: ignore[attr-defined] self._remove_old_push_actions_txn( # type: ignore[attr-defined]
txn, room_id, user_id, end_stream_ordering, start_stream_ordering txn, room_id, user_id, end_stream_ordering, start_stream_ordering
) )