diff --git a/core/crates/indexer-rules/src/lib.rs b/core/crates/indexer-rules/src/lib.rs index 7c40b2eb1..4e682955f 100644 --- a/core/crates/indexer-rules/src/lib.rs +++ b/core/crates/indexer-rules/src/lib.rs @@ -260,7 +260,7 @@ impl MetadataForIndexerRules for Metadata { } impl RulePerKind { - #[deprecated] + #[deprecated = "Use `[apply_with_metadata]` instead"] async fn apply( &self, source: impl AsRef + Send, @@ -328,7 +328,7 @@ pub struct IndexerRule { } impl IndexerRule { - #[deprecated] + #[deprecated = "Use `[apply_with_metadata]` instead"] pub async fn apply( &self, source: impl AsRef + Send, @@ -362,7 +362,7 @@ impl IndexerRule { inner(&self.rules, source.as_ref(), metadata).await } - #[deprecated] + #[deprecated = "Use `[IndexerRuler::apply_all]` instead"] pub async fn apply_all( rules: &[Self], source: impl AsRef + Send, @@ -472,7 +472,7 @@ fn reject_by_glob(source: impl AsRef, reject_glob_set: &GlobSet) -> bool { !accept_by_glob(source.as_ref(), reject_glob_set) } -#[deprecated] +#[deprecated = "Use `[accept_dir_for_its_children_with_metadata]` instead"] async fn accept_dir_for_its_children( source: impl AsRef + Send, children: &HashSet, @@ -558,7 +558,7 @@ async fn accept_dir_for_its_children_with_metadata( Ok(false) } -#[deprecated] +#[deprecated = "Use `[reject_dir_for_its_children_with_metadata]` instead"] async fn reject_dir_for_its_children( source: impl AsRef + Send, children: &HashSet, diff --git a/core/crates/prisma-helpers/src/lib.rs b/core/crates/prisma-helpers/src/lib.rs index 9f04b609e..e0cf4cbdc 100644 --- a/core/crates/prisma-helpers/src/lib.rs +++ b/core/crates/prisma-helpers/src/lib.rs @@ -185,6 +185,7 @@ impl From for location::Data { sync_preview_media: data.sync_preview_media, hidden: data.hidden, date_created: data.date_created, + scan_state: data.scan_state, file_paths: None, indexer_rules: None, instance: None, @@ -208,6 +209,7 @@ impl From<&location_with_indexer_rules::Data> for location::Data { sync_preview_media: data.sync_preview_media, hidden: data.hidden, date_created: data.date_created, + scan_state: data.scan_state, file_paths: None, indexer_rules: None, instance: None, diff --git a/core/prisma/migrations/20240408215355_add_location_scan_state/migration.sql b/core/prisma/migrations/20240408215355_add_location_scan_state/migration.sql new file mode 100644 index 000000000..1bcf4117c --- /dev/null +++ b/core/prisma/migrations/20240408215355_add_location_scan_state/migration.sql @@ -0,0 +1,29 @@ +-- AlterTable +ALTER TABLE "job" ADD COLUMN "critical_error" TEXT; +ALTER TABLE "job" ADD COLUMN "non_critical_errors" BLOB; + +-- RedefineTables +PRAGMA foreign_keys=OFF; +CREATE TABLE "new_location" ( + "id" INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, + "pub_id" BLOB NOT NULL, + "name" TEXT, + "path" TEXT, + "total_capacity" INTEGER, + "available_capacity" INTEGER, + "size_in_bytes" BLOB, + "is_archived" BOOLEAN, + "generate_preview_media" BOOLEAN, + "sync_preview_media" BOOLEAN, + "hidden" BOOLEAN, + "date_created" DATETIME, + "scan_state" INTEGER NOT NULL DEFAULT 0, + "instance_id" INTEGER, + CONSTRAINT "location_instance_id_fkey" FOREIGN KEY ("instance_id") REFERENCES "instance" ("id") ON DELETE SET NULL ON UPDATE CASCADE +); +INSERT INTO "new_location" ("available_capacity", "date_created", "generate_preview_media", "hidden", "id", "instance_id", "is_archived", "name", "path", "pub_id", "size_in_bytes", "sync_preview_media", "total_capacity") SELECT "available_capacity", "date_created", "generate_preview_media", "hidden", "id", "instance_id", "is_archived", "name", "path", "pub_id", "size_in_bytes", "sync_preview_media", "total_capacity" FROM "location"; +DROP TABLE "location"; +ALTER TABLE "new_location" RENAME TO "location"; +CREATE UNIQUE INDEX "location_pub_id_key" ON "location"("pub_id"); +PRAGMA foreign_key_check; +PRAGMA foreign_keys=ON; diff --git a/core/prisma/schema.prisma b/core/prisma/schema.prisma index b7f4cf6bb..0504cd535 100644 --- a/core/prisma/schema.prisma +++ b/core/prisma/schema.prisma @@ -119,6 +119,8 @@ model Location { hidden Boolean? date_created DateTime? + scan_state Int @default(0) // Enum: sd_core::location::ScanState + /// @local // this is just a client side cache which is annoying but oh well (@brendan) instance_id Int? diff --git a/core/src/api/libraries.rs b/core/src/api/libraries.rs index fe4f45d7f..d30433938 100644 --- a/core/src/api/libraries.rs +++ b/core/src/api/libraries.rs @@ -1,7 +1,7 @@ use crate::{ invalidate_query, library::{update_library_statistics, Library, LibraryConfig, LibraryName}, - location::{scan_location, LocationCreateArgs}, + location::{scan_location, LocationCreateArgs, ScanState}, util::MaybeUndefined, Node, }; @@ -254,7 +254,9 @@ pub(crate) fn mount() -> AlphaRouter { return Ok(()); }; - scan_location(&node, &library, location) + let scan_state = ScanState::try_from(location.scan_state)?; + + scan_location(&node, &library, location, scan_state) .await .map_err(rspc::Error::from) })) diff --git a/core/src/api/locations.rs b/core/src/api/locations.rs index ed421ce8d..c88f0da0d 100644 --- a/core/src/api/locations.rs +++ b/core/src/api/locations.rs @@ -3,7 +3,7 @@ use crate::{ location::{ delete_location, find_location, indexer::OldIndexerJobInit, light_scan_location, non_indexed::NonIndexedPathItem, relink_location, scan_location, scan_location_sub_path, - LocationCreateArgs, LocationError, LocationUpdateArgs, + LocationCreateArgs, LocationError, LocationUpdateArgs, ScanState, }, object::old_file_identifier::old_file_identifier_job::OldFileIdentifierJobInit, old_job::StatefulJob, @@ -311,7 +311,7 @@ pub(crate) fn mount() -> AlphaRouter { .mutation(|(node, library), args: LocationCreateArgs| async move { if let Some(location) = args.create(&node, &library).await? { let id = Some(location.id); - scan_location(&node, &library, location).await?; + scan_location(&node, &library, location, ScanState::Pending).await?; invalidate_query!(library, "locations.list"); Ok(id) } else { @@ -349,7 +349,8 @@ pub(crate) fn mount() -> AlphaRouter { .mutation(|(node, library), args: LocationCreateArgs| async move { if let Some(location) = args.add_library(&node, &library).await? { let id = location.id; - scan_location(&node, &library, location).await?; + let location_scan_state = ScanState::try_from(location.scan_state)?; + scan_location(&node, &library, location, location_scan_state).await?; invalidate_query!(library, "locations.list"); Ok(Some(id)) } else { @@ -392,18 +393,18 @@ pub(crate) fn mount() -> AlphaRouter { // library.orphan_remover.invoke().await; } + let location = find_location(&library, location_id) + .include(location_with_indexer_rules::include()) + .exec() + .await? + .ok_or(LocationError::IdNotFound(location_id))?; + + let location_scan_state = ScanState::try_from(location.scan_state)?; + // rescan location - scan_location( - &node, - &library, - find_location(&library, location_id) - .include(location_with_indexer_rules::include()) - .exec() - .await? - .ok_or(LocationError::IdNotFound(location_id))?, - ) - .await - .map_err(Into::into) + scan_location(&node, &library, location, location_scan_state) + .await + .map_err(Into::into) }, ) }) diff --git a/core/src/location/error.rs b/core/src/location/error.rs index 23115cc87..df1bb3c93 100644 --- a/core/src/location/error.rs +++ b/core/src/location/error.rs @@ -76,6 +76,8 @@ pub enum LocationError { MissingPath(location::id::Type), #[error("missing-field: {0}")] MissingField(#[from] MissingFieldError), + #[error("invalid location scan state value: {0}")] + InvalidScanStateValue(i32), } impl From for rspc::Error { diff --git a/core/src/location/indexer/old_indexer_job.rs b/core/src/location/indexer/old_indexer_job.rs index 84edfddbd..4c4826793 100644 --- a/core/src/location/indexer/old_indexer_job.rs +++ b/core/src/location/indexer/old_indexer_job.rs @@ -1,7 +1,7 @@ use crate::{ file_paths_db_fetcher_fn, invalidate_query, library::Library, - location::{location_with_indexer_rules, update_location_size}, + location::{location_with_indexer_rules, update_location_size, ScanState}, old_job::{ CurrentStep, JobError, JobInitOutput, JobReportUpdate, JobResult, JobRunMetadata, JobStepOutput, StatefulJob, WorkerContext, @@ -531,9 +531,37 @@ impl StatefulJob for OldIndexerJobInit { update_location_size(init.location.id, &ctx.library) .await .map_err(IndexerError::from)?; + + ctx.library + .db + .location() + .update( + location::id::equals(init.location.id), + vec![location::scan_state::set(ScanState::Indexed as i32)], + ) + .exec() + .await + .map_err(IndexerError::from)?; } } + // FIXME(fogodev): This is currently a workaround to don't save paths and sizes in the + // metadata after a job is completed, as it's pretty heavy. A proper fix isn't needed + // right now as I already changed it in the new indexer job. And this old one + // will be removed eventually. + let run_metadata = Self::RunMetadata { + db_write_time: run_metadata.db_write_time, + scan_read_time: run_metadata.scan_read_time, + total_paths: run_metadata.total_paths, + total_updated_paths: run_metadata.total_updated_paths, + total_save_steps: run_metadata.total_save_steps, + total_update_steps: run_metadata.total_update_steps, + indexed_count: run_metadata.indexed_count, + updated_count: run_metadata.updated_count, + removed_count: run_metadata.removed_count, + paths_and_sizes: HashMap::new(), + }; + Ok(Some(json!({"init: ": init, "run_metadata": run_metadata}))) } } @@ -543,8 +571,8 @@ fn update_notifier_fn(ctx: &WorkerContext) -> impl FnMut(&Path, usize) + '_ { OldIndexerJobData::on_scan_progress( ctx, vec![ScanProgress::Message(format!( - "Found: {total_entries} entries; Scanning: {:?}", - path.file_name().unwrap_or(path.as_os_str()) + "{total_entries} entries found at {}", + path.display() ))], ); } diff --git a/core/src/location/indexer/old_shallow.rs b/core/src/location/indexer/old_shallow.rs index 5d5b48b7f..4b24316ed 100644 --- a/core/src/location/indexer/old_shallow.rs +++ b/core/src/location/indexer/old_shallow.rs @@ -81,7 +81,6 @@ pub async fn old_shallow( walk_single_dir( &to_walk_path, &indexer_rules, - |_, _| {}, file_paths_db_fetcher_fn!(&db), to_remove_db_fetcher_fn!(location_id, &db), iso_file_path_factory(location_id, location_path), @@ -104,7 +103,6 @@ pub async fn old_shallow( errors.into_iter().for_each(|e| error!("{e}")); - // TODO pass these uuids to sync system remove_non_existing_file_paths(to_remove, &db, sync).await?; let mut new_directories_to_scan = HashSet::new(); diff --git a/core/src/location/indexer/old_walk.rs b/core/src/location/indexer/old_walk.rs index b7db99c7c..6653165d3 100644 --- a/core/src/location/indexer/old_walk.rs +++ b/core/src/location/indexer/old_walk.rs @@ -153,11 +153,12 @@ where let mut to_remove = vec![]; while let Some(entry) = to_walk.pop_front() { + let last_indexed_count = indexed_paths.len(); + let (entry_size, current_to_remove) = inner_walk_single_dir( root, &entry, indexer_rules, - &mut update_notifier, &to_remove_db_fetcher, &iso_file_path_factory, WorkingTable { @@ -170,6 +171,8 @@ where .await; to_remove.push(current_to_remove); + update_notifier(&entry.path, indexed_paths.len() - last_indexed_count); + // Saving the size of current entry paths_and_sizes.insert(entry.path, entry_size); @@ -227,7 +230,6 @@ where to_walk_entry.path.clone(), to_walk_entry, indexer_rules, - &mut update_notifier, &to_remove_db_fetcher, &iso_file_path_factory, WorkingTable { @@ -239,6 +241,8 @@ where ) .await; + update_notifier(&to_walk_entry.path, indexed_paths.len()); + let (walked, to_update) = filter_existing_paths(indexed_paths, file_paths_db_fetcher).await?; Ok(WalkResult { @@ -263,7 +267,6 @@ where pub(super) async fn walk_single_dir( root: impl AsRef, indexer_rules: &[IndexerRule], - mut update_notifier: impl FnMut(&Path, usize) + '_, file_paths_db_fetcher: impl Fn(Vec) -> FilePathDBFetcherFut, to_remove_db_fetcher: impl Fn( IsolatedFilePathData<'static>, @@ -312,7 +315,6 @@ where maybe_parent: None, }, indexer_rules, - &mut update_notifier, &to_remove_db_fetcher, &iso_file_path_factory, WorkingTable { @@ -435,7 +437,6 @@ async fn inner_walk_single_dir( .. }: &ToWalkEntry, indexer_rules: &[IndexerRule], - update_notifier: &mut impl FnMut(&Path, usize), to_remove_db_fetcher: impl Fn( IsolatedFilePathData<'static>, Vec, @@ -469,8 +470,6 @@ where // Just to make sure... paths_buffer.clear(); - let mut found_paths_counts = 0; - // Marking with a loop label here in case of rejection or errors, to continue with next entry 'entries: loop { let entry = match read_dir.next_entry().await { @@ -491,16 +490,6 @@ where let current_path = entry.path(); - // Just sending updates if we found more paths since the last loop - let current_found_paths_count = paths_buffer.len(); - if found_paths_counts != current_found_paths_count { - update_notifier( - ¤t_path, - indexed_paths.len() + current_found_paths_count, - ); - found_paths_counts = current_found_paths_count; - } - trace!( "Current filesystem path: {}, accept_by_children_dir: {:#?}", current_path.display(), diff --git a/core/src/location/mod.rs b/core/src/location/mod.rs index 9846f98f9..bfa9d3426 100644 --- a/core/src/location/mod.rs +++ b/core/src/location/mod.rs @@ -35,7 +35,7 @@ use chrono::Utc; use futures::future::TryFutureExt; use normpath::PathExt; use prisma_client_rust::{operator::and, or, QueryError}; -use serde::Deserialize; +use serde::{Deserialize, Serialize}; use serde_json::json; use specta::Type; use tokio::{fs, io, time::Instant}; @@ -55,6 +55,29 @@ use metadata::SpacedriveLocationMetadataFile; pub type LocationPubId = Uuid; +#[repr(i32)] +#[derive(Debug, Clone, Copy, Serialize, Deserialize, Type, Eq, PartialEq)] +pub enum ScanState { + Pending = 0, + Indexed = 1, + FilesIdentified = 2, + Completed = 3, +} + +impl TryFrom for ScanState { + type Error = LocationError; + + fn try_from(value: i32) -> Result { + Ok(match value { + 0 => Self::Pending, + 1 => Self::Indexed, + 2 => Self::FilesIdentified, + 3 => Self::Completed, + _ => return Err(LocationError::InvalidScanStateValue(value)), + }) + } +} + /// `LocationCreateArgs` is the argument received from the client using `rspc` to create a new location. /// It has the actual path and a vector of indexer rules ids, to create many-to-many relationships /// between the location and indexer rules. @@ -442,6 +465,7 @@ pub async fn scan_location( node: &Arc, library: &Arc, location: location_with_indexer_rules::Data, + location_scan_state: ScanState, ) -> Result<(), JobManagerError> { // TODO(N): This isn't gonna work with removable media and this will likely permanently break if the DB is restored from a backup. if location.instance_id != Some(library.config().await.instance_id) { @@ -450,25 +474,61 @@ pub async fn scan_location( let location_base_data = location::Data::from(&location); - JobBuilder::new(OldIndexerJobInit { - location, - sub_path: None, - }) - .with_action("scan_location") - .with_metadata(json!({"location": location_base_data.clone()})) - .build() - .queue_next(OldFileIdentifierJobInit { - location: location_base_data.clone(), - sub_path: None, - }) - .queue_next(OldMediaProcessorJobInit { - location: location_base_data, - sub_path: None, - regenerate_thumbnails: false, - regenerate_labels: false, - }) - .spawn(node, library) - .await + debug!("Scanning location with state: {location_scan_state:?}"); + + match location_scan_state { + ScanState::Pending | ScanState::Completed => { + JobBuilder::new(OldIndexerJobInit { + location, + sub_path: None, + }) + .with_action("scan_location") + .with_metadata(json!({"location": location_base_data.clone()})) + .build() + .queue_next(OldFileIdentifierJobInit { + location: location_base_data.clone(), + sub_path: None, + }) + .queue_next(OldMediaProcessorJobInit { + location: location_base_data, + sub_path: None, + regenerate_thumbnails: false, + regenerate_labels: false, + }) + .spawn(node, library) + .await + } + ScanState::Indexed => { + JobBuilder::new(OldFileIdentifierJobInit { + location: location_base_data.clone(), + sub_path: None, + }) + .with_action("scan_location_already_indexed") + .with_metadata(json!({"location": location_base_data.clone()})) + .build() + .queue_next(OldMediaProcessorJobInit { + location: location_base_data, + sub_path: None, + regenerate_thumbnails: false, + regenerate_labels: false, + }) + .spawn(node, library) + .await + } + ScanState::FilesIdentified => { + JobBuilder::new(OldMediaProcessorJobInit { + location: location_base_data.clone(), + sub_path: None, + regenerate_thumbnails: false, + regenerate_labels: false, + }) + .with_action("scan_location_files_already_identified") + .with_metadata(json!({"location": location_base_data})) + .build() + .spawn(node, library) + .await + } + } .map_err(Into::into) } diff --git a/core/src/object/media/old_media_processor/job.rs b/core/src/object/media/old_media_processor/job.rs index 444dbbab0..47aa5eff0 100644 --- a/core/src/object/media/old_media_processor/job.rs +++ b/core/src/object/media/old_media_processor/job.rs @@ -1,6 +1,7 @@ use crate::{ invalidate_query, library::Library, + location::ScanState, old_job::{ CurrentStep, JobError, JobInitOutput, JobReportUpdate, JobResult, JobStepOutput, StatefulJob, WorkerContext, @@ -420,6 +421,17 @@ impl StatefulJob for OldMediaProcessorJobInit { invalidate_query!(ctx.library, "search.paths"); } + ctx.library + .db + .location() + .update( + location::id::equals(self.location.id), + vec![location::scan_state::set(ScanState::Completed as i32)], + ) + .exec() + .await + .map_err(MediaProcessorError::from)?; + Ok(Some(json!({"init: ": self, "run_metadata": run_metadata}))) } } diff --git a/core/src/object/old_file_identifier/old_file_identifier_job.rs b/core/src/object/old_file_identifier/old_file_identifier_job.rs index f862a49a4..3a1afa3f6 100644 --- a/core/src/object/old_file_identifier/old_file_identifier_job.rs +++ b/core/src/object/old_file_identifier/old_file_identifier_job.rs @@ -1,5 +1,6 @@ use crate::{ library::Library, + location::ScanState, old_job::{ CurrentStep, JobError, JobInitOutput, JobReportUpdate, JobResult, JobRunMetadata, JobStepOutput, StatefulJob, WorkerContext, @@ -239,13 +240,24 @@ impl StatefulJob for OldFileIdentifierJobInit { async fn finalize( &self, - _: &WorkerContext, + ctx: &WorkerContext, _data: &Option, run_metadata: &Self::RunMetadata, ) -> JobResult { let init = self; info!("Finalizing identifier job: {:?}", &run_metadata); + ctx.library + .db + .location() + .update( + location::id::equals(init.location.id), + vec![location::scan_state::set(ScanState::FilesIdentified as i32)], + ) + .exec() + .await + .map_err(FileIdentifierJobError::from)?; + Ok(Some(json!({"init: ": init, "run_metadata": run_metadata}))) } } diff --git a/core/src/util/debug_initializer.rs b/core/src/util/debug_initializer.rs index 0169164ad..8d796cc3d 100644 --- a/core/src/util/debug_initializer.rs +++ b/core/src/util/debug_initializer.rs @@ -1,10 +1,10 @@ // ! A system for loading a default set of data on startup. This is ONLY enabled in development builds. use crate::{ - library::Libraries, - library::{LibraryManagerError, LibraryName}, + library::{Libraries, LibraryManagerError, LibraryName}, location::{ delete_location, scan_location, LocationCreateArgs, LocationError, LocationManagerError, + ScanState, }, old_job::JobManagerError, util::AbortOnDrop, @@ -178,7 +178,7 @@ impl InitConfig { .create(node, &library) .await? { - scan_location(node, &library, location).await?; + scan_location(node, &library, location, ScanState::Pending).await?; } else { warn!( "Debug init error: location '{}' was not found after being created!", diff --git a/packages/client/src/core.ts b/packages/client/src/core.ts index 0bf612eb9..d8930ecbc 100644 --- a/packages/client/src/core.ts +++ b/packages/client/src/core.ts @@ -416,7 +416,7 @@ export type LightScanArgs = { location_id: number; sub_path: string } export type Listener2 = { id: string; name: string; addrs: string[] } -export type Location = { id: number; pub_id: number[]; name: string | null; path: string | null; total_capacity: number | null; available_capacity: number | null; size_in_bytes: number[] | null; is_archived: boolean | null; generate_preview_media: boolean | null; sync_preview_media: boolean | null; hidden: boolean | null; date_created: string | null; instance_id: number | null } +export type Location = { id: number; pub_id: number[]; name: string | null; path: string | null; total_capacity: number | null; available_capacity: number | null; size_in_bytes: number[] | null; is_archived: boolean | null; generate_preview_media: boolean | null; sync_preview_media: boolean | null; hidden: boolean | null; date_created: string | null; scan_state: number; instance_id: number | null } /** * `LocationCreateArgs` is the argument received from the client using `rspc` to create a new location.