From bc7e8a5e60179e77f2aabc80979ad52dc2e5dab6 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Thu, 26 May 2022 15:48:45 -0400 Subject: [PATCH] Pass a Receipt into insert_receipt. --- synapse/handlers/receipts.py | 8 +---- synapse/storage/databases/main/receipts.py | 42 ++++++++++------------ 2 files changed, 20 insertions(+), 30 deletions(-) diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py index 43d2882b0a..5588545850 100644 --- a/synapse/handlers/receipts.py +++ b/synapse/handlers/receipts.py @@ -109,13 +109,7 @@ class ReceiptsHandler: max_batch_id: Optional[int] = None for receipt in receipts: - res = await self.store.insert_receipt( - receipt.room_id, - receipt.receipt_type, - receipt.user_id, - receipt.event_ids, - receipt.data, - ) + res = await self.store.insert_receipt(receipt) if not res: # res will be None if this receipt is 'old' diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py index b6106affa6..2252dd8608 100644 --- a/synapse/storage/databases/main/receipts.py +++ b/synapse/storage/databases/main/receipts.py @@ -42,7 +42,7 @@ from synapse.storage.util.id_generators import ( MultiWriterIdGenerator, StreamIdGenerator, ) -from synapse.types import JsonDict +from synapse.types import JsonDict, ReadReceipt from synapse.util import json_encoder from synapse.util.caches.descriptors import cached, cachedList from synapse.util.caches.stream_change_cache import StreamChangeCache @@ -725,14 +725,7 @@ class ReceiptsWorkerStore(SQLBaseStore): else: raise RuntimeError("Unrecognized event_ids: %r" % (event_ids,)) - async def insert_receipt( - self, - room_id: str, - receipt_type: str, - user_id: str, - event_ids: List[str], - data: dict, - ) -> Optional[Tuple[int, int]]: + async def insert_receipt(self, receipt: ReadReceipt) -> Optional[Tuple[int, int]]: """Insert a receipt, either from local client or remote server. Automatically does conversion between linearized and graph @@ -744,26 +737,29 @@ class ReceiptsWorkerStore(SQLBaseStore): """ assert self._can_write_to_receipts - if not event_ids: + if not receipt.event_ids: return None - if len(event_ids) == 1: - linearized_event_id = event_ids[0] + if len(receipt.event_ids) == 1: + linearized_event_id = receipt.event_ids[0] else: # we need to points in graph -> linearized form. linearized_event_id = await self.db_pool.runInteraction( - "insert_receipt_conv", self._graph_to_linear, room_id, event_ids + "insert_receipt_conv", + self._graph_to_linear, + receipt.room_id, + receipt.event_ids, ) async with self._receipts_id_gen.get_next() as stream_id: # type: ignore[attr-defined] event_ts = await self.db_pool.runInteraction( "insert_linearized_receipt", self._insert_linearized_receipt_txn, - room_id, - receipt_type, - user_id, + receipt.room_id, + receipt.receipt_type, + receipt.user_id, linearized_event_id, - data, + receipt.data, stream_id=stream_id, # Read committed is actually beneficial here because we check for a receipt with # greater stream order, and checking the very latest data at select time is better @@ -779,18 +775,18 @@ class ReceiptsWorkerStore(SQLBaseStore): logger.debug( "RR for event %s in %s (%i ms old)", linearized_event_id, - room_id, + receipt.room_id, now - event_ts, ) await self.db_pool.runInteraction( "insert_graph_receipt", self._insert_graph_receipt_txn, - room_id, - receipt_type, - user_id, - event_ids, - data, + receipt.room_id, + receipt.receipt_type, + receipt.user_id, + receipt.event_ids, + receipt.data, ) max_persisted_id = self._receipts_id_gen.get_current_token()