[ENG-245] Object Validator Job (#414)

* use `.to_hex()` for blake3 hashes

* move file hashing function+fix large stack alloc

* init validator job

* change branch name

* add object validator job

* formatting

Co-authored-by: Jamie Pine <ijamespine@me.com>
This commit is contained in:
jake 2022-10-18 13:39:33 +01:00 committed by GitHub
parent 5fcc6c4f31
commit 3f2b4cda38
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 258 additions and 21 deletions

View file

@ -26,6 +26,7 @@ export type Procedures = {
{ key: "files.setNote", input: LibraryArgs<SetNoteArgs>, result: null } |
{ key: "jobs.generateThumbsForLocation", input: LibraryArgs<GenerateThumbsForLocationArgs>, result: null } |
{ key: "jobs.identifyUniqueFiles", input: LibraryArgs<IdentifyUniqueFilesArgs>, result: null } |
{ key: "jobs.objectValidator", input: LibraryArgs<ObjectValidatorArgs>, result: null } |
{ key: "library.create", input: string, result: LibraryConfigWrapped } |
{ key: "library.delete", input: string, result: null } |
{ key: "library.edit", input: EditLibraryArgs, result: null } |
@ -93,6 +94,8 @@ export interface NodeState { version: string | null, id: string, name: string, p
export interface Object { id: number, cas_id: string, integrity_checksum: string | null, name: string | null, extension: string | null, kind: number, size_in_bytes: string, key_id: number | null, hidden: boolean, favorite: boolean, important: boolean, has_thumbnail: boolean, has_thumbstrip: boolean, has_video_preview: boolean, ipfs_id: string | null, note: string | null, date_created: string, date_modified: string, date_indexed: string }
export interface ObjectValidatorArgs { id: number, path: string }
export type RuleKind = "AcceptFilesByGlob" | "RejectFilesByGlob" | "AcceptIfChildrenDirectoriesArePresent" | "RejectIfChildrenDirectoriesArePresent"
export interface SetFavoriteArgs { id: number, favorite: boolean }

View file

@ -4,6 +4,7 @@ use crate::{
object::{
identifier_job::{FileIdentifierJob, FileIdentifierJobInit},
preview::{ThumbnailJob, ThumbnailJobInit},
validation::validator_job::{ObjectValidatorJob, ObjectValidatorJobInit},
},
prisma::location,
};
@ -56,6 +57,35 @@ pub(crate) fn mount() -> RouterBuilder {
},
)
})
.library_mutation("objectValidator", |t| {
#[derive(Type, Deserialize)]
pub struct ObjectValidatorArgs {
pub id: i32,
pub path: PathBuf,
}
t(|_, args: ObjectValidatorArgs, library| async move {
if fetch_location(&library, args.id).exec().await?.is_none() {
return Err(rspc::Error::new(
ErrorCode::NotFound,
"Location not found".into(),
));
}
library
.spawn_job(Job::new(
ObjectValidatorJobInit {
location_id: args.id,
path: args.path,
background: true,
},
Box::new(ObjectValidatorJob {}),
))
.await;
Ok(())
})
})
.library_mutation("identifyUniqueFiles", |t| {
#[derive(Type, Deserialize)]
pub struct IdentifyUniqueFilesArgs {

View file

@ -5,6 +5,7 @@ use crate::{
object::{
identifier_job::{FileIdentifierJob, FileIdentifierJobInit},
preview::{ThumbnailJob, ThumbnailJobInit},
validation::validator_job::{ObjectValidatorJob, ObjectValidatorJobInit},
},
prisma::{indexer_rules_in_location, location, node},
};
@ -264,6 +265,15 @@ pub async fn scan_location(
Box::new(ThumbnailJob {}),
))
.await;
ctx.queue_job(Job::new(
ObjectValidatorJobInit {
location_id,
path: PathBuf::new(),
background: true,
},
Box::new(ObjectValidatorJob {}),
))
.await;
Ok(())
}

View file

@ -17,10 +17,6 @@ async fn read_at(file: &mut File, offset: u64, size: u64) -> Result<Vec<u8>, io:
Ok(buf)
}
fn to_hex_string(b: &[u8]) -> String {
b.iter().map(|c| format!("{:02x}", c)).collect::<String>()
}
pub async fn generate_cas_id(path: PathBuf, size: u64) -> Result<String, io::Error> {
// open file reference
let mut file = File::open(path).await?;
@ -46,25 +42,25 @@ pub async fn generate_cas_id(path: PathBuf, size: u64) -> Result<String, io::Err
hasher.update(&buf);
}
let hex = to_hex_string(hasher.finalize().as_bytes());
let hex = hasher.finalize().to_hex();
Ok(hex)
}
// pub async fn full_checksum(path: &str) -> Result<String, io::Error> {
// const BLOCK_SIZE: usize = 1048576;
// //read file as buffer and convert to digest
// let mut reader = File::open(path).await?;
// let mut context = Hasher::new();
// let mut buffer = [0; 1048576];
// loop {
// let read_count = reader.read(&mut buffer).await?;
// context.update(&buffer[..read_count]);
// if read_count != BLOCK_SIZE {
// break;
// }
// }
// let hex = to_hex_string(context.finalize().as_bytes());
pub async fn full_checksum(path: &str) -> Result<String, io::Error> {
const BLOCK_SIZE: usize = 1048576;
//read file as buffer and convert to digest
let mut reader = File::open(path).await?;
let mut context = Hasher::new();
let mut buffer = [0; 1048576];
loop {
let read_count = reader.read(&mut buffer).await?;
context.update(&buffer[..read_count]);
if read_count != BLOCK_SIZE {
break;
}
}
let hex = to_hex_string(context.finalize().as_bytes());
// Ok(hex)
// }
Ok(hex)
}

