diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py index dda3027b61..ca1f3977c9 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py @@ -303,74 +303,81 @@ class EventFederationStore(SQLBaseStore): ], ) + @defer.inlineCallbacks + def _update_extremeties(self, events): events_by_room = {} for ev in events: events_by_room.setdefault(ev.room_id, []).append(ev) - for room_id, room_events in events_by_room.items(): - prevs = [ - e_id for ev in room_events for e_id, _ in ev.prev_events + def _update_forwards_txn(txn): + for room_id, room_events in events_by_room.items(): + prevs = [ + e_id for ev in room_events for e_id, _ in ev.prev_events + if not ev.internal_metadata.is_outlier() + ] + if prevs: + txn.execute( + "DELETE FROM event_forward_extremities" + " WHERE room_id = ?" + " AND event_id in (%s)" % ( + ",".join(["?"] * len(prevs)), + ), + [room_id] + prevs, + ) + + query = ( + "INSERT INTO event_forward_extremities (event_id, room_id)" + " SELECT ?, ? WHERE NOT EXISTS (" + " SELECT 1 FROM event_edges WHERE prev_event_id = ?" + " )" + ) + + txn.executemany( + query, + [ + (ev.event_id, ev.room_id, ev.event_id) for ev in events + if not ev.internal_metadata.is_outlier() + ] + ) + + def _update_backwards_txn(txn): + query = ( + "INSERT INTO event_backward_extremities (event_id, room_id)" + " SELECT ?, ? WHERE NOT EXISTS (" + " SELECT 1 FROM event_backward_extremities" + " WHERE event_id = ? AND room_id = ?" + " )" + " AND NOT EXISTS (" + " SELECT 1 FROM events WHERE event_id = ? AND room_id = ? " + " AND outlier = ?" + " )" + ) + + txn.executemany(query, [ + (e_id, ev.room_id, e_id, ev.room_id, e_id, ev.room_id, False) + for ev in events for e_id, _ in ev.prev_events if not ev.internal_metadata.is_outlier() - ] - if prevs: - txn.execute( - "DELETE FROM event_forward_extremities" - " WHERE room_id = ?" - " AND event_id in (%s)" % ( - ",".join(["?"] * len(prevs)), - ), - [room_id] + prevs, + ]) + + query = ( + "DELETE FROM event_backward_extremities" + " WHERE event_id = ? AND room_id = ?" + ) + txn.executemany( + query, + [ + (ev.event_id, ev.room_id) for ev in events + if not ev.internal_metadata.is_outlier() + ] + ) + + for room_id in events_by_room: + txn.call_after( + self.get_latest_event_ids_in_room.invalidate, (room_id,) ) - query = ( - "INSERT INTO event_forward_extremities (event_id, room_id)" - " SELECT ?, ? WHERE NOT EXISTS (" - " SELECT 1 FROM event_edges WHERE prev_event_id = ?" - " )" - ) - - txn.executemany( - query, - [ - (ev.event_id, ev.room_id, ev.event_id) for ev in events - if not ev.internal_metadata.is_outlier() - ] - ) - - query = ( - "INSERT INTO event_backward_extremities (event_id, room_id)" - " SELECT ?, ? WHERE NOT EXISTS (" - " SELECT 1 FROM event_backward_extremities" - " WHERE event_id = ? AND room_id = ?" - " )" - " AND NOT EXISTS (" - " SELECT 1 FROM events WHERE event_id = ? AND room_id = ? " - " AND outlier = ?" - " )" - ) - - txn.executemany(query, [ - (e_id, ev.room_id, e_id, ev.room_id, e_id, ev.room_id, False) - for ev in events for e_id, _ in ev.prev_events - if not ev.internal_metadata.is_outlier() - ]) - - query = ( - "DELETE FROM event_backward_extremities" - " WHERE event_id = ? AND room_id = ?" - ) - txn.executemany( - query, - [ - (ev.event_id, ev.room_id) for ev in events - if not ev.internal_metadata.is_outlier() - ] - ) - - for room_id in events_by_room: - txn.call_after( - self.get_latest_event_ids_in_room.invalidate, (room_id,) - ) + yield self.runInteraction("_update_forwards_txn", _update_forwards_txn) + yield self.runInteraction("_update_backwards_txn", _update_backwards_txn) def get_backfill_events(self, room_id, event_list, limit): """Get a list of Events for a given topic that occurred before (and diff --git a/synapse/storage/events.py b/synapse/storage/events.py index e3eabab13d..0501eeb6e0 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -86,6 +86,10 @@ class EventsStore(SQLBaseStore): is_new_state=is_new_state, ) + yield self._update_extremeties([ + ev for ev, _ in chunk + ]) + @defer.inlineCallbacks @log_function def persist_event(self, event, context, backfilled=False, @@ -120,6 +124,8 @@ class EventsStore(SQLBaseStore): except _RollbackButIsFineException: pass + yield self._update_extremeties([event]) + max_persisted_id = yield self._stream_id_gen.get_max_token(self) defer.returnValue((stream_ordering, max_persisted_id))