Add filters to task retrieval + clean less often

This commit is contained in:
Mathieu Velten 2023-07-24 21:50:02 +02:00
parent 470f385419
commit 0961f52c57
2 changed files with 83 additions and 39 deletions

View file

@ -16,7 +16,12 @@ import json
from typing import TYPE_CHECKING, Any, Dict, List, Optional
from synapse.storage._base import SQLBaseStore
from synapse.storage.database import DatabasePool, LoggingDatabaseConnection
from synapse.storage.database import (
DatabasePool,
LoggingDatabaseConnection,
LoggingTransaction,
make_in_list_sql_clause,
)
from synapse.types import JsonDict, JsonMapping, ScheduledTask, TaskStatus
if TYPE_CHECKING:
@ -42,40 +47,56 @@ class TaskSchedulerWorkerStore(SQLBaseStore):
return ScheduledTask(**row)
async def get_scheduled_tasks(
self, action: Optional[str] = None, resource_id: Optional[str] = None
self,
actions: Optional[List[str]] = None,
resource_ids: Optional[List[str]] = None,
statuses: Optional[List[TaskStatus]] = None,
) -> List[ScheduledTask]:
"""Get a list of scheduled tasks from the DB.
If the parameters are `None` all the tasks are returned.
If an arg is `None` all tasks matching the other args will be selected.
If an arg is an empty list, the value needs to be NULL in DB to be selected.
Args:
action: Limit the returned tasks to this specific action name
resource_id: Limit the returned tasks to this specific resource id
actions: Limit the returned tasks to those specific action names
resource_ids: Limit the returned tasks to thoe specific resource ids
statuses: Limit the returned tasks to thoe specific statuses
Returns: a list of `ScheduledTask`
"""
keyvalues = {}
if action:
keyvalues["action"] = action
if resource_id:
keyvalues["resource_id"] = resource_id
rows = await self.db_pool.simple_select_list(
table="scheduled_tasks",
keyvalues=keyvalues,
retcols=(
"id",
"action",
"status",
"timestamp",
"resource_id",
"params",
"result",
"error",
),
desc="get_scheduled_tasks",
def get_scheduled_tasks_txn(txn: LoggingTransaction) -> List[Dict[str, Any]]:
clauses = []
args = []
if actions is not None:
clause, temp_args = make_in_list_sql_clause(
txn.database_engine, "action", actions
)
clauses.append(clause)
args.extend(temp_args)
if resource_ids is not None:
clause, temp_args = make_in_list_sql_clause(
txn.database_engine, "resource_id", resource_ids
)
clauses.append(clause)
args.extend(temp_args)
if statuses is not None:
clause, temp_args = make_in_list_sql_clause(
txn.database_engine, "status", statuses
)
clauses.append(clause)
args.extend(temp_args)
sql = "SELECT * FROM scheduled_tasks"
if clauses:
sql = sql + " WHERE " + " AND ".join(clauses)
txn.execute(sql, args)
return self.db_pool.cursor_to_dict(txn)
rows = await self.db_pool.runInteraction(
"get_scheduled_tasks", get_scheduled_tasks_txn
)
return [TaskSchedulerWorkerStore._convert_row_to_task(row) for row in rows]
async def upsert_scheduled_task(self, task: ScheduledTask) -> None:
@ -107,7 +128,7 @@ class TaskSchedulerWorkerStore(SQLBaseStore):
status: Optional[TaskStatus] = None,
result: Optional[JsonMapping] = None,
error: Optional[str] = None,
) -> None:
) -> bool:
"""Update a scheduled task in the DB with some new value(s).
Args:
@ -126,12 +147,13 @@ class TaskSchedulerWorkerStore(SQLBaseStore):
updatevalues["result"] = json.dumps(result)
if error is not None:
updatevalues["error"] = error
await self.db_pool.simple_update(
nb_rows = await self.db_pool.simple_update(
"scheduled_tasks",
{"id": id},
updatevalues,
desc="update_scheduled_task",
)
return nb_rows > 0
async def get_scheduled_task(self, id: str) -> Optional[ScheduledTask]:
"""Get a specific `ScheduledTask` from its id.

