mirror of
https://github.com/spacedriveapp/spacedrive
synced 2024-07-14 01:54:04 +00:00
[Eng 495] Better job report fields (#726)
* Changing action column on job table * Introducing action and is_background info * Requested changes
This commit is contained in:
parent
705f39aa2d
commit
fa5671f614
|
@ -0,0 +1,24 @@
|
|||
-- RedefineTables
|
||||
PRAGMA foreign_keys=OFF;
|
||||
CREATE TABLE "new_job" (
|
||||
"id" BLOB NOT NULL PRIMARY KEY,
|
||||
"name" TEXT NOT NULL,
|
||||
"node_id" INTEGER NOT NULL,
|
||||
"action" TEXT,
|
||||
"status" INTEGER NOT NULL DEFAULT 0,
|
||||
"data" BLOB,
|
||||
"metadata" BLOB,
|
||||
"parent_id" BLOB,
|
||||
"task_count" INTEGER NOT NULL DEFAULT 1,
|
||||
"completed_task_count" INTEGER NOT NULL DEFAULT 0,
|
||||
"date_created" DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
||||
"date_started" DATETIME DEFAULT CURRENT_TIMESTAMP,
|
||||
"date_completed" DATETIME DEFAULT CURRENT_TIMESTAMP,
|
||||
CONSTRAINT "job_node_id_fkey" FOREIGN KEY ("node_id") REFERENCES "node" ("id") ON DELETE CASCADE ON UPDATE CASCADE,
|
||||
CONSTRAINT "job_parent_id_fkey" FOREIGN KEY ("parent_id") REFERENCES "job" ("id") ON DELETE CASCADE ON UPDATE CASCADE
|
||||
);
|
||||
INSERT INTO "new_job" ("action", "completed_task_count", "data", "date_completed", "date_created", "date_started", "id", "metadata", "name", "node_id", "parent_id", "status", "task_count") SELECT "action", "completed_task_count", "data", "date_completed", "date_created", "date_started", "id", "metadata", "name", "node_id", "parent_id", "status", "task_count" FROM "job";
|
||||
DROP TABLE "job";
|
||||
ALTER TABLE "new_job" RENAME TO "job";
|
||||
PRAGMA foreign_key_check;
|
||||
PRAGMA foreign_keys=ON;
|
|
@ -368,14 +368,16 @@ model ObjectInSpace {
|
|||
//// Job ////
|
||||
|
||||
model Job {
|
||||
id Bytes @id
|
||||
name String
|
||||
node_id Int
|
||||
action Int
|
||||
id Bytes @id
|
||||
name String
|
||||
node_id Int
|
||||
action String? // Will be composed of "{action_description}(-{children_order})*"
|
||||
|
||||
// Enum: sd_core::job::job_manager:JobStatus
|
||||
status Int @default(0)
|
||||
data Bytes?
|
||||
metadata Bytes?
|
||||
status Int @default(0) // 0 = Queued
|
||||
|
||||
data Bytes? // Serialized data to be used on pause/resume
|
||||
metadata Bytes? // Serialized metadata field with info about the job after completion
|
||||
|
||||
parent_id Bytes?
|
||||
|
||||
|
|
|
@ -9,7 +9,8 @@ use crate::{
|
|||
shallow_file_identifier_job::ShallowFileIdentifierJob,
|
||||
},
|
||||
fs::{
|
||||
copy::FileCopierJob, cut::FileCutterJob, delete::FileDeleterJob, erase::FileEraserJob,
|
||||
copy::FileCopierJob, cut::FileCutterJob, decrypt::FileDecryptorJob,
|
||||
delete::FileDeleterJob, encrypt::FileEncryptorJob, erase::FileEraserJob,
|
||||
},
|
||||
preview::{
|
||||
shallow_thumbnailer_job::ShallowThumbnailerJob, thumbnailer_job::ThumbnailerJob,
|
||||
|
@ -29,7 +30,6 @@ use std::{
|
|||
};
|
||||
|
||||
use chrono::{DateTime, Utc};
|
||||
use futures::future::BoxFuture;
|
||||
use prisma_client_rust::Direction;
|
||||
use rspc::Type;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
@ -167,8 +167,10 @@ impl JobManager {
|
|||
let mut ret = vec![];
|
||||
|
||||
for worker in self.running_workers.read().await.values() {
|
||||
let worker = worker.lock().await;
|
||||
ret.push(worker.report());
|
||||
let report = worker.lock().await.report();
|
||||
if !report.is_background {
|
||||
ret.push(report);
|
||||
}
|
||||
}
|
||||
ret
|
||||
}
|
||||
|
@ -183,7 +185,8 @@ impl JobManager {
|
|||
.exec()
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(Into::into)
|
||||
.map(JobReport::from)
|
||||
.filter(|report| !report.is_background)
|
||||
.collect())
|
||||
}
|
||||
|
||||
|
@ -240,10 +243,23 @@ impl JobManager {
|
|||
.into_iter()
|
||||
.map(JobReport::from)
|
||||
{
|
||||
let children_jobs = library
|
||||
.db
|
||||
.job()
|
||||
.find_many(vec![job::parent_id::equals(Some(
|
||||
root_paused_job_report.id.as_bytes().to_vec(),
|
||||
))])
|
||||
.order_by(job::action::order(Direction::Asc))
|
||||
.exec()
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(|job_data| get_resumable_job(JobReport::from(job_data), VecDeque::new()))
|
||||
.collect::<Result<_, _>>()?;
|
||||
|
||||
Arc::clone(&self)
|
||||
.dispatch_job(
|
||||
library,
|
||||
Self::recursive_resume_job(root_paused_job_report, library).await?,
|
||||
get_resumable_job(root_paused_job_report, children_jobs)?,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
@ -251,83 +267,6 @@ impl JobManager {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
fn recursive_resume_job(
|
||||
parent: JobReport,
|
||||
library: &Library,
|
||||
) -> BoxFuture<Result<Box<dyn DynJob>, JobManagerError>> {
|
||||
// Recursive async functions must return boxed futures
|
||||
Box::pin(async move {
|
||||
info!(
|
||||
"Trying to resume Job <id='{}', name='{}'>",
|
||||
parent.name, parent.id
|
||||
);
|
||||
|
||||
let maybe_children_job = if let Some(children_job_report) = library
|
||||
.db
|
||||
.job()
|
||||
.find_first(vec![job::parent_id::equals(Some(
|
||||
parent.id.as_bytes().to_vec(),
|
||||
))])
|
||||
.exec()
|
||||
.await?
|
||||
.map(JobReport::from)
|
||||
{
|
||||
Some(Self::recursive_resume_job(children_job_report, library).await?)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
Self::get_resumable_job(parent, maybe_children_job)
|
||||
})
|
||||
}
|
||||
|
||||
fn get_resumable_job(
|
||||
job_report: JobReport,
|
||||
next_job: Option<Box<dyn DynJob>>,
|
||||
) -> Result<Box<dyn DynJob>, JobManagerError> {
|
||||
match job_report.name.as_str() {
|
||||
<ThumbnailerJob as StatefulJob>::NAME => {
|
||||
Job::resume(job_report, ThumbnailerJob {}, next_job)
|
||||
}
|
||||
<ShallowThumbnailerJob as StatefulJob>::NAME => {
|
||||
Job::resume(job_report, ShallowThumbnailerJob {}, next_job)
|
||||
}
|
||||
<IndexerJob as StatefulJob>::NAME => Job::resume(job_report, IndexerJob {}, next_job),
|
||||
<ShallowIndexerJob as StatefulJob>::NAME => {
|
||||
Job::resume(job_report, ShallowIndexerJob {}, next_job)
|
||||
}
|
||||
<FileIdentifierJob as StatefulJob>::NAME => {
|
||||
Job::resume(job_report, FileIdentifierJob {}, next_job)
|
||||
}
|
||||
<ShallowFileIdentifierJob as StatefulJob>::NAME => {
|
||||
Job::resume(job_report, ShallowFileIdentifierJob {}, next_job)
|
||||
}
|
||||
<ObjectValidatorJob as StatefulJob>::NAME => {
|
||||
Job::resume(job_report, ObjectValidatorJob {}, next_job)
|
||||
}
|
||||
<FileCutterJob as StatefulJob>::NAME => {
|
||||
Job::resume(job_report, FileCutterJob {}, next_job)
|
||||
}
|
||||
<FileCopierJob as StatefulJob>::NAME => {
|
||||
Job::resume(job_report, FileCopierJob {}, next_job)
|
||||
}
|
||||
<FileDeleterJob as StatefulJob>::NAME => {
|
||||
Job::resume(job_report, FileDeleterJob {}, next_job)
|
||||
}
|
||||
<FileEraserJob as StatefulJob>::NAME => {
|
||||
Job::resume(job_report, FileEraserJob {}, next_job)
|
||||
}
|
||||
_ => {
|
||||
error!(
|
||||
"Unknown job type: {}, id: {}",
|
||||
job_report.name, job_report.id
|
||||
);
|
||||
Err(JobError::UnknownJobName(job_report.id, job_report.name))
|
||||
}
|
||||
}
|
||||
.map_err(Into::into)
|
||||
}
|
||||
|
||||
async fn dispatch_job(self: Arc<Self>, library: &Library, mut job: Box<dyn DynJob>) {
|
||||
// create worker to process job
|
||||
let mut running_workers = self.running_workers.write().await;
|
||||
|
@ -378,8 +317,11 @@ pub enum JobReportUpdate {
|
|||
pub struct JobReport {
|
||||
pub id: Uuid,
|
||||
pub name: String,
|
||||
pub action: Option<String>,
|
||||
pub data: Option<Vec<u8>>,
|
||||
pub metadata: Option<serde_json::Value>,
|
||||
pub is_background: bool,
|
||||
|
||||
pub created_at: Option<DateTime<Utc>>,
|
||||
pub started_at: Option<DateTime<Utc>>,
|
||||
pub completed_at: Option<DateTime<Utc>>,
|
||||
|
@ -407,15 +349,11 @@ impl Display for JobReport {
|
|||
// convert database struct into a resource struct
|
||||
impl From<job::Data> for JobReport {
|
||||
fn from(data: job::Data) -> Self {
|
||||
JobReport {
|
||||
id: Uuid::from_slice(&data.id).unwrap(),
|
||||
Self {
|
||||
id: Uuid::from_slice(&data.id).expect("corrupted database"),
|
||||
is_background: get_background_info_by_job_name(&data.name),
|
||||
name: data.name,
|
||||
status: JobStatus::try_from(data.status).unwrap(),
|
||||
task_count: data.task_count,
|
||||
completed_task_count: data.completed_task_count,
|
||||
created_at: Some(data.date_created.into()),
|
||||
started_at: data.date_started.map(|d| d.into()),
|
||||
completed_at: data.date_completed.map(|d| d.into()),
|
||||
action: data.action,
|
||||
data: data.data,
|
||||
metadata: data.metadata.and_then(|m| {
|
||||
serde_json::from_slice(&m).unwrap_or_else(|e| -> Option<serde_json::Value> {
|
||||
|
@ -423,9 +361,16 @@ impl From<job::Data> for JobReport {
|
|||
None
|
||||
})
|
||||
}),
|
||||
created_at: Some(data.date_created.into()),
|
||||
started_at: data.date_started.map(|d| d.into()),
|
||||
completed_at: data.date_completed.map(|d| d.into()),
|
||||
parent_id: data
|
||||
.parent_id
|
||||
.map(|id| Uuid::from_slice(&id).expect("corrupted database")),
|
||||
status: JobStatus::try_from(data.status).expect("corrupted database"),
|
||||
task_count: data.task_count,
|
||||
completed_task_count: data.completed_task_count,
|
||||
message: String::new(),
|
||||
// SAFETY: We created this uuid before
|
||||
parent_id: data.parent_id.map(|id| Uuid::from_slice(&id).unwrap()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -434,7 +379,9 @@ impl JobReport {
|
|||
pub fn new(uuid: Uuid, name: String) -> Self {
|
||||
Self {
|
||||
id: uuid,
|
||||
is_background: get_background_info_by_job_name(&name),
|
||||
name,
|
||||
action: None,
|
||||
created_at: None,
|
||||
started_at: None,
|
||||
completed_at: None,
|
||||
|
@ -448,9 +395,21 @@ impl JobReport {
|
|||
}
|
||||
}
|
||||
|
||||
pub fn new_with_parent(uuid: Uuid, name: String, parent_id: Uuid) -> Self {
|
||||
pub fn new_with_action(uuid: Uuid, name: String, action: impl AsRef<str>) -> Self {
|
||||
let mut report = Self::new(uuid, name);
|
||||
report.action = Some(action.as_ref().to_string());
|
||||
report
|
||||
}
|
||||
|
||||
pub fn new_with_parent(
|
||||
uuid: Uuid,
|
||||
name: String,
|
||||
parent_id: Uuid,
|
||||
action: Option<String>,
|
||||
) -> Self {
|
||||
let mut report = Self::new(uuid, name);
|
||||
report.parent_id = Some(parent_id);
|
||||
report.action = action;
|
||||
report
|
||||
}
|
||||
|
||||
|
@ -464,10 +423,10 @@ impl JobReport {
|
|||
.create(
|
||||
self.id.as_bytes().to_vec(),
|
||||
self.name.clone(),
|
||||
JobStatus::Running as i32,
|
||||
node::id::equals(library.node_local_id),
|
||||
util::db::chain_optional_iter(
|
||||
[
|
||||
job::action::set(self.action.clone()),
|
||||
job::data::set(self.data.clone()),
|
||||
job::date_created::set(now.into()),
|
||||
job::date_started::set(self.started_at.map(|d| d.into())),
|
||||
|
@ -532,3 +491,77 @@ impl TryFrom<i32> for JobStatus {
|
|||
Ok(s)
|
||||
}
|
||||
}
|
||||
|
||||
#[macro_use]
|
||||
mod macros {
|
||||
macro_rules! dispatch_call_to_job_by_name {
|
||||
($job_name:expr, T -> $call:expr, default = $default:block, jobs = [ $($job:ty),+ $(,)?]) => {{
|
||||
match $job_name {
|
||||
$(<$job as $crate::job::StatefulJob>::NAME => {
|
||||
type T = $job;
|
||||
$call
|
||||
},)+
|
||||
_ => $default
|
||||
}
|
||||
}};
|
||||
}
|
||||
}
|
||||
|
||||
fn get_background_info_by_job_name(name: &str) -> bool {
|
||||
dispatch_call_to_job_by_name!(
|
||||
name,
|
||||
T -> <T as StatefulJob>::IS_BACKGROUND,
|
||||
default = {
|
||||
error!(
|
||||
"Unknown job name '{name}' at `is_background` check, will use `false` as a safe default"
|
||||
);
|
||||
false
|
||||
},
|
||||
jobs = [
|
||||
ThumbnailerJob,
|
||||
ShallowThumbnailerJob,
|
||||
IndexerJob,
|
||||
ShallowIndexerJob,
|
||||
FileIdentifierJob,
|
||||
ShallowFileIdentifierJob,
|
||||
ObjectValidatorJob,
|
||||
FileCutterJob,
|
||||
FileCopierJob,
|
||||
FileDeleterJob,
|
||||
FileEraserJob,
|
||||
FileEncryptorJob,
|
||||
FileDecryptorJob,
|
||||
]
|
||||
)
|
||||
}
|
||||
|
||||
fn get_resumable_job(
|
||||
job_report: JobReport,
|
||||
next_jobs: VecDeque<Box<dyn DynJob>>,
|
||||
) -> Result<Box<dyn DynJob>, JobManagerError> {
|
||||
dispatch_call_to_job_by_name!(
|
||||
job_report.name.as_str(),
|
||||
T -> Job::resume(job_report, T {}, next_jobs),
|
||||
default = {
|
||||
error!(
|
||||
"Unknown job type: {}, id: {}",
|
||||
job_report.name, job_report.id
|
||||
);
|
||||
Err(JobError::UnknownJobName(job_report.id, job_report.name))
|
||||
},
|
||||
jobs = [
|
||||
ThumbnailerJob,
|
||||
ShallowThumbnailerJob,
|
||||
IndexerJob,
|
||||
ShallowIndexerJob,
|
||||
FileIdentifierJob,
|
||||
ShallowFileIdentifierJob,
|
||||
ObjectValidatorJob,
|
||||
FileCutterJob,
|
||||
FileCopierJob,
|
||||
FileDeleterJob,
|
||||
FileEraserJob,
|
||||
]
|
||||
)
|
||||
.map_err(Into::into)
|
||||
}
|
||||
|
|
|
@ -8,6 +8,7 @@ use std::{
|
|||
collections::{hash_map::DefaultHasher, VecDeque},
|
||||
fmt::Debug,
|
||||
hash::{Hash, Hasher},
|
||||
mem,
|
||||
path::PathBuf,
|
||||
sync::Arc,
|
||||
};
|
||||
|
@ -46,6 +47,8 @@ pub enum JobError {
|
|||
"Tried to resume a job that doesn't have saved state data: job <name='{1}', uuid='{0}'>"
|
||||
)]
|
||||
MissingJobDataState(Uuid, String),
|
||||
#[error("missing report field: job <uuid='{id}', name='{name}'>")]
|
||||
MissingReport { id: Uuid, name: String },
|
||||
#[error("missing some job data: '{value}'")]
|
||||
MissingData { value: String },
|
||||
#[error("error converting/handling OS strings")]
|
||||
|
@ -101,6 +104,7 @@ pub trait StatefulJob: Send + Sync + Sized {
|
|||
|
||||
/// The name of the job is a unique human readable identifier for the job.
|
||||
const NAME: &'static str;
|
||||
const IS_BACKGROUND: bool = false;
|
||||
|
||||
/// Construct a new instance of the job. This is used so the user can pass `Self::Init` into the `spawn_job` function and we can still run the job.
|
||||
/// This does remove the flexibility of being able to pass arguments into the job's struct but with resumable jobs I view that as an anti-pattern anyway.
|
||||
|
@ -129,7 +133,7 @@ pub trait DynJob: Send + Sync {
|
|||
fn name(&self) -> &'static str;
|
||||
async fn run(&mut self, job_manager: Arc<JobManager>, ctx: WorkerContext) -> JobResult;
|
||||
fn hash(&self) -> u64;
|
||||
fn queue_next(&mut self, next_job: Box<dyn DynJob>);
|
||||
fn set_next_jobs(&mut self, next_jobs: VecDeque<Box<dyn DynJob>>);
|
||||
fn serialize_state(&self) -> Result<Vec<u8>, JobError>;
|
||||
async fn register_children(&mut self, library: &Library) -> Result<(), JobError>;
|
||||
async fn pause_children(&mut self, library: &Library) -> Result<(), JobError>;
|
||||
|
@ -137,10 +141,11 @@ pub trait DynJob: Send + Sync {
|
|||
}
|
||||
|
||||
pub struct Job<SJob: StatefulJob> {
|
||||
id: Uuid,
|
||||
report: Option<JobReport>,
|
||||
state: JobState<SJob>,
|
||||
stateful_job: SJob,
|
||||
next_job: Option<Box<dyn DynJob>>,
|
||||
next_jobs: VecDeque<Box<dyn DynJob>>,
|
||||
}
|
||||
|
||||
pub trait IntoJob<SJob: StatefulJob + 'static> {
|
||||
|
@ -172,9 +177,11 @@ where
|
|||
SJob: StatefulJob<Init = Init> + 'static,
|
||||
Init: JobInitData<Job = SJob>,
|
||||
{
|
||||
pub fn new(init: Init) -> Box<Self> {
|
||||
fn new(init: Init) -> Box<Self> {
|
||||
let id = Uuid::new_v4();
|
||||
Box::new(Self {
|
||||
report: Some(JobReport::new(Uuid::new_v4(), SJob::NAME.to_string())),
|
||||
id,
|
||||
report: Some(JobReport::new(id, SJob::NAME.to_string())),
|
||||
state: JobState {
|
||||
init,
|
||||
data: None,
|
||||
|
@ -182,58 +189,18 @@ where
|
|||
step_number: 0,
|
||||
},
|
||||
stateful_job: SJob::new(),
|
||||
next_job: None,
|
||||
next_jobs: VecDeque::new(),
|
||||
})
|
||||
}
|
||||
|
||||
pub fn queue_next<NextSJob, NextInit>(mut self: Box<Self>, init: NextInit) -> Box<Self>
|
||||
where
|
||||
NextSJob: StatefulJob<Init = NextInit> + 'static,
|
||||
NextInit: JobInitData<Job = NextSJob>,
|
||||
{
|
||||
let last_job = Job::new_dependent(
|
||||
init,
|
||||
self.next_job
|
||||
.as_ref()
|
||||
.map(|job| job.id())
|
||||
// SAFETY: If we're queueing a next job then we should have a report yet
|
||||
.unwrap_or(self.report.as_ref().unwrap().id),
|
||||
);
|
||||
|
||||
if let Some(ref mut next) = self.next_job {
|
||||
next.queue_next(last_job);
|
||||
} else {
|
||||
self.next_job = Some(last_job);
|
||||
}
|
||||
|
||||
self
|
||||
}
|
||||
|
||||
pub fn resume(
|
||||
mut report: JobReport,
|
||||
stateful_job: SJob,
|
||||
next_job: Option<Box<dyn DynJob>>,
|
||||
) -> Result<Box<dyn DynJob>, JobError> {
|
||||
let job_state_data = if let Some(data) = report.data.take() {
|
||||
data
|
||||
} else {
|
||||
return Err(JobError::MissingJobDataState(report.id, report.name));
|
||||
};
|
||||
|
||||
Ok(Box::new(Self {
|
||||
report: Some(report),
|
||||
state: rmp_serde::from_slice(&job_state_data)?,
|
||||
stateful_job,
|
||||
next_job,
|
||||
}))
|
||||
}
|
||||
|
||||
fn new_dependent(init: Init, parent_id: Uuid) -> Box<Self> {
|
||||
pub fn new_with_action(init: Init, action: impl AsRef<str>) -> Box<Self> {
|
||||
let id = Uuid::new_v4();
|
||||
Box::new(Self {
|
||||
report: Some(JobReport::new_with_parent(
|
||||
Uuid::new_v4(),
|
||||
id,
|
||||
report: Some(JobReport::new_with_action(
|
||||
id,
|
||||
SJob::NAME.to_string(),
|
||||
parent_id,
|
||||
action,
|
||||
)),
|
||||
state: JobState {
|
||||
init,
|
||||
|
@ -242,7 +209,68 @@ where
|
|||
step_number: 0,
|
||||
},
|
||||
stateful_job: SJob::new(),
|
||||
next_job: None,
|
||||
next_jobs: VecDeque::new(),
|
||||
})
|
||||
}
|
||||
|
||||
pub fn queue_next<NextSJob, NextInit>(mut self: Box<Self>, init: NextInit) -> Box<Self>
|
||||
where
|
||||
NextSJob: StatefulJob<Init = NextInit> + 'static,
|
||||
NextInit: JobInitData<Job = NextSJob>,
|
||||
{
|
||||
let next_job_order = self.next_jobs.len() + 1;
|
||||
self.next_jobs.push_back(Job::new_dependent(
|
||||
init,
|
||||
self.id,
|
||||
// SAFETY: If we're queueing a next job then we should still have a report
|
||||
self.report().as_ref().and_then(|parent_report| {
|
||||
parent_report
|
||||
.action
|
||||
.as_ref()
|
||||
.map(|parent_action| format!("{parent_action}-{next_job_order}"))
|
||||
}),
|
||||
));
|
||||
|
||||
self
|
||||
}
|
||||
|
||||
pub fn resume(
|
||||
mut report: JobReport,
|
||||
stateful_job: SJob,
|
||||
next_jobs: VecDeque<Box<dyn DynJob>>,
|
||||
) -> Result<Box<dyn DynJob>, JobError> {
|
||||
Ok(Box::new(Self {
|
||||
id: report.id,
|
||||
state: rmp_serde::from_slice(
|
||||
&report
|
||||
.data
|
||||
.take()
|
||||
.ok_or_else(|| JobError::MissingJobDataState(report.id, report.name.clone()))?,
|
||||
)?,
|
||||
report: Some(report),
|
||||
stateful_job,
|
||||
next_jobs,
|
||||
}))
|
||||
}
|
||||
|
||||
fn new_dependent(init: Init, parent_id: Uuid, parent_action: Option<String>) -> Box<Self> {
|
||||
let id = Uuid::new_v4();
|
||||
Box::new(Self {
|
||||
id,
|
||||
report: Some(JobReport::new_with_parent(
|
||||
id,
|
||||
SJob::NAME.to_string(),
|
||||
parent_id,
|
||||
parent_action,
|
||||
)),
|
||||
state: JobState {
|
||||
init,
|
||||
data: None,
|
||||
steps: VecDeque::new(),
|
||||
step_number: 0,
|
||||
},
|
||||
stateful_job: SJob::new(),
|
||||
next_jobs: VecDeque::new(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
@ -294,8 +322,6 @@ impl<SJob: StatefulJob> DynJob for Job<SJob> {
|
|||
}
|
||||
|
||||
let mut shutdown_rx = ctx.shutdown_rx();
|
||||
let shutdown_rx_fut = shutdown_rx.recv();
|
||||
tokio::pin!(shutdown_rx_fut);
|
||||
|
||||
while job_should_run && !self.state.steps.is_empty() {
|
||||
tokio::select! {
|
||||
|
@ -311,7 +337,7 @@ impl<SJob: StatefulJob> DynJob for Job<SJob> {
|
|||
};
|
||||
self.state.steps.pop_front();
|
||||
}
|
||||
_ = &mut shutdown_rx_fut => {
|
||||
_ = shutdown_rx.recv() => {
|
||||
return Err(
|
||||
JobError::Paused(
|
||||
rmp_serde::to_vec_named(&self.state)?
|
||||
|
@ -327,12 +353,15 @@ impl<SJob: StatefulJob> DynJob for Job<SJob> {
|
|||
.finalize(ctx.clone(), &mut self.state)
|
||||
.await?;
|
||||
|
||||
if let Some(next_job) = self.next_job.take() {
|
||||
let mut next_jobs = mem::take(&mut self.next_jobs);
|
||||
|
||||
if let Some(mut next_job) = next_jobs.pop_front() {
|
||||
debug!(
|
||||
"Job '{}' requested to spawn '{}' now that it's complete!",
|
||||
self.name(),
|
||||
next_job.name()
|
||||
);
|
||||
next_job.set_next_jobs(next_jobs);
|
||||
|
||||
if let Err(e) = job_manager.clone().ingest(&ctx.library, next_job).await {
|
||||
error!("Failed to ingest next job: {e}");
|
||||
|
@ -346,12 +375,8 @@ impl<SJob: StatefulJob> DynJob for Job<SJob> {
|
|||
<SJob::Init as JobInitData>::hash(&self.state.init)
|
||||
}
|
||||
|
||||
fn queue_next(&mut self, next_job: Box<dyn DynJob>) {
|
||||
if let Some(ref mut next) = self.next_job {
|
||||
next.queue_next(next_job);
|
||||
} else {
|
||||
self.next_job = Some(next_job);
|
||||
}
|
||||
fn set_next_jobs(&mut self, next_jobs: VecDeque<Box<dyn DynJob>>) {
|
||||
self.next_jobs = next_jobs;
|
||||
}
|
||||
|
||||
fn serialize_state(&self) -> Result<Vec<u8>, JobError> {
|
||||
|
@ -359,44 +384,53 @@ impl<SJob: StatefulJob> DynJob for Job<SJob> {
|
|||
}
|
||||
|
||||
async fn register_children(&mut self, library: &Library) -> Result<(), JobError> {
|
||||
if let Some(ref mut next_job) = self.next_job {
|
||||
// SAFETY: As these children jobs haven't been run yet, they still have their report field
|
||||
let next_job_report = next_job.report_mut().as_mut().unwrap();
|
||||
if next_job_report.created_at.is_none() {
|
||||
next_job_report.create(library).await?
|
||||
for next_job in self.next_jobs.iter_mut() {
|
||||
if let Some(next_job_report) = next_job.report_mut() {
|
||||
if next_job_report.created_at.is_none() {
|
||||
next_job_report.create(library).await?
|
||||
}
|
||||
} else {
|
||||
return Err(JobError::MissingReport {
|
||||
id: next_job.id(),
|
||||
name: next_job.name().to_string(),
|
||||
});
|
||||
}
|
||||
|
||||
next_job.register_children(library).await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn pause_children(&mut self, library: &Library) -> Result<(), JobError> {
|
||||
if let Some(ref mut next_job) = self.next_job {
|
||||
for next_job in self.next_jobs.iter_mut() {
|
||||
let state = next_job.serialize_state()?;
|
||||
|
||||
// SAFETY: As these children jobs haven't been run yet, they still have their report field
|
||||
let mut report = next_job.report_mut().as_mut().unwrap();
|
||||
report.status = JobStatus::Paused;
|
||||
report.data = Some(state);
|
||||
report.update(library).await?;
|
||||
next_job.pause_children(library).await?;
|
||||
if let Some(next_job_report) = next_job.report_mut() {
|
||||
next_job_report.status = JobStatus::Paused;
|
||||
next_job_report.data = Some(state);
|
||||
next_job_report.update(library).await?;
|
||||
} else {
|
||||
return Err(JobError::MissingReport {
|
||||
id: next_job.id(),
|
||||
name: next_job.name().to_string(),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn cancel_children(&mut self, library: &Library) -> Result<(), JobError> {
|
||||
if let Some(ref mut next_job) = self.next_job {
|
||||
for next_job in self.next_jobs.iter_mut() {
|
||||
let state = next_job.serialize_state()?;
|
||||
|
||||
// SAFETY: As these children jobs haven't been run yet, they still have their report field
|
||||
let mut report = next_job.report_mut().as_mut().unwrap();
|
||||
report.status = JobStatus::Canceled;
|
||||
report.data = Some(state);
|
||||
report.update(library).await?;
|
||||
next_job.cancel_children(library).await?;
|
||||
if let Some(next_job_report) = next_job.report_mut() {
|
||||
next_job_report.status = JobStatus::Canceled;
|
||||
next_job_report.data = Some(state);
|
||||
next_job_report.update(library).await?;
|
||||
} else {
|
||||
return Err(JobError::MissingReport {
|
||||
id: next_job.id(),
|
||||
name: next_job.name().to_string(),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
|
|
@ -63,6 +63,7 @@ impl StatefulJob for ShallowIndexerJob {
|
|||
type Step = IndexerJobStep;
|
||||
|
||||
const NAME: &'static str = "shallow_indexer";
|
||||
const IS_BACKGROUND: bool = true;
|
||||
|
||||
fn new() -> Self {
|
||||
Self {}
|
||||
|
|
|
@ -330,10 +330,13 @@ pub async fn scan_location(
|
|||
|
||||
library
|
||||
.spawn_job(
|
||||
Job::new(IndexerJobInit {
|
||||
location,
|
||||
sub_path: None,
|
||||
})
|
||||
Job::new_with_action(
|
||||
IndexerJobInit {
|
||||
location,
|
||||
sub_path: None,
|
||||
},
|
||||
"scan_location",
|
||||
)
|
||||
.queue_next(FileIdentifierJobInit {
|
||||
location: location_base_data.clone(),
|
||||
sub_path: None,
|
||||
|
@ -362,10 +365,13 @@ pub async fn scan_location_sub_path(
|
|||
|
||||
library
|
||||
.spawn_job(
|
||||
Job::new(IndexerJobInit {
|
||||
location,
|
||||
sub_path: Some(sub_path.clone()),
|
||||
})
|
||||
Job::new_with_action(
|
||||
IndexerJobInit {
|
||||
location,
|
||||
sub_path: Some(sub_path.clone()),
|
||||
},
|
||||
"scan_location_sub_path",
|
||||
)
|
||||
.queue_next(FileIdentifierJobInit {
|
||||
location: location_base_data.clone(),
|
||||
sub_path: Some(sub_path.clone()),
|
||||
|
@ -393,10 +399,13 @@ pub async fn light_scan_location(
|
|||
|
||||
library
|
||||
.spawn_job(
|
||||
Job::new(ShallowIndexerJobInit {
|
||||
location,
|
||||
sub_path: sub_path.clone(),
|
||||
})
|
||||
Job::new_with_action(
|
||||
ShallowIndexerJobInit {
|
||||
location,
|
||||
sub_path: sub_path.clone(),
|
||||
},
|
||||
"light_scan_location",
|
||||
)
|
||||
.queue_next(ShallowFileIdentifierJobInit {
|
||||
location: location_base_data.clone(),
|
||||
sub_path: sub_path.clone(),
|
||||
|
|
|
@ -61,6 +61,7 @@ impl StatefulJob for ShallowFileIdentifierJob {
|
|||
type Step = ();
|
||||
|
||||
const NAME: &'static str = "shallow_file_identifier";
|
||||
const IS_BACKGROUND: bool = true;
|
||||
|
||||
fn new() -> Self {
|
||||
Self {}
|
||||
|
|
|
@ -60,6 +60,7 @@ impl StatefulJob for ShallowThumbnailerJob {
|
|||
type Step = ThumbnailerJobStep;
|
||||
|
||||
const NAME: &'static str = "shallow_thumbnailer";
|
||||
const IS_BACKGROUND: bool = true;
|
||||
|
||||
fn new() -> Self {
|
||||
Self {}
|
||||
|
|
|
@ -159,7 +159,7 @@ export type IndexerRuleCreateArgs = { kind: RuleKind, name: string, parameters:
|
|||
|
||||
export type InvalidateOperationEvent = { key: string, arg: any, result: any | null }
|
||||
|
||||
export type JobReport = { id: string, name: string, data: number[] | null, metadata: any | null, created_at: string | null, started_at: string | null, completed_at: string | null, parent_id: string | null, status: JobStatus, task_count: number, completed_task_count: number, message: string }
|
||||
export type JobReport = { id: string, name: string, action: string | null, data: number[] | null, metadata: any | null, is_background: boolean, created_at: string | null, started_at: string | null, completed_at: string | null, parent_id: string | null, status: JobStatus, task_count: number, completed_task_count: number, message: string }
|
||||
|
||||
export type JobStatus = "Queued" | "Running" | "Completed" | "Canceled" | "Failed" | "Paused"
|
||||
|
||||
|
|
Loading…
Reference in a new issue