diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py index 62fbd05534..949840e63f 100644 --- a/synapse/storage/background_updates.py +++ b/synapse/storage/background_updates.py @@ -768,8 +768,9 @@ class BackgroundUpdater: # override the global statement timeout to avoid accidentally squashing # a long-running index creation process - timeout_sql = "SET SESSION statement_timeout = 0" - c.execute(timeout_sql) + self.db_pool.engine.attempt_to_set_statement_timeout( + c, 0, for_transaction=True + ) sql = ( "CREATE %(unique)s INDEX CONCURRENTLY %(name)s" @@ -791,12 +792,6 @@ class BackgroundUpdater: logger.debug("[SQL] %s", sql) c.execute(sql) finally: - # mypy ignore - `statement_timeout` is defined on PostgresEngine - # reset the global timeout to the default - default_timeout = self.db_pool.engine.statement_timeout # type: ignore[attr-defined] - undo_timeout_sql = f"SET statement_timeout = {default_timeout}" - conn.cursor().execute(undo_timeout_sql) - conn.engine.attempt_to_set_autocommit(conn.conn, False) def create_index_sqlite(conn: "LoggingDatabaseConnection") -> None: diff --git a/synapse/storage/databases/main/purge_events.py b/synapse/storage/databases/main/purge_events.py index 1a5b5731bb..56c8198149 100644 --- a/synapse/storage/databases/main/purge_events.py +++ b/synapse/storage/databases/main/purge_events.py @@ -89,10 +89,11 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore): # furthermore, we might already have the table from a previous (failed) # purge attempt, so let's drop the table first. - if isinstance(self.database_engine, PostgresEngine): - # Disable statement timeouts for this transaction; purging rooms can - # take a while! - txn.execute("SET LOCAL statement_timeout = 0") + # Disable statement timeouts for this transaction; purging rooms can + # take a while! + self.database_engine.attempt_to_set_statement_timeout( + txn, 0, for_transaction=True + ) txn.execute("DROP TABLE IF EXISTS events_to_purge") diff --git a/synapse/storage/engines/_base.py b/synapse/storage/engines/_base.py index b1a2418cbd..888b4a5660 100644 --- a/synapse/storage/engines/_base.py +++ b/synapse/storage/engines/_base.py @@ -36,6 +36,9 @@ CursorType = TypeVar("CursorType", bound=Cursor) class BaseDatabaseEngine(Generic[ConnectionType, CursorType], metaclass=abc.ABCMeta): + # The default statement timeout to use for transactions. + statement_timeout: Optional[int] = None + def __init__(self, module: DBAPI2Module, config: Mapping[str, Any]): self.module = module @@ -132,6 +135,16 @@ class BaseDatabaseEngine(Generic[ConnectionType, CursorType], metaclass=abc.ABCM """ ... + @abc.abstractmethod + def attempt_to_set_statement_timeout( + self, cursor: CursorType, statement_timeout: int, for_transaction: bool + ) -> None: + """Attempt to set the cursor's statement timeout. + + Note this has no effect on SQLite3. + """ + ... + @staticmethod @abc.abstractmethod def executescript(cursor: CursorType, script: str) -> None: diff --git a/synapse/storage/engines/postgres.py b/synapse/storage/engines/postgres.py index ec4c4041b7..6ce9ef5fcd 100644 --- a/synapse/storage/engines/postgres.py +++ b/synapse/storage/engines/postgres.py @@ -52,7 +52,7 @@ class PostgresEngine( # some degenerate query plan has been created and the client has probably # timed out/walked off anyway. # This is in milliseconds. - self.statement_timeout: Optional[int] = database_config.get( + self.statement_timeout = database_config.get( "statement_timeout", 60 * 60 * 1000 ) self._version: Optional[int] = None # unknown as yet @@ -169,7 +169,11 @@ class PostgresEngine( # Abort really long-running statements and turn them into errors. if self.statement_timeout is not None: - cursor.execute("SET statement_timeout TO ?", (self.statement_timeout,)) + self.attempt_to_set_statement_timeout( + cast(psycopg2.extensions.cursor, cursor.txn), + self.statement_timeout, + for_transaction=False, + ) cursor.close() db_conn.commit() @@ -233,6 +237,18 @@ class PostgresEngine( isolation_level = self.isolation_level_map[isolation_level] return conn.set_isolation_level(isolation_level) + def attempt_to_set_statement_timeout( + self, + cursor: psycopg2.extensions.cursor, + statement_timeout: int, + for_transaction: bool, + ) -> None: + if for_transaction: + sql = "SET LOCAL statement_timeout TO ?" + else: + sql = "SET statement_timeout TO ?" + cursor.execute(sql, (statement_timeout,)) + @staticmethod def executescript(cursor: psycopg2.extensions.cursor, script: str) -> None: """Execute a chunk of SQL containing multiple semicolon-delimited statements. diff --git a/synapse/storage/engines/sqlite.py b/synapse/storage/engines/sqlite.py index 802069e1e1..64d2a72ed5 100644 --- a/synapse/storage/engines/sqlite.py +++ b/synapse/storage/engines/sqlite.py @@ -143,6 +143,12 @@ class Sqlite3Engine(BaseDatabaseEngine[sqlite3.Connection, sqlite3.Cursor]): # All transactions are SERIALIZABLE by default in sqlite pass + def attempt_to_set_statement_timeout( + self, cursor: sqlite3.Cursor, statement_timeout: int, for_transaction: bool + ) -> None: + # Not supported. + pass + @staticmethod def executescript(cursor: sqlite3.Cursor, script: str) -> None: """Execute a chunk of SQL containing multiple semicolon-delimited statements.