Start splitting receipts on new events.

This commit is contained in:
Patrick Cloke 2022-06-02 08:31:36 -04:00
parent ad6b7cf5c6
commit 0c395fd8b9
3 changed files with 233 additions and 32 deletions

View file

@ -98,7 +98,7 @@ def main():
# Create a new room as user 2, add a bunch of messages.
result = requests.post(
f"{HOMESERVER}/_matrix/client/v3/createRoom",
json={"visibility": "public", "name": f"Road to Nowhere ({monotonic()})"},
json={"visibility": "public", "name": f"Ranged Read Receipts ({monotonic()})"},
headers=USER_2_HEADERS,
)
_check_for_status(result)
@ -115,29 +115,16 @@ def main():
# User 2 sends some messages.
event_ids = []
with open("road_to_no_where.txt", "r") as f:
count = 0
forks = 1
for line in f.readlines():
line = line.strip()
if not line:
if forks < 3:
last_event_id = first_event_id
forks += 1
else:
# Let the server figure it out.
last_event_id = None
continue
# Send a msg to the room.
last_event_id = _send_event(room_id, line, last_event_id)
event_ids.append(last_event_id)
sleep(1)
def _send_and_append(body, prev_message_id = None):
event_id = _send_event(room_id, body, prev_message_id)
event_ids.append(event_id)
return event_id
count += 1
if count == 20: # End of second verse
break
prev_message_id = first_message_id = _send_and_append("Root")
for msg in range(3):
prev_message_id = _send_and_append(f"Fork 1 Message {msg}", prev_message_id)
sleep(1)
# User 2 sends a read receipt.
print("@second reads to end")
@ -151,9 +138,9 @@ def main():
_sync_and_show(room_id)
# User 1 sends a read receipt.
print("@test reads from 3 -> 8")
print("@test reads from fork 1")
result = requests.post(
f"{HOMESERVER}/_matrix/client/v3/rooms/{room_id}/receipt/m.read/{event_ids[8]}/{event_ids[3]}",
f"{HOMESERVER}/_matrix/client/v3/rooms/{room_id}/receipt/m.read/{event_ids[3]}/{event_ids[1]}",
headers=USER_1_HEADERS,
json={},
)
@ -161,16 +148,26 @@ def main():
_sync_and_show(room_id)
# Create a fork in the DAG.
prev_message_id = first_message_id
for msg in range(1):
prev_message_id = _send_and_append(f"Fork 2 Message {msg}", prev_message_id)
sleep(1)
# # Join the forks.
_send_and_append("Tail")
_sync_and_show(room_id)
# User 1 sends another read receipt.
print("@test reads from 13 -> 14")
result = requests.post(
f"{HOMESERVER}/_matrix/client/v3/rooms/{room_id}/receipt/m.read/{event_ids[14]}/{event_ids[13]}",
headers=USER_1_HEADERS,
json={},
)
_check_for_status(result)
# print("@test reads everything")
# result = requests.post(
# f"{HOMESERVER}/_matrix/client/v3/rooms/{room_id}/receipt/m.read/{event_ids[-1]}/{event_ids[0]}",
# headers=USER_1_HEADERS,
# json={},
# )
# _check_for_status(result)
_sync_and_show(room_id)
# _sync_and_show(room_id)
if __name__ == "__main__":

View file

