From cdc9e50a5d4cba8a7dfec32d81756eaf45f54ca5 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Thu, 16 Nov 2017 15:29:10 +0000 Subject: [PATCH 1/4] Cleanup in _simple_upsert_txn Bail out early to reduce indentation --- synapse/storage/_base.py | 35 ++++++++++++++++++----------------- 1 file changed, 18 insertions(+), 17 deletions(-) diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 7ebd4f189d..740400d58b 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -493,7 +493,7 @@ class SQLBaseStore(object): if lock: self.database_engine.lock_table(txn, table) - # Try to update + # First try to update. sql = "UPDATE %s SET %s WHERE %s" % ( table, ", ".join("%s = ?" % (k,) for k in values), @@ -502,24 +502,25 @@ class SQLBaseStore(object): sqlargs = values.values() + keyvalues.values() txn.execute(sql, sqlargs) - if txn.rowcount == 0: - # We didn't update and rows so insert a new one - allvalues = {} - allvalues.update(keyvalues) - allvalues.update(values) - allvalues.update(insertion_values) - - sql = "INSERT INTO %s (%s) VALUES (%s)" % ( - table, - ", ".join(k for k in allvalues), - ", ".join("?" for _ in allvalues) - ) - txn.execute(sql, allvalues.values()) - - return True - else: + if txn.rowcount > 0: + # successfully updated at least one row. return False + # We didn't update any rows so insert a new one + allvalues = {} + allvalues.update(keyvalues) + allvalues.update(values) + allvalues.update(insertion_values) + + sql = "INSERT INTO %s (%s) VALUES (%s)" % ( + table, + ", ".join(k for k in allvalues), + ", ".join("?" for _ in allvalues) + ) + txn.execute(sql, allvalues.values()) + # successfully inserted + return True + def _simple_select_one(self, table, keyvalues, retcols, allow_none=False, desc="_simple_select_one"): """Executes a SELECT query on the named table, which is expected to From 10aaa1bc15775a228ce22ab45efbb55b5099289b Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Thu, 16 Nov 2017 15:30:15 +0000 Subject: [PATCH 2/4] _simple_upsert: retry on IntegrityError wrap the call to _simple_upsert_txn in a loop so that we retry on an integrityerror: this means we can avoid locking the table provided there is an unique index. --- synapse/storage/_base.py | 35 +++++++++++++++++++++++++++++------ 1 file changed, 29 insertions(+), 6 deletions(-) diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 740400d58b..1582a58966 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -469,23 +469,46 @@ class SQLBaseStore(object): txn.executemany(sql, vals) + @defer.inlineCallbacks def _simple_upsert(self, table, keyvalues, values, insertion_values={}, desc="_simple_upsert", lock=True): """ + + `lock` should generally be set to True (the default), but can be set + to False if either of the following are true: + + * there is a UNIQUE INDEX on the key columns. In this case a conflict + will cause an IntegrityError in which case this function will retry + the update. + + * we somehow know that we are the only thread which will be updating + this table. + Args: table (str): The table to upsert into keyvalues (dict): The unique key tables and their new values values (dict): The nonunique columns and their new values - insertion_values (dict): key/values to use when inserting + insertion_values (dict): additional key/values to use only when + inserting + lock (bool): True to lock the table when doing the upsert. Returns: Deferred(bool): True if a new entry was created, False if an existing one was updated. """ - return self.runInteraction( - desc, - self._simple_upsert_txn, table, keyvalues, values, insertion_values, - lock - ) + while True: + try: + result = yield self.runInteraction( + desc, + self._simple_upsert_txn, table, keyvalues, values, insertion_values, + lock=lock + ) + defer.returnValue(result) + except self.database_engine.IntegrityError as e: + # presumably we raced with another transaction: let's retry. + logger.warn( + "IntegrityError when upserting into %s; retrying: %s", + table, e + ) def _simple_upsert_txn(self, txn, table, keyvalues, values, insertion_values={}, lock=True): From 7ab2b69e188e84a1360f0381f36af4b8445395bf Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Thu, 16 Nov 2017 15:32:01 +0000 Subject: [PATCH 3/4] Avoid locking `pushers` table on upsert Now that _simple_upsert will retry on IntegrityError, we don't need to lock the table. --- synapse/storage/pusher.py | 55 ++++++++++++++++++++------------------- 1 file changed, 28 insertions(+), 27 deletions(-) diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py index 34d2f82b7f..19ce41fde9 100644 --- a/synapse/storage/pusher.py +++ b/synapse/storage/pusher.py @@ -204,34 +204,35 @@ class PusherStore(SQLBaseStore): pushkey, pushkey_ts, lang, data, last_stream_ordering, profile_tag=""): with self._pushers_id_gen.get_next() as stream_id: - def f(txn): - newly_inserted = self._simple_upsert_txn( - txn, - "pushers", - { - "app_id": app_id, - "pushkey": pushkey, - "user_name": user_id, - }, - { - "access_token": access_token, - "kind": kind, - "app_display_name": app_display_name, - "device_display_name": device_display_name, - "ts": pushkey_ts, - "lang": lang, - "data": encode_canonical_json(data), - "last_stream_ordering": last_stream_ordering, - "profile_tag": profile_tag, - "id": stream_id, - }, - ) - if newly_inserted: - # get_if_user_has_pusher only cares if the user has - # at least *one* pusher. - txn.call_after(self.get_if_user_has_pusher.invalidate, (user_id,)) + # no need to lock because `pushers` has a unique key on + # (app_id, pushkey, user_name) so _simple_upsert will retry + newly_inserted = yield self._simple_upsert( + table="pushers", + keyvalues={ + "app_id": app_id, + "pushkey": pushkey, + "user_name": user_id, + }, + values={ + "access_token": access_token, + "kind": kind, + "app_display_name": app_display_name, + "device_display_name": device_display_name, + "ts": pushkey_ts, + "lang": lang, + "data": encode_canonical_json(data), + "last_stream_ordering": last_stream_ordering, + "profile_tag": profile_tag, + "id": stream_id, + }, + desc="add_pusher", + lock=False, + ) - yield self.runInteraction("add_pusher", f) + if newly_inserted: + # get_if_user_has_pusher only cares if the user has + # at least *one* pusher. + self.get_if_user_has_pusher.invalidate(user_id,) @defer.inlineCallbacks def delete_pusher_by_app_id_pushkey_user_id(self, app_id, pushkey, user_id): From 77a122787022c302f0b2abcb63bd2f7d8514d692 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Thu, 16 Nov 2017 16:03:38 +0000 Subject: [PATCH 4/4] Fix broken ref to IntegrityError --- synapse/storage/_base.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 1582a58966..e6eefdd6fe 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -503,7 +503,7 @@ class SQLBaseStore(object): lock=lock ) defer.returnValue(result) - except self.database_engine.IntegrityError as e: + except self.database_engine.module.IntegrityError as e: # presumably we raced with another transaction: let's retry. logger.warn( "IntegrityError when upserting into %s; retrying: %s",