From e70fd4eba52886de8f67a804e9e55381ae3ffd1a Mon Sep 17 00:00:00 2001 From: Brendan Allan Date: Tue, 30 May 2023 19:07:41 +0200 Subject: [PATCH] [ENG-662] Don't use job system for shallow operations (#886) * make it work * use functions instead of jobs for shallow stuff * use generalised AbortOnDrop * simplify naming * error stuff + alphabetical rspc types * log errors * don't run CI on push * remove ephemeral * private Worker::new * remove comments --- .github/workflows/ci.yml | 16 - Cargo.lock | 2 +- core/src/api/locations.rs | 28 +- core/src/job/job_manager.rs | 25 +- core/src/job/worker.rs | 1 + core/src/location/indexer/indexer_job.rs | 25 +- core/src/location/indexer/mod.rs | 26 +- core/src/location/indexer/shallow.rs | 107 +++++ .../location/indexer/shallow_indexer_job.rs | 202 --------- core/src/location/mod.rs | 41 +- .../file_identifier/file_identifier_job.rs | 55 ++- core/src/object/file_identifier/mod.rs | 53 +-- core/src/object/file_identifier/shallow.rs | 174 ++++++++ .../shallow_file_identifier_job.rs | 259 ------------ core/src/object/preview/thumbnail/mod.rs | 80 ++-- core/src/object/preview/thumbnail/shallow.rs | 150 +++++++ .../thumbnail/shallow_thumbnailer_job.rs | 215 ---------- core/src/util/debug_initializer.rs | 35 +- interface/app/$libraryId/location/$id.tsx | 18 +- interface/hooks/useExplorerTopBarOptions.tsx | 25 +- packages/client/src/core.ts | 384 +++++++++--------- 21 files changed, 833 insertions(+), 1088 deletions(-) create mode 100644 core/src/location/indexer/shallow.rs delete mode 100644 core/src/location/indexer/shallow_indexer_job.rs create mode 100644 core/src/object/file_identifier/shallow.rs delete mode 100644 core/src/object/file_identifier/shallow_file_identifier_job.rs create mode 100644 core/src/object/preview/thumbnail/shallow.rs delete mode 100644 core/src/object/preview/thumbnail/shallow_thumbnailer_job.rs diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 9ebbfd65f..7abcabb7f 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -1,22 +1,6 @@ name: CI on: - push: - branches: - - main - paths-ignore: - - '**.md' - - 'docs/**' - - 'LICENSE' - - '.cspell/**' - - '.vscode/**' - - '.github/CODEOWNERS' - - '.github/FUNDING.yml' - - '.github/ISSUE_TEMPLATE/**' - - '.github/scripts/osxcross/**' - - '.github/scripts/ffmpeg-macos/**' - - '.gitattributes' - - 'cspell.config.yaml' pull_request: paths-ignore: - '**.md' diff --git a/Cargo.lock b/Cargo.lock index e7b0476eb..1a5507bc8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6634,7 +6634,7 @@ dependencies = [ [[package]] name = "rspc" version = "0.1.4" -source = "git+https://github.com/oscartbeaumont/rspc?branch=specta-v2#927e951fa7bae9d8ad87d2c084a508870f43e788" +source = "git+https://github.com/oscartbeaumont/rspc?branch=specta-v2#2aec493e6f9259e67f04866e250e1690f073def9" dependencies = [ "futures", "futures-channel", diff --git a/core/src/api/locations.rs b/core/src/api/locations.rs index a412b4c44..99736d4c2 100644 --- a/core/src/api/locations.rs +++ b/core/src/api/locations.rs @@ -6,13 +6,13 @@ use crate::{ LocationError, LocationUpdateArgs, }, prisma::{file_path, indexer_rule, indexer_rules_in_location, location, object, tag}, + util::debug_initializer::AbortOnDrop, }; -use std::path::PathBuf; - use rspc::{self, alpha::AlphaRouter, ErrorCode}; use serde::{Deserialize, Serialize}; use specta::Type; +use std::path::PathBuf; use super::{utils::library, Ctx, R}; @@ -151,19 +151,17 @@ pub(crate) fn mount() -> AlphaRouter { } R.with2(library()) - .mutation(|(_, library), args: LightScanArgs| async move { - // light rescan location - light_scan_location( - &library, - find_location(&library, args.location_id) - .include(location_with_indexer_rules::include()) - .exec() - .await? - .ok_or(LocationError::IdNotFound(args.location_id))?, - &args.sub_path, - ) - .await - .map_err(Into::into) + .subscription(|(_, library), args: LightScanArgs| async move { + let location = find_location(&library, args.location_id) + .include(location_with_indexer_rules::include()) + .exec() + .await? + .ok_or(LocationError::IdNotFound(args.location_id))?; + + let handle = + tokio::spawn(light_scan_location(library, location, args.sub_path)); + + Ok(AbortOnDrop(handle)) }) }) .procedure( diff --git a/core/src/job/job_manager.rs b/core/src/job/job_manager.rs index 7c2db88ad..3948c89a4 100644 --- a/core/src/job/job_manager.rs +++ b/core/src/job/job_manager.rs @@ -2,19 +2,14 @@ use crate::{ invalidate_query, job::{worker::Worker, DynJob, Job, JobError, StatefulJob}, library::Library, - location::indexer::{indexer_job::IndexerJob, shallow_indexer_job::ShallowIndexerJob}, + location::indexer::indexer_job::IndexerJob, object::{ - file_identifier::{ - file_identifier_job::FileIdentifierJob, - shallow_file_identifier_job::ShallowFileIdentifierJob, - }, + file_identifier::file_identifier_job::FileIdentifierJob, fs::{ copy::FileCopierJob, cut::FileCutterJob, decrypt::FileDecryptorJob, delete::FileDeleterJob, encrypt::FileEncryptorJob, erase::FileEraserJob, }, - preview::{ - shallow_thumbnailer_job::ShallowThumbnailerJob, thumbnailer_job::ThumbnailerJob, - }, + preview::thumbnailer_job::ThumbnailerJob, validation::validator_job::ObjectValidatorJob, }, prisma::{job, node, SortOrder}, @@ -281,12 +276,8 @@ impl JobManager { let wrapped_worker = Arc::new(Mutex::new(worker)); - if let Err(e) = Worker::spawn( - Arc::clone(&self), - Arc::clone(&wrapped_worker), - library.clone(), - ) - .await + if let Err(e) = + Worker::spawn(self.clone(), Arc::clone(&wrapped_worker), library.clone()).await { error!("Error spawning worker: {:?}", e); } else { @@ -532,11 +523,8 @@ fn get_background_info_by_job_name(name: &str) -> bool { }, jobs = [ ThumbnailerJob, - ShallowThumbnailerJob, IndexerJob, - ShallowIndexerJob, FileIdentifierJob, - ShallowFileIdentifierJob, ObjectValidatorJob, FileCutterJob, FileCopierJob, @@ -564,11 +552,8 @@ fn get_resumable_job( }, jobs = [ ThumbnailerJob, - ShallowThumbnailerJob, IndexerJob, - ShallowIndexerJob, FileIdentifierJob, - ShallowFileIdentifierJob, ObjectValidatorJob, FileCutterJob, FileCopierJob, diff --git a/core/src/job/worker.rs b/core/src/job/worker.rs index 06fe622f0..623a0133b 100644 --- a/core/src/job/worker.rs +++ b/core/src/job/worker.rs @@ -122,6 +122,7 @@ impl Worker { // Otherwise it can be a job being resumed or a children job that was already been created worker.report.update(&library).await?; } + drop(worker); job.register_children(&library).await?; diff --git a/core/src/location/indexer/indexer_job.rs b/core/src/location/indexer/indexer_job.rs index 600042c7e..74c9c4b7b 100644 --- a/core/src/location/indexer/indexer_job.rs +++ b/core/src/location/indexer/indexer_job.rs @@ -184,12 +184,25 @@ impl StatefulJob for IndexerJob { match &state.steps[0] { IndexerJobStepInput::Save(step) => { - execute_indexer_save_step(&state.init.location, step, data, &mut ctx) - .await - .map(|(indexed_count, elapsed_time)| { - data.indexed_count += indexed_count; - data.db_write_time += elapsed_time; - })? + let start_time = Instant::now(); + + IndexerJobData::on_scan_progress( + &mut ctx, + vec![ + ScanProgress::SavedChunks(step.chunk_idx), + ScanProgress::Message(format!( + "Writing {}/{} to db", + step.chunk_idx, data.total_save_steps + )), + ], + ); + + let count = + execute_indexer_save_step(&state.init.location, step, &ctx.library.clone()) + .await?; + + data.indexed_count += count as u64; + data.db_write_time += start_time.elapsed(); } IndexerJobStepInput::Walk(to_walk_entry) => { let location_id = state.init.location.id; diff --git a/core/src/location/indexer/mod.rs b/core/src/location/indexer/mod.rs index 7b6f65dc1..a89aafbe4 100644 --- a/core/src/location/indexer/mod.rs +++ b/core/src/location/indexer/mod.rs @@ -17,7 +17,6 @@ use rspc::ErrorCode; use serde::{de::DeserializeOwned, Deserialize, Serialize}; use serde_json::json; use thiserror::Error; -use tokio::time::Instant; use tracing::info; use super::{ @@ -27,12 +26,14 @@ use super::{ pub mod indexer_job; pub mod rules; -pub mod shallow_indexer_job; +mod shallow; mod walk; use rules::IndexerRuleError; use walk::WalkedEntry; +pub use shallow::*; + /// `IndexerJobInit` receives a `location::Data` object to be indexed /// and possibly a `sub_path` to be indexed. The `sub_path` is used when /// we want do index just a part of a location. @@ -132,22 +133,9 @@ impl From for rspc::Error { async fn execute_indexer_save_step( location: &location_with_indexer_rules::Data, save_step: &IndexerJobSaveStep, - data: &IndexerJobData, - ctx: &mut WorkerContext, -) -> Result<(u64, Duration), IndexerError> { - let start_time = Instant::now(); - - IndexerJobData::on_scan_progress( - ctx, - vec![ - ScanProgress::SavedChunks(save_step.chunk_idx), - ScanProgress::Message(format!( - "Writing {}/{} to db", - save_step.chunk_idx, data.total_save_steps - )), - ], - ); - let Library { sync, db, .. } = &ctx.library; + library: &Library, +) -> Result { + let Library { sync, db, .. } = &library; let (sync_stuff, paths): (Vec<_>, Vec<_>) = save_step .walked @@ -214,7 +202,7 @@ async fn execute_indexer_save_step( info!("Inserted {count} records"); - Ok((count as u64, start_time.elapsed())) + Ok(count) } fn finalize_indexer( diff --git a/core/src/location/indexer/shallow.rs b/core/src/location/indexer/shallow.rs new file mode 100644 index 000000000..d927bdc99 --- /dev/null +++ b/core/src/location/indexer/shallow.rs @@ -0,0 +1,107 @@ +use crate::{ + file_paths_db_fetcher_fn, + job::JobError, + library::Library, + location::file_path_helper::{ + check_file_path_exists, ensure_sub_path_is_directory, ensure_sub_path_is_in_location, + IsolatedFilePathData, + }, + to_remove_db_fetcher_fn, +}; +use tracing::error; + +use std::path::{Path, PathBuf}; + +use itertools::Itertools; + +use super::{ + execute_indexer_save_step, iso_file_path_factory, location_with_indexer_rules, + remove_non_existing_file_paths, rules::IndexerRule, walk::walk_single_dir, IndexerError, + IndexerJobSaveStep, +}; + +/// BATCH_SIZE is the number of files to index at each step, writing the chunk of files metadata in the database. +const BATCH_SIZE: usize = 1000; + +pub async fn shallow( + location: &location_with_indexer_rules::Data, + sub_path: &PathBuf, + library: &Library, +) -> Result<(), JobError> { + let location_id = location.id; + let location_path = Path::new(&location.path); + + let db = library.db.clone(); + + let indexer_rules = location + .indexer_rules + .iter() + .map(|rule| IndexerRule::try_from(&rule.indexer_rule)) + .collect::, _>>() + .map_err(IndexerError::from)?; + + let (add_root, to_walk_path) = if sub_path != Path::new("") { + let full_path = ensure_sub_path_is_in_location(location_path, &sub_path) + .await + .map_err(IndexerError::from)?; + ensure_sub_path_is_directory(location_path, &sub_path) + .await + .map_err(IndexerError::from)?; + + ( + !check_file_path_exists::( + &IsolatedFilePathData::new(location_id, location_path, &full_path, true) + .map_err(IndexerError::from)?, + &db, + ) + .await?, + full_path, + ) + } else { + (false, location_path.to_path_buf()) + }; + + let (walked, to_remove, errors) = { + walk_single_dir( + &to_walk_path, + &indexer_rules, + |_, _| {}, + file_paths_db_fetcher_fn!(&db), + to_remove_db_fetcher_fn!(location_id, location_path, &db), + iso_file_path_factory(location_id, location_path), + add_root, + ) + .await? + }; + + errors.into_iter().for_each(|e| error!("{e}")); + + // TODO pass these uuids to sync system + remove_non_existing_file_paths(to_remove, &db).await?; + + let total_paths = &mut 0; + + let steps = walked + .chunks(BATCH_SIZE) + .into_iter() + .enumerate() + .map(|(i, chunk)| { + let chunk_steps = chunk.collect::>(); + + *total_paths += chunk_steps.len() as u64; + + IndexerJobSaveStep { + chunk_idx: i, + walked: chunk_steps, + } + }) + .collect::>(); + + for step in steps { + execute_indexer_save_step(&location, &step, &library).await?; + } + + library.orphan_remover.invoke().await; + + Ok(()) +} diff --git a/core/src/location/indexer/shallow_indexer_job.rs b/core/src/location/indexer/shallow_indexer_job.rs deleted file mode 100644 index 31999d138..000000000 --- a/core/src/location/indexer/shallow_indexer_job.rs +++ /dev/null @@ -1,202 +0,0 @@ -use crate::{ - file_paths_db_fetcher_fn, - job::{JobError, JobInitData, JobResult, JobState, StatefulJob, WorkerContext}, - location::file_path_helper::{ - check_file_path_exists, ensure_sub_path_is_directory, ensure_sub_path_is_in_location, - IsolatedFilePathData, - }, - to_remove_db_fetcher_fn, -}; - -use std::{ - hash::{Hash, Hasher}, - path::{Path, PathBuf}, - sync::Arc, -}; - -use itertools::Itertools; -use serde::{Deserialize, Serialize}; -use tokio::time::Instant; - -use super::{ - execute_indexer_save_step, finalize_indexer, iso_file_path_factory, - location_with_indexer_rules, remove_non_existing_file_paths, rules::IndexerRule, - update_notifier_fn, walk::walk_single_dir, IndexerError, IndexerJobData, IndexerJobSaveStep, - ScanProgress, -}; - -/// BATCH_SIZE is the number of files to index at each step, writing the chunk of files metadata in the database. -const BATCH_SIZE: usize = 1000; - -/// `ShallowIndexerJobInit` receives a `location::Data` object to be indexed -/// and possibly a `sub_path` to be indexed. The `sub_path` is used when -/// we want do index just a part of a location. -#[derive(Serialize, Deserialize)] -pub struct ShallowIndexerJobInit { - pub location: location_with_indexer_rules::Data, - pub sub_path: PathBuf, -} - -impl Hash for ShallowIndexerJobInit { - fn hash(&self, state: &mut H) { - self.location.id.hash(state); - self.sub_path.hash(state); - } -} - -/// A `ShallowIndexerJob` is a stateful job that indexes all files in a directory, without checking inner directories. -/// First it checks the directory and generates a list of files to index, chunked into -/// batches of [`BATCH_SIZE`]. Then for each chunk it write the file metadata to the database. -pub struct ShallowIndexerJob; - -impl JobInitData for ShallowIndexerJobInit { - type Job = ShallowIndexerJob; -} - -#[async_trait::async_trait] -impl StatefulJob for ShallowIndexerJob { - type Init = ShallowIndexerJobInit; - type Data = IndexerJobData; - type Step = IndexerJobSaveStep; - - const NAME: &'static str = "shallow_indexer"; - const IS_BACKGROUND: bool = true; - - fn new() -> Self { - Self {} - } - - /// Creates a vector of valid path buffers from a directory, chunked into batches of `BATCH_SIZE`. - async fn init( - &self, - mut ctx: WorkerContext, - state: &mut JobState, - ) -> Result<(), JobError> { - let location_id = state.init.location.id; - let location_path = Path::new(&state.init.location.path); - - let db = Arc::clone(&ctx.library.db); - - let indexer_rules = state - .init - .location - .indexer_rules - .iter() - .map(|rule| IndexerRule::try_from(&rule.indexer_rule)) - .collect::, _>>() - .map_err(IndexerError::from)?; - - let (add_root, to_walk_path) = if state.init.sub_path != Path::new("") { - let full_path = ensure_sub_path_is_in_location(location_path, &state.init.sub_path) - .await - .map_err(IndexerError::from)?; - ensure_sub_path_is_directory(location_path, &state.init.sub_path) - .await - .map_err(IndexerError::from)?; - - ( - !check_file_path_exists::( - &IsolatedFilePathData::new(location_id, location_path, &full_path, true) - .map_err(IndexerError::from)?, - &db, - ) - .await?, - full_path, - ) - } else { - (false, location_path.to_path_buf()) - }; - - let scan_start = Instant::now(); - let (walked, to_remove, errors) = { - let ctx = &mut ctx; - walk_single_dir( - &to_walk_path, - &indexer_rules, - update_notifier_fn(BATCH_SIZE, ctx), - file_paths_db_fetcher_fn!(&db), - to_remove_db_fetcher_fn!(location_id, location_path, &db), - iso_file_path_factory(location_id, location_path), - add_root, - ) - .await? - }; - - let db_delete_start = Instant::now(); - // TODO pass these uuids to sync system - let removed_count = remove_non_existing_file_paths(to_remove, &db).await?; - let db_delete_time = db_delete_start.elapsed(); - - let total_paths = &mut 0; - - state.steps.extend( - walked - .chunks(BATCH_SIZE) - .into_iter() - .enumerate() - .map(|(i, chunk)| { - let chunk_steps = chunk.collect::>(); - - *total_paths += chunk_steps.len() as u64; - - IndexerJobSaveStep { - chunk_idx: i, - walked: chunk_steps, - } - }), - ); - - ctx.library.orphan_remover.invoke().await; - - IndexerJobData::on_scan_progress( - &mut ctx, - vec![ScanProgress::Message(format!( - "Saving {total_paths} files or directories" - ))], - ); - - state.data = Some(IndexerJobData { - indexed_path: to_walk_path, - indexer_rules, - db_write_time: db_delete_time, - scan_read_time: scan_start.elapsed(), - total_paths: *total_paths, - indexed_count: 0, - removed_count, - total_save_steps: state.steps.len() as u64, - }); - - if !errors.is_empty() { - Err(JobError::StepCompletedWithErrors( - errors.into_iter().map(|e| format!("{e}")).collect(), - )) - } else { - Ok(()) - } - } - - /// Process each chunk of entries in the indexer job, writing to the `file_path` table - async fn execute_step( - &self, - mut ctx: WorkerContext, - state: &mut JobState, - ) -> Result<(), JobError> { - let data = state - .data - .as_mut() - .expect("critical error: missing data on job state"); - - execute_indexer_save_step(&state.init.location, &state.steps[0], data, &mut ctx) - .await - .map(|(indexed_paths, elapsed_time)| { - data.indexed_count += indexed_paths; - data.db_write_time += elapsed_time; - }) - .map_err(Into::into) - } - - /// Logs some metadata about the indexer job - async fn finalize(&mut self, ctx: WorkerContext, state: &mut JobState) -> JobResult { - finalize_indexer(&state.init.location.path, state, ctx) - } -} diff --git a/core/src/location/mod.rs b/core/src/location/mod.rs index 23bef9885..57aff7907 100644 --- a/core/src/location/mod.rs +++ b/core/src/location/mod.rs @@ -3,13 +3,8 @@ use crate::{ job::{Job, JobManagerError}, library::Library, object::{ - file_identifier::{ - file_identifier_job::FileIdentifierJobInit, - shallow_file_identifier_job::ShallowFileIdentifierJobInit, - }, - preview::{ - shallow_thumbnailer_job::ShallowThumbnailerJobInit, thumbnailer_job::ThumbnailerJobInit, - }, + file_identifier::{self, file_identifier_job::FileIdentifierJobInit}, + preview::{shallow_thumbnailer, thumbnailer_job::ThumbnailerJobInit}, }, prisma::{file_path, indexer_rules_in_location, location, node, object, PrismaClient}, sync, @@ -38,7 +33,7 @@ mod manager; mod metadata; pub use error::LocationError; -use indexer::{shallow_indexer_job::ShallowIndexerJobInit, IndexerJobInit}; +use indexer::IndexerJobInit; pub use manager::{LocationManager, LocationManagerError}; use metadata::SpacedriveLocationMetadataFile; @@ -423,37 +418,23 @@ pub async fn scan_location_sub_path( } pub async fn light_scan_location( - library: &Library, + library: Library, location: location_with_indexer_rules::Data, sub_path: impl AsRef, ) -> Result<(), JobManagerError> { let sub_path = sub_path.as_ref().to_path_buf(); + if location.node_id != library.node_local_id { return Ok(()); } let location_base_data = location::Data::from(&location); - // removed grouping for background jobs, they don't need to be grouped as only running ones are shown to the user - library - .spawn_job(ShallowIndexerJobInit { - location, - sub_path: sub_path.clone(), - }) - .await - .unwrap_or(()); - library - .spawn_job(ShallowFileIdentifierJobInit { - location: location_base_data.clone(), - sub_path: sub_path.clone(), - }) - .await - .unwrap_or(()); - library - .spawn_job(ShallowThumbnailerJobInit { - location: location_base_data.clone(), - sub_path: sub_path.clone(), - }) - .await + + indexer::shallow(&location, &sub_path, &library).await?; + file_identifier::shallow(&location_base_data, &sub_path, &library).await?; + shallow_thumbnailer(&location_base_data, &sub_path, &library).await?; + + Ok(()) } pub async fn relink_location( diff --git a/core/src/object/file_identifier/file_identifier_job.rs b/core/src/object/file_identifier/file_identifier_job.rs index 43825cf79..761af1f97 100644 --- a/core/src/object/file_identifier/file_identifier_job.rs +++ b/core/src/object/file_identifier/file_identifier_job.rs @@ -20,8 +20,7 @@ use serde::{Deserialize, Serialize}; use tracing::info; use super::{ - finalize_file_identifier, process_identifier_file_paths, FileIdentifierJobError, - FileIdentifierReport, CHUNK_SIZE, + process_identifier_file_paths, FileIdentifierJobError, FileIdentifierReport, CHUNK_SIZE, }; pub struct FileIdentifierJob {} @@ -184,27 +183,51 @@ impl StatefulJob for FileIdentifierJob { ) .await?; - process_identifier_file_paths( - ::NAME, + // if no file paths found, abort entire job early, there is nothing to do + // if we hit this error, there is something wrong with the data/query + if file_paths.is_empty() { + return Err(JobError::EarlyFinish { + name: ::NAME.to_string(), + reason: "Expected orphan Paths not returned from database query for this chunk" + .to_string(), + }); + } + + let (total_objects_created, total_objects_linked) = process_identifier_file_paths( location, &file_paths, step_number, cursor, - report, - ctx, + &ctx.library, + report.total_orphan_paths, ) - .await + .await?; + + report.total_objects_created += total_objects_created; + report.total_objects_linked += total_objects_linked; + + ctx.progress(vec![ + JobReportUpdate::CompletedTaskCount(step_number), + JobReportUpdate::Message(format!( + "Processed {} of {} orphan Paths", + step_number * CHUNK_SIZE, + report.total_orphan_paths + )), + ]); + + Ok(()) } - async fn finalize(&mut self, ctx: WorkerContext, state: &mut JobState) -> JobResult { - finalize_file_identifier( - &state - .data - .as_ref() - .expect("critical error: missing data on job state") - .report, - ctx, - ) + async fn finalize(&mut self, _: WorkerContext, state: &mut JobState) -> JobResult { + let report = &state + .data + .as_ref() + .expect("critical error: missing data on job state") + .report; + + info!("Finalizing identifier job: {report:?}"); + + Ok(Some(serde_json::to_value(report)?)) } } diff --git a/core/src/object/file_identifier/mod.rs b/core/src/object/file_identifier/mod.rs index ab6618738..d6a8ad660 100644 --- a/core/src/object/file_identifier/mod.rs +++ b/core/src/object/file_identifier/mod.rs @@ -1,6 +1,5 @@ use crate::{ - invalidate_query, - job::{JobError, JobReportUpdate, JobResult, WorkerContext}, + job::JobError, library::Library, location::file_path_helper::{ file_path_for_file_identifier, FilePathError, IsolatedFilePathData, @@ -28,7 +27,9 @@ use tracing::{error, info}; use uuid::Uuid; pub mod file_identifier_job; -pub mod shallow_file_identifier_job; +mod shallow; + +pub use shallow::*; // we break these jobs into chunks of 100 to improve performance const CHUNK_SIZE: usize = 100; @@ -336,60 +337,26 @@ fn file_path_object_connect_ops<'db>( } async fn process_identifier_file_paths( - job_name: &str, location: &location::Data, file_paths: &[file_path_for_file_identifier::Data], step_number: usize, cursor: &mut i32, - report: &mut FileIdentifierReport, - ctx: WorkerContext, -) -> Result<(), JobError> { - // if no file paths found, abort entire job early, there is nothing to do - // if we hit this error, there is something wrong with the data/query - if file_paths.is_empty() { - return Err(JobError::EarlyFinish { - name: job_name.to_string(), - reason: "Expected orphan Paths not returned from database query for this chunk" - .to_string(), - }); - } - + library: &Library, + orphan_count: usize, +) -> Result<(usize, usize), JobError> { info!( "Processing {:?} orphan Paths. ({} completed of {})", file_paths.len(), step_number, - report.total_orphan_paths + orphan_count ); - let (total_objects_created, total_objects_linked) = - identifier_job_step(&ctx.library, location, file_paths).await?; - - report.total_objects_created += total_objects_created; - report.total_objects_linked += total_objects_linked; + let counts = identifier_job_step(&library, location, file_paths).await?; // set the step data cursor to the last row of this chunk if let Some(last_row) = file_paths.last() { *cursor = last_row.id; } - ctx.progress(vec![ - JobReportUpdate::CompletedTaskCount(step_number), - JobReportUpdate::Message(format!( - "Processed {} of {} orphan Paths", - step_number * CHUNK_SIZE, - report.total_orphan_paths - )), - ]); - - Ok(()) -} - -fn finalize_file_identifier(report: &FileIdentifierReport, ctx: WorkerContext) -> JobResult { - info!("Finalizing identifier job: {report:?}"); - - if report.total_orphan_paths > 0 { - invalidate_query!(ctx.library, "search.paths"); - } - - Ok(Some(serde_json::to_value(report)?)) + Ok(counts) } diff --git a/core/src/object/file_identifier/shallow.rs b/core/src/object/file_identifier/shallow.rs new file mode 100644 index 000000000..c2bbaddcb --- /dev/null +++ b/core/src/object/file_identifier/shallow.rs @@ -0,0 +1,174 @@ +use crate::{ + invalidate_query, + job::JobError, + library::Library, + location::file_path_helper::{ + ensure_file_path_exists, ensure_sub_path_is_directory, ensure_sub_path_is_in_location, + file_path_for_file_identifier, IsolatedFilePathData, + }, + prisma::{file_path, location, PrismaClient, SortOrder}, + util::db::chain_optional_iter, +}; + +use std::path::{Path, PathBuf}; + +use serde::{Deserialize, Serialize}; +use tracing::info; + +use super::{process_identifier_file_paths, FileIdentifierJobError, CHUNK_SIZE}; + +#[derive(Serialize, Deserialize)] +pub struct ShallowFileIdentifierJobState { + cursor: i32, + sub_iso_file_path: IsolatedFilePathData<'static>, +} + +pub async fn shallow( + location: &location::Data, + sub_path: &PathBuf, + library: &Library, +) -> Result<(), JobError> { + let Library { db, .. } = &library; + + info!("Identifying orphan File Paths..."); + + let location_id = location.id; + let location_path = Path::new(&location.path); + + let sub_iso_file_path = if sub_path != Path::new("") { + let full_path = ensure_sub_path_is_in_location(location_path, &sub_path) + .await + .map_err(FileIdentifierJobError::from)?; + ensure_sub_path_is_directory(location_path, &sub_path) + .await + .map_err(FileIdentifierJobError::from)?; + + let sub_iso_file_path = + IsolatedFilePathData::new(location_id, location_path, &full_path, true) + .map_err(FileIdentifierJobError::from)?; + + ensure_file_path_exists( + &sub_path, + &sub_iso_file_path, + db, + FileIdentifierJobError::SubPathNotFound, + ) + .await?; + + sub_iso_file_path + } else { + IsolatedFilePathData::new(location_id, location_path, location_path, true) + .map_err(FileIdentifierJobError::from)? + }; + + let orphan_count = count_orphan_file_paths(db, location_id, &sub_iso_file_path).await?; + + if orphan_count == 0 { + return Ok(()); + } + + let task_count = (orphan_count as f64 / CHUNK_SIZE as f64).ceil() as usize; + info!( + "Found {} orphan Paths. Will execute {} tasks...", + orphan_count, task_count + ); + + let first_path = db + .file_path() + .find_first(orphan_path_filters(location_id, None, &sub_iso_file_path)) + // .order_by(file_path::id::order(Direction::Asc)) + .select(file_path::select!({ id })) + .exec() + .await? + .unwrap(); // SAFETY: We already validated before that there are orphans `file_path`s + + // Initializing `state.data` here because we need a complete state in case of early finish + let mut data = ShallowFileIdentifierJobState { + cursor: first_path.id, + sub_iso_file_path, + }; + + for step_number in 0..task_count { + let ShallowFileIdentifierJobState { + cursor, + sub_iso_file_path, + } = &mut data; + + // get chunk of orphans to process + let file_paths = + get_orphan_file_paths(&library.db, location.id, *cursor, sub_iso_file_path).await?; + + process_identifier_file_paths( + location, + &file_paths, + step_number, + cursor, + &library, + orphan_count, + ) + .await?; + } + + if orphan_count > 0 { + invalidate_query!(library, "search.paths"); + } + + Ok(()) +} + +fn orphan_path_filters( + location_id: i32, + file_path_id: Option, + sub_iso_file_path: &IsolatedFilePathData<'_>, +) -> Vec { + chain_optional_iter( + [ + file_path::object_id::equals(None), + file_path::is_dir::equals(false), + file_path::location_id::equals(location_id), + file_path::materialized_path::equals( + sub_iso_file_path + .materialized_path_for_children() + .expect("sub path for shallow identifier must be a directory"), + ), + ], + [file_path_id.map(file_path::id::gte)], + ) +} + +async fn count_orphan_file_paths( + db: &PrismaClient, + location_id: i32, + sub_iso_file_path: &IsolatedFilePathData<'_>, +) -> Result { + db.file_path() + .count(orphan_path_filters(location_id, None, sub_iso_file_path)) + .exec() + .await + .map(|c| c as usize) +} + +async fn get_orphan_file_paths( + db: &PrismaClient, + location_id: i32, + file_path_id_cursor: i32, + sub_iso_file_path: &IsolatedFilePathData<'_>, +) -> Result, prisma_client_rust::QueryError> { + info!( + "Querying {} orphan Paths at cursor: {:?}", + CHUNK_SIZE, file_path_id_cursor + ); + db.file_path() + .find_many(orphan_path_filters( + location_id, + Some(file_path_id_cursor), + sub_iso_file_path, + )) + .order_by(file_path::id::order(SortOrder::Asc)) + // .cursor(cursor.into()) + .take(CHUNK_SIZE as i64) + // .skip(1) + .select(file_path_for_file_identifier::select()) + .exec() + .await +} diff --git a/core/src/object/file_identifier/shallow_file_identifier_job.rs b/core/src/object/file_identifier/shallow_file_identifier_job.rs deleted file mode 100644 index 2b36086fb..000000000 --- a/core/src/object/file_identifier/shallow_file_identifier_job.rs +++ /dev/null @@ -1,259 +0,0 @@ -use crate::{ - job::{ - JobError, JobInitData, JobReportUpdate, JobResult, JobState, StatefulJob, WorkerContext, - }, - library::Library, - location::file_path_helper::{ - ensure_file_path_exists, ensure_sub_path_is_directory, ensure_sub_path_is_in_location, - file_path_for_file_identifier, IsolatedFilePathData, - }, - prisma::{file_path, location, PrismaClient, SortOrder}, - util::db::chain_optional_iter, -}; - -use std::{ - hash::{Hash, Hasher}, - path::{Path, PathBuf}, -}; - -use serde::{Deserialize, Serialize}; -use tracing::info; - -use super::{ - finalize_file_identifier, process_identifier_file_paths, FileIdentifierJobError, - FileIdentifierReport, CHUNK_SIZE, -}; - -pub struct ShallowFileIdentifierJob {} - -/// `ShallowFileIdentifierJobInit` takes file_paths without a file_id from a specific path -/// (just direct children of this path) and uniquely identifies them: -/// - first: generating the cas_id and extracting metadata -/// - finally: creating unique file records, and linking them to their file_paths -#[derive(Serialize, Deserialize, Clone)] -pub struct ShallowFileIdentifierJobInit { - pub location: location::Data, - pub sub_path: PathBuf, -} - -impl Hash for ShallowFileIdentifierJobInit { - fn hash(&self, state: &mut H) { - self.location.id.hash(state); - self.sub_path.hash(state); - } -} - -#[derive(Serialize, Deserialize)] -pub struct ShallowFileIdentifierJobState { - cursor: i32, - report: FileIdentifierReport, - sub_iso_file_path: IsolatedFilePathData<'static>, -} - -impl JobInitData for ShallowFileIdentifierJobInit { - type Job = ShallowFileIdentifierJob; -} - -#[async_trait::async_trait] -impl StatefulJob for ShallowFileIdentifierJob { - type Init = ShallowFileIdentifierJobInit; - type Data = ShallowFileIdentifierJobState; - type Step = (); - - const NAME: &'static str = "shallow_file_identifier"; - const IS_BACKGROUND: bool = true; - - fn new() -> Self { - Self {} - } - - async fn init(&self, ctx: WorkerContext, state: &mut JobState) -> Result<(), JobError> { - let Library { db, .. } = &ctx.library; - - info!("Identifying orphan File Paths..."); - - let location_id = state.init.location.id; - let location_path = Path::new(&state.init.location.path); - - let sub_iso_file_path = if state.init.sub_path != Path::new("") { - let full_path = ensure_sub_path_is_in_location(location_path, &state.init.sub_path) - .await - .map_err(FileIdentifierJobError::from)?; - ensure_sub_path_is_directory(location_path, &state.init.sub_path) - .await - .map_err(FileIdentifierJobError::from)?; - - let sub_iso_file_path = - IsolatedFilePathData::new(location_id, location_path, &full_path, true) - .map_err(FileIdentifierJobError::from)?; - - ensure_file_path_exists( - &state.init.sub_path, - &sub_iso_file_path, - db, - FileIdentifierJobError::SubPathNotFound, - ) - .await?; - - sub_iso_file_path - } else { - IsolatedFilePathData::new(location_id, location_path, location_path, true) - .map_err(FileIdentifierJobError::from)? - }; - - let orphan_count = count_orphan_file_paths(db, location_id, &sub_iso_file_path).await?; - - // Initializing `state.data` here because we need a complete state in case of early finish - state.data = Some(ShallowFileIdentifierJobState { - report: FileIdentifierReport { - location_path: location_path.to_path_buf(), - total_orphan_paths: orphan_count, - ..Default::default() - }, - cursor: 0, - sub_iso_file_path, - }); - - if orphan_count == 0 { - return Err(JobError::EarlyFinish { - name: ::NAME.to_string(), - reason: "Found no orphan file paths to process".to_string(), - }); - } - - info!("Found {} orphan file paths", orphan_count); - - let task_count = (orphan_count as f64 / CHUNK_SIZE as f64).ceil() as usize; - info!( - "Found {} orphan Paths. Will execute {} tasks...", - orphan_count, task_count - ); - - // update job with total task count based on orphan file_paths count - ctx.progress(vec![JobReportUpdate::TaskCount(task_count)]); - - let mut data = state - .data - .as_mut() - .expect("critical error: missing data on job state"); - - let first_path = db - .file_path() - .find_first(orphan_path_filters( - location_id, - None, - &data.sub_iso_file_path, - )) - // .order_by(file_path::id::order(Direction::Asc)) - .select(file_path::select!({ id })) - .exec() - .await? - .expect("We already validated before that there are orphans `file_path`s"); // SAFETY: We already validated before that there are orphans `file_path`s - - data.cursor = first_path.id; - - state.steps.extend((0..task_count).map(|_| ())); - - Ok(()) - } - - async fn execute_step( - &self, - ctx: WorkerContext, - state: &mut JobState, - ) -> Result<(), JobError> { - let ShallowFileIdentifierJobState { - ref mut cursor, - ref mut report, - ref sub_iso_file_path, - } = state - .data - .as_mut() - .expect("critical error: missing data on job state"); - - let location = &state.init.location; - - // get chunk of orphans to process - let file_paths = - get_orphan_file_paths(&ctx.library.db, location.id, *cursor, sub_iso_file_path).await?; - - process_identifier_file_paths( - ::NAME, - location, - &file_paths, - state.step_number, - cursor, - report, - ctx, - ) - .await - } - - async fn finalize(&mut self, ctx: WorkerContext, state: &mut JobState) -> JobResult { - finalize_file_identifier( - &state - .data - .as_ref() - .expect("critical error: missing data on job state") - .report, - ctx, - ) - } -} - -fn orphan_path_filters( - location_id: i32, - file_path_id: Option, - sub_iso_file_path: &IsolatedFilePathData<'_>, -) -> Vec { - chain_optional_iter( - [ - file_path::object_id::equals(None), - file_path::is_dir::equals(false), - file_path::location_id::equals(location_id), - file_path::materialized_path::equals( - sub_iso_file_path - .materialized_path_for_children() - .expect("sub path for shallow identifier must be a directory"), - ), - ], - [file_path_id.map(file_path::id::gte)], - ) -} - -async fn count_orphan_file_paths( - db: &PrismaClient, - location_id: i32, - sub_iso_file_path: &IsolatedFilePathData<'_>, -) -> Result { - db.file_path() - .count(orphan_path_filters(location_id, None, sub_iso_file_path)) - .exec() - .await - .map(|c| c as usize) -} - -async fn get_orphan_file_paths( - db: &PrismaClient, - location_id: i32, - file_path_id_cursor: i32, - sub_iso_file_path: &IsolatedFilePathData<'_>, -) -> Result, prisma_client_rust::QueryError> { - info!( - "Querying {} orphan Paths at cursor: {:?}", - CHUNK_SIZE, file_path_id_cursor - ); - db.file_path() - .find_many(orphan_path_filters( - location_id, - Some(file_path_id_cursor), - sub_iso_file_path, - )) - .order_by(file_path::id::order(SortOrder::Asc)) - // .cursor(cursor.into()) - .take(CHUNK_SIZE as i64) - // .skip(1) - .select(file_path_for_file_identifier::select()) - .exec() - .await -} diff --git a/core/src/object/preview/thumbnail/mod.rs b/core/src/object/preview/thumbnail/mod.rs index 9f94109f2..9bd4289b0 100644 --- a/core/src/object/preview/thumbnail/mod.rs +++ b/core/src/object/preview/thumbnail/mod.rs @@ -1,14 +1,13 @@ use crate::{ api::CoreEvent, invalidate_query, - job::{ - JobError, JobInitData, JobReportUpdate, JobResult, JobState, StatefulJob, WorkerContext, - }, + job::{JobError, JobReportUpdate, JobResult, JobState, WorkerContext}, library::Library, location::{ file_path_helper::{file_path_for_thumbnailer, FilePathError, IsolatedFilePathData}, LocationId, }, + prisma::location, util::error::FileIOError, }; @@ -27,17 +26,17 @@ use image::{self, imageops, DynamicImage, GenericImageView}; use once_cell::sync::Lazy; use serde::{Deserialize, Serialize}; use thiserror::Error; -use tokio::{ - fs::{self}, - io::{self}, - task::block_in_place, -}; +use tokio::{fs, io, task::block_in_place}; use tracing::{error, info, trace, warn}; use webp::Encoder; -pub mod shallow_thumbnailer_job; +use self::thumbnailer_job::ThumbnailerJob; + +mod shallow; pub mod thumbnailer_job; +pub use shallow::*; + const THUMBNAIL_SIZE_FACTOR: f32 = 0.2; const THUMBNAIL_QUALITY: f32 = 30.0; pub const THUMBNAIL_CACHE_DIR_NAME: &str = "thumbnails"; @@ -213,14 +212,10 @@ fn finalize_thumbnailer(data: &ThumbnailerJobState, ctx: WorkerContext) -> JobRe Ok(Some(serde_json::to_value(&data.report)?)) } -async fn process_step( - state: &mut JobState, +async fn process_step( + state: &mut JobState, ctx: WorkerContext, -) -> Result<(), JobError> -where - SJob: StatefulJob, - Init: JobInitData, -{ +) -> Result<(), JobError> { let step = &state.steps[0]; ctx.progress(vec![JobReportUpdate::Message(format!( @@ -228,7 +223,21 @@ where step.file_path.materialized_path ))]); - let step_result = inner_process_step(state, &ctx).await; + let data = state + .data + .as_mut() + .expect("critical error: missing data on job state"); + + let step_result = inner_process_step( + &step, + &data.location_path, + &data.thumbnail_dir, + &state.init.location, + &ctx.library, + ) + .await; + + data.report.thumbnails_created += 1; ctx.progress(vec![JobReportUpdate::CompletedTaskCount( state.step_number + 1, @@ -237,25 +246,17 @@ where step_result } -async fn inner_process_step( - state: &mut JobState, - ctx: &WorkerContext, -) -> Result<(), JobError> -where - SJob: StatefulJob, - Init: JobInitData, -{ - let ThumbnailerJobStep { file_path, kind } = &state.steps[0]; - let data = state - .data - .as_ref() - .expect("critical error: missing data on job state"); +pub async fn inner_process_step( + step: &ThumbnailerJobStep, + location_path: &PathBuf, + thumbnail_dir: &PathBuf, + location: &location::Data, + library: &Library, +) -> Result<(), JobError> { + let ThumbnailerJobStep { file_path, kind } = step; // assemble the file path - let path = data.location_path.join(IsolatedFilePathData::from(( - data.report.location_id, - file_path, - ))); + let path = location_path.join(IsolatedFilePathData::from((location.id, file_path))); trace!("image_file {:?}", file_path); // get cas_id, if none found skip @@ -269,7 +270,7 @@ where }; // Define and write the WebP-encoded file to a given path - let output_path = data.thumbnail_dir.join(format!("{cas_id}.webp")); + let output_path = thumbnail_dir.join(format!("{cas_id}.webp")); match fs::metadata(&output_path).await { Ok(_) => { @@ -293,16 +294,9 @@ where } println!("emitting new thumbnail event"); - ctx.library.emit(CoreEvent::NewThumbnail { + library.emit(CoreEvent::NewThumbnail { cas_id: cas_id.clone(), }); - - state - .data - .as_mut() - .expect("critical error: missing data on job state") - .report - .thumbnails_created += 1; } Err(e) => return Err(ThumbnailerError::from(FileIOError::from((output_path, e))).into()), } diff --git a/core/src/object/preview/thumbnail/shallow.rs b/core/src/object/preview/thumbnail/shallow.rs new file mode 100644 index 000000000..755721464 --- /dev/null +++ b/core/src/object/preview/thumbnail/shallow.rs @@ -0,0 +1,150 @@ +use super::{ + ThumbnailerError, ThumbnailerJobStep, ThumbnailerJobStepKind, FILTERED_IMAGE_EXTENSIONS, + THUMBNAIL_CACHE_DIR_NAME, +}; +use crate::{ + job::JobError, + library::Library, + location::{ + file_path_helper::{ + ensure_file_path_exists, ensure_sub_path_is_directory, ensure_sub_path_is_in_location, + file_path_for_thumbnailer, IsolatedFilePathData, + }, + LocationId, + }, + object::preview::thumbnail, + prisma::{file_path, location, PrismaClient}, + util::error::FileIOError, +}; +use sd_file_ext::extensions::Extension; +use std::path::{Path, PathBuf}; +use tokio::fs; +use tracing::info; + +#[cfg(feature = "ffmpeg")] +use super::FILTERED_VIDEO_EXTENSIONS; + +pub async fn shallow_thumbnailer( + location: &location::Data, + sub_path: &PathBuf, + library: &Library, +) -> Result<(), JobError> { + let Library { db, .. } = &library; + + let thumbnail_dir = library + .config() + .data_directory() + .join(THUMBNAIL_CACHE_DIR_NAME); + + let location_id = location.id; + let location_path = PathBuf::from(&location.path); + + let (path, iso_file_path) = if sub_path != Path::new("") { + let full_path = ensure_sub_path_is_in_location(&location_path, &sub_path) + .await + .map_err(ThumbnailerError::from)?; + ensure_sub_path_is_directory(&location_path, &sub_path) + .await + .map_err(ThumbnailerError::from)?; + + let sub_iso_file_path = + IsolatedFilePathData::new(location_id, &location_path, &full_path, true) + .map_err(ThumbnailerError::from)?; + + ensure_file_path_exists( + &sub_path, + &sub_iso_file_path, + db, + ThumbnailerError::SubPathNotFound, + ) + .await?; + + (full_path, sub_iso_file_path) + } else { + ( + location_path.to_path_buf(), + IsolatedFilePathData::new(location_id, &location_path, &location_path, true) + .map_err(ThumbnailerError::from)?, + ) + }; + + info!( + "Searching for images in location {location_id} at path {}", + path.display() + ); + + // create all necessary directories if they don't exist + fs::create_dir_all(&thumbnail_dir) + .await + .map_err(|e| FileIOError::from((&thumbnail_dir, e)))?; + + // query database for all image files in this location that need thumbnails + let image_files = get_files_by_extensions( + &library.db, + location_id, + &iso_file_path, + &FILTERED_IMAGE_EXTENSIONS, + ThumbnailerJobStepKind::Image, + ) + .await?; + + info!("Found {:?} image files", image_files.len()); + + #[cfg(feature = "ffmpeg")] + let video_files = { + // query database for all video files in this location that need thumbnails + let video_files = get_files_by_extensions( + &library.db, + location_id, + &iso_file_path, + &FILTERED_VIDEO_EXTENSIONS, + ThumbnailerJobStepKind::Video, + ) + .await?; + + info!("Found {:?} video files", video_files.len()); + + video_files + }; + + let all_files = [ + image_files, + #[cfg(feature = "ffmpeg")] + video_files, + ] + .into_iter() + .flatten(); + + for file in all_files { + thumbnail::inner_process_step(&file, &location_path, &thumbnail_dir, location, &library) + .await?; + } + + Ok(()) +} + +async fn get_files_by_extensions( + db: &PrismaClient, + location_id: LocationId, + parent_isolated_file_path_data: &IsolatedFilePathData<'_>, + extensions: &[Extension], + kind: ThumbnailerJobStepKind, +) -> Result, JobError> { + Ok(db + .file_path() + .find_many(vec![ + file_path::location_id::equals(location_id), + file_path::extension::in_vec(extensions.iter().map(ToString::to_string).collect()), + file_path::materialized_path::equals( + parent_isolated_file_path_data + .materialized_path_for_children() + .expect("sub path iso_file_path must be a directory"), + ), + ]) + .select(file_path_for_thumbnailer::select()) + .exec() + .await? + .into_iter() + .map(|file_path| ThumbnailerJobStep { file_path, kind }) + .collect()) +} diff --git a/core/src/object/preview/thumbnail/shallow_thumbnailer_job.rs b/core/src/object/preview/thumbnail/shallow_thumbnailer_job.rs deleted file mode 100644 index 5669cc251..000000000 --- a/core/src/object/preview/thumbnail/shallow_thumbnailer_job.rs +++ /dev/null @@ -1,215 +0,0 @@ -use crate::{ - job::{ - JobError, JobInitData, JobReportUpdate, JobResult, JobState, StatefulJob, WorkerContext, - }, - library::Library, - location::{ - file_path_helper::{ - ensure_file_path_exists, ensure_sub_path_is_directory, ensure_sub_path_is_in_location, - file_path_for_thumbnailer, IsolatedFilePathData, - }, - LocationId, - }, - prisma::{file_path, location, PrismaClient}, - util::error::FileIOError, -}; - -use std::{ - collections::VecDeque, - hash::Hash, - path::{Path, PathBuf}, -}; - -use sd_file_ext::extensions::Extension; - -use serde::{Deserialize, Serialize}; -use tokio::fs; -use tracing::info; - -use super::{ - finalize_thumbnailer, process_step, ThumbnailerError, ThumbnailerJobReport, - ThumbnailerJobState, ThumbnailerJobStep, ThumbnailerJobStepKind, FILTERED_IMAGE_EXTENSIONS, - THUMBNAIL_CACHE_DIR_NAME, -}; - -#[cfg(feature = "ffmpeg")] -use super::FILTERED_VIDEO_EXTENSIONS; - -pub struct ShallowThumbnailerJob {} - -#[derive(Serialize, Deserialize, Clone)] -pub struct ShallowThumbnailerJobInit { - pub location: location::Data, - pub sub_path: PathBuf, -} - -impl Hash for ShallowThumbnailerJobInit { - fn hash(&self, state: &mut H) { - self.location.id.hash(state); - self.sub_path.hash(state); - } -} - -impl JobInitData for ShallowThumbnailerJobInit { - type Job = ShallowThumbnailerJob; -} - -#[async_trait::async_trait] -impl StatefulJob for ShallowThumbnailerJob { - type Init = ShallowThumbnailerJobInit; - type Data = ThumbnailerJobState; - type Step = ThumbnailerJobStep; - - const NAME: &'static str = "shallow_thumbnailer"; - const IS_BACKGROUND: bool = true; - - fn new() -> Self { - Self {} - } - - async fn init(&self, ctx: WorkerContext, state: &mut JobState) -> Result<(), JobError> { - let Library { db, .. } = &ctx.library; - - let thumbnail_dir = ctx - .library - .config() - .data_directory() - .join(THUMBNAIL_CACHE_DIR_NAME); - - let location_id = state.init.location.id; - let location_path = PathBuf::from(&state.init.location.path); - - let (path, iso_file_path) = if state.init.sub_path != Path::new("") { - let full_path = ensure_sub_path_is_in_location(&location_path, &state.init.sub_path) - .await - .map_err(ThumbnailerError::from)?; - ensure_sub_path_is_directory(&location_path, &state.init.sub_path) - .await - .map_err(ThumbnailerError::from)?; - - let sub_iso_file_path = - IsolatedFilePathData::new(location_id, &location_path, &full_path, true) - .map_err(ThumbnailerError::from)?; - - ensure_file_path_exists( - &state.init.sub_path, - &sub_iso_file_path, - db, - ThumbnailerError::SubPathNotFound, - ) - .await?; - - (full_path, sub_iso_file_path) - } else { - ( - location_path.to_path_buf(), - IsolatedFilePathData::new(location_id, &location_path, &location_path, true) - .map_err(ThumbnailerError::from)?, - ) - }; - - info!( - "Searching for images in location {location_id} at path {}", - path.display() - ); - - // create all necessary directories if they don't exist - fs::create_dir_all(&thumbnail_dir) - .await - .map_err(|e| FileIOError::from((&thumbnail_dir, e)))?; - - // query database for all image files in this location that need thumbnails - let image_files = get_files_by_extensions( - db, - location_id, - &iso_file_path, - &FILTERED_IMAGE_EXTENSIONS, - ThumbnailerJobStepKind::Image, - ) - .await?; - info!("Found {:?} image files", image_files.len()); - - #[cfg(feature = "ffmpeg")] - let all_files = { - // query database for all video files in this location that need thumbnails - let video_files = get_files_by_extensions( - db, - location_id, - &iso_file_path, - &FILTERED_VIDEO_EXTENSIONS, - ThumbnailerJobStepKind::Video, - ) - .await?; - info!("Found {:?} video files", video_files.len()); - - image_files - .into_iter() - .chain(video_files.into_iter()) - .collect::>() - }; - #[cfg(not(feature = "ffmpeg"))] - let all_files = { image_files.into_iter().collect::>() }; - - ctx.progress(vec![ - JobReportUpdate::TaskCount(all_files.len()), - JobReportUpdate::Message(format!("Preparing to process {} files", all_files.len())), - ]); - - state.data = Some(ThumbnailerJobState { - thumbnail_dir, - location_path, - report: ThumbnailerJobReport { - location_id, - path, - thumbnails_created: 0, - }, - }); - state.steps.extend(all_files); - - Ok(()) - } - - async fn execute_step( - &self, - ctx: WorkerContext, - state: &mut JobState, - ) -> Result<(), JobError> { - process_step(state, ctx).await - } - - async fn finalize(&mut self, ctx: WorkerContext, state: &mut JobState) -> JobResult { - finalize_thumbnailer( - state - .data - .as_ref() - .expect("critical error: missing data on job state"), - ctx, - ) - } -} - -async fn get_files_by_extensions( - db: &PrismaClient, - location_id: LocationId, - parent_isolated_file_path_data: &IsolatedFilePathData<'_>, - extensions: &[Extension], - kind: ThumbnailerJobStepKind, -) -> Result, JobError> { - Ok(db - .file_path() - .find_many(vec![ - file_path::location_id::equals(location_id), - file_path::extension::in_vec(extensions.iter().map(ToString::to_string).collect()), - file_path::materialized_path::equals( - parent_isolated_file_path_data - .materialized_path_for_children() - .expect("sub path iso_file_path must be a directory"), - ), - ]) - .select(file_path_for_thumbnailer::select()) - .exec() - .await? - .into_iter() - .map(|file_path| ThumbnailerJobStep { file_path, kind }) - .collect()) -} diff --git a/core/src/util/debug_initializer.rs b/core/src/util/debug_initializer.rs index 591c79d8a..0f3639f84 100644 --- a/core/src/util/debug_initializer.rs +++ b/core/src/util/debug_initializer.rs @@ -14,6 +14,7 @@ use crate::{ }, prisma::location, }; +use futures::{pin_mut, Future, Stream}; use prisma_client_rust::QueryError; use serde::Deserialize; use thiserror::Error; @@ -182,10 +183,40 @@ impl InitConfig { } } -pub struct AbortOnDrop(tokio::task::JoinHandle); +pub struct AbortOnDrop(pub tokio::task::JoinHandle); impl Drop for AbortOnDrop { fn drop(&mut self) { - self.0.abort(); + self.0.abort() + } +} + +impl Future for AbortOnDrop { + type Output = Result; + + fn poll( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll { + let handle = &mut self.0; + + pin_mut!(handle); + + handle.poll(cx) + } +} + +impl Stream for AbortOnDrop { + type Item = (); + + fn poll_next( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + let handle = &mut self.0; + + pin_mut!(handle); + + handle.poll(cx).map(|_| None) } } diff --git a/interface/app/$libraryId/location/$id.tsx b/interface/app/$libraryId/location/$id.tsx index 6b23ac655..103933a62 100644 --- a/interface/app/$libraryId/location/$id.tsx +++ b/interface/app/$libraryId/location/$id.tsx @@ -4,8 +4,8 @@ import { z } from 'zod'; import { ExplorerItem, useLibraryContext, - useLibraryMutation, useLibraryQuery, + useLibrarySubscription, useRspcLibraryContext } from '@sd/client'; import { Folder } from '~/components/Folder'; @@ -33,14 +33,22 @@ export const Component = () => { const location = useLibraryQuery(['locations.get', location_id]); - // we destructure this since `mutate` is a stable reference but the object it's in is not - const { mutate: quickRescan } = useLibraryMutation('locations.quickRescan'); + useLibrarySubscription( + [ + 'locations.quickRescan', + { + location_id, + sub_path: path ?? '' + } + ], + { onData() {} } + ); const explorerStore = getExplorerStore(); + useEffect(() => { explorerStore.locationId = location_id; - if (location_id !== null) quickRescan({ location_id, sub_path: path ?? '' }); - }, [explorerStore, location_id, path, quickRescan]); + }, [explorerStore, location_id, path]); const { query, items } = useItems(); const file = explorerStore.selectedRowIndex !== null && items?.[explorerStore.selectedRowIndex]; diff --git a/interface/hooks/useExplorerTopBarOptions.tsx b/interface/hooks/useExplorerTopBarOptions.tsx index 64287e89c..8038b7b70 100644 --- a/interface/hooks/useExplorerTopBarOptions.tsx +++ b/interface/hooks/useExplorerTopBarOptions.tsx @@ -10,7 +10,8 @@ import { SquaresFour, Tag } from 'phosphor-react'; -import { useLibraryMutation } from '@sd/client'; +import { useEffect, useRef } from 'react'; +import { useRspcLibraryContext } from '@sd/client'; import OptionsPanel from '~/app/$libraryId/Explorer/OptionsPanel'; import { TOP_BAR_ICON_STYLE, ToolOption } from '~/app/$libraryId/TopBar/TopBarOptions'; import { KeyManager } from '../app/$libraryId/KeyManager'; @@ -19,8 +20,6 @@ import { getExplorerStore, useExplorerStore } from './useExplorerStore'; export const useExplorerTopBarOptions = () => { const explorerStore = useExplorerStore(); - const reload = useLibraryMutation('locations.quickRescan'); - const explorerViewOptions: ToolOption[] = [ { toolTipLabel: 'Grid view', @@ -75,6 +74,14 @@ export const useExplorerTopBarOptions = () => { } ]; + // subscription so that we can cancel it if in progress + const quickRescanSubscription = useRef<() => void | undefined>(); + + // gotta clean up any rescan subscriptions if the exist + useEffect(() => () => quickRescanSubscription.current?.(), []); + + const { client } = useRspcLibraryContext(); + const explorerToolOptions: ToolOption[] = [ { toolTipLabel: 'Key Manager', @@ -100,7 +107,17 @@ export const useExplorerTopBarOptions = () => { toolTipLabel: 'Reload', onClick: () => { if (explorerStore.locationId) { - reload.mutate({ location_id: explorerStore.locationId, sub_path: '' }); + quickRescanSubscription.current?.(); + quickRescanSubscription.current = client.addSubscription( + [ + 'locations.quickRescan', + { + location_id: explorerStore.locationId, + sub_path: '' + } + ], + { onData() {} } + ); } }, icon: , diff --git a/packages/client/src/core.ts b/packages/client/src/core.ts index e5c8e7ceb..95f7e170b 100644 --- a/packages/client/src/core.ts +++ b/packages/client/src/core.ts @@ -74,7 +74,6 @@ export type Procedures = { { key: "locations.fullRescan", input: LibraryArgs, result: null } | { key: "locations.indexer_rules.create", input: LibraryArgs, result: null } | { key: "locations.indexer_rules.delete", input: LibraryArgs, result: null } | - { key: "locations.quickRescan", input: LibraryArgs, result: null } | { key: "locations.relink", input: LibraryArgs, result: null } | { key: "locations.update", input: LibraryArgs, result: null } | { key: "nodes.changeNodeName", input: ChangeNodeNameArgs, result: NodeConfig } | @@ -88,23 +87,36 @@ export type Procedures = { { key: "invalidation.listen", input: never, result: InvalidateOperationEvent[] } | { key: "jobs.newThumbnail", input: LibraryArgs, result: string } | { key: "locations.online", input: never, result: number[][] } | + { key: "locations.quickRescan", input: LibraryArgs, result: null } | { key: "p2p.events", input: never, result: P2PEvent } | { key: "sync.newMessage", input: LibraryArgs, result: CRDTOperation } }; -export type FilePathSearchArgs = { take?: number | null; order?: FilePathSearchOrdering | null; cursor?: number[] | null; filter?: FilePathFilterArgs } +/** + * These are all possible algorithms that can be used for encryption and decryption + */ +export type Algorithm = "XChaCha20Poly1305" | "Aes256Gcm" -export type PeerMetadata = { name: string; operating_system: OperatingSystem | null; version: string | null; email: string | null; img_url: string | null } +export type AutomountUpdateArgs = { uuid: string; status: boolean } + +export type BuildInfo = { version: string; commit: string } + +export type CRDTOperation = { node: string; timestamp: number; id: string; typ: CRDTOperationType } + +export type CRDTOperationType = SharedOperation | RelationOperation | OwnedOperation /** - * NodeConfig is the configuration for a node. This is shared between all libraries and is stored in a JSON file on disk. + * Meow */ -export type NodeConfig = { id: string; name: string; p2p_port: number | null; p2p_email: string | null; p2p_img_url: string | null } +export type Category = "Recents" | "Favorites" | "Photos" | "Videos" | "Movies" | "Music" | "Documents" | "Downloads" | "Encrypted" | "Projects" | "Applications" | "Archives" | "Databases" | "Games" | "Books" | "Contacts" | "Trash" -/** - * This denotes the `StoredKey` version. - */ -export type StoredKeyVersion = "V1" +export type ChangeNodeNameArgs = { name: string } + +export type CreateLibraryArgs = { name: string } + +export type DiskType = "SSD" | "HDD" | "Removable" + +export type EditLibraryArgs = { id: string; name: string | null; description: string | null } /** * This should be used for passing an encrypted key around. @@ -113,152 +125,42 @@ export type StoredKeyVersion = "V1" */ export type EncryptedKey = number[] -export type PeerId = string +export type ExplorerItem = { type: "Path"; has_thumbnail: boolean; item: FilePathWithObject } | { type: "Object"; has_thumbnail: boolean; item: ObjectWithFilePaths } + +export type FileCopierJobInit = { source_location_id: number; source_path_id: number; target_location_id: number; target_path: string; target_file_name_suffix: string | null } + +export type FileCutterJobInit = { source_location_id: number; source_path_id: number; target_location_id: number; target_path: string } + +export type FileDecryptorJobInit = { location_id: number; path_id: number; mount_associated_key: boolean; output_path: string | null; password: string | null; save_to_library: boolean | null } + +export type FileDeleterJobInit = { location_id: number; path_id: number } + +export type FileEncryptorJobInit = { location_id: number; path_id: number; key_uuid: string; algorithm: Algorithm; metadata: boolean; preview_media: boolean; output_path: string | null } + +export type FileEraserJobInit = { location_id: number; path_id: number; passes: string } + +export type FilePath = { id: number; pub_id: number[]; is_dir: boolean; cas_id: string | null; integrity_checksum: string | null; location_id: number; materialized_path: string; name: string; extension: string; size_in_bytes: string; inode: number[]; device: number[]; object_id: number | null; key_id: number | null; date_created: string; date_modified: string; date_indexed: string } + +export type FilePathFilterArgs = { locationId?: number | null; search?: string; extension?: string | null; createdAt?: OptionalRange; path?: string | null; object?: ObjectFilterArgs | null } + +export type FilePathSearchArgs = { take?: number | null; order?: FilePathSearchOrdering | null; cursor?: number[] | null; filter?: FilePathFilterArgs } + +export type FilePathSearchOrdering = { name: SortOrder } | { sizeInBytes: SortOrder } | { dateCreated: SortOrder } | { dateModified: SortOrder } | { dateIndexed: SortOrder } | { object: ObjectSearchOrdering } + +export type FilePathWithObject = { id: number; pub_id: number[]; is_dir: boolean; cas_id: string | null; integrity_checksum: string | null; location_id: number; materialized_path: string; name: string; extension: string; size_in_bytes: string; inode: number[]; device: number[]; object_id: number | null; key_id: number | null; date_created: string; date_modified: string; date_indexed: string; object: Object | null } export type GenerateThumbsForLocationArgs = { id: number; path: string } -export type LibraryConfigWrapped = { uuid: string; config: LibraryConfig } - -/** - * These parameters define the password-hashing level. - * - * The greater the parameter, the longer the password will take to hash. - */ -export type Params = "Standard" | "Hardened" | "Paranoid" - -export type Location = { id: number; pub_id: number[]; node_id: number; name: string; path: string; total_capacity: number | null; available_capacity: number | null; is_archived: boolean; generate_preview_media: boolean; sync_preview_media: boolean; hidden: boolean; date_created: string } - -export type SortOrder = "Asc" | "Desc" - -export type KeyAddArgs = { algorithm: Algorithm; hashing_algorithm: HashingAlgorithm; key: Protected; library_sync: boolean; automount: boolean } - -/** - * Represents the operating system which the remote peer is running. - * This is not used internally and predominantly is designed to be used for display purposes by the embedding application. - */ -export type OperatingSystem = "Windows" | "Linux" | "MacOS" | "Ios" | "Android" | { Other: string } - export type GetArgs = { id: number } /** - * This is a stored key, and can be freely written to the database. - * - * It contains no sensitive information that is not encrypted. + * This defines all available password hashing algorithms. */ -export type StoredKey = { uuid: string; version: StoredKeyVersion; key_type: StoredKeyType; algorithm: Algorithm; hashing_algorithm: HashingAlgorithm; content_salt: Salt; master_key: EncryptedKey; master_key_nonce: Nonce; key_nonce: Nonce; key: number[]; salt: Salt; memory_only: boolean; automount: boolean } - -export type OnboardingConfig = { password: Protected; algorithm: Algorithm; hashing_algorithm: HashingAlgorithm } - -export type FileDecryptorJobInit = { location_id: number; path_id: number; mount_associated_key: boolean; output_path: string | null; password: string | null; save_to_library: boolean | null } - -export type TagCreateArgs = { name: string; color: string } - -export type LightScanArgs = { location_id: number; sub_path: string } - -export type FileEraserJobInit = { location_id: number; path_id: number; passes: string } - -/** - * This should be used for providing a nonce to encrypt/decrypt functions. - * - * You may also generate a nonce for a given algorithm with `Nonce::generate()` - */ -export type Nonce = { XChaCha20Poly1305: number[] } | { Aes256Gcm: number[] } - -export type AutomountUpdateArgs = { uuid: string; status: boolean } - -export type NodeState = ({ id: string; name: string; p2p_port: number | null; p2p_email: string | null; p2p_img_url: string | null }) & { data_path: string } - -/** - * `LocationUpdateArgs` is the argument received from the client using `rspc` to update a location. - * It contains the id of the location to be updated, possible a name to change the current location's name - * and a vector of indexer rules ids to add or remove from the location. - * - * It is important to note that only the indexer rule ids in this vector will be used from now on. - * Old rules that aren't in this vector will be purged. - */ -export type LocationUpdateArgs = { id: number; name: string | null; generate_preview_media: boolean | null; sync_preview_media: boolean | null; hidden: boolean | null; indexer_rules_ids: number[] } - -export type EditLibraryArgs = { id: string; name: string | null; description: string | null } - -export type SetNoteArgs = { id: number; note: string | null } - -export type InvalidateOperationEvent = { key: string; arg: any; result: any | null } - -export type ObjectSearchArgs = { take?: number | null; order?: ObjectSearchOrdering | null; cursor?: number[] | null; filter?: ObjectFilterArgs } - -export type CRDTOperation = { node: string; timestamp: number; id: string; typ: CRDTOperationType } - -/** - * This should be used for passing a salt around. - * - * You may also generate a salt with `Salt::generate()` - */ -export type Salt = number[] - -/** - * Meow - */ -export type Category = "Recents" | "Favorites" | "Photos" | "Videos" | "Movies" | "Music" | "Documents" | "Downloads" | "Encrypted" | "Projects" | "Applications" | "Archives" | "Databases" | "Games" | "Books" | "Contacts" | "Trash" - -/** - * TODO: P2P event for the frontend - */ -export type P2PEvent = { type: "DiscoveredPeer"; peer_id: PeerId; metadata: PeerMetadata } | { type: "SpacedropRequest"; id: string; peer_id: PeerId; name: string } - -export type FileCopierJobInit = { source_location_id: number; source_path_id: number; target_location_id: number; target_path: string; target_file_name_suffix: string | null } - -export type DiskType = "SSD" | "HDD" | "Removable" - -export type RestoreBackupArgs = { password: Protected; secret_key: Protected; path: string } - -export type SetFavoriteArgs = { id: number; favorite: boolean } - -export type FilePathFilterArgs = { locationId?: number | null; search?: string; extension?: string | null; createdAt?: OptionalRange; path?: string | null; object?: ObjectFilterArgs | null } - -export type RuleKind = "AcceptFilesByGlob" | "RejectFilesByGlob" | "AcceptIfChildrenDirectoriesArePresent" | "RejectIfChildrenDirectoriesArePresent" - -export type MediaData = { id: number; pixel_width: number | null; pixel_height: number | null; longitude: number | null; latitude: number | null; fps: number | null; capture_device_make: string | null; capture_device_model: string | null; capture_device_software: string | null; duration_seconds: number | null; codecs: string | null; streams: number | null } - -export type FilePathSearchOrdering = { name: SortOrder } | { sizeInBytes: SortOrder } | { dateCreated: SortOrder } | { dateModified: SortOrder } | { dateIndexed: SortOrder } | { object: ObjectSearchOrdering } - -export type IndexerRule = { id: number; name: string; default: boolean; rules_per_kind: number[]; date_created: string; date_modified: string } - -export type BuildInfo = { version: string; commit: string } +export type HashingAlgorithm = { name: "Argon2id"; params: Params } | { name: "BalloonBlake3"; params: Params } export type IdentifyUniqueFilesArgs = { id: number; path: string } -/** - * These are all possible algorithms that can be used for encryption and decryption - */ -export type Algorithm = "XChaCha20Poly1305" | "Aes256Gcm" - -export type Tag = { id: number; pub_id: number[]; name: string | null; color: string | null; total_objects: number | null; redundancy_goal: number | null; date_created: string; date_modified: string } - -export type OwnedOperationItem = { id: any; data: OwnedOperationData } - -export type ObjectSearchOrdering = { dateAccessed: SortOrder } - -export type CRDTOperationType = SharedOperation | RelationOperation | OwnedOperation - -export type MaybeNot = T | { not: T } - -export type SpacedropArgs = { peer_id: PeerId; file_path: string[] } - -export type JobReport = { id: string; name: string; action: string | null; data: number[] | null; metadata: any | null; is_background: boolean; errors_text: string[]; created_at: string | null; started_at: string | null; completed_at: string | null; parent_id: string | null; status: JobStatus; task_count: number; completed_task_count: number; message: string; estimated_completion: string } - -export type ObjectFilterArgs = { favorite?: boolean | null; hidden?: boolean | null; dateAccessed?: MaybeNot | null; kind?: number[]; tags?: number[] } - -export type OwnedOperation = { model: string; items: OwnedOperationItem[] } - -export type ObjectWithFilePaths = { id: number; pub_id: number[]; kind: number; key_id: number | null; hidden: boolean; favorite: boolean; important: boolean; has_thumbnail: boolean; has_thumbstrip: boolean; has_video_preview: boolean; ipfs_id: string | null; note: string | null; date_created: string; date_accessed: string | null; file_paths: FilePath[] } - -export type SharedOperation = { record_id: any; model: string; data: SharedOperationData } - -export type RelationOperationData = "Create" | { Update: { field: string; value: any } } | "Delete" - -export type FileDeleterJobInit = { location_id: number; path_id: number } - -export type CreateLibraryArgs = { name: string } +export type IndexerRule = { id: number; name: string; default: boolean; rules_per_kind: number[]; date_created: string; date_modified: string } /** * `IndexerRuleCreateArgs` is the argument received from the client using rspc to create a new indexer rule. @@ -272,11 +174,29 @@ export type CreateLibraryArgs = { name: string } */ export type IndexerRuleCreateArgs = { name: string; dry_run: boolean; rules: ([RuleKind, string[]])[] } -export type SharedOperationCreateData = { u: { [key: string]: any } } | "a" +export type InvalidateOperationEvent = { key: string; arg: any; result: any | null } -export type OptionalRange = { from: T | null; to: T | null } +export type JobReport = { id: string; name: string; action: string | null; data: number[] | null; metadata: any | null; is_background: boolean; errors_text: string[]; created_at: string | null; started_at: string | null; completed_at: string | null; parent_id: string | null; status: JobStatus; task_count: number; completed_task_count: number; message: string; estimated_completion: string } -export type FileEncryptorJobInit = { location_id: number; path_id: number; key_uuid: string; algorithm: Algorithm; metadata: boolean; preview_media: boolean; output_path: string | null } +export type JobStatus = "Queued" | "Running" | "Completed" | "Canceled" | "Failed" | "Paused" | "CompletedWithErrors" + +export type KeyAddArgs = { algorithm: Algorithm; hashing_algorithm: HashingAlgorithm; key: Protected; library_sync: boolean; automount: boolean } + +/** + * Can wrap a query argument to require it to contain a `library_id` and provide helpers for working with libraries. + */ +export type LibraryArgs = { library_id: string; arg: T } + +/** + * LibraryConfig holds the configuration for a specific library. This is stored as a '{uuid}.sdlibrary' file. + */ +export type LibraryConfig = { name: string; description: string } + +export type LibraryConfigWrapped = { uuid: string; config: LibraryConfig } + +export type LightScanArgs = { location_id: number; sub_path: string } + +export type Location = { id: number; pub_id: number[]; node_id: number; name: string; path: string; total_capacity: number | null; available_capacity: number | null; is_archived: boolean; generate_preview_media: boolean; sync_preview_media: boolean; hidden: boolean; date_created: string } /** * `LocationCreateArgs` is the argument received from the client using `rspc` to create a new location. @@ -285,66 +205,146 @@ export type FileEncryptorJobInit = { location_id: number; path_id: number; key_u */ export type LocationCreateArgs = { path: string; dry_run: boolean; indexer_rules_ids: number[] } -export type ExplorerItem = { type: "Path"; has_thumbnail: boolean; item: FilePathWithObject } | { type: "Object"; has_thumbnail: boolean; item: ObjectWithFilePaths } - /** - * Can wrap a query argument to require it to contain a `library_id` and provide helpers for working with libraries. + * `LocationUpdateArgs` is the argument received from the client using `rspc` to update a location. + * It contains the id of the location to be updated, possible a name to change the current location's name + * and a vector of indexer rules ids to add or remove from the location. + * + * It is important to note that only the indexer rule ids in this vector will be used from now on. + * Old rules that aren't in this vector will be purged. */ -export type LibraryArgs = { library_id: string; arg: T } - -export type UnlockKeyManagerArgs = { password: Protected; secret_key: Protected } - -export type FileCutterJobInit = { source_location_id: number; source_path_id: number; target_location_id: number; target_path: string } - -export type OwnedOperationData = { Create: { [key: string]: any } } | { CreateMany: { values: ([any, { [key: string]: any }])[]; skip_duplicates: boolean } } | { Update: { [key: string]: any } } | "Delete" - -export type SharedOperationData = SharedOperationCreateData | { field: string; value: any } | null - -export type Node = { id: number; pub_id: number[]; name: string; platform: number; version: string | null; last_seen: string; timezone: string | null; date_created: string } - -export type Volume = { name: string; mount_point: string; total_capacity: string; available_capacity: string; is_removable: boolean; disk_type: DiskType | null; file_system: string | null; is_root_filesystem: boolean } - -export type TagUpdateArgs = { id: number; name: string | null; color: string | null } - -export type MasterPasswordChangeArgs = { password: Protected; algorithm: Algorithm; hashing_algorithm: HashingAlgorithm } - -export type ObjectValidatorArgs = { id: number; path: string } - -export type TagAssignArgs = { object_id: number; tag_id: number; unassign: boolean } - -/** - * This defines all available password hashing algorithms. - */ -export type HashingAlgorithm = { name: "Argon2id"; params: Params } | { name: "BalloonBlake3"; params: Params } - -export type JobStatus = "Queued" | "Running" | "Completed" | "Canceled" | "Failed" | "Paused" | "CompletedWithErrors" - -export type FilePathWithObject = { id: number; pub_id: number[]; is_dir: boolean; cas_id: string | null; integrity_checksum: string | null; location_id: number; materialized_path: string; name: string; extension: string; size_in_bytes: string; inode: number[]; device: number[]; object_id: number | null; key_id: number | null; date_created: string; date_modified: string; date_indexed: string; object: Object | null } +export type LocationUpdateArgs = { id: number; name: string | null; generate_preview_media: boolean | null; sync_preview_media: boolean | null; hidden: boolean | null; indexer_rules_ids: number[] } export type LocationWithIndexerRules = { id: number; pub_id: number[]; node_id: number; name: string; path: string; total_capacity: number | null; available_capacity: number | null; is_archived: boolean; generate_preview_media: boolean; sync_preview_media: boolean; hidden: boolean; date_created: string; indexer_rules: { indexer_rule: IndexerRule }[] } +export type MasterPasswordChangeArgs = { password: Protected; algorithm: Algorithm; hashing_algorithm: HashingAlgorithm } + +export type MaybeNot = T | { not: T } + +export type MediaData = { id: number; pixel_width: number | null; pixel_height: number | null; longitude: number | null; latitude: number | null; fps: number | null; capture_device_make: string | null; capture_device_model: string | null; capture_device_software: string | null; duration_seconds: number | null; codecs: string | null; streams: number | null } + +export type Node = { id: number; pub_id: number[]; name: string; platform: number; version: string | null; last_seen: string; timezone: string | null; date_created: string } + /** - * LibraryConfig holds the configuration for a specific library. This is stored as a '{uuid}.sdlibrary' file. + * NodeConfig is the configuration for a node. This is shared between all libraries and is stored in a JSON file on disk. */ -export type LibraryConfig = { name: string; description: string } +export type NodeConfig = { id: string; name: string; p2p_port: number | null; p2p_email: string | null; p2p_img_url: string | null } -export type SearchData = { cursor: number[] | null; items: T[] } +export type NodeState = ({ id: string; name: string; p2p_port: number | null; p2p_email: string | null; p2p_img_url: string | null }) & { data_path: string } -export type FilePath = { id: number; pub_id: number[]; is_dir: boolean; cas_id: string | null; integrity_checksum: string | null; location_id: number; materialized_path: string; name: string; extension: string; size_in_bytes: string; inode: number[]; device: number[]; object_id: number | null; key_id: number | null; date_created: string; date_modified: string; date_indexed: string } - -export type Statistics = { id: number; date_captured: string; total_object_count: number; library_db_size: string; total_bytes_used: string; total_bytes_capacity: string; total_unique_bytes: string; total_bytes_free: string; preview_media_bytes: string } - -export type Protected = T - -export type ChangeNodeNameArgs = { name: string } +/** + * This should be used for providing a nonce to encrypt/decrypt functions. + * + * You may also generate a nonce for a given algorithm with `Nonce::generate()` + */ +export type Nonce = { XChaCha20Poly1305: number[] } | { Aes256Gcm: number[] } export type Object = { id: number; pub_id: number[]; kind: number; key_id: number | null; hidden: boolean; favorite: boolean; important: boolean; has_thumbnail: boolean; has_thumbstrip: boolean; has_video_preview: boolean; ipfs_id: string | null; note: string | null; date_created: string; date_accessed: string | null } -export type RenameFileArgs = { location_id: number; file_name: string; new_file_name: string } +export type ObjectFilterArgs = { favorite?: boolean | null; hidden?: boolean | null; dateAccessed?: MaybeNot | null; kind?: number[]; tags?: number[] } + +export type ObjectSearchArgs = { take?: number | null; order?: ObjectSearchOrdering | null; cursor?: number[] | null; filter?: ObjectFilterArgs } + +export type ObjectSearchOrdering = { dateAccessed: SortOrder } + +export type ObjectValidatorArgs = { id: number; path: string } + +export type ObjectWithFilePaths = { id: number; pub_id: number[]; kind: number; key_id: number | null; hidden: boolean; favorite: boolean; important: boolean; has_thumbnail: boolean; has_thumbstrip: boolean; has_video_preview: boolean; ipfs_id: string | null; note: string | null; date_created: string; date_accessed: string | null; file_paths: FilePath[] } + +export type OnboardingConfig = { password: Protected; algorithm: Algorithm; hashing_algorithm: HashingAlgorithm } + +/** + * Represents the operating system which the remote peer is running. + * This is not used internally and predominantly is designed to be used for display purposes by the embedding application. + */ +export type OperatingSystem = "Windows" | "Linux" | "MacOS" | "Ios" | "Android" | { Other: string } + +export type OptionalRange = { from: T | null; to: T | null } + +export type OwnedOperation = { model: string; items: OwnedOperationItem[] } + +export type OwnedOperationData = { Create: { [key: string]: any } } | { CreateMany: { values: ([any, { [key: string]: any }])[]; skip_duplicates: boolean } } | { Update: { [key: string]: any } } | "Delete" + +export type OwnedOperationItem = { id: any; data: OwnedOperationData } + +/** + * TODO: P2P event for the frontend + */ +export type P2PEvent = { type: "DiscoveredPeer"; peer_id: PeerId; metadata: PeerMetadata } | { type: "SpacedropRequest"; id: string; peer_id: PeerId; name: string } + +/** + * These parameters define the password-hashing level. + * + * The greater the parameter, the longer the password will take to hash. + */ +export type Params = "Standard" | "Hardened" | "Paranoid" + +export type PeerId = string + +export type PeerMetadata = { name: string; operating_system: OperatingSystem | null; version: string | null; email: string | null; img_url: string | null } + +export type Protected = T export type RelationOperation = { relation_item: string; relation_group: string; relation: string; data: RelationOperationData } +export type RelationOperationData = "Create" | { Update: { field: string; value: any } } | "Delete" + +export type RenameFileArgs = { location_id: number; file_name: string; new_file_name: string } + +export type RestoreBackupArgs = { password: Protected; secret_key: Protected; path: string } + +export type RuleKind = "AcceptFilesByGlob" | "RejectFilesByGlob" | "AcceptIfChildrenDirectoriesArePresent" | "RejectIfChildrenDirectoriesArePresent" + +/** + * This should be used for passing a salt around. + * + * You may also generate a salt with `Salt::generate()` + */ +export type Salt = number[] + +export type SearchData = { cursor: number[] | null; items: T[] } + +export type SetFavoriteArgs = { id: number; favorite: boolean } + +export type SetNoteArgs = { id: number; note: string | null } + +export type SharedOperation = { record_id: any; model: string; data: SharedOperationData } + +export type SharedOperationCreateData = { u: { [key: string]: any } } | "a" + +export type SharedOperationData = SharedOperationCreateData | { field: string; value: any } | null + +export type SortOrder = "Asc" | "Desc" + +export type SpacedropArgs = { peer_id: PeerId; file_path: string[] } + +export type Statistics = { id: number; date_captured: string; total_object_count: number; library_db_size: string; total_bytes_used: string; total_bytes_capacity: string; total_unique_bytes: string; total_bytes_free: string; preview_media_bytes: string } + +/** + * This is a stored key, and can be freely written to the database. + * + * It contains no sensitive information that is not encrypted. + */ +export type StoredKey = { uuid: string; version: StoredKeyVersion; key_type: StoredKeyType; algorithm: Algorithm; hashing_algorithm: HashingAlgorithm; content_salt: Salt; master_key: EncryptedKey; master_key_nonce: Nonce; key_nonce: Nonce; key: number[]; salt: Salt; memory_only: boolean; automount: boolean } + /** * This denotes the type of key. `Root` keys can be used to unlock the key manager, and `User` keys are ordinary keys. */ export type StoredKeyType = "User" | "Root" + +/** + * This denotes the `StoredKey` version. + */ +export type StoredKeyVersion = "V1" + +export type Tag = { id: number; pub_id: number[]; name: string | null; color: string | null; total_objects: number | null; redundancy_goal: number | null; date_created: string; date_modified: string } + +export type TagAssignArgs = { object_id: number; tag_id: number; unassign: boolean } + +export type TagCreateArgs = { name: string; color: string } + +export type TagUpdateArgs = { id: number; name: string | null; color: string | null } + +export type UnlockKeyManagerArgs = { password: Protected; secret_key: Protected } + +export type Volume = { name: string; mount_point: string; total_capacity: string; available_capacity: string; is_removable: boolean; disk_type: DiskType | null; file_system: string | null; is_root_filesystem: boolean }