Pass a Receipt into insert_receipt.

This commit is contained in:
Patrick Cloke 2022-05-26 15:48:45 -04:00
parent f68b5e5773
commit bc7e8a5e60
2 changed files with 20 additions and 30 deletions

View file

@ -109,13 +109,7 @@ class ReceiptsHandler:
max_batch_id: Optional[int] = None
for receipt in receipts:
res = await self.store.insert_receipt(
receipt.room_id,
receipt.receipt_type,
receipt.user_id,
receipt.event_ids,
receipt.data,
)
res = await self.store.insert_receipt(receipt)
if not res:
# res will be None if this receipt is 'old'

View file

@ -42,7 +42,7 @@ from synapse.storage.util.id_generators import (
MultiWriterIdGenerator,
StreamIdGenerator,
)
from synapse.types import JsonDict
from synapse.types import JsonDict, ReadReceipt
from synapse.util import json_encoder
from synapse.util.caches.descriptors import cached, cachedList
from synapse.util.caches.stream_change_cache import StreamChangeCache
@ -725,14 +725,7 @@ class ReceiptsWorkerStore(SQLBaseStore):
else:
raise RuntimeError("Unrecognized event_ids: %r" % (event_ids,))
async def insert_receipt(
self,
room_id: str,
receipt_type: str,
user_id: str,
event_ids: List[str],
data: dict,
) -> Optional[Tuple[int, int]]:
async def insert_receipt(self, receipt: ReadReceipt) -> Optional[Tuple[int, int]]:
"""Insert a receipt, either from local client or remote server.
Automatically does conversion between linearized and graph
@ -744,26 +737,29 @@ class ReceiptsWorkerStore(SQLBaseStore):
"""
assert self._can_write_to_receipts
if not event_ids:
if not receipt.event_ids:
return None
if len(event_ids) == 1:
linearized_event_id = event_ids[0]
if len(receipt.event_ids) == 1:
linearized_event_id = receipt.event_ids[0]
else:
# we need to points in graph -> linearized form.
linearized_event_id = await self.db_pool.runInteraction(
"insert_receipt_conv", self._graph_to_linear, room_id, event_ids
"insert_receipt_conv",
self._graph_to_linear,
receipt.room_id,
receipt.event_ids,
)
async with self._receipts_id_gen.get_next() as stream_id: # type: ignore[attr-defined]
event_ts = await self.db_pool.runInteraction(
"insert_linearized_receipt",
self._insert_linearized_receipt_txn,
room_id,
receipt_type,
user_id,
receipt.room_id,
receipt.receipt_type,
receipt.user_id,
linearized_event_id,
data,
receipt.data,
stream_id=stream_id,
# Read committed is actually beneficial here because we check for a receipt with
# greater stream order, and checking the very latest data at select time is better
@ -779,18 +775,18 @@ class ReceiptsWorkerStore(SQLBaseStore):
logger.debug(
"RR for event %s in %s (%i ms old)",
linearized_event_id,
room_id,
receipt.room_id,
now - event_ts,
)
await self.db_pool.runInteraction(
"insert_graph_receipt",
self._insert_graph_receipt_txn,
room_id,
receipt_type,
user_id,
event_ids,
data,
receipt.room_id,
receipt.receipt_type,
receipt.user_id,
receipt.event_ids,
receipt.data,
)
max_persisted_id = self._receipts_id_gen.get_current_token()