View file

@ -1,6 +1,7 @@
pub mod cas;
pub mod identifier_job;
pub mod preview;
pub mod validation;
// Objects are primarily created by the identifier from Paths
// Some Objects are purely virtual, unless they have one or more associated Paths, which refer to a file found in a Location

View file

@ -0,0 +1,24 @@
use blake3::Hasher;
use std::path::PathBuf;
use tokio::{
fs::File,
io::{self, AsyncReadExt},
};
const BLOCK_SIZE: usize = 1048576;
pub async fn file_checksum(path: PathBuf) -> Result<String, io::Error> {
let mut reader = File::open(path).await?;
let mut context = Hasher::new();
let mut buffer = vec![0; BLOCK_SIZE].into_boxed_slice();
loop {
let read_count = reader.read(&mut buffer).await?;
context.update(&buffer[..read_count]);
if read_count != BLOCK_SIZE {
break;
}
}
let hex = context.finalize().to_hex();
Ok(hex.to_string())
}

View file

@ -0,0 +1,2 @@
pub mod hash;
pub mod validator_job;

View file

@ -0,0 +1,153 @@
use serde::{Deserialize, Serialize};
use std::{collections::VecDeque, path::PathBuf};
use crate::{
job::{JobError, JobReportUpdate, JobResult, JobState, StatefulJob, WorkerContext},
prisma::{self, file_path, location, object},
};
use tracing::info;
use super::hash::file_checksum;
// The Validator is able to:
// - generate a full byte checksum for Objects in a Location
// - generate checksums for all Objects missing without one
// - compare two objects and return true if they are the same
pub struct ObjectValidatorJob {}
#[derive(Serialize, Deserialize, Debug)]
pub struct ObjectValidatorJobState {
pub root_path: PathBuf,
pub task_count: usize,
}
// The validator can
#[derive(Serialize, Deserialize, Debug)]
pub struct ObjectValidatorJobInit {
pub location_id: i32,
pub path: PathBuf,
pub background: bool,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct ObjectValidatorJobStep {
pub path: file_path::Data,
}
#[async_trait::async_trait]
impl StatefulJob for ObjectValidatorJob {
type Data = ObjectValidatorJobState;
type Init = ObjectValidatorJobInit;
type Step = ObjectValidatorJobStep;
fn name(&self) -> &'static str {
"object_validator"
}
async fn init(
&self,
ctx: WorkerContext,
state: &mut JobState<Self::Init, Self::Data, Self::Step>,
) -> Result<(), JobError> {
let library_ctx = ctx.library_ctx();
state.steps = library_ctx
.db
.file_path()
.find_many(vec![file_path::location_id::equals(state.init.location_id)])
.exec()
.await?
.into_iter()
.map(|path| ObjectValidatorJobStep { path })
.collect::<VecDeque<_>>();
let location = library_ctx
.db
.location()
.find_unique(location::id::equals(state.init.location_id))
.exec()
.await?
.unwrap();
state.data = Some(ObjectValidatorJobState {
root_path: location.local_path.as_ref().map(PathBuf::from).unwrap(),
task_count: state.steps.len(),
});
ctx.progress(vec![JobReportUpdate::TaskCount(state.steps.len())]);
Ok(())
}
async fn execute_step(
&self,
ctx: WorkerContext,
state: &mut JobState<Self::Init, Self::Data, Self::Step>,
) -> Result<(), JobError> {
let step = &state.steps[0];
let library_ctx = ctx.library_ctx();
let data = state.data.as_ref().expect("fatal: missing job state");
let path = data.root_path.join(&step.path.materialized_path);
// skip directories
if path.is_dir() {
return Ok(());
}
if let Some(object_id) = step.path.object_id {
// this is to skip files that already have checksums
// i'm unsure what the desired behaviour is in this case
// we can also compare old and new checksums here
let object = library_ctx
.db
.object()
.find_unique(object::id::equals(object_id))
.exec()
.await?
.unwrap();
if object.integrity_checksum.is_some() {
return Ok(());
}
let hash = file_checksum(path).await?;
library_ctx
.db
.object()
.update(
object::id::equals(object_id),
vec![prisma::object::SetParam::SetIntegrityChecksum(Some(hash))],
)
.exec()
.await?;
}
ctx.progress(vec![JobReportUpdate::CompletedTaskCount(
state.step_number + 1,
)]);
Ok(())
}
async fn finalize(
&self,
_ctx: WorkerContext,
state: &mut JobState<Self::Init, Self::Data, Self::Step>,
) -> JobResult {
let data = state
.data
.as_ref()
.expect("critical error: missing data on job state");
info!(
"finalizing validator job at {}: {} tasks",
data.root_path.display(),
data.task_count
);
Ok(Some(serde_json::to_value(&state.init)?))
}
}

