mirror of
https://github.com/element-hq/synapse
synced 2024-10-06 04:12:41 +00:00
Fix errors related to ambiguous db_autocommit
This commit is contained in:
parent
9c34f6eaee
commit
d75da0e392
3 changed files with 41 additions and 28 deletions
|
@ -1213,15 +1213,16 @@ class DatabasePool:
|
||||||
# We can autocommit if it is safe to upsert
|
# We can autocommit if it is safe to upsert
|
||||||
autocommit = table not in self._unsafe_to_upsert_tables
|
autocommit = table not in self._unsafe_to_upsert_tables
|
||||||
|
|
||||||
return await self.runInteraction(
|
return await self.runInteraction_advanced(
|
||||||
desc,
|
desc,
|
||||||
|
autocommit,
|
||||||
|
None,
|
||||||
self.simple_upsert_txn,
|
self.simple_upsert_txn,
|
||||||
table,
|
table,
|
||||||
keyvalues,
|
keyvalues,
|
||||||
values,
|
values,
|
||||||
insertion_values,
|
insertion_values,
|
||||||
lock=lock,
|
lock=lock,
|
||||||
db_autocommit=autocommit,
|
|
||||||
)
|
)
|
||||||
except self.engine.module.IntegrityError as e:
|
except self.engine.module.IntegrityError as e:
|
||||||
attempts += 1
|
attempts += 1
|
||||||
|
@ -1436,8 +1437,10 @@ class DatabasePool:
|
||||||
# We can autocommit if it safe to upsert
|
# We can autocommit if it safe to upsert
|
||||||
autocommit = table not in self._unsafe_to_upsert_tables
|
autocommit = table not in self._unsafe_to_upsert_tables
|
||||||
|
|
||||||
await self.runInteraction(
|
await self.runInteraction_advanced(
|
||||||
desc,
|
desc,
|
||||||
|
autocommit,
|
||||||
|
None,
|
||||||
self.simple_upsert_many_txn,
|
self.simple_upsert_many_txn,
|
||||||
table,
|
table,
|
||||||
key_names,
|
key_names,
|
||||||
|
@ -1445,7 +1448,6 @@ class DatabasePool:
|
||||||
value_names,
|
value_names,
|
||||||
value_values,
|
value_values,
|
||||||
lock=lock,
|
lock=lock,
|
||||||
db_autocommit=autocommit,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
def simple_upsert_many_txn(
|
def simple_upsert_many_txn(
|
||||||
|
@ -1622,14 +1624,15 @@ class DatabasePool:
|
||||||
statement returns no rows
|
statement returns no rows
|
||||||
desc: description of the transaction, for logging and metrics
|
desc: description of the transaction, for logging and metrics
|
||||||
"""
|
"""
|
||||||
return await self.runInteraction(
|
return await self.runInteraction_advanced(
|
||||||
desc,
|
desc,
|
||||||
|
True,
|
||||||
|
None,
|
||||||
self.simple_select_one_txn,
|
self.simple_select_one_txn,
|
||||||
table,
|
table,
|
||||||
keyvalues,
|
keyvalues,
|
||||||
retcols,
|
retcols,
|
||||||
allow_none,
|
allow_none,
|
||||||
db_autocommit=True,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
@overload
|
@overload
|
||||||
|
@ -1673,14 +1676,15 @@ class DatabasePool:
|
||||||
statement returns no rows
|
statement returns no rows
|
||||||
desc: description of the transaction, for logging and metrics
|
desc: description of the transaction, for logging and metrics
|
||||||
"""
|
"""
|
||||||
return await self.runInteraction(
|
return await self.runInteraction_advanced(
|
||||||
desc,
|
desc,
|
||||||
|
True,
|
||||||
|
None,
|
||||||
self.simple_select_one_onecol_txn,
|
self.simple_select_one_onecol_txn,
|
||||||
table,
|
table,
|
||||||
keyvalues,
|
keyvalues,
|
||||||
retcol,
|
retcol,
|
||||||
allow_none=allow_none,
|
allow_none=allow_none,
|
||||||
db_autocommit=True,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
@overload
|
@overload
|
||||||
|
@ -1764,13 +1768,14 @@ class DatabasePool:
|
||||||
Returns:
|
Returns:
|
||||||
Results in a list
|
Results in a list
|
||||||
"""
|
"""
|
||||||
return await self.runInteraction(
|
return await self.runInteraction_advanced(
|
||||||
desc,
|
desc,
|
||||||
|
True,
|
||||||
|
None,
|
||||||
self.simple_select_onecol_txn,
|
self.simple_select_onecol_txn,
|
||||||
table,
|
table,
|
||||||
keyvalues,
|
keyvalues,
|
||||||
retcol,
|
retcol,
|
||||||
db_autocommit=True,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
async def simple_select_list(
|
async def simple_select_list(
|
||||||
|
@ -1794,13 +1799,14 @@ class DatabasePool:
|
||||||
Returns:
|
Returns:
|
||||||
A list of dictionaries.
|
A list of dictionaries.
|
||||||
"""
|
"""
|
||||||
return await self.runInteraction(
|
return await self.runInteraction_advanced(
|
||||||
desc,
|
desc,
|
||||||
|
True,
|
||||||
|
None,
|
||||||
self.simple_select_list_txn,
|
self.simple_select_list_txn,
|
||||||
table,
|
table,
|
||||||
keyvalues,
|
keyvalues,
|
||||||
retcols,
|
retcols,
|
||||||
db_autocommit=True,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
|
@ -1864,15 +1870,16 @@ class DatabasePool:
|
||||||
results: List[Dict[str, Any]] = []
|
results: List[Dict[str, Any]] = []
|
||||||
|
|
||||||
for chunk in batch_iter(iterable, batch_size):
|
for chunk in batch_iter(iterable, batch_size):
|
||||||
rows = await self.runInteraction(
|
rows = await self.runInteraction_advanced(
|
||||||
desc,
|
desc,
|
||||||
|
True,
|
||||||
|
None,
|
||||||
self.simple_select_many_txn,
|
self.simple_select_many_txn,
|
||||||
table,
|
table,
|
||||||
column,
|
column,
|
||||||
chunk,
|
chunk,
|
||||||
keyvalues,
|
keyvalues,
|
||||||
retcols,
|
retcols,
|
||||||
db_autocommit=True,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
results.extend(rows)
|
results.extend(rows)
|
||||||
|
@ -2050,13 +2057,14 @@ class DatabasePool:
|
||||||
updatevalues: dict giving column names and values to update
|
updatevalues: dict giving column names and values to update
|
||||||
desc: description of the transaction, for logging and metrics
|
desc: description of the transaction, for logging and metrics
|
||||||
"""
|
"""
|
||||||
await self.runInteraction(
|
await self.runInteraction_advanced(
|
||||||
desc,
|
desc,
|
||||||
|
True,
|
||||||
|
None,
|
||||||
self.simple_update_one_txn,
|
self.simple_update_one_txn,
|
||||||
table,
|
table,
|
||||||
keyvalues,
|
keyvalues,
|
||||||
updatevalues,
|
updatevalues,
|
||||||
db_autocommit=True,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
|
@ -2115,12 +2123,13 @@ class DatabasePool:
|
||||||
keyvalues: dict of column names and values to select the row with
|
keyvalues: dict of column names and values to select the row with
|
||||||
desc: description of the transaction, for logging and metrics
|
desc: description of the transaction, for logging and metrics
|
||||||
"""
|
"""
|
||||||
await self.runInteraction(
|
await self.runInteraction_advanced(
|
||||||
desc,
|
desc,
|
||||||
|
True,
|
||||||
|
None,
|
||||||
self.simple_delete_one_txn,
|
self.simple_delete_one_txn,
|
||||||
table,
|
table,
|
||||||
keyvalues,
|
keyvalues,
|
||||||
db_autocommit=True,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
|
@ -2160,8 +2169,8 @@ class DatabasePool:
|
||||||
Returns:
|
Returns:
|
||||||
The number of deleted rows.
|
The number of deleted rows.
|
||||||
"""
|
"""
|
||||||
return await self.runInteraction(
|
return await self.runInteraction_advanced(
|
||||||
desc, self.simple_delete_txn, table, keyvalues, db_autocommit=True
|
desc, True, None, self.simple_delete_txn, table, keyvalues
|
||||||
)
|
)
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
|
@ -2210,14 +2219,15 @@ class DatabasePool:
|
||||||
Returns:
|
Returns:
|
||||||
Number rows deleted
|
Number rows deleted
|
||||||
"""
|
"""
|
||||||
return await self.runInteraction(
|
return await self.runInteraction_advanced(
|
||||||
desc,
|
desc,
|
||||||
|
True,
|
||||||
|
None,
|
||||||
self.simple_delete_many_txn,
|
self.simple_delete_many_txn,
|
||||||
table,
|
table,
|
||||||
column,
|
column,
|
||||||
iterable,
|
iterable,
|
||||||
keyvalues,
|
keyvalues,
|
||||||
db_autocommit=True,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
|
@ -2403,14 +2413,15 @@ class DatabasePool:
|
||||||
A list of dictionaries or None.
|
A list of dictionaries or None.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
return await self.runInteraction(
|
return await self.runInteraction_advanced(
|
||||||
desc,
|
desc,
|
||||||
|
True,
|
||||||
|
None,
|
||||||
self.simple_search_list_txn,
|
self.simple_search_list_txn,
|
||||||
table,
|
table,
|
||||||
term,
|
term,
|
||||||
col,
|
col,
|
||||||
retcols,
|
retcols,
|
||||||
db_autocommit=True,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
|
|
|
@ -1501,13 +1501,14 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
|
||||||
event_id: The event that failed to be fetched or processed
|
event_id: The event that failed to be fetched or processed
|
||||||
cause: The error message or reason that we failed to pull the event
|
cause: The error message or reason that we failed to pull the event
|
||||||
"""
|
"""
|
||||||
await self.db_pool.runInteraction(
|
await self.db_pool.runInteraction_advanced(
|
||||||
"record_event_failed_pull_attempt",
|
"record_event_failed_pull_attempt",
|
||||||
|
True, # Safe to autocommit as it's a single upsert
|
||||||
|
None,
|
||||||
self._record_event_failed_pull_attempt_upsert_txn,
|
self._record_event_failed_pull_attempt_upsert_txn,
|
||||||
room_id,
|
room_id,
|
||||||
event_id,
|
event_id,
|
||||||
cause,
|
cause,
|
||||||
db_autocommit=True, # Safe as it's a single upsert
|
|
||||||
)
|
)
|
||||||
|
|
||||||
def _record_event_failed_pull_attempt_upsert_txn(
|
def _record_event_failed_pull_attempt_upsert_txn(
|
||||||
|
|
|
@ -782,11 +782,12 @@ class _MultiWriterCtxManager:
|
||||||
async def __aenter__(self) -> Union[int, List[int]]:
|
async def __aenter__(self) -> Union[int, List[int]]:
|
||||||
# It's safe to run this in autocommit mode as fetching values from a
|
# It's safe to run this in autocommit mode as fetching values from a
|
||||||
# sequence ignores transaction semantics anyway.
|
# sequence ignores transaction semantics anyway.
|
||||||
self.stream_ids = await self.id_gen._db.runInteraction(
|
self.stream_ids = await self.id_gen._db.runInteraction_advanced(
|
||||||
"_load_next_mult_id",
|
"_load_next_mult_id",
|
||||||
|
True,
|
||||||
|
None,
|
||||||
self.id_gen._load_next_mult_id_txn,
|
self.id_gen._load_next_mult_id_txn,
|
||||||
self.multiple_ids or 1,
|
self.multiple_ids or 1,
|
||||||
db_autocommit=True,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
if self.multiple_ids is None:
|
if self.multiple_ids is None:
|
||||||
|
|
Loading…
Reference in a new issue