View file

@ -31,6 +31,7 @@ class TaskScheduler:
# Precision of the scheduler, evaluation of tasks to run will only happen
# every `SCHEDULE_INTERVAL_MS` ms
SCHEDULE_INTERVAL_MS = 5 * 60 * 1000 # 5mn
CLEAN_INTERVAL_MS = 60 * 60 * 1000 # 1hr
# Time before a complete or failed task is deleted from the DB
KEEP_TASKS_FOR_MS = 7 * 24 * 60 * 60 * 1000 # 1 week
@ -51,8 +52,14 @@ class TaskScheduler:
self.clock.looping_call(
run_as_background_process,
TaskScheduler.SCHEDULE_INTERVAL_MS,
"scheduled_tasks_loop",
self._scheduled_tasks_loop,
"run_scheduled_tasks",
self._run_scheduled_tasks,
)
self.clock.looping_call(
run_as_background_process,
TaskScheduler.CLEAN_INTERVAL_MS,
"clean_scheduled_tasks",
self._clean_scheduled_tasks,
)
def register_action(
@ -135,10 +142,11 @@ class TaskScheduler:
self,
id: str,
*,
timestamp: Optional[int] = None,
status: Optional[TaskStatus] = None,
result: Optional[JsonMapping] = None,
error: Optional[str] = None,
) -> None:
) -> bool:
"""Update some task associated values.
This is used internally, and also exposed publically so it can be used inside task functions.
@ -150,9 +158,11 @@ class TaskScheduler:
result: the new result of the task
error: the new error of the task
"""
await self.store.update_scheduled_task(
if timestamp is None:
timestamp = self.clock.time_msec()
return await self.store.update_scheduled_task(
id,
timestamp=self.clock.time_msec(),
timestamp=timestamp,
status=status,
result=result,
error=error,
@ -170,10 +180,13 @@ class TaskScheduler:
return await self.store.get_scheduled_task(id)
async def get_tasks(
self, action: str, resource_id: Optional[str]
self,
actions: Optional[List[str]] = None,
resource_ids: Optional[List[str]] = None,
statuses: Optional[List[TaskStatus]] = None,
) -> List[ScheduledTask]:
"""Get a list of tasks associated with an action name, and
optionally with a resource id.
"""Get a list of tasks associated with some action name(s) and/or
with some resource id(s).
Args:
action: the action name of the tasks to retrieve
@ -182,11 +195,13 @@ class TaskScheduler:
Returns: a list of `ScheduledTask`
"""
return await self.store.get_scheduled_tasks(action, resource_id)
return await self.store.get_scheduled_tasks(actions, resource_ids, statuses)
async def _scheduled_tasks_loop(self) -> None:
async def _run_scheduled_tasks(self) -> None:
"""Main loop taking care of launching the scheduled tasks when needed."""
for task in await self.store.get_scheduled_tasks():
for task in await self.store.get_scheduled_tasks(
statuses=[TaskStatus.SCHEDULED, TaskStatus.ACTIVE]
):
if task.id not in self.running_tasks:
if (
task.status == TaskStatus.SCHEDULED
@ -195,7 +210,14 @@ class TaskScheduler:
await self._launch_task(task, True)
elif task.status == TaskStatus.ACTIVE:
await self._launch_task(task, False)
elif (
async def _clean_scheduled_tasks(self) -> None:
"""Clean loop taking care of removing old complete or failed jobs to avoid clutter the DB."""
for task in await self.store.get_scheduled_tasks(
statuses=[TaskStatus.FAILED, TaskStatus.COMPLETE]
):
if task.id not in self.running_tasks:
if (
task.status == TaskStatus.COMPLETE
or task.status == TaskStatus.FAILED
) and self.clock.time_msec() > task.timestamp + TaskScheduler.KEEP_TASKS_FOR_MS: