This commit is contained in:
Oscar Beaumont 2023-06-23 10:30:00 +08:00
parent 996d274822
commit 08790fae93
9 changed files with 50 additions and 63 deletions

View file

@ -71,7 +71,7 @@ pub trait StatefulJob: Send + Sync + Sized {
) -> Result<(), JobError>;
/// is called after all steps have been executed
async fn finalize(&mut self, ctx: &mut WorkerContext, state: &mut JobState<Self>) -> JobResult;
async fn finalize(&mut self, ctx: &mut WorkerContext, data: &mut Self::Data) -> JobResult;
}
#[async_trait::async_trait]
@ -357,7 +357,16 @@ impl<SJob: StatefulJob> DynJob for Job<SJob> {
self.state.step_number += 1;
}
let metadata = self.stateful_job.finalize(ctx, &mut self.state).await?;
let metadata = self
.stateful_job
.finalize(
ctx,
&mut self
.data
.as_mut()
.expect("critical error: missing data on job state"),
)
.await?;
let mut next_jobs = mem::take(&mut self.next_jobs);

View file

@ -274,9 +274,9 @@ impl StatefulJob for IndexerJob {
Ok(())
}
async fn finalize(&mut self, ctx: &mut WorkerContext, state: &mut JobState<Self>) -> JobResult {
async fn finalize(&mut self, ctx: &mut WorkerContext, data: &mut Self::Data) -> JobResult {
let location_path =
maybe_missing(&state.init.location.path, "location.path").map(Path::new)?;
maybe_missing(&data.init.location.path, "location.path").map(Path::new)?;
finalize_indexer(location_path, state, ctx)
}

View file

@ -216,8 +216,8 @@ impl StatefulJob for FileIdentifierJob {
Ok(())
}
async fn finalize(&mut self, _: &mut WorkerContext, state: &mut JobState<Self>) -> JobResult {
let report = &extract_job_data!(state).report;
async fn finalize(&mut self, _: &mut WorkerContext, data: &mut Self::Data) -> JobResult {
let report = &data.report;
info!("Finalizing identifier job: {report:?}");

View file

@ -205,9 +205,9 @@ impl StatefulJob for FileCopierJob {
Ok(())
}
async fn finalize(&mut self, ctx: &mut WorkerContext, state: &mut JobState<Self>) -> JobResult {
async fn finalize(&mut self, ctx: &mut WorkerContext, data: &mut Self::Data) -> JobResult {
invalidate_query!(ctx.library, "search.paths");
Ok(Some(serde_json::to_value(&state.init)?))
Ok(Some(serde_json::to_value(&data.init)?))
}
}

View file

@ -21,7 +21,7 @@ use super::{fetch_source_and_target_location_paths, get_many_files_datas, FileDa
pub struct FileCutterJob {}
#[derive(Serialize, Deserialize, Hash, Type)]
#[derive(Debug, Clone, Serialize, Deserialize, Hash, Type)]
pub struct FileCutterJobInit {
pub source_location_id: location::id::Type,
pub target_location_id: location::id::Type,
@ -31,6 +31,7 @@ pub struct FileCutterJobInit {
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct FileCutterJobState {
init: FileCutterJobInit,
full_target_directory_path: PathBuf,
}
@ -71,6 +72,7 @@ impl StatefulJob for FileCutterJob {
);
let data = FileCutterJobState {
init: init.clone(),
full_target_directory_path,
};
@ -135,9 +137,9 @@ impl StatefulJob for FileCutterJob {
}
}
async fn finalize(&mut self, ctx: &mut WorkerContext, state: &mut JobState<Self>) -> JobResult {
async fn finalize(&mut self, ctx: &mut WorkerContext, data: &mut Self::Data) -> JobResult {
invalidate_query!(ctx.library, "search.paths");
Ok(Some(serde_json::to_value(&state.init)?))
Ok(Some(serde_json::to_value(&data.init)?))
}
}

View file

@ -32,7 +32,7 @@ impl JobInitData for FileDeleterJobInit {
#[async_trait::async_trait]
impl StatefulJob for FileDeleterJob {
type Init = FileDeleterJobInit;
type Data = ();
type Data = FileDeleterJobInit;
type Step = FileData;
const NAME: &'static str = "file_deleter";
@ -58,7 +58,7 @@ impl StatefulJob for FileDeleterJob {
ctx.progress(vec![JobReportUpdate::TaskCount(steps.len())]);
Ok(((), steps))
Ok((init, steps))
}
async fn execute_step(
@ -101,9 +101,9 @@ impl StatefulJob for FileDeleterJob {
Ok(())
}
async fn finalize(&mut self, ctx: &mut WorkerContext, state: &mut JobState<Self>) -> JobResult {
async fn finalize(&mut self, ctx: &mut WorkerContext, init: &mut Self::Data) -> JobResult {
invalidate_query!(ctx.library, "search.paths");
Ok(Some(serde_json::to_value(&state.init)?))
Ok(Some(serde_json::to_value(&init)?))
}
}

View file

@ -1,8 +1,6 @@
use crate::{
extract_job_data_mut, invalidate_query,
job::{
JobError, JobInitData, JobReportUpdate, JobResult, JobState, StatefulJob, WorkerContext,
},
invalidate_query,
job::{JobError, JobInitData, JobReportUpdate, JobResult, StatefulJob, WorkerContext},
library::Library,
location::file_path_helper::IsolatedFilePathData,
prisma::{file_path, location},
@ -29,7 +27,7 @@ use super::{
pub struct FileEraserJob {}
#[serde_as]
#[derive(Serialize, Deserialize, Hash, Type)]
#[derive(Clone, Serialize, Deserialize, Hash, Type)]
pub struct FileEraserJobInit {
pub location_id: location::id::Type,
pub file_path_ids: Vec<file_path::id::Type>,
@ -44,10 +42,7 @@ impl JobInitData for FileEraserJobInit {
#[derive(Serialize, Deserialize)]
pub struct FileEraserJobData {
location_id: location::id::Type,
// #[specta(type = String)]
// #[serde_as(as = "DisplayFromStr")]
passes: usize,
init: FileEraserJobInit,
location_path: PathBuf,
diretories_to_remove: Vec<PathBuf>,
}
@ -78,8 +73,7 @@ impl StatefulJob for FileEraserJob {
.into();
let data = FileEraserJobData {
location_id: init.location_id.clone(),
passes: init.passes.clone(),
init: init.clone(),
location_path,
diretories_to_remove: vec![],
};
@ -174,21 +168,16 @@ impl StatefulJob for FileEraserJob {
Ok(())
}
async fn finalize(&mut self, ctx: &mut WorkerContext, state: &mut JobState<Self>) -> JobResult {
try_join_all(
extract_job_data_mut!(state)
.diretories_to_remove
.drain(..)
.map(|data| async {
fs::remove_dir_all(&data)
.await
.map_err(|e| FileIOError::from((data, e)))
}),
)
async fn finalize(&mut self, ctx: &mut WorkerContext, data: &mut Self::Data) -> JobResult {
try_join_all(data.diretories_to_remove.drain(..).map(|data| async {
fs::remove_dir_all(&data)
.await
.map_err(|e| FileIOError::from((data, e)))
}))
.await?;
invalidate_query!(ctx.library, "search.paths");
Ok(Some(serde_json::to_value(&state.init)?))
Ok(Some(serde_json::to_value(&data.init)?))
}
}

View file

@ -1,8 +1,5 @@
use crate::{
extract_job_data,
job::{
JobError, JobInitData, JobReportUpdate, JobResult, JobState, StatefulJob, WorkerContext,
},
job::{JobError, JobInitData, JobReportUpdate, JobResult, StatefulJob, WorkerContext},
library::Library,
location::file_path_helper::{
ensure_file_path_exists, ensure_sub_path_is_directory, ensure_sub_path_is_in_location,
@ -177,9 +174,9 @@ impl StatefulJob for ThumbnailerJob {
}
}
async fn finalize(&mut self, ctx: &mut WorkerContext, state: &mut JobState<Self>) -> JobResult {
if let Some(state) = extract_job_data!(state) {
finalize_thumbnailer(state, ctx)
async fn finalize(&mut self, ctx: &mut WorkerContext, data: &mut Self::Data) -> JobResult {
if let Some(data) = data {
finalize_thumbnailer(data, ctx)
} else {
Ok(None)
}

View file

@ -1,8 +1,5 @@
use crate::{
extract_job_data,
job::{
JobError, JobInitData, JobReportUpdate, JobResult, JobState, StatefulJob, WorkerContext,
},
job::{JobError, JobInitData, JobReportUpdate, JobResult, StatefulJob, WorkerContext},
library::Library,
location::file_path_helper::{
ensure_file_path_exists, ensure_sub_path_is_directory, ensure_sub_path_is_in_location,
@ -36,14 +33,13 @@ pub struct ObjectValidatorJob {}
#[derive(Serialize, Deserialize, Debug)]
pub struct ObjectValidatorJobState {
// TODO: Only store minimal set of data as this could change during job execution
pub location: location::Data,
pub init: ObjectValidatorJobInit,
pub location_path: PathBuf,
pub task_count: usize,
}
// The validator can
#[derive(Serialize, Deserialize, Debug)]
#[derive(Clone, Serialize, Deserialize, Debug)]
pub struct ObjectValidatorJobInit {
pub location: location::Data,
pub sub_path: Option<PathBuf>,
@ -132,7 +128,7 @@ impl StatefulJob for ObjectValidatorJob {
.into();
let data = ObjectValidatorJobState {
location: init.location.clone(),
init: init.clone(),
location_path,
task_count: steps.len(),
};
@ -159,7 +155,7 @@ impl StatefulJob for ObjectValidatorJob {
// This if is just to make sure, we already queried objects where integrity_checksum is null
if file_path.integrity_checksum.is_none() {
let full_path = data.location_path.join(IsolatedFilePathData::try_from((
data.location.id,
data.init.location.id,
file_path,
))?);
let checksum = file_checksum(&full_path)
@ -188,17 +184,11 @@ impl StatefulJob for ObjectValidatorJob {
Ok(())
}
async fn finalize(
&mut self,
_ctx: &mut WorkerContext,
state: &mut JobState<Self>,
) -> JobResult {
let data = extract_job_data!(state);
async fn finalize(&mut self, _ctx: &mut WorkerContext, data: &mut Self::Data) -> JobResult {
info!(
"finalizing validator job at {}{}: {} tasks",
data.location_path.display(),
state
.init
data.init
.sub_path
.as_ref()
.map(|p| format!("{}", p.display()))
@ -206,6 +196,6 @@ impl StatefulJob for ObjectValidatorJob {
data.task_count
);
Ok(Some(serde_json::to_value(&state.init)?))
Ok(Some(serde_json::to_value(&data.init)?))
}
}