From 996361f0810f30205a9f9f7332db7c35d8e484d9 Mon Sep 17 00:00:00 2001 From: "Ericson \"Fogo\" Soares" Date: Thu, 12 Oct 2023 23:47:13 -0300 Subject: [PATCH] [ENG-1160] Opening a location while indexing causes a lot of jobs to be spawned (#1554) Solved, but missing a frontend error message --- core/src/api/locations.rs | 44 ++++++++--- core/src/job/manager.rs | 12 ++- core/src/job/mod.rs | 74 ++++++++++++++++++- core/src/job/worker.rs | 21 +++++- core/src/location/indexer/indexer_job.rs | 4 + core/src/location/indexer/shallow.rs | 9 ++- .../file_identifier/file_identifier_job.rs | 4 + core/src/object/fs/copy.rs | 4 + core/src/object/fs/cut.rs | 4 + core/src/object/fs/delete.rs | 4 + core/src/object/fs/erase.rs | 4 + core/src/object/media/media_processor/job.rs | 4 + core/src/object/media/thumbnail/actor.rs | 2 +- core/src/object/validation/validator_job.rs | 4 + 14 files changed, 175 insertions(+), 19 deletions(-) diff --git a/core/src/api/locations.rs b/core/src/api/locations.rs index 781a1638d..8fa5b297b 100644 --- a/core/src/api/locations.rs +++ b/core/src/api/locations.rs @@ -1,11 +1,15 @@ use crate::{ invalidate_query, + job::StatefulJob, location::{ - delete_location, find_location, indexer::rules::IndexerRuleCreateArgs, light_scan_location, - location_with_indexer_rules, non_indexed::NonIndexedPathItem, relink_location, - scan_location, scan_location_sub_path, LocationCreateArgs, LocationError, + delete_location, find_location, + indexer::{rules::IndexerRuleCreateArgs, IndexerJobInit}, + light_scan_location, location_with_indexer_rules, + non_indexed::NonIndexedPathItem, + relink_location, scan_location, scan_location_sub_path, LocationCreateArgs, LocationError, LocationUpdateArgs, }, + object::file_identifier::file_identifier_job::FileIdentifierJobInit, p2p::PeerMetadata, prisma::{file_path, indexer_rule, indexer_rules_in_location, location, object, SortOrder}, util::AbortOnDrop, @@ -307,24 +311,44 @@ pub(crate) fn mount() -> AlphaRouter { pub sub_path: String, } - R.with2(library()) - .subscription(|(node, library), args: LightScanArgs| async move { - let location = find_location(&library, args.location_id) + R.with2(library()).subscription( + |(node, library), + LightScanArgs { + location_id, + sub_path, + }: LightScanArgs| async move { + if node + .jobs + .has_job_running(|job_identity| { + job_identity.target_location == location_id + && (job_identity.name == ::NAME + || job_identity.name + == ::NAME) + }) + .await + { + return Err(rspc::Error::new( + ErrorCode::Conflict, + "We're still indexing this location, pleases wait a bit...".to_string(), + )); + } + + let location = find_location(&library, location_id) .include(location_with_indexer_rules::include()) .exec() .await? - .ok_or(LocationError::IdNotFound(args.location_id))?; + .ok_or(LocationError::IdNotFound(location_id))?; let handle = tokio::spawn(async move { - if let Err(e) = - light_scan_location(node, library, location, args.sub_path).await + if let Err(e) = light_scan_location(node, library, location, sub_path).await { error!("light scan error: {e:#?}"); } }); Ok(AbortOnDrop(handle)) - }) + }, + ) }) .procedure( "online", diff --git a/core/src/job/manager.rs b/core/src/job/manager.rs index adf16d8d0..63bc8c2b3 100644 --- a/core/src/job/manager.rs +++ b/core/src/job/manager.rs @@ -26,9 +26,8 @@ use tokio::sync::{mpsc, oneshot, RwLock}; use tracing::{debug, error, info, warn}; use uuid::Uuid; -use super::{JobManagerError, JobReport, JobStatus, StatefulJob}; +use super::{JobIdentity, JobManagerError, JobReport, JobStatus, StatefulJob}; -// db is single threaded, nerd const MAX_WORKERS: usize = 5; pub enum JobManagerEvent { @@ -355,6 +354,15 @@ impl Jobs { false } + + pub async fn has_job_running(&self, predicate: impl Fn(JobIdentity) -> bool) -> bool { + for worker in self.running_workers.read().await.values() { + if worker.who_am_i().await.map(&predicate).unwrap_or(false) { + return true; + } + } + false + } } #[macro_use] diff --git a/core/src/job/mod.rs b/core/src/job/mod.rs index 97e67eddd..7a1692325 100644 --- a/core/src/job/mod.rs +++ b/core/src/job/mod.rs @@ -9,6 +9,8 @@ use std::{ time::Instant, }; +use sd_prisma::prisma::location; + use serde::{de::DeserializeOwned, Deserialize, Serialize}; use tokio::{select, sync::mpsc}; use tracing::{debug, info, trace, warn}; @@ -27,6 +29,14 @@ pub use worker::*; pub type JobResult = Result; pub type JobMetadata = Option; +#[derive(Debug)] +pub struct JobIdentity { + pub id: Uuid, + pub name: &'static str, + pub target_location: location::id::Type, + pub status: JobStatus, +} + #[derive(Debug, Default)] pub struct JobRunErrors(pub Vec); @@ -84,6 +94,9 @@ pub trait StatefulJob: data: &mut Option, ) -> Result, JobError>; + /// The location id where this job will act upon + fn target_location(&self) -> location::id::Type; + /// is called for each step in the job. These steps are created in the `Self::init` method. async fn execute_step( &self, @@ -182,7 +195,6 @@ pub struct Job { hash: u64, report: Option, state: Option>, - // stateful_job: Option, next_jobs: VecDeque>, } @@ -462,6 +474,8 @@ impl DynJob for Job { .take() .expect("critical error: missing job state"); + let target_location = init.target_location(); + let stateful_job = Arc::new(init); let ctx = Arc::new(ctx); @@ -469,6 +483,9 @@ impl DynJob for Job { let mut job_should_run = true; let job_time = Instant::now(); + // Just for self identification purposes + let mut inner_status = JobStatus::Running; + // Checking if we have a brand new job, or if we are resuming an old one. let working_data = if let Some(data) = data { Some(data) @@ -496,16 +513,42 @@ impl DynJob for Job { select! { Some(command) = commands_rx.recv() => { match command { + WorkerCommand::IdentifyYourself(tx) => { + if tx.send( + JobIdentity { + id: job_id, + name: job_name, + target_location, + status: inner_status + } + ).is_err() { + warn!("Failed to send IdentifyYourself event reply"); + } + } WorkerCommand::Pause(when) => { debug!( "Pausing Job at init phase took {:?}", when.elapsed() ); + inner_status = JobStatus::Paused; + // In case of a Pause command, we keep waiting for the next command let paused_time = Instant::now(); while let Some(command) = commands_rx.recv().await { match command { + WorkerCommand::IdentifyYourself(tx) => { + if tx.send( + JobIdentity { + id: job_id, + name: job_name, + target_location, + status: inner_status + } + ).is_err() { + warn!("Failed to send IdentifyYourself event reply"); + } + } WorkerCommand::Resume(when) => { debug!( "Resuming Job at init phase took {:?}", @@ -515,6 +558,8 @@ impl DynJob for Job { "Total paused time {:?} Job ", paused_time.elapsed() ); + inner_status = JobStatus::Running; + break; } // The job can also be shutdown or canceled while paused @@ -654,16 +699,42 @@ impl DynJob for Job { // Here we have a channel that we use to receive commands from the worker Some(command) = commands_rx.recv() => { match command { + WorkerCommand::IdentifyYourself(tx) => { + if tx.send( + JobIdentity { + id: job_id, + name: job_name, + target_location, + status: inner_status + } + ).is_err() { + warn!("Failed to send IdentifyYourself event reply"); + } + } WorkerCommand::Pause(when) => { debug!( "Pausing Job took {:?}", when.elapsed() ); + inner_status = JobStatus::Paused; + // In case of a Pause command, we keep waiting for the next command let paused_time = Instant::now(); while let Some(command) = commands_rx.recv().await { match command { + WorkerCommand::IdentifyYourself(tx) => { + if tx.send( + JobIdentity { + id: job_id, + name: job_name, + target_location, + status: inner_status + } + ).is_err() { + warn!("Failed to send IdentifyYourself event reply"); + } + } WorkerCommand::Resume(when) => { debug!( "Resuming Job took {:?}", @@ -673,6 +744,7 @@ impl DynJob for Job { "Total paused time {:?} Job ", paused_time.elapsed(), ); + inner_status = JobStatus::Running; break; } // The job can also be shutdown or canceled while paused diff --git a/core/src/job/worker.rs b/core/src/job/worker.rs index 651915139..3068000dc 100644 --- a/core/src/job/worker.rs +++ b/core/src/job/worker.rs @@ -22,7 +22,8 @@ use tracing::{debug, error, info, trace, warn}; use uuid::Uuid; use super::{ - DynJob, JobError, JobReport, JobReportUpdate, JobRunErrors, JobRunOutput, JobStatus, Jobs, + DynJob, JobError, JobIdentity, JobReport, JobReportUpdate, JobRunErrors, JobRunOutput, + JobStatus, Jobs, }; #[derive(Debug, Clone, Serialize, Type)] @@ -47,6 +48,7 @@ pub enum WorkerEvent { pub enum WorkerCommand { Pause(Instant), Resume(Instant), + IdentifyYourself(oneshot::Sender), Cancel(Instant, oneshot::Sender<()>), Shutdown(Instant, oneshot::Sender<()>), } @@ -174,6 +176,23 @@ impl Worker { } } + pub async fn who_am_i(&self) -> Option { + let (tx, rx) = oneshot::channel(); + if self + .commands_tx + .send(WorkerCommand::IdentifyYourself(tx)) + .await + .is_err() + { + warn!("Failed to send identify yourself command to a job worker"); + return None; + } + + rx.await + .map_err(|_| warn!("Failed to receive identify yourself answer from a job worker")) + .ok() + } + pub async fn resume(&self) { if self.report_watch_rx.borrow().status == JobStatus::Paused { self.paused.store(false, Ordering::Relaxed); diff --git a/core/src/location/indexer/indexer_job.rs b/core/src/location/indexer/indexer_job.rs index 9be3a9653..2cd310442 100644 --- a/core/src/location/indexer/indexer_job.rs +++ b/core/src/location/indexer/indexer_job.rs @@ -150,6 +150,10 @@ impl StatefulJob for IndexerJobInit { const NAME: &'static str = "indexer"; const IS_BATCHED: bool = true; + fn target_location(&self) -> location::id::Type { + self.location.id + } + /// Creates a vector of valid path buffers from a directory, chunked into batches of `BATCH_SIZE`. async fn init( &self, diff --git a/core/src/location/indexer/shallow.rs b/core/src/location/indexer/shallow.rs index 98ef8b14b..048b41bac 100644 --- a/core/src/location/indexer/shallow.rs +++ b/core/src/location/indexer/shallow.rs @@ -90,8 +90,6 @@ pub async fn shallow( let to_remove_count = to_remove.len(); - debug!("Walker at shallow indexer found {to_remove_count} file_paths to be removed"); - node.thumbnailer .remove_cas_ids( to_remove @@ -167,12 +165,15 @@ pub async fn shallow( }) .collect::>(); - debug!("Walker at shallow indexer found {to_update_count} file_paths to be updated"); - for step in update_steps { execute_indexer_update_step(&step, library).await?; } + debug!( + "Walker at shallow indexer found: \ + To create: {to_create_count}; To update: {to_update_count}; To remove: {to_remove_count};" + ); + if to_create_count > 0 || to_update_count > 0 || to_remove_count > 0 { if to_walk_path != location_path { reverse_update_directories_sizes(to_walk_path, location_id, location_path, library) diff --git a/core/src/object/file_identifier/file_identifier_job.rs b/core/src/object/file_identifier/file_identifier_job.rs index 5e267736b..257a4f83f 100644 --- a/core/src/object/file_identifier/file_identifier_job.rs +++ b/core/src/object/file_identifier/file_identifier_job.rs @@ -77,6 +77,10 @@ impl StatefulJob for FileIdentifierJobInit { const NAME: &'static str = "file_identifier"; const IS_BATCHED: bool = true; + fn target_location(&self) -> location::id::Type { + self.location.id + } + async fn init( &self, ctx: &WorkerContext, diff --git a/core/src/object/fs/copy.rs b/core/src/object/fs/copy.rs index 1d5df86b9..b42d66c7a 100644 --- a/core/src/object/fs/copy.rs +++ b/core/src/object/fs/copy.rs @@ -54,6 +54,10 @@ impl StatefulJob for FileCopierJobInit { const NAME: &'static str = "file_copier"; + fn target_location(&self) -> location::id::Type { + self.target_location_id + } + async fn init( &self, ctx: &WorkerContext, diff --git a/core/src/object/fs/cut.rs b/core/src/object/fs/cut.rs index 88660e91a..30221ecf2 100644 --- a/core/src/object/fs/cut.rs +++ b/core/src/object/fs/cut.rs @@ -42,6 +42,10 @@ impl StatefulJob for FileCutterJobInit { const NAME: &'static str = "file_cutter"; + fn target_location(&self) -> location::id::Type { + self.target_location_id + } + async fn init( &self, ctx: &WorkerContext, diff --git a/core/src/object/fs/delete.rs b/core/src/object/fs/delete.rs index 59904ab54..ea2d880ae 100644 --- a/core/src/object/fs/delete.rs +++ b/core/src/object/fs/delete.rs @@ -33,6 +33,10 @@ impl StatefulJob for FileDeleterJobInit { const NAME: &'static str = "file_deleter"; + fn target_location(&self) -> location::id::Type { + self.location_id + } + async fn init( &self, ctx: &WorkerContext, diff --git a/core/src/object/fs/erase.rs b/core/src/object/fs/erase.rs index 62ae114bb..5e186b11a 100644 --- a/core/src/object/fs/erase.rs +++ b/core/src/object/fs/erase.rs @@ -62,6 +62,10 @@ impl StatefulJob for FileEraserJobInit { const NAME: &'static str = "file_eraser"; + fn target_location(&self) -> location::id::Type { + self.location_id + } + async fn init( &self, ctx: &WorkerContext, diff --git a/core/src/object/media/media_processor/job.rs b/core/src/object/media/media_processor/job.rs index 51f53f6b9..2c3815c50 100644 --- a/core/src/object/media/media_processor/job.rs +++ b/core/src/object/media/media_processor/job.rs @@ -67,6 +67,10 @@ impl StatefulJob for MediaProcessorJobInit { const NAME: &'static str = "media_processor"; const IS_BATCHED: bool = true; + fn target_location(&self) -> location::id::Type { + self.location.id + } + async fn init( &self, ctx: &WorkerContext, diff --git a/core/src/object/media/thumbnail/actor.rs b/core/src/object/media/thumbnail/actor.rs index 1ae88e472..cca9cb6bb 100644 --- a/core/src/object/media/thumbnail/actor.rs +++ b/core/src/object/media/thumbnail/actor.rs @@ -59,7 +59,7 @@ enum DatabaseMessage { // Thumbnails directory have the following structure: // thumbnails/ // ├── version.txt -//└── [0..2]/ # sharding +// └── [0..2]/ # sharding // └── .webp pub struct Thumbnailer { cas_ids_to_delete_tx: chan::Sender>, diff --git a/core/src/object/validation/validator_job.rs b/core/src/object/validation/validator_job.rs index c77038c59..2e692c909 100644 --- a/core/src/object/validation/validator_job.rs +++ b/core/src/object/validation/validator_job.rs @@ -58,6 +58,10 @@ impl StatefulJob for ObjectValidatorJobInit { const NAME: &'static str = "object_validator"; + fn target_location(&self) -> location::id::Type { + self.location.id + } + async fn init( &self, ctx: &WorkerContext,