From 3f2b4cda3841a06fd9846bd17f42bd8a3d04f515 Mon Sep 17 00:00:00 2001 From: jake <77554505+brxken128@users.noreply.github.com> Date: Tue, 18 Oct 2022 13:39:33 +0100 Subject: [PATCH] [ENG-245] Object Validator Job (#414) * use `.to_hex()` for blake3 hashes * move file hashing function+fix large stack alloc * init validator job * change branch name * add object validator job * formatting Co-authored-by: Jamie Pine --- apps/mobile/src/types/bindings.ts | 3 + core/src/api/jobs.rs | 30 ++++ core/src/location/mod.rs | 10 ++ core/src/object/cas.rs | 38 ++--- core/src/object/mod.rs | 1 + core/src/object/validation/hash.rs | 24 +++ core/src/object/validation/mod.rs | 2 + core/src/object/validation/validator_job.rs | 153 ++++++++++++++++++ packages/client/src/core.ts | 3 + .../components/explorer/ExplorerTopBar.tsx | 15 ++ 10 files changed, 258 insertions(+), 21 deletions(-) diff --git a/apps/mobile/src/types/bindings.ts b/apps/mobile/src/types/bindings.ts index 5ad4ffda2..f9a387595 100644 --- a/apps/mobile/src/types/bindings.ts +++ b/apps/mobile/src/types/bindings.ts @@ -26,6 +26,7 @@ export type Procedures = { { key: "files.setNote", input: LibraryArgs, result: null } | { key: "jobs.generateThumbsForLocation", input: LibraryArgs, result: null } | { key: "jobs.identifyUniqueFiles", input: LibraryArgs, result: null } | + { key: "jobs.objectValidator", input: LibraryArgs, result: null } | { key: "library.create", input: string, result: LibraryConfigWrapped } | { key: "library.delete", input: string, result: null } | { key: "library.edit", input: EditLibraryArgs, result: null } | @@ -93,6 +94,8 @@ export interface NodeState { version: string | null, id: string, name: string, p export interface Object { id: number, cas_id: string, integrity_checksum: string | null, name: string | null, extension: string | null, kind: number, size_in_bytes: string, key_id: number | null, hidden: boolean, favorite: boolean, important: boolean, has_thumbnail: boolean, has_thumbstrip: boolean, has_video_preview: boolean, ipfs_id: string | null, note: string | null, date_created: string, date_modified: string, date_indexed: string } +export interface ObjectValidatorArgs { id: number, path: string } + export type RuleKind = "AcceptFilesByGlob" | "RejectFilesByGlob" | "AcceptIfChildrenDirectoriesArePresent" | "RejectIfChildrenDirectoriesArePresent" export interface SetFavoriteArgs { id: number, favorite: boolean } diff --git a/core/src/api/jobs.rs b/core/src/api/jobs.rs index c27092f47..3455850e8 100644 --- a/core/src/api/jobs.rs +++ b/core/src/api/jobs.rs @@ -4,6 +4,7 @@ use crate::{ object::{ identifier_job::{FileIdentifierJob, FileIdentifierJobInit}, preview::{ThumbnailJob, ThumbnailJobInit}, + validation::validator_job::{ObjectValidatorJob, ObjectValidatorJobInit}, }, prisma::location, }; @@ -56,6 +57,35 @@ pub(crate) fn mount() -> RouterBuilder { }, ) }) + .library_mutation("objectValidator", |t| { + #[derive(Type, Deserialize)] + pub struct ObjectValidatorArgs { + pub id: i32, + pub path: PathBuf, + } + + t(|_, args: ObjectValidatorArgs, library| async move { + if fetch_location(&library, args.id).exec().await?.is_none() { + return Err(rspc::Error::new( + ErrorCode::NotFound, + "Location not found".into(), + )); + } + + library + .spawn_job(Job::new( + ObjectValidatorJobInit { + location_id: args.id, + path: args.path, + background: true, + }, + Box::new(ObjectValidatorJob {}), + )) + .await; + + Ok(()) + }) + }) .library_mutation("identifyUniqueFiles", |t| { #[derive(Type, Deserialize)] pub struct IdentifyUniqueFilesArgs { diff --git a/core/src/location/mod.rs b/core/src/location/mod.rs index 8621942d4..4a7100213 100644 --- a/core/src/location/mod.rs +++ b/core/src/location/mod.rs @@ -5,6 +5,7 @@ use crate::{ object::{ identifier_job::{FileIdentifierJob, FileIdentifierJobInit}, preview::{ThumbnailJob, ThumbnailJobInit}, + validation::validator_job::{ObjectValidatorJob, ObjectValidatorJobInit}, }, prisma::{indexer_rules_in_location, location, node}, }; @@ -264,6 +265,15 @@ pub async fn scan_location( Box::new(ThumbnailJob {}), )) .await; + ctx.queue_job(Job::new( + ObjectValidatorJobInit { + location_id, + path: PathBuf::new(), + background: true, + }, + Box::new(ObjectValidatorJob {}), + )) + .await; Ok(()) } diff --git a/core/src/object/cas.rs b/core/src/object/cas.rs index 3c558b3f1..005d495c7 100644 --- a/core/src/object/cas.rs +++ b/core/src/object/cas.rs @@ -17,10 +17,6 @@ async fn read_at(file: &mut File, offset: u64, size: u64) -> Result, io: Ok(buf) } -fn to_hex_string(b: &[u8]) -> String { - b.iter().map(|c| format!("{:02x}", c)).collect::() -} - pub async fn generate_cas_id(path: PathBuf, size: u64) -> Result { // open file reference let mut file = File::open(path).await?; @@ -46,25 +42,25 @@ pub async fn generate_cas_id(path: PathBuf, size: u64) -> Result Result { -// const BLOCK_SIZE: usize = 1048576; -// //read file as buffer and convert to digest -// let mut reader = File::open(path).await?; -// let mut context = Hasher::new(); -// let mut buffer = [0; 1048576]; -// loop { -// let read_count = reader.read(&mut buffer).await?; -// context.update(&buffer[..read_count]); -// if read_count != BLOCK_SIZE { -// break; -// } -// } -// let hex = to_hex_string(context.finalize().as_bytes()); +pub async fn full_checksum(path: &str) -> Result { + const BLOCK_SIZE: usize = 1048576; + //read file as buffer and convert to digest + let mut reader = File::open(path).await?; + let mut context = Hasher::new(); + let mut buffer = [0; 1048576]; + loop { + let read_count = reader.read(&mut buffer).await?; + context.update(&buffer[..read_count]); + if read_count != BLOCK_SIZE { + break; + } + } + let hex = to_hex_string(context.finalize().as_bytes()); -// Ok(hex) -// } + Ok(hex) +} diff --git a/core/src/object/mod.rs b/core/src/object/mod.rs index 926889a69..c5890d1a2 100644 --- a/core/src/object/mod.rs +++ b/core/src/object/mod.rs @@ -1,6 +1,7 @@ pub mod cas; pub mod identifier_job; pub mod preview; +pub mod validation; // Objects are primarily created by the identifier from Paths // Some Objects are purely virtual, unless they have one or more associated Paths, which refer to a file found in a Location diff --git a/core/src/object/validation/hash.rs b/core/src/object/validation/hash.rs index e69de29bb..8a0f3ecbc 100644 --- a/core/src/object/validation/hash.rs +++ b/core/src/object/validation/hash.rs @@ -0,0 +1,24 @@ +use blake3::Hasher; +use std::path::PathBuf; +use tokio::{ + fs::File, + io::{self, AsyncReadExt}, +}; + +const BLOCK_SIZE: usize = 1048576; + +pub async fn file_checksum(path: PathBuf) -> Result { + let mut reader = File::open(path).await?; + let mut context = Hasher::new(); + let mut buffer = vec![0; BLOCK_SIZE].into_boxed_slice(); + loop { + let read_count = reader.read(&mut buffer).await?; + context.update(&buffer[..read_count]); + if read_count != BLOCK_SIZE { + break; + } + } + let hex = context.finalize().to_hex(); + + Ok(hex.to_string()) +} diff --git a/core/src/object/validation/mod.rs b/core/src/object/validation/mod.rs index e69de29bb..372e0c428 100644 --- a/core/src/object/validation/mod.rs +++ b/core/src/object/validation/mod.rs @@ -0,0 +1,2 @@ +pub mod hash; +pub mod validator_job; diff --git a/core/src/object/validation/validator_job.rs b/core/src/object/validation/validator_job.rs index e69de29bb..403e85791 100644 --- a/core/src/object/validation/validator_job.rs +++ b/core/src/object/validation/validator_job.rs @@ -0,0 +1,153 @@ +use serde::{Deserialize, Serialize}; + +use std::{collections::VecDeque, path::PathBuf}; + +use crate::{ + job::{JobError, JobReportUpdate, JobResult, JobState, StatefulJob, WorkerContext}, + prisma::{self, file_path, location, object}, +}; + +use tracing::info; + +use super::hash::file_checksum; + +// The Validator is able to: +// - generate a full byte checksum for Objects in a Location +// - generate checksums for all Objects missing without one +// - compare two objects and return true if they are the same +pub struct ObjectValidatorJob {} + +#[derive(Serialize, Deserialize, Debug)] +pub struct ObjectValidatorJobState { + pub root_path: PathBuf, + pub task_count: usize, +} + +// The validator can +#[derive(Serialize, Deserialize, Debug)] +pub struct ObjectValidatorJobInit { + pub location_id: i32, + pub path: PathBuf, + pub background: bool, +} + +#[derive(Serialize, Deserialize, Debug)] +pub struct ObjectValidatorJobStep { + pub path: file_path::Data, +} + +#[async_trait::async_trait] +impl StatefulJob for ObjectValidatorJob { + type Data = ObjectValidatorJobState; + type Init = ObjectValidatorJobInit; + type Step = ObjectValidatorJobStep; + + fn name(&self) -> &'static str { + "object_validator" + } + + async fn init( + &self, + ctx: WorkerContext, + state: &mut JobState, + ) -> Result<(), JobError> { + let library_ctx = ctx.library_ctx(); + + state.steps = library_ctx + .db + .file_path() + .find_many(vec![file_path::location_id::equals(state.init.location_id)]) + .exec() + .await? + .into_iter() + .map(|path| ObjectValidatorJobStep { path }) + .collect::>(); + + let location = library_ctx + .db + .location() + .find_unique(location::id::equals(state.init.location_id)) + .exec() + .await? + .unwrap(); + + state.data = Some(ObjectValidatorJobState { + root_path: location.local_path.as_ref().map(PathBuf::from).unwrap(), + task_count: state.steps.len(), + }); + + ctx.progress(vec![JobReportUpdate::TaskCount(state.steps.len())]); + + Ok(()) + } + + async fn execute_step( + &self, + ctx: WorkerContext, + state: &mut JobState, + ) -> Result<(), JobError> { + let step = &state.steps[0]; + let library_ctx = ctx.library_ctx(); + + let data = state.data.as_ref().expect("fatal: missing job state"); + + let path = data.root_path.join(&step.path.materialized_path); + + // skip directories + if path.is_dir() { + return Ok(()); + } + + if let Some(object_id) = step.path.object_id { + // this is to skip files that already have checksums + // i'm unsure what the desired behaviour is in this case + // we can also compare old and new checksums here + let object = library_ctx + .db + .object() + .find_unique(object::id::equals(object_id)) + .exec() + .await? + .unwrap(); + if object.integrity_checksum.is_some() { + return Ok(()); + } + + let hash = file_checksum(path).await?; + + library_ctx + .db + .object() + .update( + object::id::equals(object_id), + vec![prisma::object::SetParam::SetIntegrityChecksum(Some(hash))], + ) + .exec() + .await?; + } + + ctx.progress(vec![JobReportUpdate::CompletedTaskCount( + state.step_number + 1, + )]); + + Ok(()) + } + + async fn finalize( + &self, + _ctx: WorkerContext, + state: &mut JobState, + ) -> JobResult { + let data = state + .data + .as_ref() + .expect("critical error: missing data on job state"); + info!( + "finalizing validator job at {}: {} tasks", + data.root_path.display(), + data.task_count + ); + + Ok(Some(serde_json::to_value(&state.init)?)) + } +} diff --git a/packages/client/src/core.ts b/packages/client/src/core.ts index 5ad4ffda2..f9a387595 100644 --- a/packages/client/src/core.ts +++ b/packages/client/src/core.ts @@ -26,6 +26,7 @@ export type Procedures = { { key: "files.setNote", input: LibraryArgs, result: null } | { key: "jobs.generateThumbsForLocation", input: LibraryArgs, result: null } | { key: "jobs.identifyUniqueFiles", input: LibraryArgs, result: null } | + { key: "jobs.objectValidator", input: LibraryArgs, result: null } | { key: "library.create", input: string, result: LibraryConfigWrapped } | { key: "library.delete", input: string, result: null } | { key: "library.edit", input: EditLibraryArgs, result: null } | @@ -93,6 +94,8 @@ export interface NodeState { version: string | null, id: string, name: string, p export interface Object { id: number, cas_id: string, integrity_checksum: string | null, name: string | null, extension: string | null, kind: number, size_in_bytes: string, key_id: number | null, hidden: boolean, favorite: boolean, important: boolean, has_thumbnail: boolean, has_thumbstrip: boolean, has_video_preview: boolean, ipfs_id: string | null, note: string | null, date_created: string, date_modified: string, date_indexed: string } +export interface ObjectValidatorArgs { id: number, path: string } + export type RuleKind = "AcceptFilesByGlob" | "RejectFilesByGlob" | "AcceptIfChildrenDirectoriesArePresent" | "RejectIfChildrenDirectoriesArePresent" export interface SetFavoriteArgs { id: number, favorite: boolean } diff --git a/packages/interface/src/components/explorer/ExplorerTopBar.tsx b/packages/interface/src/components/explorer/ExplorerTopBar.tsx index ce647d6e6..c3283fee8 100644 --- a/packages/interface/src/components/explorer/ExplorerTopBar.tsx +++ b/packages/interface/src/components/explorer/ExplorerTopBar.tsx @@ -142,6 +142,15 @@ export const TopBar: React.FC = (props) => { } }); + const { mutate: objectValidator } = useLibraryMutation( + 'jobs.objectValidator', + { + onMutate: (data) => { + // console.log('ObjectValidator', data); + } + } + ); + const navigate = useNavigate(); //create function to focus on search box when cmd+k is pressed @@ -312,6 +321,12 @@ export const TopBar: React.FC = (props) => { icon: ArrowsClockwise, onPress: () => store.locationId && identifyUniqueFiles({ id: store.locationId, path: '' }) + }, + { + name: 'Validate Objects', + icon: ArrowsClockwise, + onPress: () => + store.locationId && objectValidator({ id: store.locationId, path: '' }) } ] ]}