Merge pull request #2684 from matrix-org/rav/unlock_upsert

Start work on avoiding table locks for upserts
This commit is contained in:
Richard van der Hoff 2017-11-16 16:27:29 +00:00 committed by GitHub
commit ba05f28ae7
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 75 additions and 50 deletions

View file

@ -469,23 +469,46 @@ class SQLBaseStore(object):
txn.executemany(sql, vals) txn.executemany(sql, vals)
@defer.inlineCallbacks
def _simple_upsert(self, table, keyvalues, values, def _simple_upsert(self, table, keyvalues, values,
insertion_values={}, desc="_simple_upsert", lock=True): insertion_values={}, desc="_simple_upsert", lock=True):
""" """
`lock` should generally be set to True (the default), but can be set
to False if either of the following are true:
* there is a UNIQUE INDEX on the key columns. In this case a conflict
will cause an IntegrityError in which case this function will retry
the update.
* we somehow know that we are the only thread which will be updating
this table.
Args: Args:
table (str): The table to upsert into table (str): The table to upsert into
keyvalues (dict): The unique key tables and their new values keyvalues (dict): The unique key tables and their new values
values (dict): The nonunique columns and their new values values (dict): The nonunique columns and their new values
insertion_values (dict): key/values to use when inserting insertion_values (dict): additional key/values to use only when
inserting
lock (bool): True to lock the table when doing the upsert.
Returns: Returns:
Deferred(bool): True if a new entry was created, False if an Deferred(bool): True if a new entry was created, False if an
existing one was updated. existing one was updated.
""" """
return self.runInteraction( while True:
desc, try:
self._simple_upsert_txn, table, keyvalues, values, insertion_values, result = yield self.runInteraction(
lock desc,
) self._simple_upsert_txn, table, keyvalues, values, insertion_values,
lock=lock
)
defer.returnValue(result)
except self.database_engine.module.IntegrityError as e:
# presumably we raced with another transaction: let's retry.
logger.warn(
"IntegrityError when upserting into %s; retrying: %s",
table, e
)
def _simple_upsert_txn(self, txn, table, keyvalues, values, insertion_values={}, def _simple_upsert_txn(self, txn, table, keyvalues, values, insertion_values={},
lock=True): lock=True):
@ -493,7 +516,7 @@ class SQLBaseStore(object):
if lock: if lock:
self.database_engine.lock_table(txn, table) self.database_engine.lock_table(txn, table)
# Try to update # First try to update.
sql = "UPDATE %s SET %s WHERE %s" % ( sql = "UPDATE %s SET %s WHERE %s" % (
table, table,
", ".join("%s = ?" % (k,) for k in values), ", ".join("%s = ?" % (k,) for k in values),
@ -502,24 +525,25 @@ class SQLBaseStore(object):
sqlargs = values.values() + keyvalues.values() sqlargs = values.values() + keyvalues.values()
txn.execute(sql, sqlargs) txn.execute(sql, sqlargs)
if txn.rowcount == 0: if txn.rowcount > 0:
# We didn't update and rows so insert a new one # successfully updated at least one row.
allvalues = {}
allvalues.update(keyvalues)
allvalues.update(values)
allvalues.update(insertion_values)
sql = "INSERT INTO %s (%s) VALUES (%s)" % (
table,
", ".join(k for k in allvalues),
", ".join("?" for _ in allvalues)
)
txn.execute(sql, allvalues.values())
return True
else:
return False return False
# We didn't update any rows so insert a new one
allvalues = {}
allvalues.update(keyvalues)
allvalues.update(values)
allvalues.update(insertion_values)
sql = "INSERT INTO %s (%s) VALUES (%s)" % (
table,
", ".join(k for k in allvalues),
", ".join("?" for _ in allvalues)
)
txn.execute(sql, allvalues.values())
# successfully inserted
return True
def _simple_select_one(self, table, keyvalues, retcols, def _simple_select_one(self, table, keyvalues, retcols,
allow_none=False, desc="_simple_select_one"): allow_none=False, desc="_simple_select_one"):
"""Executes a SELECT query on the named table, which is expected to """Executes a SELECT query on the named table, which is expected to

View file

@ -204,34 +204,35 @@ class PusherStore(SQLBaseStore):
pushkey, pushkey_ts, lang, data, last_stream_ordering, pushkey, pushkey_ts, lang, data, last_stream_ordering,
profile_tag=""): profile_tag=""):
with self._pushers_id_gen.get_next() as stream_id: with self._pushers_id_gen.get_next() as stream_id:
def f(txn): # no need to lock because `pushers` has a unique key on
newly_inserted = self._simple_upsert_txn( # (app_id, pushkey, user_name) so _simple_upsert will retry
txn, newly_inserted = yield self._simple_upsert(
"pushers", table="pushers",
{ keyvalues={
"app_id": app_id, "app_id": app_id,
"pushkey": pushkey, "pushkey": pushkey,
"user_name": user_id, "user_name": user_id,
}, },
{ values={
"access_token": access_token, "access_token": access_token,
"kind": kind, "kind": kind,
"app_display_name": app_display_name, "app_display_name": app_display_name,
"device_display_name": device_display_name, "device_display_name": device_display_name,
"ts": pushkey_ts, "ts": pushkey_ts,
"lang": lang, "lang": lang,
"data": encode_canonical_json(data), "data": encode_canonical_json(data),
"last_stream_ordering": last_stream_ordering, "last_stream_ordering": last_stream_ordering,
"profile_tag": profile_tag, "profile_tag": profile_tag,
"id": stream_id, "id": stream_id,
}, },
) desc="add_pusher",
if newly_inserted: lock=False,
# get_if_user_has_pusher only cares if the user has )
# at least *one* pusher.
txn.call_after(self.get_if_user_has_pusher.invalidate, (user_id,))
yield self.runInteraction("add_pusher", f) if newly_inserted:
# get_if_user_has_pusher only cares if the user has
# at least *one* pusher.
self.get_if_user_has_pusher.invalidate(user_id,)
@defer.inlineCallbacks @defer.inlineCallbacks
def delete_pusher_by_app_id_pushkey_user_id(self, app_id, pushkey, user_id): def delete_pusher_by_app_id_pushkey_user_id(self, app_id, pushkey, user_id):