View file

@ -26,6 +26,7 @@ export type Procedures = {
{ key: "files.setNote", input: LibraryArgs<SetNoteArgs>, result: null } |
{ key: "jobs.generateThumbsForLocation", input: LibraryArgs<GenerateThumbsForLocationArgs>, result: null } |
{ key: "jobs.identifyUniqueFiles", input: LibraryArgs<IdentifyUniqueFilesArgs>, result: null } |
{ key: "jobs.objectValidator", input: LibraryArgs<ObjectValidatorArgs>, result: null } |
{ key: "library.create", input: string, result: LibraryConfigWrapped } |
{ key: "library.delete", input: string, result: null } |
{ key: "library.edit", input: EditLibraryArgs, result: null } |
@ -93,6 +94,8 @@ export interface NodeState { version: string | null, id: string, name: string, p
export interface Object { id: number, cas_id: string, integrity_checksum: string | null, name: string | null, extension: string | null, kind: number, size_in_bytes: string, key_id: number | null, hidden: boolean, favorite: boolean, important: boolean, has_thumbnail: boolean, has_thumbstrip: boolean, has_video_preview: boolean, ipfs_id: string | null, note: string | null, date_created: string, date_modified: string, date_indexed: string }
export interface ObjectValidatorArgs { id: number, path: string }
export type RuleKind = "AcceptFilesByGlob" | "RejectFilesByGlob" | "AcceptIfChildrenDirectoriesArePresent" | "RejectIfChildrenDirectoriesArePresent"
export interface SetFavoriteArgs { id: number, favorite: boolean }

View file

@ -142,6 +142,15 @@ export const TopBar: React.FC<TopBarProps> = (props) => {
}
});
const { mutate: objectValidator } = useLibraryMutation(
'jobs.objectValidator',
{
onMutate: (data) => {
// console.log('ObjectValidator', data);
}
}
);
const navigate = useNavigate();
//create function to focus on search box when cmd+k is pressed
@ -312,6 +321,12 @@ export const TopBar: React.FC<TopBarProps> = (props) => {
icon: ArrowsClockwise,
onPress: () =>
store.locationId && identifyUniqueFiles({ id: store.locationId, path: '' })
},
{
name: 'Validate Objects',
icon: ArrowsClockwise,
onPress: () =>
store.locationId && objectValidator({ id: store.locationId, path: '' })
}
]
]}