From 73f521a3b8bab4ade684878e9faaf7119145f6f9 Mon Sep 17 00:00:00 2001 From: "Ericson \"Fogo\" Soares" Date: Thu, 25 Apr 2024 01:06:11 -0300 Subject: [PATCH] [ENG-1629] Write new file identifier with the task system (#2334) * Introduce deep vs shallow for indexer tasks with different priorities * Make job wait to dispatch if it's paused * Extract file metadata task on file identifier job * Some initial drafts on object processor task * Object processor task for file identifier * File Identifier job and shallow --- Cargo.lock | 3 + Cargo.toml | 1 + core/Cargo.toml | 2 +- .../src/isolated_file_path_data.rs | 12 + core/crates/heavy-lifting/Cargo.toml | 3 + .../src/file_identifier/cas_id.rs | 68 +++ .../heavy-lifting/src/file_identifier/job.rs | 566 ++++++++++++++++++ .../heavy-lifting/src/file_identifier/mod.rs | 120 ++++ .../src/file_identifier/shallow.rs | 207 +++++++ .../tasks/extract_file_metadata.rs | 280 +++++++++ .../src/file_identifier/tasks/mod.rs | 18 + .../file_identifier/tasks/object_processor.rs | 473 +++++++++++++++ core/crates/heavy-lifting/src/indexer/job.rs | 160 ++--- core/crates/heavy-lifting/src/indexer/mod.rs | 45 +- .../heavy-lifting/src/indexer/shallow.rs | 15 +- .../heavy-lifting/src/indexer/tasks/saver.rs | 33 +- .../src/indexer/tasks/updater.rs | 43 +- .../heavy-lifting/src/indexer/tasks/walker.rs | 88 ++- .../heavy-lifting/src/job_system/job.rs | 53 +- .../heavy-lifting/src/job_system/store.rs | 3 +- core/crates/heavy-lifting/src/lib.rs | 17 + core/crates/heavy-lifting/src/utils/mod.rs | 1 + .../heavy-lifting/src/utils/sub_path.rs | 93 +++ core/crates/prisma-helpers/src/lib.rs | 1 + crates/task-system/src/task.rs | 31 +- 25 files changed, 2171 insertions(+), 165 deletions(-) create mode 100644 core/crates/heavy-lifting/src/file_identifier/cas_id.rs create mode 100644 core/crates/heavy-lifting/src/file_identifier/job.rs create mode 100644 core/crates/heavy-lifting/src/file_identifier/mod.rs create mode 100644 core/crates/heavy-lifting/src/file_identifier/shallow.rs create mode 100644 core/crates/heavy-lifting/src/file_identifier/tasks/extract_file_metadata.rs create mode 100644 core/crates/heavy-lifting/src/file_identifier/tasks/mod.rs create mode 100644 core/crates/heavy-lifting/src/file_identifier/tasks/object_processor.rs create mode 100644 core/crates/heavy-lifting/src/utils/mod.rs create mode 100644 core/crates/heavy-lifting/src/utils/sub_path.rs diff --git a/Cargo.lock b/Cargo.lock index 546521aa1..3a38b0ab6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8634,6 +8634,7 @@ version = "0.1.0" dependencies = [ "async-channel", "async-trait", + "blake3", "chrono", "futures", "futures-concurrency", @@ -8648,6 +8649,7 @@ dependencies = [ "sd-core-indexer-rules", "sd-core-prisma-helpers", "sd-core-sync", + "sd-file-ext", "sd-prisma", "sd-sync", "sd-task-system", @@ -8655,6 +8657,7 @@ dependencies = [ "serde", "serde_json", "specta", + "static_assertions", "strum", "tempfile", "thiserror", diff --git a/Cargo.toml b/Cargo.toml index 437a2dbf2..309aa5a98 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -76,6 +76,7 @@ rmp-serde = "1.1.2" rmpv = { version = "^1.0.1", features = ["with-serde"] } serde = "1.0" serde_json = "1.0" +static_assertions = "1.1.0" strum = "0.25" strum_macros = "0.25" tempfile = "3.8.1" diff --git a/core/Cargo.toml b/core/Cargo.toml index a903678af..2d18499a3 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -82,6 +82,7 @@ rspc = { workspace = true, features = [ serde = { workspace = true, features = ["derive"] } serde_json = { workspace = true } specta = { workspace = true } +static_assertions = { workspace = true } strum = { workspace = true, features = ["derive"] } strum_macros = { workspace = true } tempfile = { workspace = true } @@ -128,7 +129,6 @@ serde-hashkey = "0.4.5" serde_repr = "0.1" serde_with = "3.4.0" slotmap = "1.0.6" -static_assertions = "1.1.0" sysinfo = "0.29.10" tar = "0.4.40" tower-service = "0.3.2" diff --git a/core/crates/file-path-helper/src/isolated_file_path_data.rs b/core/crates/file-path-helper/src/isolated_file_path_data.rs index 21852fe18..ba321dbbc 100644 --- a/core/crates/file-path-helper/src/isolated_file_path_data.rs +++ b/core/crates/file-path-helper/src/isolated_file_path_data.rs @@ -100,6 +100,18 @@ impl<'a> IsolatedFilePathData<'a> { self.extension.as_ref() } + #[must_use] + pub fn to_owned(self) -> IsolatedFilePathData<'static> { + IsolatedFilePathData { + location_id: self.location_id, + materialized_path: Cow::Owned(self.materialized_path.to_string()), + is_dir: self.is_dir, + name: Cow::Owned(self.name.to_string()), + extension: Cow::Owned(self.extension.to_string()), + relative_path: Cow::Owned(self.relative_path.to_string()), + } + } + #[must_use] pub const fn is_dir(&self) -> bool { self.is_dir diff --git a/core/crates/heavy-lifting/Cargo.toml b/core/crates/heavy-lifting/Cargo.toml index a1bd037e1..fb73a63fd 100644 --- a/core/crates/heavy-lifting/Cargo.toml +++ b/core/crates/heavy-lifting/Cargo.toml @@ -16,6 +16,7 @@ sd-core-prisma-helpers = { path = "../prisma-helpers" } sd-core-sync = { path = "../sync" } # Sub-crates +sd-file-ext = { path = "../../../crates/file-ext" } sd-prisma = { path = "../../../crates/prisma" } sd-sync = { path = "../../../crates/sync" } sd-task-system = { path = "../../../crates/task-system" } @@ -24,6 +25,7 @@ sd-utils = { path = "../../../crates/utils" } async-channel = { workspace = true } async-trait = { workspace = true } +blake3 = { workspace = true } chrono = { workspace = true, features = ["serde"] } futures = { workspace = true } futures-concurrency = { workspace = true } @@ -37,6 +39,7 @@ rspc = { workspace = true } serde = { workspace = true, features = ["derive"] } serde_json = { workspace = true } specta = { workspace = true } +static_assertions = { workspace = true } strum = { workspace = true, features = ["derive", "phf"] } thiserror = { workspace = true } tokio = { workspace = true, features = ["fs", "sync", "parking_lot"] } diff --git a/core/crates/heavy-lifting/src/file_identifier/cas_id.rs b/core/crates/heavy-lifting/src/file_identifier/cas_id.rs new file mode 100644 index 000000000..5ad5a9456 --- /dev/null +++ b/core/crates/heavy-lifting/src/file_identifier/cas_id.rs @@ -0,0 +1,68 @@ +use std::path::Path; + +use blake3::Hasher; +use static_assertions::const_assert; +use tokio::{ + fs::{self, File}, + io::{self, AsyncReadExt, AsyncSeekExt, SeekFrom}, +}; + +const SAMPLE_COUNT: u64 = 4; +const SAMPLE_SIZE: u64 = 1024 * 10; +const HEADER_OR_FOOTER_SIZE: u64 = 1024 * 8; + +// minimum file size of 100KiB, to avoid sample hashing for small files as they can be smaller than the total sample size +const MINIMUM_FILE_SIZE: u64 = 1024 * 100; + +// Asserting that nobody messed up our consts +const_assert!((HEADER_OR_FOOTER_SIZE * 2 + SAMPLE_COUNT * SAMPLE_SIZE) < MINIMUM_FILE_SIZE); + +// Asserting that the sample size is larger than header/footer size, as the same buffer is used for both +const_assert!(SAMPLE_SIZE > HEADER_OR_FOOTER_SIZE); + +// SAFETY: Casts here are safe, they're hardcoded values we have some const assertions above to make sure they're correct +#[allow(clippy::cast_possible_truncation)] +#[allow(clippy::cast_possible_wrap)] +pub async fn generate_cas_id( + path: impl AsRef + Send, + size: u64, +) -> Result { + let mut hasher = Hasher::new(); + hasher.update(&size.to_le_bytes()); + + if size <= MINIMUM_FILE_SIZE { + // For small files, we hash the whole file + hasher.update(&fs::read(path).await?); + } else { + let mut file = File::open(path).await?; + let mut buf = vec![0; SAMPLE_SIZE as usize].into_boxed_slice(); + + // Hashing the header + let mut current_pos = file + .read_exact(&mut buf[..HEADER_OR_FOOTER_SIZE as usize]) + .await? as u64; + hasher.update(&buf[..HEADER_OR_FOOTER_SIZE as usize]); + + // Sample hashing the inner content of the file + let seek_jump = (size - HEADER_OR_FOOTER_SIZE * 2) / SAMPLE_COUNT; + loop { + file.read_exact(&mut buf).await?; + hasher.update(&buf); + + if current_pos >= (HEADER_OR_FOOTER_SIZE + seek_jump * (SAMPLE_COUNT - 1)) { + break; + } + + current_pos = file.seek(SeekFrom::Start(current_pos + seek_jump)).await?; + } + + // Hashing the footer + file.seek(SeekFrom::End(-(HEADER_OR_FOOTER_SIZE as i64))) + .await?; + file.read_exact(&mut buf[..HEADER_OR_FOOTER_SIZE as usize]) + .await?; + hasher.update(&buf[..HEADER_OR_FOOTER_SIZE as usize]); + } + + Ok(hasher.finalize().to_hex()[..16].to_string()) +} diff --git a/core/crates/heavy-lifting/src/file_identifier/job.rs b/core/crates/heavy-lifting/src/file_identifier/job.rs new file mode 100644 index 000000000..d01a55f50 --- /dev/null +++ b/core/crates/heavy-lifting/src/file_identifier/job.rs @@ -0,0 +1,566 @@ +use crate::{ + job_system::{ + job::{Job, JobReturn, JobTaskDispatcher, ReturnStatus}, + report::ReportOutputMetadata, + utils::cancel_pending_tasks, + SerializableJob, SerializedTasks, + }, + utils::sub_path::maybe_get_iso_file_path_from_sub_path, + Error, JobContext, JobName, LocationScanState, NonCriticalJobError, ProgressUpdate, +}; + +use sd_core_file_path_helper::IsolatedFilePathData; +use sd_core_prisma_helpers::file_path_for_file_identifier; + +use sd_prisma::prisma::{file_path, location, SortOrder}; +use sd_task_system::{ + AnyTaskOutput, IntoTask, SerializableTask, Task, TaskDispatcher, TaskHandle, TaskId, + TaskOutput, TaskStatus, +}; +use sd_utils::db::maybe_missing; + +use std::{ + collections::HashMap, + hash::{Hash, Hasher}, + mem, + path::PathBuf, + sync::Arc, + time::Duration, +}; + +use futures::{stream::FuturesUnordered, StreamExt}; +use futures_concurrency::future::TryJoin; +use prisma_client_rust::or; +use serde::{Deserialize, Serialize}; +use serde_json::json; +use tokio::time::Instant; +use tracing::warn; + +use super::{ + tasks::{ + ExtractFileMetadataTask, ExtractFileMetadataTaskOutput, ObjectProcessorTask, + ObjectProcessorTaskMetrics, + }, + FileIdentifierError, CHUNK_SIZE, +}; + +#[derive(Debug)] +pub struct FileIdentifierJob { + location: Arc, + location_path: Arc, + sub_path: Option, + + metadata: Metadata, + + errors: Vec, + + pending_tasks_on_resume: Vec>, + tasks_for_shutdown: Vec>>, +} + +impl Hash for FileIdentifierJob { + fn hash(&self, state: &mut H) { + self.location.id.hash(state); + if let Some(ref sub_path) = self.sub_path { + sub_path.hash(state); + } + } +} + +impl Job for FileIdentifierJob { + const NAME: JobName = JobName::FileIdentifier; + + async fn resume_tasks( + &mut self, + dispatcher: &JobTaskDispatcher, + ctx: &impl JobContext, + SerializedTasks(serialized_tasks): SerializedTasks, + ) -> Result<(), Error> { + self.pending_tasks_on_resume = dispatcher + .dispatch_many_boxed( + rmp_serde::from_slice::)>>(&serialized_tasks) + .map_err(FileIdentifierError::from)? + .into_iter() + .map(|(task_kind, task_bytes)| async move { + match task_kind { + TaskKind::ExtractFileMetadata => { + >::deserialize( + &task_bytes, + (), + ) + .await + .map(IntoTask::into_task) + } + + TaskKind::ObjectProcessor => ObjectProcessorTask::deserialize( + &task_bytes, + (Arc::clone(ctx.db()), Arc::clone(ctx.sync())), + ) + .await + .map(IntoTask::into_task), + } + }) + .collect::>() + .try_join() + .await + .map_err(FileIdentifierError::from)?, + ) + .await; + + Ok(()) + } + + async fn run( + mut self, + dispatcher: JobTaskDispatcher, + ctx: impl JobContext, + ) -> Result { + let mut pending_running_tasks = FuturesUnordered::new(); + + self.init_or_resume(&mut pending_running_tasks, &ctx, &dispatcher) + .await?; + + while let Some(task) = pending_running_tasks.next().await { + match task { + Ok(TaskStatus::Done((task_id, TaskOutput::Out(out)))) => { + if let Some(new_object_processor_task) = self + .process_task_output(task_id, out, &ctx, &dispatcher) + .await + { + pending_running_tasks.push(new_object_processor_task); + }; + } + + Ok(TaskStatus::Done((task_id, TaskOutput::Empty))) => { + warn!("Task returned an empty output"); + } + + Ok(TaskStatus::Shutdown(task)) => { + self.tasks_for_shutdown.push(task); + } + + Ok(TaskStatus::Error(e)) => { + cancel_pending_tasks(&pending_running_tasks).await; + + return Err(e); + } + + Ok(TaskStatus::Canceled | TaskStatus::ForcedAbortion) => { + cancel_pending_tasks(&pending_running_tasks).await; + + return Ok(ReturnStatus::Canceled); + } + + Err(e) => { + cancel_pending_tasks(&pending_running_tasks).await; + + return Err(e.into()); + } + } + } + + if !self.tasks_for_shutdown.is_empty() { + return Ok(ReturnStatus::Shutdown(self.serialize().await)); + } + + // From this point onward, we are done with the job and it can't be interrupted anymore + let Self { + location, + metadata, + errors, + .. + } = self; + + ctx.db() + .location() + .update( + location::id::equals(location.id), + vec![location::scan_state::set( + LocationScanState::FilesIdentified as i32, + )], + ) + .exec() + .await + .map_err(FileIdentifierError::from)?; + + Ok(ReturnStatus::Completed( + JobReturn::builder() + .with_metadata(metadata) + .with_non_critical_errors(errors) + .build(), + )) + } +} + +impl FileIdentifierJob { + pub fn new( + location: location::Data, + sub_path: Option, + ) -> Result { + Ok(Self { + location_path: maybe_missing(&location.path, "location.path") + .map(PathBuf::from) + .map(Arc::new)?, + location: Arc::new(location), + sub_path, + metadata: Metadata::default(), + errors: Vec::new(), + pending_tasks_on_resume: Vec::new(), + tasks_for_shutdown: Vec::new(), + }) + } + + async fn init_or_resume( + &mut self, + pending_running_tasks: &mut FuturesUnordered>, + job_ctx: &impl JobContext, + dispatcher: &JobTaskDispatcher, + ) -> Result<(), FileIdentifierError> { + // if we don't have any pending task, then this is a fresh job + if self.pending_tasks_on_resume.is_empty() { + let db = job_ctx.db(); + let maybe_sub_iso_file_path = maybe_get_iso_file_path_from_sub_path( + self.location.id, + &self.sub_path, + &*self.location_path, + db, + ) + .await?; + + let mut orphans_count = 0; + let mut last_orphan_file_path_id = None; + + let start = Instant::now(); + + loop { + #[allow(clippy::cast_possible_wrap)] + // SAFETY: we know that CHUNK_SIZE is a valid i64 + let orphan_paths = db + .file_path() + .find_many(orphan_path_filters( + self.location.id, + last_orphan_file_path_id, + &maybe_sub_iso_file_path, + )) + .order_by(file_path::id::order(SortOrder::Asc)) + .take(CHUNK_SIZE as i64) + .select(file_path_for_file_identifier::select()) + .exec() + .await?; + + if orphan_paths.is_empty() { + break; + } + + orphans_count += orphan_paths.len() as u64; + last_orphan_file_path_id = + Some(orphan_paths.last().expect("orphan_paths is not empty").id); + + job_ctx.progress(vec![ + ProgressUpdate::TaskCount(orphans_count), + ProgressUpdate::Message(format!("{orphans_count} files to be identified")), + ]); + + pending_running_tasks.push( + dispatcher + .dispatch(ExtractFileMetadataTask::new_deep( + Arc::clone(&self.location), + Arc::clone(&self.location_path), + orphan_paths, + )) + .await, + ); + } + + self.metadata.seeking_orphans_time = start.elapsed(); + self.metadata.total_found_orphans = orphans_count; + } else { + pending_running_tasks.extend(mem::take(&mut self.pending_tasks_on_resume)); + } + + Ok(()) + } + + /// Process output of tasks, according to the downcasted output type + /// + /// # Panics + /// Will panic if another task type is added in the job, but this function wasn't updated to handle it + /// + async fn process_task_output( + &mut self, + task_id: TaskId, + any_task_output: Box, + job_ctx: &impl JobContext, + dispatcher: &JobTaskDispatcher, + ) -> Option> { + if any_task_output.is::() { + return self + .process_extract_file_metadata_output( + *any_task_output + .downcast::() + .expect("just checked"), + job_ctx, + dispatcher, + ) + .await; + } else if any_task_output.is::() { + self.process_object_processor_output( + *any_task_output + .downcast::() + .expect("just checked"), + job_ctx, + ); + } else { + unreachable!("Unexpected task output type: "); + } + + None + } + + async fn process_extract_file_metadata_output( + &mut self, + ExtractFileMetadataTaskOutput { + identified_files, + extract_metadata_time, + errors, + }: ExtractFileMetadataTaskOutput, + job_ctx: &impl JobContext, + dispatcher: &JobTaskDispatcher, + ) -> Option> { + self.metadata.extract_metadata_time += extract_metadata_time; + self.errors.extend(errors); + + if identified_files.is_empty() { + self.metadata.completed_tasks += 1; + + job_ctx.progress(vec![ProgressUpdate::CompletedTaskCount( + self.metadata.completed_tasks, + )]); + + None + } else { + job_ctx.progress_msg(format!("Identified {} files", identified_files.len())); + + Some( + dispatcher + .dispatch(ObjectProcessorTask::new_deep( + identified_files, + Arc::clone(job_ctx.db()), + Arc::clone(job_ctx.sync()), + )) + .await, + ) + } + } + + fn process_object_processor_output( + &mut self, + ObjectProcessorTaskMetrics { + assign_cas_ids_time, + fetch_existing_objects_time, + assign_to_existing_object_time, + create_object_time, + created_objects_count, + linked_objects_count, + }: ObjectProcessorTaskMetrics, + job_ctx: &impl JobContext, + ) { + self.metadata.assign_cas_ids_time += assign_cas_ids_time; + self.metadata.fetch_existing_objects_time += fetch_existing_objects_time; + self.metadata.assign_to_existing_object_time += assign_to_existing_object_time; + self.metadata.create_object_time += create_object_time; + self.metadata.created_objects_count += created_objects_count; + self.metadata.linked_objects_count += linked_objects_count; + + self.metadata.completed_tasks += 1; + + job_ctx.progress(vec![ + ProgressUpdate::CompletedTaskCount(self.metadata.completed_tasks), + ProgressUpdate::Message(format!( + "Processed {} of {} objects", + self.metadata.created_objects_count + self.metadata.linked_objects_count, + self.metadata.total_found_orphans + )), + ]); + } +} + +#[derive(Debug, Clone, Copy, Serialize, Deserialize)] +enum TaskKind { + ExtractFileMetadata, + ObjectProcessor, +} + +#[derive(Serialize, Deserialize)] +struct SaveState { + location: Arc, + location_path: Arc, + sub_path: Option, + + metadata: Metadata, + + errors: Vec, + + tasks_for_shutdown_bytes: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize, Default)] +pub struct Metadata { + extract_metadata_time: Duration, + assign_cas_ids_time: Duration, + fetch_existing_objects_time: Duration, + assign_to_existing_object_time: Duration, + create_object_time: Duration, + seeking_orphans_time: Duration, + total_found_orphans: u64, + created_objects_count: u64, + linked_objects_count: u64, + completed_tasks: u64, +} + +impl From for ReportOutputMetadata { + fn from(value: Metadata) -> Self { + Self::Metrics(HashMap::from([ + ( + "extract_metadata_time".into(), + json!(value.extract_metadata_time), + ), + ( + "assign_cas_ids_time".into(), + json!(value.assign_cas_ids_time), + ), + ( + "fetch_existing_objects_time".into(), + json!(value.fetch_existing_objects_time), + ), + ( + "assign_to_existing_object_time".into(), + json!(value.assign_to_existing_object_time), + ), + ("create_object_time".into(), json!(value.create_object_time)), + ( + "seeking_orphans_time".into(), + json!(value.seeking_orphans_time), + ), + ( + "total_found_orphans".into(), + json!(value.total_found_orphans), + ), + ( + "created_objects_count".into(), + json!(value.created_objects_count), + ), + ( + "linked_objects_count".into(), + json!(value.linked_objects_count), + ), + ("total_tasks".into(), json!(value.completed_tasks)), + ])) + } +} + +impl SerializableJob for FileIdentifierJob { + async fn serialize(self) -> Result>, rmp_serde::encode::Error> { + let Self { + location, + location_path, + sub_path, + metadata, + errors, + tasks_for_shutdown, + .. + } = self; + + rmp_serde::to_vec_named(&SaveState { + location, + location_path, + sub_path, + metadata, + tasks_for_shutdown_bytes: Some(SerializedTasks(rmp_serde::to_vec_named( + &tasks_for_shutdown + .into_iter() + .map(|task| async move { + if task.is::() { + SerializableTask::serialize( + *task + .downcast::() + .expect("just checked"), + ) + .await + .map(|bytes| (TaskKind::ExtractFileMetadata, bytes)) + } else if task.is::() { + task.downcast::() + .expect("just checked") + .serialize() + .await + .map(|bytes| (TaskKind::ObjectProcessor, bytes)) + } else { + unreachable!("Unexpected task type") + } + }) + .collect::>() + .try_join() + .await?, + )?)), + errors, + }) + .map(Some) + } + + async fn deserialize( + serialized_job: &[u8], + _: &impl JobContext, + ) -> Result)>, rmp_serde::decode::Error> { + let SaveState { + location, + location_path, + sub_path, + metadata, + + errors, + tasks_for_shutdown_bytes, + } = rmp_serde::from_slice::(serialized_job)?; + + Ok(Some(( + Self { + location, + location_path, + sub_path, + metadata, + errors, + pending_tasks_on_resume: Vec::new(), + tasks_for_shutdown: Vec::new(), + }, + tasks_for_shutdown_bytes, + ))) + } +} + +fn orphan_path_filters( + location_id: location::id::Type, + file_path_id: Option, + maybe_sub_iso_file_path: &Option>, +) -> Vec { + sd_utils::chain_optional_iter( + [ + or!( + file_path::object_id::equals(None), + file_path::cas_id::equals(None) + ), + file_path::is_dir::equals(Some(false)), + file_path::location_id::equals(Some(location_id)), + file_path::size_in_bytes_bytes::not(Some(0u64.to_be_bytes().to_vec())), + ], + [ + // this is a workaround for the cursor not working properly + file_path_id.map(file_path::id::gte), + maybe_sub_iso_file_path.as_ref().map(|sub_iso_file_path| { + file_path::materialized_path::starts_with( + sub_iso_file_path + .materialized_path_for_children() + .expect("sub path iso_file_path must be a directory"), + ) + }), + ], + ) +} diff --git a/core/crates/heavy-lifting/src/file_identifier/mod.rs b/core/crates/heavy-lifting/src/file_identifier/mod.rs new file mode 100644 index 000000000..6659ef375 --- /dev/null +++ b/core/crates/heavy-lifting/src/file_identifier/mod.rs @@ -0,0 +1,120 @@ +use crate::utils::sub_path::SubPathError; + +use sd_core_file_path_helper::{FilePathError, IsolatedFilePathData}; + +use sd_file_ext::{extensions::Extension, kind::ObjectKind}; +use sd_utils::{db::MissingFieldError, error::FileIOError}; + +use std::{fs::Metadata, path::Path}; + +use prisma_client_rust::QueryError; +use rspc::ErrorCode; +use serde::{Deserialize, Serialize}; +use specta::Type; +use tokio::fs; +use tracing::trace; + +mod cas_id; +mod job; +mod shallow; +mod tasks; + +use cas_id::generate_cas_id; + +pub use job::FileIdentifierJob; +pub use shallow::shallow; + +// we break these tasks into chunks of 100 to improve performance +const CHUNK_SIZE: usize = 100; + +#[derive(thiserror::Error, Debug)] +pub enum FileIdentifierError { + #[error("missing field on database: {0}")] + MissingField(#[from] MissingFieldError), + #[error("failed to deserialized stored tasks for job resume: {0}")] + DeserializeTasks(#[from] rmp_serde::decode::Error), + #[error("database error: {0}")] + Database(#[from] QueryError), + + #[error(transparent)] + FilePathError(#[from] FilePathError), + #[error(transparent)] + SubPath(#[from] SubPathError), +} + +impl From for rspc::Error { + fn from(err: FileIdentifierError) -> Self { + match err { + FileIdentifierError::SubPath(sub_path_err) => sub_path_err.into(), + + _ => Self::with_cause(ErrorCode::InternalServerError, err.to_string(), err), + } + } +} + +#[derive(thiserror::Error, Debug, Serialize, Deserialize, Type)] +pub enum NonCriticalFileIdentifierError { + #[error("failed to extract file metadata: {0}")] + FailedToExtractFileMetadata(String), + #[cfg(target_os = "windows")] + #[error("failed to extract metadata from on-demand file: {0}")] + FailedToExtractMetadataFromOnDemandFile(String), + #[error("failed to extract isolated file path data: {0}")] + FailedToExtractIsolatedFilePathData(String), +} + +#[derive(Debug, Clone)] +pub struct FileMetadata { + pub cas_id: Option, + pub kind: ObjectKind, + pub fs_metadata: Metadata, +} + +impl FileMetadata { + /// Fetch metadata from the file system and generate a cas id for the file + /// if it's not empty. + /// + /// # Panics + /// Will panic if the file is a directory. + pub async fn new( + location_path: impl AsRef + Send, + iso_file_path: &IsolatedFilePathData<'_>, + ) -> Result { + let path = location_path.as_ref().join(iso_file_path); + + let fs_metadata = fs::metadata(&path) + .await + .map_err(|e| FileIOError::from((&path, e)))?; + + assert!( + !fs_metadata.is_dir(), + "We can't generate cas_id for directories" + ); + + // derive Object kind + let kind = Extension::resolve_conflicting(&path, false) + .await + .map_or(ObjectKind::Unknown, Into::into); + + let cas_id = if fs_metadata.len() != 0 { + generate_cas_id(&path, fs_metadata.len()) + .await + .map(Some) + .map_err(|e| FileIOError::from((&path, e)))? + } else { + // We can't do shit with empty files + None + }; + + trace!( + "Analyzed file: ", + path.display() + ); + + Ok(Self { + cas_id, + kind, + fs_metadata, + }) + } +} diff --git a/core/crates/heavy-lifting/src/file_identifier/shallow.rs b/core/crates/heavy-lifting/src/file_identifier/shallow.rs new file mode 100644 index 000000000..ef85a07b8 --- /dev/null +++ b/core/crates/heavy-lifting/src/file_identifier/shallow.rs @@ -0,0 +1,207 @@ +use crate::{utils::sub_path::maybe_get_iso_file_path_from_sub_path, Error, NonCriticalJobError}; + +use sd_core_file_path_helper::IsolatedFilePathData; +use sd_core_prisma_helpers::file_path_for_file_identifier; +use sd_core_sync::Manager as SyncManager; + +use sd_prisma::prisma::{file_path, location, PrismaClient, SortOrder}; +use sd_task_system::{ + BaseTaskDispatcher, CancelTaskOnDrop, TaskDispatcher, TaskOutput, TaskStatus, +}; +use sd_utils::db::maybe_missing; + +use std::{ + path::{Path, PathBuf}, + sync::Arc, +}; + +use futures_concurrency::future::FutureGroup; +use lending_stream::{LendingStream, StreamExt}; +use prisma_client_rust::or; +use tracing::{debug, warn}; + +use super::{ + tasks::{ExtractFileMetadataTask, ExtractFileMetadataTaskOutput, ObjectProcessorTask}, + FileIdentifierError, CHUNK_SIZE, +}; + +pub async fn shallow( + location: location::Data, + sub_path: impl AsRef + Send, + dispatcher: BaseTaskDispatcher, + db: Arc, + sync: Arc, + invalidate_query: impl Fn(&'static str) + Send + Sync, +) -> Result, Error> { + let sub_path = sub_path.as_ref(); + + let location_path = maybe_missing(&location.path, "location.path") + .map(PathBuf::from) + .map(Arc::new) + .map_err(FileIdentifierError::from)?; + + let location = Arc::new(location); + + let sub_iso_file_path = + maybe_get_iso_file_path_from_sub_path(location.id, &Some(sub_path), &*location_path, &db) + .await + .map_err(FileIdentifierError::from)? + .map_or_else( + || { + IsolatedFilePathData::new(location.id, &*location_path, &*location_path, true) + .map_err(FileIdentifierError::from) + }, + Ok, + )?; + + let mut orphans_count = 0; + let mut last_orphan_file_path_id = None; + + let mut pending_running_tasks = FutureGroup::new(); + + loop { + #[allow(clippy::cast_possible_wrap)] + // SAFETY: we know that CHUNK_SIZE is a valid i64 + let orphan_paths = db + .file_path() + .find_many(orphan_path_filters( + location.id, + last_orphan_file_path_id, + &sub_iso_file_path, + )) + .order_by(file_path::id::order(SortOrder::Asc)) + .take(CHUNK_SIZE as i64) + .select(file_path_for_file_identifier::select()) + .exec() + .await + .map_err(FileIdentifierError::from)?; + + let Some(last_orphan) = orphan_paths.last() else { + // No orphans here! + break; + }; + + orphans_count += orphan_paths.len() as u64; + last_orphan_file_path_id = Some(last_orphan.id); + + pending_running_tasks.insert(CancelTaskOnDrop( + dispatcher + .dispatch(ExtractFileMetadataTask::new_shallow( + Arc::clone(&location), + Arc::clone(&location_path), + orphan_paths, + )) + .await, + )); + } + + if orphans_count == 0 { + debug!( + "No orphans found on ", + location.id, + sub_path.display() + ); + return Ok(vec![]); + } + + let errors = process_tasks(pending_running_tasks, dispatcher, db, sync).await?; + + invalidate_query("search.paths"); + invalidate_query("search.objects"); + + Ok(errors) +} + +async fn process_tasks( + pending_running_tasks: FutureGroup>, + dispatcher: BaseTaskDispatcher, + db: Arc, + sync: Arc, +) -> Result, Error> { + let mut pending_running_tasks = pending_running_tasks.lend_mut(); + + let mut errors = vec![]; + + while let Some((pending_running_tasks, task_result)) = pending_running_tasks.next().await { + match task_result { + Ok(TaskStatus::Done((_, TaskOutput::Out(any_task_output)))) => { + // We only care about ExtractFileMetadataTaskOutput because we need to dispatch further tasks + // and the ObjectProcessorTask only gives back some metrics not much important for + // shallow file identifier + if any_task_output.is::() { + let ExtractFileMetadataTaskOutput { + identified_files, + errors: more_errors, + .. + } = *any_task_output + .downcast::() + .expect("just checked"); + + errors.extend(more_errors); + + if !identified_files.is_empty() { + pending_running_tasks.insert(CancelTaskOnDrop( + dispatcher + .dispatch(ObjectProcessorTask::new_shallow( + identified_files, + Arc::clone(&db), + Arc::clone(&sync), + )) + .await, + )); + } + } + } + + Ok(TaskStatus::Done((task_id, TaskOutput::Empty))) => { + warn!("Task returned an empty output"); + } + + Ok(TaskStatus::Shutdown(_)) => { + debug!( + "Spacedrive is shutting down while a shallow file identifier was in progress" + ); + return Ok(vec![]); + } + + Ok(TaskStatus::Error(e)) => { + return Err(e); + } + + Ok(TaskStatus::Canceled | TaskStatus::ForcedAbortion) => { + warn!("Task was cancelled or aborted on shallow file identifier"); + return Ok(vec![]); + } + + Err(e) => { + return Err(e.into()); + } + } + } + + Ok(errors) +} + +fn orphan_path_filters( + location_id: location::id::Type, + file_path_id: Option, + sub_iso_file_path: &IsolatedFilePathData<'_>, +) -> Vec { + sd_utils::chain_optional_iter( + [ + or!( + file_path::object_id::equals(None), + file_path::cas_id::equals(None) + ), + file_path::is_dir::equals(Some(false)), + file_path::location_id::equals(Some(location_id)), + file_path::materialized_path::equals(Some( + sub_iso_file_path + .materialized_path_for_children() + .expect("sub path for shallow identifier must be a directory"), + )), + file_path::size_in_bytes_bytes::not(Some(0u64.to_be_bytes().to_vec())), + ], + [file_path_id.map(file_path::id::gte)], + ) +} diff --git a/core/crates/heavy-lifting/src/file_identifier/tasks/extract_file_metadata.rs b/core/crates/heavy-lifting/src/file_identifier/tasks/extract_file_metadata.rs new file mode 100644 index 000000000..ef9b2af9b --- /dev/null +++ b/core/crates/heavy-lifting/src/file_identifier/tasks/extract_file_metadata.rs @@ -0,0 +1,280 @@ +use crate::{ + file_identifier::{FileMetadata, NonCriticalFileIdentifierError}, + Error, NonCriticalJobError, +}; + +use sd_core_file_path_helper::IsolatedFilePathData; +use sd_core_prisma_helpers::file_path_for_file_identifier; + +use sd_prisma::prisma::location; +use sd_task_system::{ + ExecStatus, Interrupter, InterruptionKind, IntoAnyTaskOutput, SerializableTask, Task, TaskId, +}; +use sd_utils::error::FileIOError; + +use std::{ + collections::HashMap, future::IntoFuture, mem, path::PathBuf, pin::pin, sync::Arc, + time::Duration, +}; + +use futures::stream::{self, FuturesUnordered, StreamExt}; +use futures_concurrency::stream::Merge; +use serde::{Deserialize, Serialize}; +use tokio::time::Instant; +use tracing::error; +use uuid::Uuid; + +use super::IdentifiedFile; + +#[derive(Debug, Serialize, Deserialize)] +pub struct ExtractFileMetadataTask { + id: TaskId, + location: Arc, + location_path: Arc, + file_paths_by_id: HashMap, + identified_files: HashMap, + extract_metadata_time: Duration, + errors: Vec, + is_shallow: bool, +} + +#[derive(Debug)] +pub struct ExtractFileMetadataTaskOutput { + pub identified_files: HashMap, + pub extract_metadata_time: Duration, + pub errors: Vec, +} + +impl ExtractFileMetadataTask { + fn new( + location: Arc, + location_path: Arc, + file_paths: Vec, + is_shallow: bool, + ) -> Self { + Self { + id: TaskId::new_v4(), + location, + location_path, + identified_files: HashMap::with_capacity(file_paths.len()), + file_paths_by_id: file_paths + .into_iter() + .map(|file_path| { + // SAFETY: This should never happen + ( + Uuid::from_slice(&file_path.pub_id).expect("file_path.pub_id is invalid!"), + file_path, + ) + }) + .collect(), + extract_metadata_time: Duration::ZERO, + errors: Vec::new(), + is_shallow, + } + } + + #[must_use] + pub fn new_deep( + location: Arc, + location_path: Arc, + file_paths: Vec, + ) -> Self { + Self::new(location, location_path, file_paths, false) + } + + #[must_use] + pub fn new_shallow( + location: Arc, + location_path: Arc, + file_paths: Vec, + ) -> Self { + Self::new(location, location_path, file_paths, true) + } +} + +#[async_trait::async_trait] +impl Task for ExtractFileMetadataTask { + fn id(&self) -> TaskId { + self.id + } + + fn with_priority(&self) -> bool { + self.is_shallow + } + + async fn run(&mut self, interrupter: &Interrupter) -> Result { + enum StreamMessage { + Processed(Uuid, Result), + Interrupt(InterruptionKind), + } + + let Self { + location, + location_path, + file_paths_by_id, + identified_files, + extract_metadata_time, + errors, + .. + } = self; + + let start_time = Instant::now(); + + if !file_paths_by_id.is_empty() { + let extraction_futures = file_paths_by_id + .iter() + .filter_map(|(file_path_id, file_path)| { + try_iso_file_path_extraction( + location.id, + *file_path_id, + file_path, + Arc::clone(location_path), + errors, + ) + }) + .map(|(file_path_id, iso_file_path, location_path)| async move { + StreamMessage::Processed( + file_path_id, + FileMetadata::new(&*location_path, &iso_file_path).await, + ) + }) + .collect::>(); + + let mut msg_stream = pin!(( + extraction_futures, + stream::once(interrupter.into_future()).map(StreamMessage::Interrupt) + ) + .merge()); + + while let Some(msg) = msg_stream.next().await { + match msg { + StreamMessage::Processed(file_path_pub_id, res) => { + let file_path = file_paths_by_id + .remove(&file_path_pub_id) + .expect("file_path must be here"); + + match res { + Ok(FileMetadata { cas_id, kind, .. }) => { + identified_files.insert( + file_path_pub_id, + IdentifiedFile { + file_path, + cas_id, + kind, + }, + ); + } + Err(e) => { + handle_non_critical_errors( + location.id, + file_path_pub_id, + &e, + errors, + ); + } + } + + if file_paths_by_id.is_empty() { + // All files have been processed so we can end this merged stream and don't keep waiting an + // interrupt signal + break; + } + } + + StreamMessage::Interrupt(kind) => { + *extract_metadata_time += start_time.elapsed(); + return Ok(match kind { + InterruptionKind::Pause => ExecStatus::Paused, + InterruptionKind::Cancel => ExecStatus::Canceled, + }); + } + } + } + } + + Ok(ExecStatus::Done( + ExtractFileMetadataTaskOutput { + identified_files: mem::take(identified_files), + extract_metadata_time: *extract_metadata_time + start_time.elapsed(), + errors: mem::take(errors), + } + .into_output(), + )) + } +} + +fn handle_non_critical_errors( + location_id: location::id::Type, + file_path_pub_id: Uuid, + e: &FileIOError, + errors: &mut Vec, +) { + error!("Failed to extract file metadata : {e:#?}"); + + let formatted_error = format!(""); + + #[cfg(target_os = "windows")] + { + // Handle case where file is on-demand (NTFS only) + if e.source.raw_os_error().map_or(false, |code| code == 362) { + errors.push( + NonCriticalFileIdentifierError::FailedToExtractMetadataFromOnDemandFile( + formatted_error, + ) + .into(), + ); + } else { + errors.push( + NonCriticalFileIdentifierError::FailedToExtractFileMetadata(formatted_error).into(), + ); + } + } + + #[cfg(not(target_os = "windows"))] + { + errors.push( + NonCriticalFileIdentifierError::FailedToExtractFileMetadata(formatted_error).into(), + ); + } +} + +fn try_iso_file_path_extraction( + location_id: location::id::Type, + file_path_pub_id: Uuid, + file_path: &file_path_for_file_identifier::Data, + location_path: Arc, + errors: &mut Vec, +) -> Option<(Uuid, IsolatedFilePathData<'static>, Arc)> { + IsolatedFilePathData::try_from((location_id, file_path)) + .map(IsolatedFilePathData::to_owned) + .map(|iso_file_path| (file_path_pub_id, iso_file_path, location_path)) + .map_err(|e| { + error!("Failed to extract isolated file path data: {e:#?}"); + errors.push( + NonCriticalFileIdentifierError::FailedToExtractIsolatedFilePathData(format!( + "" + )) + .into(), + ); + }) + .ok() +} + +impl SerializableTask for ExtractFileMetadataTask { + type SerializeError = rmp_serde::encode::Error; + + type DeserializeError = rmp_serde::decode::Error; + + type DeserializeCtx = (); + + async fn serialize(self) -> Result, Self::SerializeError> { + rmp_serde::to_vec_named(&self) + } + + async fn deserialize( + data: &[u8], + (): Self::DeserializeCtx, + ) -> Result { + rmp_serde::from_slice(data) + } +} diff --git a/core/crates/heavy-lifting/src/file_identifier/tasks/mod.rs b/core/crates/heavy-lifting/src/file_identifier/tasks/mod.rs new file mode 100644 index 000000000..c5bac9fb1 --- /dev/null +++ b/core/crates/heavy-lifting/src/file_identifier/tasks/mod.rs @@ -0,0 +1,18 @@ +use sd_core_prisma_helpers::file_path_for_file_identifier; + +use sd_file_ext::kind::ObjectKind; + +use serde::{Deserialize, Serialize}; + +mod extract_file_metadata; +mod object_processor; + +pub use extract_file_metadata::{ExtractFileMetadataTask, ExtractFileMetadataTaskOutput}; +pub use object_processor::{ObjectProcessorTask, ObjectProcessorTaskMetrics}; + +#[derive(Debug, Serialize, Deserialize)] +pub(super) struct IdentifiedFile { + pub(super) file_path: file_path_for_file_identifier::Data, + pub(super) cas_id: Option, + pub(super) kind: ObjectKind, +} diff --git a/core/crates/heavy-lifting/src/file_identifier/tasks/object_processor.rs b/core/crates/heavy-lifting/src/file_identifier/tasks/object_processor.rs new file mode 100644 index 000000000..cdb9f0842 --- /dev/null +++ b/core/crates/heavy-lifting/src/file_identifier/tasks/object_processor.rs @@ -0,0 +1,473 @@ +use crate::{file_identifier::FileIdentifierError, Error}; + +use sd_core_prisma_helpers::{ + file_path_for_file_identifier, file_path_pub_id, object_for_file_identifier, +}; +use sd_core_sync::Manager as SyncManager; + +use sd_prisma::{ + prisma::{file_path, object, PrismaClient}, + prisma_sync, +}; +use sd_sync::{CRDTOperation, OperationFactory}; +use sd_task_system::{ + check_interruption, ExecStatus, Interrupter, IntoAnyTaskOutput, SerializableTask, Task, TaskId, +}; +use sd_utils::{msgpack, uuid_to_bytes}; + +use std::{ + collections::{HashMap, HashSet}, + mem, + sync::Arc, + time::Duration, +}; + +use prisma_client_rust::Select; +use serde::{Deserialize, Serialize}; +use tokio::time::Instant; +use tracing::{debug, trace}; +use uuid::Uuid; + +use super::IdentifiedFile; + +#[derive(Debug)] +pub struct ObjectProcessorTask { + id: TaskId, + db: Arc, + sync: Arc, + identified_files: HashMap, + metrics: ObjectProcessorTaskMetrics, + stage: Stage, + is_shallow: bool, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct SaveState { + id: TaskId, + identified_files: HashMap, + metrics: ObjectProcessorTaskMetrics, + stage: Stage, + is_shallow: bool, +} + +#[derive(Debug, Serialize, Deserialize, Default)] +pub struct ObjectProcessorTaskMetrics { + pub assign_cas_ids_time: Duration, + pub fetch_existing_objects_time: Duration, + pub assign_to_existing_object_time: Duration, + pub create_object_time: Duration, + pub created_objects_count: u64, + pub linked_objects_count: u64, +} + +#[derive(Debug, Serialize, Deserialize)] +enum Stage { + Starting, + FetchExistingObjects, + AssignFilePathsToExistingObjects { + existing_objects_by_cas_id: HashMap, + }, + CreateObjects, +} + +impl ObjectProcessorTask { + fn new( + identified_files: HashMap, + db: Arc, + sync: Arc, + is_shallow: bool, + ) -> Self { + Self { + id: TaskId::new_v4(), + db, + sync, + identified_files, + stage: Stage::Starting, + metrics: ObjectProcessorTaskMetrics::default(), + is_shallow, + } + } + + pub fn new_deep( + identified_files: HashMap, + db: Arc, + sync: Arc, + ) -> Self { + Self::new(identified_files, db, sync, false) + } + + pub fn new_shallow( + identified_files: HashMap, + db: Arc, + sync: Arc, + ) -> Self { + Self::new(identified_files, db, sync, true) + } +} + +#[async_trait::async_trait] +impl Task for ObjectProcessorTask { + fn id(&self) -> TaskId { + self.id + } + + fn with_priority(&self) -> bool { + self.is_shallow + } + + async fn run(&mut self, interrupter: &Interrupter) -> Result { + let Self { + db, + sync, + identified_files, + stage, + metrics: + ObjectProcessorTaskMetrics { + assign_cas_ids_time, + fetch_existing_objects_time, + assign_to_existing_object_time, + create_object_time, + created_objects_count, + linked_objects_count, + }, + .. + } = self; + + loop { + match stage { + Stage::Starting => { + let start = Instant::now(); + assign_cas_id_to_file_paths(identified_files, db, sync).await?; + *assign_cas_ids_time = start.elapsed(); + *stage = Stage::FetchExistingObjects; + } + + Stage::FetchExistingObjects => { + let start = Instant::now(); + let existing_objects_by_cas_id = + fetch_existing_objects_by_cas_id(identified_files, db).await?; + *fetch_existing_objects_time = start.elapsed(); + *stage = Stage::AssignFilePathsToExistingObjects { + existing_objects_by_cas_id, + }; + } + + Stage::AssignFilePathsToExistingObjects { + existing_objects_by_cas_id, + } => { + let start = Instant::now(); + let assigned_file_path_pub_ids = assign_existing_objects_to_file_paths( + identified_files, + existing_objects_by_cas_id, + db, + sync, + ) + .await?; + *assign_to_existing_object_time = start.elapsed(); + *linked_objects_count = assigned_file_path_pub_ids.len() as u64; + + debug!( + "Found {} existing Objects, linked file paths to them", + existing_objects_by_cas_id.len() + ); + + for file_path_pub_id::Data { pub_id } in assigned_file_path_pub_ids { + let pub_id = Uuid::from_slice(&pub_id).expect("uuid bytes are invalid"); + trace!("Assigned file path to existing object"); + + identified_files + .remove(&pub_id) + .expect("file_path must be here"); + } + + *stage = Stage::CreateObjects; + + if identified_files.is_empty() { + // No objects to be created, we're good to finish already + break; + } + } + + Stage::CreateObjects => { + let start = Instant::now(); + *created_objects_count = create_objects(identified_files, db, sync).await?; + *create_object_time = start.elapsed(); + + break; + } + } + + check_interruption!(interrupter); + } + + Ok(ExecStatus::Done(mem::take(&mut self.metrics).into_output())) + } +} + +async fn assign_cas_id_to_file_paths( + identified_files: &HashMap, + db: &PrismaClient, + sync: &SyncManager, +) -> Result<(), FileIdentifierError> { + // Assign cas_id to each file path + sync.write_ops( + db, + identified_files + .iter() + .map(|(pub_id, IdentifiedFile { cas_id, .. })| { + ( + sync.shared_update( + prisma_sync::file_path::SyncId { + pub_id: uuid_to_bytes(*pub_id), + }, + file_path::cas_id::NAME, + msgpack!(cas_id), + ), + db.file_path() + .update( + file_path::pub_id::equals(uuid_to_bytes(*pub_id)), + vec![file_path::cas_id::set(cas_id.clone())], + ) + // We don't need any data here, just the id avoids receiving the entire object + // as we can't pass an empty select macro call + .select(file_path::select!({ id })), + ) + }) + .unzip::<_, _, _, Vec<_>>(), + ) + .await?; + + Ok(()) +} + +async fn fetch_existing_objects_by_cas_id( + identified_files: &HashMap, + db: &PrismaClient, +) -> Result, FileIdentifierError> { + // Retrieves objects that are already connected to file paths with the same id + db.object() + .find_many(vec![object::file_paths::some(vec![ + file_path::cas_id::in_vec( + identified_files + .values() + .filter_map(|IdentifiedFile { cas_id, .. }| cas_id.as_ref()) + .cloned() + .collect::>() + .into_iter() + .collect(), + ), + ])]) + .select(object_for_file_identifier::select()) + .exec() + .await + .map_err(Into::into) + .map(|objects| { + objects + .into_iter() + .filter_map(|object| { + object + .file_paths + .first() + .and_then(|file_path| file_path.cas_id.clone()) + .map(|cas_id| (cas_id, object)) + }) + .collect() + }) +} + +async fn assign_existing_objects_to_file_paths( + identified_files: &HashMap, + objects_by_cas_id: &HashMap, + db: &PrismaClient, + sync: &SyncManager, +) -> Result, FileIdentifierError> { + // Attempt to associate each file path with an object that has been + // connected to file paths with the same cas_id + sync.write_ops( + db, + identified_files + .iter() + .filter_map(|(pub_id, IdentifiedFile { cas_id, .. })| { + objects_by_cas_id + // Filtering out files without cas_id due to being empty + .get(cas_id.as_ref()?) + .map(|object| (*pub_id, object)) + }) + .map(|(pub_id, object)| { + connect_file_path_to_object( + pub_id, + // SAFETY: This pub_id is generated by the uuid lib, but we have to store bytes in sqlite + Uuid::from_slice(&object.pub_id).expect("uuid bytes are invalid"), + sync, + db, + ) + }) + .unzip::<_, _, Vec<_>, Vec<_>>(), + ) + .await + .map_err(Into::into) +} + +fn connect_file_path_to_object<'db>( + file_path_pub_id: Uuid, + object_pub_id: Uuid, + sync: &SyncManager, + db: &'db PrismaClient, +) -> (CRDTOperation, Select<'db, file_path_pub_id::Data>) { + trace!("Connecting to "); + + let vec_id = object_pub_id.as_bytes().to_vec(); + + ( + sync.shared_update( + prisma_sync::file_path::SyncId { + pub_id: uuid_to_bytes(file_path_pub_id), + }, + file_path::object::NAME, + msgpack!(prisma_sync::object::SyncId { + pub_id: vec_id.clone() + }), + ), + db.file_path() + .update( + file_path::pub_id::equals(uuid_to_bytes(file_path_pub_id)), + vec![file_path::object::connect(object::pub_id::equals(vec_id))], + ) + .select(file_path_pub_id::select()), + ) +} + +async fn create_objects( + identified_files: &HashMap, + db: &PrismaClient, + sync: &SyncManager, +) -> Result { + trace!("Creating {} new Objects", identified_files.len(),); + + let (object_create_args, file_path_update_args) = identified_files + .iter() + .map( + |( + file_path_pub_id, + IdentifiedFile { + file_path: file_path_for_file_identifier::Data { date_created, .. }, + kind, + .. + }, + )| { + let object_pub_id = Uuid::new_v4(); + + let kind = *kind as i32; + + let (sync_params, db_params) = [ + ( + (object::date_created::NAME, msgpack!(date_created)), + object::date_created::set(*date_created), + ), + ( + (object::kind::NAME, msgpack!(kind)), + object::kind::set(Some(kind)), + ), + ] + .into_iter() + .unzip::<_, _, Vec<_>, Vec<_>>(); + + ( + ( + sync.shared_create( + prisma_sync::object::SyncId { + pub_id: uuid_to_bytes(object_pub_id), + }, + sync_params, + ), + object::create_unchecked(uuid_to_bytes(object_pub_id), db_params), + ), + connect_file_path_to_object(*file_path_pub_id, object_pub_id, sync, db), + ) + }, + ) + .unzip::<_, _, Vec<_>, Vec<_>>(); + + // create new object records with assembled values + let total_created_files = sync + .write_ops(db, { + let (sync, db_params) = object_create_args + .into_iter() + .unzip::<_, _, Vec<_>, Vec<_>>(); + + ( + sync.into_iter().flatten().collect(), + db.object().create_many(db_params), + ) + }) + .await?; + + trace!("Created {total_created_files} new Objects"); + + if total_created_files > 0 { + trace!("Updating file paths with created objects"); + + sync.write_ops( + db, + file_path_update_args + .into_iter() + .unzip::<_, _, Vec<_>, Vec<_>>(), + ) + .await?; + + trace!("Updated file paths with created objects"); + } + + #[allow(clippy::cast_sign_loss)] // SAFETY: We're sure the value is positive + Ok(total_created_files as u64) +} + +impl SerializableTask for ObjectProcessorTask { + type SerializeError = rmp_serde::encode::Error; + + type DeserializeError = rmp_serde::decode::Error; + + type DeserializeCtx = (Arc, Arc); + + async fn serialize(self) -> Result, Self::SerializeError> { + let Self { + id, + identified_files, + metrics, + stage, + is_shallow, + .. + } = self; + + rmp_serde::to_vec_named(&SaveState { + id, + identified_files, + metrics, + stage, + is_shallow, + }) + } + + async fn deserialize( + data: &[u8], + (db, sync): Self::DeserializeCtx, + ) -> Result { + rmp_serde::from_slice(data).map( + |SaveState { + id, + identified_files, + metrics, + stage, + is_shallow, + }| Self { + id, + db, + sync, + identified_files, + metrics, + stage, + is_shallow, + }, + ) + } +} diff --git a/core/crates/heavy-lifting/src/indexer/job.rs b/core/crates/heavy-lifting/src/indexer/job.rs index d85f2fd32..dacea9ab5 100644 --- a/core/crates/heavy-lifting/src/indexer/job.rs +++ b/core/crates/heavy-lifting/src/indexer/job.rs @@ -8,13 +8,15 @@ use crate::{ utils::cancel_pending_tasks, SerializableJob, SerializedTasks, }, - Error, NonCriticalJobError, + utils::sub_path::get_full_path_from_sub_path, + Error, LocationScanState, NonCriticalJobError, }; use sd_core_file_path_helper::IsolatedFilePathData; use sd_core_indexer_rules::{IndexerRule, IndexerRuler}; use sd_core_prisma_helpers::location_with_indexer_rules; +use sd_prisma::prisma::location; use sd_task_system::{ AnyTaskOutput, IntoTask, SerializableTask, Task, TaskDispatcher, TaskHandle, TaskId, TaskOutput, TaskStatus, @@ -39,7 +41,7 @@ use tokio::time::Instant; use tracing::warn; use super::{ - determine_initial_walk_path, remove_non_existing_file_paths, reverse_update_directories_sizes, + remove_non_existing_file_paths, reverse_update_directories_sizes, tasks::{ saver::{SaveTask, SaveTaskOutput}, updater::{UpdateTask, UpdateTaskOutput}, @@ -70,6 +72,64 @@ pub struct IndexerJob { impl Job for IndexerJob { const NAME: JobName = JobName::Indexer; + async fn resume_tasks( + &mut self, + dispatcher: &JobTaskDispatcher, + ctx: &impl JobContext, + SerializedTasks(serialized_tasks): SerializedTasks, + ) -> Result<(), Error> { + let location_id = self.location.id; + + self.pending_tasks_on_resume = dispatcher + .dispatch_many_boxed( + rmp_serde::from_slice::)>>(&serialized_tasks) + .map_err(IndexerError::from)? + .into_iter() + .map(|(task_kind, task_bytes)| { + let indexer_ruler = self.indexer_ruler.clone(); + let iso_file_path_factory = self.iso_file_path_factory.clone(); + async move { + match task_kind { + TaskKind::Walk => WalkDirTask::deserialize( + &task_bytes, + ( + indexer_ruler.clone(), + WalkerDBProxy { + location_id, + db: Arc::clone(ctx.db()), + }, + iso_file_path_factory.clone(), + dispatcher.clone(), + ), + ) + .await + .map(IntoTask::into_task), + + TaskKind::Save => SaveTask::deserialize( + &task_bytes, + (Arc::clone(ctx.db()), Arc::clone(ctx.sync())), + ) + .await + .map(IntoTask::into_task), + TaskKind::Update => UpdateTask::deserialize( + &task_bytes, + (Arc::clone(ctx.db()), Arc::clone(ctx.sync())), + ) + .await + .map(IntoTask::into_task), + } + } + }) + .collect::>() + .try_join() + .await + .map_err(IndexerError::from)?, + ) + .await; + + Ok(()) + } + async fn run( mut self, dispatcher: JobTaskDispatcher, @@ -102,7 +162,7 @@ impl Job for IndexerJob { self.metadata.total_paths += chunked_saves.len() as u64; self.metadata.total_save_steps += 1; - SaveTask::new( + SaveTask::new_deep( self.location.id, self.location.pub_id.clone(), chunked_saves, @@ -162,6 +222,10 @@ impl Job for IndexerJob { metadata.db_write_time += start_size_update_time.elapsed(); } + if metadata.removed_count > 0 { + // TODO: Dispatch a task to remove orphan objects + } + if metadata.indexed_count > 0 || metadata.removed_count > 0 { ctx.invalidate_query("search.paths"); } @@ -171,6 +235,16 @@ impl Job for IndexerJob { "all tasks must be completed here" ); + ctx.db() + .location() + .update( + location::id::equals(location.id), + vec![location::scan_state::set(LocationScanState::Indexed as i32)], + ) + .exec() + .await + .map_err(IndexerError::from)?; + Ok(ReturnStatus::Completed( JobReturn::builder() .with_metadata(metadata) @@ -178,64 +252,6 @@ impl Job for IndexerJob { .build(), )) } - - async fn resume_tasks( - &mut self, - dispatcher: &JobTaskDispatcher, - ctx: &impl JobContext, - SerializedTasks(serialized_tasks): SerializedTasks, - ) -> Result<(), Error> { - let location_id = self.location.id; - - self.pending_tasks_on_resume = dispatcher - .dispatch_many_boxed( - rmp_serde::from_slice::)>>(&serialized_tasks) - .map_err(IndexerError::from)? - .into_iter() - .map(|(task_kind, task_bytes)| { - let indexer_ruler = self.indexer_ruler.clone(); - let iso_file_path_factory = self.iso_file_path_factory.clone(); - async move { - match task_kind { - TaskKind::Walk => WalkDirTask::deserialize( - &task_bytes, - ( - indexer_ruler.clone(), - WalkerDBProxy { - location_id, - db: Arc::clone(ctx.db()), - }, - iso_file_path_factory.clone(), - dispatcher.clone(), - ), - ) - .await - .map(IntoTask::into_task), - - TaskKind::Save => SaveTask::deserialize( - &task_bytes, - (Arc::clone(ctx.db()), Arc::clone(ctx.sync())), - ) - .await - .map(IntoTask::into_task), - TaskKind::Update => UpdateTask::deserialize( - &task_bytes, - (Arc::clone(ctx.db()), Arc::clone(ctx.sync())), - ) - .await - .map(IntoTask::into_task), - } - } - }) - .collect::>() - .try_join() - .await - .map_err(IndexerError::from)?, - ) - .await; - - Ok(()) - } } impl IndexerJob { @@ -282,6 +298,12 @@ impl IndexerJob { job_ctx: &impl JobContext, dispatcher: &JobTaskDispatcher, ) -> Result>, IndexerError> { + self.metadata.completed_tasks += 1; + + job_ctx.progress(vec![ProgressUpdate::CompletedTaskCount( + self.metadata.completed_tasks, + )]); + if any_task_output.is::() { return self .process_walk_output( @@ -310,12 +332,6 @@ impl IndexerJob { unreachable!("Unexpected task output type: "); } - self.metadata.completed_tasks += 1; - - job_ctx.progress(vec![ProgressUpdate::CompletedTaskCount( - self.metadata.completed_tasks, - )]); - Ok(Vec::new()) } @@ -394,7 +410,7 @@ impl IndexerJob { self.metadata.total_paths += chunked_saves.len() as u64; self.metadata.total_save_steps += 1; - SaveTask::new( + SaveTask::new_deep( self.location.id, self.location.pub_id.clone(), chunked_saves, @@ -413,7 +429,7 @@ impl IndexerJob { self.metadata.total_updated_paths += chunked_updates.len() as u64; self.metadata.total_update_steps += 1; - UpdateTask::new( + UpdateTask::new_deep( chunked_updates, Arc::clone(job_ctx.db()), Arc::clone(job_ctx.sync()), @@ -528,7 +544,7 @@ impl IndexerJob { // if we don't have any pending task, then this is a fresh job if self.pending_tasks_on_resume.is_empty() { let walker_root_path = Arc::new( - determine_initial_walk_path( + get_full_path_from_sub_path( self.location.id, &self.sub_path, &*self.iso_file_path_factory.location_path, @@ -539,7 +555,7 @@ impl IndexerJob { pending_running_tasks.push( dispatcher - .dispatch(WalkDirTask::new( + .dispatch(WalkDirTask::new_deep( walker_root_path.as_ref(), Arc::clone(&walker_root_path), self.indexer_ruler.clone(), @@ -548,7 +564,7 @@ impl IndexerJob { location_id: self.location.id, db: Arc::clone(job_ctx.db()), }, - Some(dispatcher.clone()), + dispatcher.clone(), )?) .await, ); diff --git a/core/crates/heavy-lifting/src/indexer/mod.rs b/core/crates/heavy-lifting/src/indexer/mod.rs index 12d27b337..2bac41b1b 100644 --- a/core/crates/heavy-lifting/src/indexer/mod.rs +++ b/core/crates/heavy-lifting/src/indexer/mod.rs @@ -1,9 +1,6 @@ -use crate::NonCriticalJobError; +use crate::{utils::sub_path::SubPathError, NonCriticalJobError}; -use sd_core_file_path_helper::{ - ensure_file_path_exists, ensure_sub_path_is_directory, ensure_sub_path_is_in_location, - FilePathError, IsolatedFilePathData, -}; +use sd_core_file_path_helper::{FilePathError, IsolatedFilePathData}; use sd_core_indexer_rules::IndexerRuleError; use sd_core_prisma_helpers::{ file_path_pub_and_cas_ids, file_path_to_isolate_with_pub_id, file_path_walker, @@ -53,8 +50,8 @@ pub enum IndexerError { // Not Found errors #[error("indexer rule not found: ")] IndexerRuleNotFound(i32), - #[error("received sub path not in database: ", .0.display())] - SubPathNotFound(Box), + #[error(transparent)] + SubPath(#[from] SubPathError), // Internal Errors #[error("database Error: {0}")] @@ -78,10 +75,12 @@ pub enum IndexerError { impl From for rspc::Error { fn from(err: IndexerError) -> Self { match err { - IndexerError::IndexerRuleNotFound(_) | IndexerError::SubPathNotFound(_) => { + IndexerError::IndexerRuleNotFound(_) => { Self::with_cause(ErrorCode::NotFound, err.to_string(), err) } + IndexerError::SubPath(sub_path_err) => sub_path_err.into(), + IndexerError::Rules(rule_err) => rule_err.into(), _ => Self::with_cause(ErrorCode::InternalServerError, err.to_string(), err), @@ -111,36 +110,6 @@ pub enum NonCriticalIndexerError { MissingFilePathData(String), } -async fn determine_initial_walk_path( - location_id: location::id::Type, - sub_path: &Option + Send + Sync>, - location_path: impl AsRef + Send, - db: &PrismaClient, -) -> Result { - let location_path = location_path.as_ref(); - - match sub_path { - Some(sub_path) if sub_path.as_ref() != Path::new("") => { - let sub_path = sub_path.as_ref(); - let full_path = ensure_sub_path_is_in_location(location_path, sub_path).await?; - - ensure_sub_path_is_directory(location_path, sub_path).await?; - - ensure_file_path_exists( - sub_path, - &IsolatedFilePathData::new(location_id, location_path, &full_path, true) - .map_err(IndexerError::from)?, - db, - IndexerError::SubPathNotFound, - ) - .await?; - - Ok(full_path) - } - _ => Ok(location_path.to_path_buf()), - } -} - fn chunk_db_queries<'db, 'iso>( iso_file_paths: impl IntoIterator>, db: &'db PrismaClient, diff --git a/core/crates/heavy-lifting/src/indexer/shallow.rs b/core/crates/heavy-lifting/src/indexer/shallow.rs index a39d37bbf..96eaf4398 100644 --- a/core/crates/heavy-lifting/src/indexer/shallow.rs +++ b/core/crates/heavy-lifting/src/indexer/shallow.rs @@ -1,4 +1,4 @@ -use crate::{Error, NonCriticalJobError}; +use crate::{utils::sub_path::get_full_path_from_sub_path, Error, NonCriticalJobError}; use sd_core_indexer_rules::{IndexerRule, IndexerRuler}; use sd_core_prisma_helpers::location_with_indexer_rules; @@ -19,7 +19,7 @@ use itertools::Itertools; use tracing::{debug, warn}; use super::{ - determine_initial_walk_path, remove_non_existing_file_paths, reverse_update_directories_sizes, + remove_non_existing_file_paths, reverse_update_directories_sizes, tasks::{ saver::{SaveTask, SaveTaskOutput}, updater::{UpdateTask, UpdateTaskOutput}, @@ -45,7 +45,9 @@ pub async fn shallow( .map_err(IndexerError::from)?; let to_walk_path = Arc::new( - determine_initial_walk_path(location.id, &Some(sub_path), &*location_path, &db).await?, + get_full_path_from_sub_path(location.id, &Some(sub_path), &*location_path, &db) + .await + .map_err(IndexerError::from)?, ); let Some(WalkTaskOutput { @@ -124,7 +126,7 @@ async fn walk( dispatcher: &BaseTaskDispatcher, ) -> Result, Error> { match dispatcher - .dispatch(WalkDirTask::new( + .dispatch(WalkDirTask::new_shallow( ToWalkEntry::from(&*to_walk_path), to_walk_path, location @@ -142,7 +144,6 @@ async fn walk( location_id: location.id, db, }, - None::>, )?) .await .await? @@ -186,7 +187,7 @@ async fn save_and_update( .chunks(BATCH_SIZE) .into_iter() .map(|chunk| { - SaveTask::new( + SaveTask::new_shallow( location.id, location.pub_id.clone(), chunk.collect::>(), @@ -201,7 +202,7 @@ async fn save_and_update( .chunks(BATCH_SIZE) .into_iter() .map(|chunk| { - UpdateTask::new( + UpdateTask::new_shallow( chunk.collect::>(), Arc::clone(&db), Arc::clone(&sync), diff --git a/core/crates/heavy-lifting/src/indexer/tasks/saver.rs b/core/crates/heavy-lifting/src/indexer/tasks/saver.rs index 2f1f6d433..715fc770c 100644 --- a/core/crates/heavy-lifting/src/indexer/tasks/saver.rs +++ b/core/crates/heavy-lifting/src/indexer/tasks/saver.rs @@ -28,11 +28,12 @@ pub struct SaveTask { walked_entries: Vec, db: Arc, sync: Arc, + is_shallow: bool, } impl SaveTask { #[must_use] - pub fn new( + pub fn new_deep( location_id: location::id::Type, location_pub_id: location::pub_id::Type, walked_entries: Vec, @@ -46,6 +47,26 @@ impl SaveTask { walked_entries, db, sync, + is_shallow: false, + } + } + + #[must_use] + pub fn new_shallow( + location_id: location::id::Type, + location_pub_id: location::pub_id::Type, + walked_entries: Vec, + db: Arc, + sync: Arc, + ) -> Self { + Self { + id: TaskId::new_v4(), + location_id, + location_pub_id, + walked_entries, + db, + sync, + is_shallow: true, } } } @@ -56,6 +77,7 @@ struct SaveTaskSaveState { location_id: location::id::Type, location_pub_id: location::pub_id::Type, walked_entries: Vec, + is_shallow: bool, } impl SerializableTask for SaveTask { @@ -71,6 +93,7 @@ impl SerializableTask for SaveTask { location_id, location_pub_id, walked_entries, + is_shallow, .. } = self; rmp_serde::to_vec_named(&SaveTaskSaveState { @@ -78,6 +101,7 @@ impl SerializableTask for SaveTask { location_id, location_pub_id, walked_entries, + is_shallow, }) } @@ -91,6 +115,7 @@ impl SerializableTask for SaveTask { location_id, location_pub_id, walked_entries, + is_shallow, }| Self { id, location_id, @@ -98,6 +123,7 @@ impl SerializableTask for SaveTask { walked_entries, db, sync, + is_shallow, }, ) } @@ -115,6 +141,11 @@ impl Task for SaveTask { self.id } + fn with_priority(&self) -> bool { + // If we're running in shallow mode, then we want priority + self.is_shallow + } + async fn run(&mut self, _: &Interrupter) -> Result { use file_path::{ create_unchecked, date_created, date_indexed, date_modified, extension, hidden, inode, diff --git a/core/crates/heavy-lifting/src/indexer/tasks/updater.rs b/core/crates/heavy-lifting/src/indexer/tasks/updater.rs index f7e99e800..f4cf0d7fd 100644 --- a/core/crates/heavy-lifting/src/indexer/tasks/updater.rs +++ b/core/crates/heavy-lifting/src/indexer/tasks/updater.rs @@ -28,11 +28,12 @@ pub struct UpdateTask { object_ids_that_should_be_unlinked: HashSet, db: Arc, sync: Arc, + is_shallow: bool, } impl UpdateTask { #[must_use] - pub fn new( + pub fn new_deep( walked_entries: Vec, db: Arc, sync: Arc, @@ -43,6 +44,23 @@ impl UpdateTask { db, sync, object_ids_that_should_be_unlinked: HashSet::new(), + is_shallow: false, + } + } + + #[must_use] + pub fn new_shallow( + walked_entries: Vec, + db: Arc, + sync: Arc, + ) -> Self { + Self { + id: TaskId::new_v4(), + walked_entries, + db, + sync, + object_ids_that_should_be_unlinked: HashSet::new(), + is_shallow: true, } } } @@ -52,6 +70,7 @@ struct UpdateTaskSaveState { id: TaskId, walked_entries: Vec, object_ids_that_should_be_unlinked: HashSet, + is_shallow: bool, } impl SerializableTask for UpdateTask { @@ -62,10 +81,19 @@ impl SerializableTask for UpdateTask { type DeserializeCtx = (Arc, Arc); async fn serialize(self) -> Result, Self::SerializeError> { + let Self { + id, + walked_entries, + object_ids_that_should_be_unlinked, + is_shallow, + .. + } = self; + rmp_serde::to_vec_named(&UpdateTaskSaveState { - id: self.id, - walked_entries: self.walked_entries, - object_ids_that_should_be_unlinked: self.object_ids_that_should_be_unlinked, + id, + walked_entries, + object_ids_that_should_be_unlinked, + is_shallow, }) } @@ -78,12 +106,14 @@ impl SerializableTask for UpdateTask { id, walked_entries, object_ids_that_should_be_unlinked, + is_shallow, }| Self { id, walked_entries, object_ids_that_should_be_unlinked, db, sync, + is_shallow, }, ) } @@ -101,6 +131,11 @@ impl Task for UpdateTask { self.id } + fn with_priority(&self) -> bool { + // If we're running in shallow mode, then we want priority + self.is_shallow + } + async fn run(&mut self, interrupter: &Interrupter) -> Result { use file_path::{ cas_id, date_created, date_modified, hidden, inode, is_dir, object, object_id, diff --git a/core/crates/heavy-lifting/src/indexer/tasks/walker.rs b/core/crates/heavy-lifting/src/indexer/tasks/walker.rs index 7b8eefd4d..3ed771e2e 100644 --- a/core/crates/heavy-lifting/src/indexer/tasks/walker.rs +++ b/core/crates/heavy-lifting/src/indexer/tasks/walker.rs @@ -9,8 +9,8 @@ use sd_core_prisma_helpers::{file_path_pub_and_cas_ids, file_path_walker}; use sd_prisma::prisma::file_path; use sd_task_system::{ - check_interruption, ExecStatus, Interrupter, IntoAnyTaskOutput, SerializableTask, Task, - TaskDispatcher, TaskHandle, TaskId, + check_interruption, BaseTaskDispatcher, ExecStatus, Interrupter, IntoAnyTaskOutput, + SerializableTask, Task, TaskDispatcher, TaskHandle, TaskId, }; use sd_utils::{db::inode_from_db, error::FileIOError}; @@ -239,6 +239,7 @@ struct WalkDirSaveState { stage: WalkerStageSaveState, errors: Vec, scan_time: Duration, + is_shallow: bool, } #[derive(Debug, Serialize, Deserialize)] @@ -351,7 +352,7 @@ impl From for WalkerStage { } #[derive(Debug)] -pub struct WalkDirTask +pub struct WalkDirTask> where DBProxy: WalkerDBProxy, IsoPathFactory: IsoFilePathFactory, @@ -368,6 +369,7 @@ where maybe_dispatcher: Option, errors: Vec, scan_time: Duration, + is_shallow: bool, } impl WalkDirTask @@ -376,13 +378,13 @@ where IsoPathFactory: IsoFilePathFactory, Dispatcher: TaskDispatcher, { - pub fn new( + pub fn new_deep( entry: impl Into + Send, root: Arc, indexer_ruler: IndexerRuler, iso_file_path_factory: IsoPathFactory, db_proxy: DBProxy, - maybe_dispatcher: Option, + dispatcher: Dispatcher, ) -> Result { let entry = entry.into(); Ok(Self { @@ -394,7 +396,38 @@ where db_proxy, stage: WalkerStage::Start, entry, - maybe_dispatcher, + maybe_dispatcher: Some(dispatcher), + is_shallow: false, + errors: Vec::new(), + scan_time: Duration::ZERO, + }) + } +} + +impl WalkDirTask> +where + DBProxy: WalkerDBProxy, + IsoPathFactory: IsoFilePathFactory, +{ + pub fn new_shallow( + entry: impl Into + Send, + root: Arc, + indexer_ruler: IndexerRuler, + iso_file_path_factory: IsoPathFactory, + db_proxy: DBProxy, + ) -> Result { + let entry = entry.into(); + Ok(Self { + id: TaskId::new_v4(), + root, + indexer_ruler, + entry_iso_file_path: iso_file_path_factory.build(&entry.path, true)?, + iso_file_path_factory, + db_proxy, + stage: WalkerStage::Start, + entry, + maybe_dispatcher: None, + is_shallow: true, errors: Vec::new(), scan_time: Duration::ZERO, }) @@ -413,14 +446,26 @@ where type DeserializeCtx = (IndexerRuler, DBProxy, IsoPathFactory, Dispatcher); async fn serialize(self) -> Result, Self::SerializeError> { + let Self { + id, + entry, + root, + entry_iso_file_path, + stage, + errors, + scan_time, + is_shallow, + .. + } = self; rmp_serde::to_vec_named(&WalkDirSaveState { - id: self.id, - entry: self.entry, - root: self.root, - entry_iso_file_path: self.entry_iso_file_path, - stage: self.stage.into(), - errors: self.errors, - scan_time: self.scan_time, + id, + entry, + root, + entry_iso_file_path, + stage: stage.into(), + errors, + scan_time, + is_shallow, }) } @@ -437,6 +482,7 @@ where stage, errors, scan_time, + is_shallow, }| Self { id, entry, @@ -446,9 +492,10 @@ where iso_file_path_factory, db_proxy, stage: stage.into(), - maybe_dispatcher: Some(dispatcher), + maybe_dispatcher: is_shallow.then_some(dispatcher), errors, scan_time, + is_shallow, }, ) } @@ -466,6 +513,11 @@ where self.id } + fn with_priority(&self) -> bool { + // If we're running in shallow mode, then we want priority + self.is_shallow + } + #[allow(clippy::too_many_lines)] async fn run(&mut self, interrupter: &Interrupter) -> Result { let Self { @@ -747,13 +799,13 @@ async fn keep_walking( to_keep_walking .drain(..) .map(|entry| { - WalkDirTask::new( + WalkDirTask::new_deep( entry, Arc::clone(root), indexer_ruler.clone(), iso_file_path_factory.clone(), db_proxy.clone(), - Some(dispatcher.clone()), + dispatcher.clone(), ) .map_err(|e| NonCriticalIndexerError::DispatchKeepWalking(e.to_string())) }) @@ -1226,7 +1278,7 @@ mod tests { let handle = system .dispatch( - WalkDirTask::new( + WalkDirTask::new_deep( root_path.to_path_buf(), Arc::new(root_path.to_path_buf()), indexer_ruler, @@ -1234,7 +1286,7 @@ mod tests { root_path: Arc::new(root_path.to_path_buf()), }, DummyDBProxy, - Some(system.get_dispatcher()), + system.get_dispatcher(), ) .unwrap(), ) diff --git a/core/crates/heavy-lifting/src/job_system/job.rs b/core/crates/heavy-lifting/src/job_system/job.rs index 5dfdddfcb..191d71148 100644 --- a/core/crates/heavy-lifting/src/job_system/job.rs +++ b/core/crates/heavy-lifting/src/job_system/job.rs @@ -25,7 +25,10 @@ use futures_concurrency::{ use serde::{Deserialize, Serialize}; use specta::Type; use strum::{Display, EnumString}; -use tokio::spawn; +use tokio::{ + spawn, + sync::{watch, Mutex}, +}; use tracing::{debug, error, info, warn}; use uuid::Uuid; @@ -42,6 +45,7 @@ use super::{ #[strum(use_phf, serialize_all = "snake_case")] pub enum JobName { Indexer, + FileIdentifier, // TODO: Add more job names as needed } @@ -631,7 +635,10 @@ async fn to_spawn_job( let mut remote_controllers = vec![]; - let (dispatcher, remote_controllers_rx) = JobTaskDispatcher::new(base_dispatcher); + let (running_state_tx, running_state_rx) = watch::channel(JobRunningState::Running); + + let (dispatcher, remote_controllers_rx) = + JobTaskDispatcher::new(base_dispatcher, running_state_rx); if let Some(existing_tasks) = existing_tasks { if let Err(e) = job @@ -664,6 +671,7 @@ async fn to_spawn_job( match command { Command::Pause => { + running_state_tx.send_modify(|state| *state = JobRunningState::Paused); remote_controllers .iter() .map(TaskRemoteController::pause) @@ -680,6 +688,8 @@ async fn to_spawn_job( }); } Command::Resume => { + running_state_tx.send_modify(|state| *state = JobRunningState::Running); + remote_controllers .iter() .map(TaskRemoteController::resume) @@ -726,14 +736,29 @@ async fn to_spawn_job( } } +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum JobRunningState { + Running, + Paused, +} + +impl Default for JobRunningState { + fn default() -> Self { + Self::Running + } +} + #[derive(Debug, Clone)] pub struct JobTaskDispatcher { dispatcher: BaseTaskDispatcher, remote_controllers_tx: chan::Sender, + running_state: Arc>>, } impl TaskDispatcher for JobTaskDispatcher { async fn dispatch_boxed(&self, boxed_task: Box>) -> TaskHandle { + self.wait_for_dispatch_approval().await; + let handle = self.dispatcher.dispatch_boxed(boxed_task).await; self.remote_controllers_tx @@ -748,14 +773,9 @@ impl TaskDispatcher for JobTaskDispatcher { &self, boxed_tasks: impl IntoIterator>> + Send, ) -> Vec> { - let handles = self.dispatcher.dispatch_many_boxed(boxed_tasks).await; + self.wait_for_dispatch_approval().await; - for handle in &handles { - self.remote_controllers_tx - .send(handle.remote_controller()) - .await - .expect("remote controllers tx closed"); - } + let handles = self.dispatcher.dispatch_many_boxed(boxed_tasks).await; handles .iter() @@ -770,15 +790,28 @@ impl TaskDispatcher for JobTaskDispatcher { } impl JobTaskDispatcher { - fn new(dispatcher: BaseTaskDispatcher) -> (Self, chan::Receiver) { + fn new( + dispatcher: BaseTaskDispatcher, + running_state_rx: watch::Receiver, + ) -> (Self, chan::Receiver) { let (remote_controllers_tx, remote_controllers_rx) = chan::unbounded(); ( Self { dispatcher, remote_controllers_tx, + running_state: Arc::new(Mutex::new(running_state_rx)), }, remote_controllers_rx, ) } + + async fn wait_for_dispatch_approval(&self) { + self.running_state + .lock() + .await + .wait_for(|state| *state == JobRunningState::Running) + .await + .expect("job running state watch channel unexpectedly closed"); + } } diff --git a/core/crates/heavy-lifting/src/job_system/store.rs b/core/crates/heavy-lifting/src/job_system/store.rs index 93728030c..4d1cb9485 100644 --- a/core/crates/heavy-lifting/src/job_system/store.rs +++ b/core/crates/heavy-lifting/src/job_system/store.rs @@ -1,4 +1,4 @@ -use crate::indexer::IndexerJob; +use crate::{file_identifier::FileIdentifierJob, indexer::IndexerJob}; use sd_prisma::prisma::{job, location}; use sd_utils::uuid_to_bytes; @@ -212,6 +212,7 @@ async fn load_job( Ctx, [ IndexerJob, + FileIdentifierJob, // TODO: Add more jobs here // e.g.: FileIdentifierJob, MediaProcessorJob, etc., ] diff --git a/core/crates/heavy-lifting/src/lib.rs b/core/crates/heavy-lifting/src/lib.rs index 3675cdedb..1cc079f8d 100644 --- a/core/crates/heavy-lifting/src/lib.rs +++ b/core/crates/heavy-lifting/src/lib.rs @@ -33,9 +33,12 @@ use serde::{Deserialize, Serialize}; use specta::Type; use thiserror::Error; +pub mod file_identifier; pub mod indexer; pub mod job_system; +pub mod utils; +use file_identifier::{FileIdentifierError, NonCriticalFileIdentifierError}; use indexer::{IndexerError, NonCriticalIndexerError}; pub use job_system::{ @@ -47,6 +50,8 @@ pub use job_system::{ pub enum Error { #[error(transparent)] Indexer(#[from] IndexerError), + #[error(transparent)] + FileIdentifier(#[from] FileIdentifierError), #[error(transparent)] TaskSystem(#[from] TaskSystemError), @@ -56,6 +61,7 @@ impl From for rspc::Error { fn from(e: Error) -> Self { match e { Error::Indexer(e) => e.into(), + Error::FileIdentifier(e) => e.into(), Error::TaskSystem(e) => { Self::with_cause(rspc::ErrorCode::InternalServerError, e.to_string(), e) } @@ -68,4 +74,15 @@ pub enum NonCriticalJobError { // TODO: Add variants as needed #[error(transparent)] Indexer(#[from] NonCriticalIndexerError), + #[error(transparent)] + FileIdentifier(#[from] NonCriticalFileIdentifierError), +} + +#[repr(i32)] +#[derive(Debug, Clone, Copy, Serialize, Deserialize, Type, Eq, PartialEq)] +pub enum LocationScanState { + Pending = 0, + Indexed = 1, + FilesIdentified = 2, + Completed = 3, } diff --git a/core/crates/heavy-lifting/src/utils/mod.rs b/core/crates/heavy-lifting/src/utils/mod.rs new file mode 100644 index 000000000..538257e2c --- /dev/null +++ b/core/crates/heavy-lifting/src/utils/mod.rs @@ -0,0 +1 @@ +pub mod sub_path; diff --git a/core/crates/heavy-lifting/src/utils/sub_path.rs b/core/crates/heavy-lifting/src/utils/sub_path.rs new file mode 100644 index 000000000..6461ccdb7 --- /dev/null +++ b/core/crates/heavy-lifting/src/utils/sub_path.rs @@ -0,0 +1,93 @@ +use rspc::ErrorCode; +use sd_core_file_path_helper::{ + ensure_file_path_exists, ensure_sub_path_is_directory, ensure_sub_path_is_in_location, + FilePathError, IsolatedFilePathData, +}; + +use sd_prisma::prisma::{location, PrismaClient}; + +use std::path::{Path, PathBuf}; + +use prisma_client_rust::QueryError; + +#[derive(thiserror::Error, Debug)] +pub enum SubPathError { + #[error("received sub path not in database: ", .0.display())] + SubPathNotFound(Box), + + #[error("database error: {0}")] + Database(#[from] QueryError), + + #[error(transparent)] + IsoFilePath(#[from] FilePathError), +} + +impl From for rspc::Error { + fn from(err: SubPathError) -> Self { + match err { + SubPathError::SubPathNotFound(_) => { + Self::with_cause(ErrorCode::NotFound, err.to_string(), err) + } + + _ => Self::with_cause(ErrorCode::InternalServerError, err.to_string(), err), + } + } +} + +pub async fn get_full_path_from_sub_path( + location_id: location::id::Type, + sub_path: &Option + Send + Sync>, + location_path: impl AsRef + Send, + db: &PrismaClient, +) -> Result { + let location_path = location_path.as_ref(); + + match sub_path { + Some(sub_path) if sub_path.as_ref() != Path::new("") => { + let sub_path = sub_path.as_ref(); + let full_path = ensure_sub_path_is_in_location(location_path, sub_path).await?; + + ensure_sub_path_is_directory(location_path, sub_path).await?; + + ensure_file_path_exists( + sub_path, + &IsolatedFilePathData::new(location_id, location_path, &full_path, true)?, + db, + SubPathError::SubPathNotFound, + ) + .await?; + + Ok(full_path) + } + _ => Ok(location_path.to_path_buf()), + } +} + +pub async fn maybe_get_iso_file_path_from_sub_path( + location_id: location::id::Type, + sub_path: &Option + Send + Sync>, + location_path: impl AsRef + Send, + db: &PrismaClient, +) -> Result>, SubPathError> { + let location_path = location_path.as_ref(); + + match sub_path { + Some(sub_path) if sub_path.as_ref() != Path::new("") => { + let full_path = ensure_sub_path_is_in_location(location_path, sub_path).await?; + ensure_sub_path_is_directory(location_path, sub_path).await?; + + let sub_iso_file_path = + IsolatedFilePathData::new(location_id, location_path, &full_path, true)?; + + ensure_file_path_exists( + sub_path, + &sub_iso_file_path, + db, + SubPathError::SubPathNotFound, + ) + .await + .map(|()| Some(sub_iso_file_path)) + } + _ => Ok(None), + } +} diff --git a/core/crates/prisma-helpers/src/lib.rs b/core/crates/prisma-helpers/src/lib.rs index 26d67a4bf..fe830b59f 100644 --- a/core/crates/prisma-helpers/src/lib.rs +++ b/core/crates/prisma-helpers/src/lib.rs @@ -30,6 +30,7 @@ use sd_prisma::prisma::{self, file_path, job, label, location, object}; // File Path selectables! +file_path::select!(file_path_pub_id { pub_id }); file_path::select!(file_path_pub_and_cas_ids { id pub_id cas_id }); file_path::select!(file_path_just_pub_id_materialized_path { pub_id diff --git a/crates/task-system/src/task.rs b/crates/task-system/src/task.rs index bf7c18b49..9fd05d5b5 100644 --- a/crates/task-system/src/task.rs +++ b/crates/task-system/src/task.rs @@ -13,8 +13,7 @@ use async_channel as chan; use async_trait::async_trait; use chan::{Recv, RecvError}; use downcast_rs::{impl_downcast, Downcast}; -use futures::executor::block_on; -use tokio::sync::oneshot; +use tokio::{runtime::Handle, sync::oneshot}; use tracing::{trace, warn}; use uuid::Uuid; @@ -58,6 +57,12 @@ pub enum TaskOutput { Empty, } +impl From<()> for TaskOutput { + fn from((): ()) -> Self { + Self::Empty + } +} + /// An enum representing all possible outcomes for a task. #[derive(Debug)] pub enum TaskStatus { @@ -125,14 +130,8 @@ impl + 'static, E: RunError> IntoTask for T { /// due to a limitation in the Rust language. #[async_trait] pub trait Task: fmt::Debug + Downcast + Send + Sync + 'static { - /// This method represent the work that should be done by the worker, it will be called by the - /// worker when there is a slot available in its internal queue. - /// We receive a `&mut self` so any internal data can be mutated on each `run` invocation. - /// - /// The [`interrupter`](Interrupter) is a helper object that can be used to check if the user requested a pause or a cancel, - /// so the user can decide the appropriated moment to pause or cancel the task. Avoiding corrupted data or - /// inconsistent states. - async fn run(&mut self, interrupter: &Interrupter) -> Result; + /// An unique identifier for the task, it will be used to identify the task on the system and also to the user. + fn id(&self) -> TaskId; /// This method defines whether a task should run with priority or not. The task system has a mechanism /// to suspend non-priority tasks on any worker and run priority tasks ASAP. This is useful for tasks that @@ -142,8 +141,14 @@ pub trait Task: fmt::Debug + Downcast + Send + Sync + 'static { false } - /// An unique identifier for the task, it will be used to identify the task on the system and also to the user. - fn id(&self) -> TaskId; + /// This method represent the work that should be done by the worker, it will be called by the + /// worker when there is a slot available in its internal queue. + /// We receive a `&mut self` so any internal data can be mutated on each `run` invocation. + /// + /// The [`interrupter`](Interrupter) is a helper object that can be used to check if the user requested a pause or a cancel, + /// so the user can decide the appropriated moment to pause or cancel the task. Avoiding corrupted data or + /// inconsistent states. + async fn run(&mut self, interrupter: &Interrupter) -> Result; } impl_downcast!(Task where E: RunError); @@ -508,7 +513,7 @@ impl Future for CancelTaskOnDrop { impl Drop for CancelTaskOnDrop { fn drop(&mut self) { // FIXME: We should use async drop when it becomes stable - block_on(self.0.cancel()); + Handle::current().block_on(self.0.cancel()); } }