From 183cacac90ca237b448da244270d55920470389b Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 22 Apr 2016 09:37:16 +0100 Subject: [PATCH] Simplify query and handle finishing correctly --- synapse/storage/background_updates.py | 3 ++- synapse/storage/search.py | 30 +++++++++++++-------------- 2 files changed, 16 insertions(+), 17 deletions(-) diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py index 49904046cf..66a995157d 100644 --- a/synapse/storage/background_updates.py +++ b/synapse/storage/background_updates.py @@ -173,11 +173,12 @@ class BackgroundUpdateStore(SQLBaseStore): logger.info( "Updating %r. Updated %r items in %rms." - " (total_rate=%r/ms, current_rate=%r/ms, total_updated=%r)", + " (total_rate=%r/ms, current_rate=%r/ms, total_updated=%r, batch_size=%r)", update_name, items_updated, duration_ms, performance.total_items_per_ms(), performance.average_items_per_ms(), performance.total_item_count, + batch_size, ) performance.update(items_updated, duration_ms) diff --git a/synapse/storage/search.py b/synapse/storage/search.py index 2c71db8c96..0224299625 100644 --- a/synapse/storage/search.py +++ b/synapse/storage/search.py @@ -169,28 +169,26 @@ class SearchStore(BackgroundUpdateStore): yield self.runInteraction( self.EVENT_SEARCH_ORDER_UPDATE_NAME, self._background_update_progress_txn, - self.EVENT_SEARCH_ORDER_UPDATE_NAME, progress, + self.EVENT_SEARCH_ORDER_UPDATE_NAME, pg, ) def reindex_search_txn(txn): - events_sql = ( - "SELECT stream_ordering, origin_server_ts, event_id FROM events" - " WHERE ? <= stream_ordering AND stream_ordering < ?" - " ORDER BY stream_ordering DESC" - " LIMIT ?" - ) - sql = ( "UPDATE event_search AS es SET stream_ordering = e.stream_ordering," " origin_server_ts = e.origin_server_ts" - " FROM (%s) AS e" + " FROM events AS e" " WHERE e.event_id = es.event_id" + " AND ? <= e.stream_ordering AND e.stream_ordering < ?" " RETURNING es.stream_ordering" - ) % (events_sql,) + ) - txn.execute(sql, (target_min_stream_id, max_stream_id, batch_size)) + min_stream_id = max_stream_id - batch_size + txn.execute(sql, (min_stream_id, max_stream_id)) rows = txn.fetchall() - min_stream_id = rows[-1][0] + + if min_stream_id < target_min_stream_id: + # We've recached the end. + return len(rows), False progress = { "target_min_stream_id_inclusive": target_min_stream_id, @@ -203,16 +201,16 @@ class SearchStore(BackgroundUpdateStore): txn, self.EVENT_SEARCH_ORDER_UPDATE_NAME, progress ) - return len(rows) + return len(rows), True - result = yield self.runInteraction( + num_rows, finished = yield self.runInteraction( self.EVENT_SEARCH_ORDER_UPDATE_NAME, reindex_search_txn ) - if not result: + if not finished: yield self._end_background_update(self.EVENT_SEARCH_ORDER_UPDATE_NAME) - defer.returnValue(result) + defer.returnValue(num_rows) @defer.inlineCallbacks def search_msgs(self, room_ids, search_term, keys):