From f237303c32f866820df12d87d4d5579cc72cf7c6 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 10 Jun 2024 16:58:58 +0100 Subject: [PATCH] Handle previously persisted events properly --- synapse/storage/databases/main/events.py | 25 +++++++++++++++++++++++- 1 file changed, 24 insertions(+), 1 deletion(-) diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index 7c32d1884c..0ece4b1b40 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -254,8 +254,15 @@ class PersistEventsStore: async def calculate_chain_cover_index_for_events( self, room_id: str, events: Collection[EventBase] ) -> Dict[str, NewEventChainLinks]: + # Filter to state events, and ensure there are no duplicates. + state_events = [] + seen_events = set() + for event in events: + if not event.is_state() or event.event_id in seen_events: + continue - state_events = [event for event in events if event.is_state()] + state_events.append(event) + seen_events.add(event.event_id) if not state_events: return {} @@ -289,6 +296,22 @@ class PersistEventsStore: if row is None: return {} + # Filter out already persisted events. + rows = self.db_pool.simple_select_many_txn( + txn, + table="events", + column="event_id", + iterable=[e.event_id for e in state_events], + keyvalues={}, + retcols=("event_id",), + ) + already_persisted_events = {event_id for event_id, in rows} + state_events = [ + event + for event in state_events + if event.event_id in already_persisted_events + ] + if not state_events: return {}