diff --git a/rrr_test.py b/rrr_test.py index e37a84c602..4df7fbf81b 100644 --- a/rrr_test.py +++ b/rrr_test.py @@ -150,7 +150,7 @@ def main(): # Create a fork in the DAG. prev_message_id = first_message_id - for msg in range(1): + for msg in range(3): prev_message_id = _send_and_append(f"Fork 2 Message {msg}", prev_message_id) sleep(1) # # Join the forks. @@ -159,15 +159,15 @@ def main(): _sync_and_show(room_id) # User 1 sends another read receipt. - # print("@test reads everything") - # result = requests.post( - # f"{HOMESERVER}/_matrix/client/v3/rooms/{room_id}/receipt/m.read/{event_ids[-1]}/{event_ids[0]}", - # headers=USER_1_HEADERS, - # json={}, - # ) - # _check_for_status(result) + print("@test reads everything") + result = requests.post( + f"{HOMESERVER}/_matrix/client/v3/rooms/{room_id}/receipt/m.read/{event_ids[-1]}/{event_ids[0]}", + headers=USER_1_HEADERS, + json={}, + ) + _check_for_status(result) - # _sync_and_show(room_id) + _sync_and_show(room_id) if __name__ == "__main__": diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py index 269a7521c6..4622e8910e 100644 --- a/synapse/storage/databases/main/receipts.py +++ b/synapse/storage/databases/main/receipts.py @@ -651,28 +651,34 @@ class ReceiptsWorkerStore(SQLBaseStore): """ assert self._can_write_to_receipts + start_topo_ordering = None + start_stream_ordering = None if start_event_id is not None: - res = self.db_pool.simple_select_one_onecol_txn( + res = self.db_pool.simple_select_one_txn( txn, table="events", - # XXX Use topo ordering - retcol="stream_ordering", + retcols=("topological_ordering", "stream_ordering"), keyvalues={"event_id": start_event_id}, allow_none=True, ) - start_stream_ordering = int(res) if res else None - else: - start_stream_ordering = None + if res is not None: + start_topo_ordering = int(res["topological_ordering"]) + start_stream_ordering = int(res["stream_ordering"]) res = self.db_pool.simple_select_one_txn( txn, table="events", - # XXX Use topo ordering - retcols=["stream_ordering", "received_ts"], + retcols=("topological_ordering", "stream_ordering", "received_ts"), keyvalues={"event_id": end_event_id}, allow_none=True, ) - end_stream_ordering = int(res["stream_ordering"]) if res else None + end_topo_ordering = ( + None # XXX When is it valid to not find this event? Federation? + ) + end_stream_ordering = None + if res is not None: + end_topo_ordering = int(res["topological_ordering"]) + end_stream_ordering = int(res["stream_ordering"]) # XXX This is just for logging in the caller, can it be removed. rx_ts = res["received_ts"] if res else 0 @@ -684,8 +690,80 @@ class ReceiptsWorkerStore(SQLBaseStore): self._receipts_stream_cache.entity_has_changed, room_id, stream_id ) - # Splat the receipt into the table. - # XXX This might overlap other ranges, should coalesce. + # Find all overlapping or adjacent receipts. These receipts are found by + # searching for any receipts which: + # + # * Have an end topological ordering directly before or after the new + # receipt's start topological ordering. + # * Have a start topological ordering directly after or before the new + # receipt's end topological ordering. + # + # E.g. the following would be found: + # + # * [1, 7] and [8, 10] should be combined. + # * [1, 7] and [5, 10] should be combined. + # * [None, 7] and [5, 10] should be combined. + # + # XXX Do we care about stream ordering here? + # + # XXX This doesn't handle a start_topo_ordering of None. + sql = """ + SELECT + stream_id, + start_event_id, + start_event.topological_ordering, + end_event_id, + end_event.topological_ordering + FROM receipts_ranged + LEFT JOIN events AS end_event ON (end_event.event_id = end_event_id) + LEFT JOIN events AS start_event ON (start_event.event_id = start_event_id) + WHERE + receipts_ranged.room_id = ? AND + user_id = ? AND + receipt_type = ? AND + end_event.topological_ordering >= ? AND + start_event.topological_ordering <= ?; + """ + txn.execute( + sql, + ( + room_id, + user_id, + receipt_type, + start_topo_ordering - 1 if start_topo_ordering is not None else None, + end_topo_ordering + 1, + ), + ) + overlapping_receipts = txn.fetchall() + # Delete the overlapping receipts by stream ID. + self.db_pool.simple_delete_many_txn( + txn, + table="receipts_ranged", + column="stream_id", + values=[receipt[0] for receipt in overlapping_receipts], + keyvalues={}, + ) + + # Potentially expand the start/end event based on overlapping receipts. + for ( + _, + overlapping_start_event_id, + overlapping_start_topo_ordering, + overlapping_end_event_id, + overlapping_end_topo_ordering, + ) in overlapping_receipts: + if ( + start_topo_ordering is not None + and overlapping_start_topo_ordering < start_topo_ordering + ): + start_topo_ordering = overlapping_start_topo_ordering + start_event_id = overlapping_start_event_id + + if end_topo_ordering < overlapping_end_topo_ordering: + end_topo_ordering = overlapping_end_topo_ordering + end_event_id = overlapping_end_event_id + + # Insert the new receipt into the table. self.db_pool.simple_insert_txn( txn, table="receipts_ranged", @@ -716,6 +794,7 @@ class ReceiptsWorkerStore(SQLBaseStore): and receipt_type in (ReceiptTypes.READ, ReceiptTypes.READ_PRIVATE) and (start_stream_ordering is not None or end_stream_ordering is not None) ): + # XXX Topo ordering? self._remove_old_push_actions_txn( # type: ignore[attr-defined] txn, room_id, user_id, end_stream_ordering, start_stream_ordering )