@ -28,6 +28,7 @@ from typing import (
Sequence,
Set,
Tuple,
cast,
)
import attr
@ -195,6 +196,9 @@ class PersistEventsStore:
)
persist_event_counter.inc(len(events_and_contexts))
# Update any receipts for users in the rooms.
await self._update_receipts(events_and_contexts)
if not use_negative_stream_ordering:
# we don't want to set the event_persisted_position to a negative
# stream_ordering.
@ -2099,6 +2103,174 @@ class PersistEventsStore:
),
)
def _get_receipts_to_update(
self, txn: LoggingTransaction, event: EventBase
) -> List[tuple]:
# Find any receipt ranges that would be "broken" by this event.
sql = """
SELECT
stream_id,
receipts_ranged.room_id,
receipt_type,
user_id,
start_event_id,
end_event_id,
data,
start_event.topological_ordering,
end_event.topological_ordering
FROM receipts_ranged
LEFT JOIN events AS end_event ON (end_event.event_id = end_event_id)
LEFT JOIN events AS start_event ON (start_event.event_id = start_event_id)
WHERE
receipts_ranged.room_id = ? AND
(start_event.topological_ordering <= ? OR start_event_id IS NULL) AND
? <= end_event.topological_ordering;
"""
txn.execute(
sql,
(event.room_id, event.depth, event.depth),
)
return list(txn.fetchall())
def _split_receipt(
self,
txn: LoggingTransaction,
event: EventBase,
stream_id: int,
room_id: str,
receipt_type: str,
user_id: str,
start_event_id: str,
end_event_id: str,
data: JsonDict,
start_topological_ordering: int,
end_topological_ordering: int,
stream_orderings: Tuple[int, ...],
) -> None:
# Upsert the current receipt to give it a new endpoint as the
# latest event in the range before the new event.
sql = """
SELECT event_id FROM events
WHERE room_id = ? AND topological_ordering <= ? AND stream_ordering < ?
ORDER BY topological_ordering DESC, stream_ordering DESC LIMIT 1;
"""
txn.execute(
sql,
(
event.room_id,
event.depth,
event.internal_metadata.stream_ordering,
),
)
new_end_event_id = cast(Tuple[str], txn.fetchone())[0] # XXX Can this be None?
# TODO Upsert?
self.db_pool.simple_delete_one_txn(
txn, table="receipts_ranged", keyvalues={"stream_id": stream_id}
)
self.db_pool.simple_insert_txn(
txn,
table="receipts_ranged",
values={
"room_id": room_id,
"user_id": user_id,
"receipt_type": receipt_type,
"start_event_id": start_event_id,
"end_event_id": new_end_event_id,
"stream_id": stream_orderings[0],
"data": data, # XXX Does it make sense to duplicate this?
},
)
# Insert a new receipt with a start point as the first event after
# the new event and re-using the old endpoint.
sql = """
SELECT event_id FROM events
WHERE room_id = ? AND topological_ordering > ? AND stream_ordering < ?
ORDER BY topological_ordering, stream_ordering LIMIT 1;
"""
txn.execute(
sql,
(
event.room_id,
event.depth,
event.internal_metadata.stream_ordering,
),
)
row = txn.fetchone()
# If there's no events topologically after the end event, the
# second range is just for the single event.
if row is not None:
new_start_event_id = row[0]
else:
new_start_event_id = end_event_id
self.db_pool.simple_insert_txn(
txn,
table="receipts_ranged",
values={
"room_id": room_id,
"user_id": user_id,
"receipt_type": receipt_type,
"start_event_id": new_start_event_id,
"end_event_id": end_event_id,
"stream_id": stream_orderings[1],
"data": data, # XXX Does it make sense to duplicate this?
},
)
txn.call_after(
self.store.invalidate_caches_for_receipt,
room_id,
receipt_type,
user_id,
)
async def _update_receipts(
self, events_and_contexts: List[Tuple[EventBase, EventContext]]
) -> None:
# Only non-outlier events can have a receipt associated with them.
# XXX Is this true?
non_outlier_events = [
event
for event, _ in events_and_contexts
if not event.internal_metadata.is_outlier()
]
# XXX This is probably slow...
for event in non_outlier_events:
receipts = await self.db_pool.runInteraction(
"update_receipts", self._get_receipts_to_update, event=event
)
# Split each receipt in two by the new event.
for (
stream_id,
room_id,
receipt_type,
user_id,
start_event_id,
end_event_id,
data,
start_topological_ordering,
end_topological_ordering,
) in receipts:
async with self.store._receipts_id_gen.get_next_mult(2) as stream_orderings: # type: ignore[attr-defined]
await self.db_pool.runInteraction(
"split_receipts",
self._split_receipt,
event=event,
stream_id=stream_id,
room_id=room_id,
receipt_type=receipt_type,
user_id=user_id,
start_event_id=start_event_id,
end_event_id=end_event_id,
data=data,
start_topological_ordering=start_topological_ordering,
end_topological_ordering=end_topological_ordering,
stream_orderings=stream_orderings,
)
def _set_push_actions_for_event_and_users_txn(
self,
txn: LoggingTransaction,

View file

@ -13,6 +13,38 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Receipts are stored as per-user ranges from a starting event to an ending event.
If the starting event is missing than the range is considered to cover all events
earlier in the room than the ending events.
Since events in a room are a DAG we need to linearise it before applying receipts.
Synapse linearises the room by sorting events by (topological ordering, stream ordering).
To ensure that receipts are non-overlapping and correct the following operations
need to occur:
* When a new receipt is received from a client, we coalesce it with other receipts.
* When new events are received, any receipt range which includes the event's
topological ordering must be split into two receipts.
Given a simple linear room:
A--B--C--D
This is covered by a single receipt [A, D]
If a forked in the DAG occurs:
A--B--C--D which linearises to: A--B--E--C--F--D
\ /
E---F
The receipt from above must be split into component parts:
[A, B]
[C, C]
[D, D]
"""
import logging
from typing import (
TYPE_CHECKING,