A wrong premise about report on a running job

This commit is contained in:
Ericson Fogo Soares 2022-09-07 23:43:04 -03:00
parent 752c4ad45c
commit 0f92dd7af1
5 changed files with 33 additions and 40 deletions

View file

@ -1,9 +1,7 @@
use crate::{
api::{locations::LocationExplorerArgs, CoreEvent, LibraryArgs},
invalidate_query,
job::{
JobError, JobMetadata, JobReportUpdate, JobResult, JobState, StatefulJob, WorkerContext,
},
job::{JobError, JobReportUpdate, JobResult, JobState, StatefulJob, WorkerContext},
library::LibraryContext,
prisma::{file_path, location},
};
@ -95,7 +93,7 @@ impl StatefulJob for ThumbnailJob {
});
state.steps = image_files.into_iter().collect();
Ok(())
Ok(None)
}
async fn execute_step(
@ -128,12 +126,12 @@ impl StatefulJob for ThumbnailJob {
"skipping thumbnail generation for {}",
step.materialized_path
);
return Ok(());
return Ok(None);
}
}
Err(_) => {
error!("Error getting cas_id {:?}", step.materialized_path);
return Ok(());
return Ok(None);
}
};
@ -175,14 +173,14 @@ impl StatefulJob for ThumbnailJob {
state.step_number + 1,
)]);
Ok(())
Ok(None)
}
async fn finalize(
&self,
_ctx: WorkerContext,
state: &mut JobState<Self::Init, Self::Data, Self::Step>,
) -> Result<JobMetadata, JobError> {
) -> JobResult {
let data = state
.data
.as_ref()

View file

@ -1,7 +1,5 @@
use crate::{
job::{
JobError, JobMetadata, JobReportUpdate, JobResult, JobState, StatefulJob, WorkerContext,
},
job::{JobError, JobReportUpdate, JobResult, JobState, StatefulJob, WorkerContext},
library::LibraryContext,
prisma::{file, file_path, location},
};
@ -126,7 +124,7 @@ impl StatefulJob for FileIdentifierJob {
});
state.steps = (0..task_count).map(|_| ()).collect();
Ok(())
Ok(None)
}
async fn execute_step(
@ -290,14 +288,14 @@ impl StatefulJob for FileIdentifierJob {
]);
// let _remaining = count_orphan_file_paths(&ctx.core_ctx, location.id.into()).await?;
Ok(())
Ok(None)
}
async fn finalize(
&self,
_ctx: WorkerContext,
state: &mut JobState<Self::Init, Self::Data, Self::Step>,
) -> Result<JobMetadata, JobError> {
) -> JobResult {
let data = state
.data
.as_ref()

View file

@ -42,7 +42,7 @@ pub enum JobError {
Paused(Vec<u8>),
}
pub type JobResult = Result<(), JobError>;
pub type JobResult = Result<JobMetadata, JobError>;
pub type JobMetadata = Option<Vec<u8>>;
#[async_trait::async_trait]
@ -68,7 +68,7 @@ pub trait StatefulJob: Send + Sync {
&self,
ctx: WorkerContext,
state: &mut JobState<Self::Init, Self::Data, Self::Step>,
) -> Result<JobMetadata, JobError>;
) -> JobResult;
}
#[async_trait::async_trait]
@ -184,12 +184,8 @@ where
self.state.step_number += 1;
}
// It is ok to unwrap here, a running job will always have a report.
self.report.as_mut().unwrap().metadata = self
.stateful_job
self.stateful_job
.finalize(ctx.clone(), &mut self.state)
.await?;
Ok(())
.await
}
}

View file

@ -1,4 +1,4 @@
use crate::job::{DynJob, JobError, JobManager, JobReportUpdate, JobStatus};
use crate::job::{DynJob, JobError, JobManager, JobMetadata, JobReportUpdate, JobStatus};
use crate::library::LibraryContext;
use crate::{api::LibraryArgs, invalidate_query};
use std::{sync::Arc, time::Duration};
@ -22,7 +22,7 @@ pub enum WorkerEvent {
updates: Vec<JobReportUpdate>,
debounce: bool,
},
Completed(oneshot::Sender<()>),
Completed(oneshot::Sender<()>, JobMetadata),
Failed(oneshot::Sender<()>),
Paused(Vec<u8>, oneshot::Sender<()>),
}
@ -151,25 +151,27 @@ impl Worker {
let (done_tx, done_rx) = oneshot::channel();
if let Err(e) = job.run(worker_ctx.clone()).await {
if let JobError::Paused(state) = e {
match job.run(worker_ctx.clone()).await {
Ok(metadata) => {
// handle completion
worker_ctx
.events_tx
.send(WorkerEvent::Completed(done_tx, metadata))
.expect("critical error: failed to send worker complete event");
}
Err(JobError::Paused(state)) => {
worker_ctx
.events_tx
.send(WorkerEvent::Paused(state, done_tx))
.expect("critical error: failed to send worker pause event");
} else {
}
Err(e) => {
error!("job '{}' failed with error: {:#?}", job_id, e);
worker_ctx
.events_tx
.send(WorkerEvent::Failed(done_tx))
.expect("critical error: failed to send worker fail event");
}
} else {
// handle completion
worker_ctx
.events_tx
.send(WorkerEvent::Completed(done_tx))
.expect("critical error: failed to send worker complete event");
}
if let Err(e) = done_rx.await {
@ -228,9 +230,10 @@ impl Worker {
LibraryArgs::new(library.id, ())
);
}
WorkerEvent::Completed(done_tx) => {
WorkerEvent::Completed(done_tx, metadata) => {
worker.report.status = JobStatus::Completed;
worker.report.data = None;
worker.report.metadata = metadata;
if let Err(e) = worker.report.update(&library).await {
error!("failed to update job report: {:#?}", e);
}

View file

@ -1,7 +1,5 @@
use crate::{
job::{
JobError, JobMetadata, JobReportUpdate, JobResult, JobState, StatefulJob, WorkerContext,
},
job::{JobReportUpdate, JobResult, JobState, StatefulJob, WorkerContext},
prisma::{file_path, location},
};
@ -221,7 +219,7 @@ impl StatefulJob for IndexerJob {
})
.collect();
Ok(())
Ok(None)
}
/// Process each chunk of entries in the indexer job, writing to the `file_path` table
@ -284,7 +282,7 @@ impl StatefulJob for IndexerJob {
info!("Inserted {count} records");
Ok(())
Ok(None)
}
/// Logs some metadata about the indexer job
@ -292,7 +290,7 @@ impl StatefulJob for IndexerJob {
&self,
_ctx: WorkerContext,
state: &mut JobState<Self::Init, Self::Data, Self::Step>,
) -> Result<JobMetadata, JobError> {
) -> JobResult {
let data = state
.data
.as_ref()