Job System and Watcher improvements (#1030)

* Trying to make watcher more resilient between creates and updates

* More checks on create vs update

* Rust fmt

* PCR rev

* Windows CI fix

* Some indexer improvements

* Builder pattern for jobs and reports

* fix list view invalid date

* fix progress bar

* Normalizing job metadata output

* fixed job metadata

* Removing inner report struct from identifier job

---------

Co-authored-by: Jamie Pine <32987599+jamiepine@users.noreply.github.com>
Co-authored-by: Utku <74243531+utkubakir@users.noreply.github.com>
Co-authored-by: Jamie Pine <ijamespine@me.com>
This commit is contained in:
Ericson "Fogo" Soares 2023-06-28 13:30:19 -03:00 committed by GitHub
parent d8210f13f6
commit 1dce6380fe
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
27 changed files with 674 additions and 435 deletions

View file

@ -11,7 +11,7 @@ use std::{
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use tokio::{select, sync::mpsc};
use tracing::{debug, info, warn};
use tracing::{debug, info, trace, warn};
use uuid::Uuid;
mod error;
@ -116,6 +116,54 @@ pub trait DynJob: Send + Sync {
async fn cancel_children(&mut self, library: &Library) -> Result<(), JobError>;
}
pub struct JobBuilder<SJob: StatefulJob> {
id: Uuid,
init: SJob,
report_builder: JobReportBuilder,
}
impl<SJob: StatefulJob> JobBuilder<SJob> {
pub fn build(self) -> Box<Job<SJob>> {
Box::new(Job::<SJob> {
id: self.id,
hash: <SJob as StatefulJob>::hash(&self.init),
report: Some(self.report_builder.build()),
state: Some(JobState {
init: self.init,
data: None,
steps: VecDeque::new(),
step_number: 0,
run_metadata: Default::default(),
}),
next_jobs: VecDeque::new(),
})
}
pub fn new(init: SJob) -> Self {
let id = Uuid::new_v4();
Self {
id,
init,
report_builder: JobReportBuilder::new(id, SJob::NAME.to_string()),
}
}
pub fn with_action(mut self, action: impl AsRef<str>) -> Self {
self.report_builder = self.report_builder.with_action(action);
self
}
pub fn with_parent_id(mut self, parent_id: Uuid) -> Self {
self.report_builder = self.report_builder.with_parent_id(parent_id);
self
}
pub fn with_metadata(mut self, metadata: serde_json::Value) -> Self {
self.report_builder = self.report_builder.with_metadata(metadata);
self
}
}
pub struct Job<SJob: StatefulJob> {
id: Uuid,
hash: u64,
@ -127,41 +175,7 @@ pub struct Job<SJob: StatefulJob> {
impl<SJob: StatefulJob> Job<SJob> {
pub fn new(init: SJob) -> Box<Self> {
let id = Uuid::new_v4();
Box::new(Self {
id,
hash: <SJob as StatefulJob>::hash(&init),
report: Some(JobReport::new(id, SJob::NAME.to_string())),
state: Some(JobState {
init,
data: None,
steps: VecDeque::new(),
step_number: 0,
run_metadata: Default::default(),
}),
next_jobs: VecDeque::new(),
})
}
pub fn new_with_action(init: SJob, action: impl AsRef<str>) -> Box<Self> {
let id = Uuid::new_v4();
Box::new(Self {
id,
hash: <SJob as StatefulJob>::hash(&init),
report: Some(JobReport::new_with_action(
id,
SJob::NAME.to_string(),
action,
)),
state: Some(JobState {
init,
data: None,
steps: VecDeque::new(),
step_number: 0,
run_metadata: Default::default(),
}),
next_jobs: VecDeque::new(),
})
JobBuilder::new(init).build()
}
pub fn queue_next<NextSJob>(mut self: Box<Self>, init: NextSJob) -> Box<Self>
@ -169,17 +183,17 @@ impl<SJob: StatefulJob> Job<SJob> {
NextSJob: StatefulJob + 'static,
{
let next_job_order = self.next_jobs.len() + 1;
self.next_jobs.push_back(Job::new_dependent(
init,
self.id,
// SAFETY: If we're queueing a next job then we should still have a report
self.report().as_ref().and_then(|parent_report| {
parent_report
.action
.as_ref()
.map(|parent_action| format!("{parent_action}-{next_job_order}"))
}),
));
let mut child_job_builder = JobBuilder::new(init).with_parent_id(self.id);
if let Some(parent_report) = self.report() {
if let Some(parent_action) = &parent_report.action {
child_job_builder =
child_job_builder.with_action(format!("{parent_action}-{next_job_order}"));
}
}
self.next_jobs.push_back(child_job_builder.build());
self
}
@ -205,28 +219,6 @@ impl<SJob: StatefulJob> Job<SJob> {
}))
}
fn new_dependent(init: SJob, parent_id: Uuid, parent_action: Option<String>) -> Box<Self> {
let id = Uuid::new_v4();
Box::new(Self {
id,
hash: <SJob as StatefulJob>::hash(&init),
report: Some(JobReport::new_with_parent(
id,
SJob::NAME.to_string(),
parent_id,
parent_action,
)),
state: Some(JobState {
init,
data: None,
steps: VecDeque::new(),
step_number: 0,
run_metadata: Default::default(),
}),
next_jobs: VecDeque::new(),
})
}
pub async fn spawn(self, library: &Library) -> Result<(), JobManagerError> {
library
.node_context
@ -613,7 +605,7 @@ impl<SJob: StatefulJob> DynJob for Job<SJob> {
let inner_step = Arc::clone(&step_arc);
let inner_stateful_job = Arc::clone(&stateful_job);
let _step_time = Instant::now();
let step_time = Instant::now();
let mut job_step_handle = tokio::spawn(async move {
inner_stateful_job
@ -658,6 +650,7 @@ impl<SJob: StatefulJob> DynJob for Job<SJob> {
// The job can also be shutdown or canceled while paused
WorkerCommand::Shutdown(when, signal_tx) => {
job_step_handle.abort();
let _ = job_step_handle.await;
debug!(
"Shuting down Job <id='{job_id}', name='{job_name}'> took {:?} \
@ -698,6 +691,7 @@ impl<SJob: StatefulJob> DynJob for Job<SJob> {
}
WorkerCommand::Cancel(when, signal_tx) => {
job_step_handle.abort();
let _ = job_step_handle.await;
debug!(
"Canceling Job <id='{job_id}', name='{job_name}'> \
took {:?} after running for {:?}",
@ -722,6 +716,7 @@ impl<SJob: StatefulJob> DynJob for Job<SJob> {
WorkerCommand::Shutdown(when, signal_tx) => {
job_step_handle.abort();
let _ = job_step_handle.await;
debug!(
"Shuting down Job <id='{job_id}', name='{job_name}'> took {:?} \
@ -758,6 +753,7 @@ impl<SJob: StatefulJob> DynJob for Job<SJob> {
}
WorkerCommand::Cancel(when, signal_tx) => {
job_step_handle.abort();
let _ = job_step_handle.await;
debug!(
"Canceling Job <id='{job_id}', name='{job_name}'> took {:?} \
after running for {:?}",
@ -771,10 +767,10 @@ impl<SJob: StatefulJob> DynJob for Job<SJob> {
// Here we actually run the job, step by step
step_result = &mut job_step_handle => {
// debug!(
// "Step finished in {:?} Job <id='{job_id}', name='{job_name}'>",
// _step_time.elapsed(),
// );
trace!(
"Step finished in {:?} Job <id='{job_id}', name='{job_name}'>",
step_time.elapsed(),
);
run_metadata = Arc::try_unwrap(run_metadata_arc)
.expect("step already ran, no more refs");
@ -807,9 +803,10 @@ impl<SJob: StatefulJob> DynJob for Job<SJob> {
if !new_errors.is_empty() {
warn!("Job<id='{job_id}', name='{job_name}'> had a step with errors");
for err in &new_errors {
new_errors.iter().for_each(|err| {
warn!("Job<id='{job_id}', name='{job_name}'> error: {:?}", err);
}
});
errors.extend(new_errors);
}
}

View file

@ -174,24 +174,6 @@ impl JobReport {
}
}
pub fn new_with_action(uuid: Uuid, name: String, action: impl AsRef<str>) -> Self {
let mut report = Self::new(uuid, name);
report.action = Some(action.as_ref().to_string());
report
}
pub fn new_with_parent(
uuid: Uuid,
name: String,
parent_id: Uuid,
action: Option<String>,
) -> Self {
let mut report = Self::new(uuid, name);
report.parent_id = Some(parent_id);
report.action = action;
report
}
pub fn get_meta(&self) -> (String, Option<String>) {
// actions are formatted like "added_location" or "added_location-1"
let Some(action_name) = self.action
@ -313,3 +295,59 @@ impl TryFrom<i32> for JobStatus {
Ok(s)
}
}
pub struct JobReportBuilder {
pub id: Uuid,
pub name: String,
pub action: Option<String>,
pub metadata: Option<serde_json::Value>,
pub parent_id: Option<Uuid>,
}
impl JobReportBuilder {
pub fn build(self) -> JobReport {
JobReport {
id: self.id,
is_background: false, // deprecated
name: self.name,
action: self.action,
created_at: None,
started_at: None,
completed_at: None,
status: JobStatus::Queued,
errors_text: vec![],
task_count: 0,
data: None,
metadata: self.metadata,
parent_id: self.parent_id,
completed_task_count: 0,
message: String::new(),
estimated_completion: Utc::now(),
}
}
pub fn new(id: Uuid, name: String) -> Self {
Self {
id,
name,
action: None,
metadata: None,
parent_id: None,
}
}
pub fn with_action(mut self, action: impl AsRef<str>) -> Self {
self.action = Some(action.as_ref().to_string());
self
}
pub fn with_metadata(mut self, metadata: serde_json::Value) -> Self {
self.metadata = Some(metadata);
self
}
pub fn with_parent_id(mut self, parent_id: Uuid) -> Self {
self.parent_id = Some(parent_id);
self
}
}

View file

@ -11,6 +11,7 @@ use std::{
use chrono::{DateTime, Utc};
use serde::Serialize;
use serde_json::json;
use specta::Type;
use tokio::{
select,
@ -384,7 +385,15 @@ impl Worker {
}) if errors.is_empty() => {
report.status = JobStatus::Completed;
report.data = None;
report.metadata = metadata;
report.metadata = match (report.metadata.take(), metadata) {
(Some(mut current_metadata), Some(new_metadata)) => {
current_metadata["output"] = new_metadata;
Some(current_metadata)
}
(None, Some(new_metadata)) => Some(json!({ "output": new_metadata })),
(Some(current_metadata), None) => Some(current_metadata),
_ => None,
};
report.completed_at = Some(Utc::now());
if let Err(e) = report.update(library).await {
error!("failed to update job report: {:#?}", e);

View file

@ -24,11 +24,15 @@ static FORBIDDEN_FILE_NAMES: OnceLock<RegexSet> = OnceLock::new();
#[derive(Serialize, Deserialize, Debug, Clone, Hash, Eq, PartialEq)]
#[non_exhaustive]
pub struct IsolatedFilePathData<'a> {
// WARN! These fields MUST NOT be changed outside the location module, that's why they have this visibility
// and are not public. They have some specific logic on them and should not be writen to directly.
// If you wanna access one of them outside from location module, write yourself an accessor method
// to have read only access to them.
pub(in crate::location) location_id: location::id::Type,
pub materialized_path: Cow<'a, str>,
pub(in crate::location) materialized_path: Cow<'a, str>,
pub(in crate::location) is_dir: bool,
pub name: Cow<'a, str>,
pub extension: Cow<'a, str>,
pub(in crate::location) name: Cow<'a, str>,
pub(in crate::location) extension: Cow<'a, str>,
relative_path: Cow<'a, str>,
}

View file

@ -5,6 +5,7 @@ use crate::{
use std::{
fs::Metadata,
hash::{Hash, Hasher},
path::{Path, PathBuf, MAIN_SEPARATOR_STR},
time::SystemTime,
};
@ -23,6 +24,7 @@ pub use isolated_file_path_data::{
};
// File Path selectables!
file_path::select!(file_path_just_pub_id { pub_id });
file_path::select!(file_path_just_pub_id_materialized_path {
pub_id
materialized_path
@ -36,12 +38,6 @@ file_path::select!(file_path_for_file_identifier {
name
extension
});
file_path::select!(file_path_for_indexer {
pub_id
materialized_path
name
extension
});
file_path::select!(file_path_for_object_validator {
pub_id
materialized_path
@ -97,6 +93,20 @@ file_path::select!(file_path_to_full_path {
// File Path includes!
file_path::include!(file_path_with_object { object });
impl Hash for file_path_just_pub_id::Data {
fn hash<H: Hasher>(&self, state: &mut H) {
self.pub_id.hash(state);
}
}
impl PartialEq for file_path_just_pub_id::Data {
fn eq(&self, other: &Self) -> bool {
self.pub_id == other.pub_id
}
}
impl Eq for file_path_just_pub_id::Data {}
#[derive(Clone, Copy, Debug, Serialize, Deserialize)]
pub struct FilePathMetadata {
pub inode: u64,

View file

@ -24,6 +24,7 @@ use std::{
use itertools::Itertools;
use serde::{Deserialize, Serialize};
use serde_json::json;
use tokio::time::Instant;
use tracing::info;
@ -179,7 +180,7 @@ impl StatefulJob for IndexerJobInit {
&indexer_rules,
update_notifier_fn(ctx),
file_paths_db_fetcher_fn!(&db),
to_remove_db_fetcher_fn!(location_id, location_path, &db),
to_remove_db_fetcher_fn!(location_id, &db),
iso_file_path_factory(location_id, location_path),
50_000,
)
@ -298,7 +299,7 @@ impl StatefulJob for IndexerJobInit {
&data.indexer_rules,
update_notifier_fn(ctx),
file_paths_db_fetcher_fn!(&db),
to_remove_db_fetcher_fn!(location_id, location_path, &db),
to_remove_db_fetcher_fn!(location_id, &db),
iso_file_path_factory(location_id, location_path),
)
.await?;
@ -361,7 +362,7 @@ impl StatefulJob for IndexerJobInit {
) -> JobResult {
let init = self;
info!(
"scan of {} completed in {:?}. {} new files found, \
"Scan of {} completed in {:?}. {} new files found, \
indexed {} files in db. db write completed in {:?}",
maybe_missing(&init.location.path, "location.path")?,
run_metadata.scan_read_time,
@ -374,7 +375,7 @@ impl StatefulJob for IndexerJobInit {
invalidate_query!(ctx.library, "search.paths");
}
Ok(Some(serde_json::to_value(init)?))
Ok(Some(json!({"init: ": init, "run_metadata": run_metadata})))
}
}

View file

@ -13,10 +13,10 @@ use sd_prisma::prisma_sync;
use serde::{Deserialize, Serialize};
use serde_json::json;
use thiserror::Error;
use tracing::info;
use tracing::trace;
use super::{
file_path_helper::{file_path_for_indexer, FilePathError, IsolatedFilePathData},
file_path_helper::{file_path_just_pub_id, FilePathError, IsolatedFilePathData},
location_with_indexer_rules,
};
@ -171,7 +171,7 @@ async fn execute_indexer_save_step(
)
.await?;
info!("Inserted {count} records");
trace!("Inserted {count} records");
Ok(count)
}
@ -186,7 +186,7 @@ fn iso_file_path_factory(
}
async fn remove_non_existing_file_paths(
to_remove: impl IntoIterator<Item = file_path_for_indexer::Data>,
to_remove: impl IntoIterator<Item = file_path_just_pub_id::Data>,
db: &PrismaClient,
) -> Result<u64, IndexerError> {
db.file_path()
@ -206,11 +206,24 @@ async fn remove_non_existing_file_paths(
macro_rules! file_paths_db_fetcher_fn {
($db:expr) => {{
|found_paths| async {
$db.file_path()
.find_many(found_paths)
.select($crate::location::file_path_helper::file_path_to_isolate::select())
.exec()
// Each found path is a AND with 4 terms, and SQLite has a expression tree limit of 1000 terms
// so we will use chunks of 200 just to be safe
// FIXME: Can't pass this chunks variable direct to _batch because of lifetime issues
let chunks = found_paths
.into_iter()
.chunks(200)
.into_iter()
.map(|founds| {
$db.file_path()
.find_many(founds.collect::<Vec<_>>())
.select($crate::location::file_path_helper::file_path_to_isolate::select())
})
.collect::<Vec<_>>();
$db._batch(chunks)
.await
.map(|fetched| fetched.into_iter().flatten().collect::<Vec<_>>())
.map_err(Into::into)
}
}};
@ -222,40 +235,70 @@ macro_rules! file_paths_db_fetcher_fn {
// FIXME: (fogodev) I was receiving this error here https://github.com/rust-lang/rust/issues/74497
#[macro_export]
macro_rules! to_remove_db_fetcher_fn {
($location_id:expr, $location_path:expr, $db:expr) => {{
|iso_file_path| async {
($location_id:expr, $db:expr) => {{
|iso_file_path, unique_location_id_materialized_path_name_extension_params| async {
let iso_file_path: $crate::location::file_path_helper::IsolatedFilePathData<'static> =
iso_file_path;
let mut data = Vec::new(); // one stupid large vec
loop {
let r = $db
.file_path()
.find_many(vec![
$crate::prisma::file_path::location_id::equals(Some($location_id)),
$crate::prisma::file_path::materialized_path::equals(Some(
iso_file_path
.materialized_path_for_children()
.expect("the received isolated file path must be from a directory"),
)),
])
.take(100)
.select($crate::location::file_path_helper::file_path_for_indexer::select())
.exec()
.await;
// FIXME: Can't pass this chunks variable direct to _batch because of lifetime issues
let chunks = unique_location_id_materialized_path_name_extension_params
.into_iter()
.chunks(200)
.into_iter()
.map(|unique_params| {
$db.file_path()
.find_many(vec![
$crate::prisma::file_path::location_id::equals(Some($location_id)),
$crate::prisma::file_path::materialized_path::equals(Some(
iso_file_path.materialized_path_for_children().expect(
"the received isolated file path must be from a directory",
),
)),
::prisma_client_rust::operator::not(vec![
::prisma_client_rust::operator::or(unique_params.collect()),
]),
])
.select($crate::location::file_path_helper::file_path_just_pub_id::select())
})
.collect::<::std::vec::Vec<_>>();
match r {
Ok(mut v) => {
data.append(&mut v);
if v.len() != 100 {
break Ok(data);
$db._batch(chunks)
.await
.map(|to_remove| {
// This is an intersection between all sets
let mut sets = to_remove
.into_iter()
.map(|fetched_vec| {
fetched_vec
.into_iter()
.map(|fetched| {
::uuid::Uuid::from_slice(&fetched.pub_id)
.expect("file_path.pub_id is invalid!")
})
.collect::<::std::collections::HashSet<_>>()
})
.collect::<Vec<_>>();
let mut intersection = ::std::collections::HashSet::new();
while let Some(set) = sets.pop() {
for pub_id in set {
// Remove returns true if the element was present in the set
if sets.iter_mut().all(|set| set.remove(&pub_id)) {
intersection.insert(pub_id);
}
}
}
Err(err) => {
break Err(err.into());
}
}
}
intersection
.into_iter()
.map(|pub_id| {
$crate::location::file_path_helper::file_path_just_pub_id::Data {
pub_id: pub_id.as_bytes().to_vec(),
}
})
.collect()
})
.map_err(::std::convert::Into::into)
}
}};
}

View file

@ -72,7 +72,7 @@ pub async fn shallow(
&indexer_rules,
|_, _| {},
file_paths_db_fetcher_fn!(&db),
to_remove_db_fetcher_fn!(location_id, location_path, &db),
to_remove_db_fetcher_fn!(location_id, &db),
iso_file_path_factory(location_id, &location_path),
add_root,
)

View file

@ -1,6 +1,6 @@
use crate::{
location::file_path_helper::{
file_path_for_indexer, file_path_to_isolate, FilePathMetadata, IsolatedFilePathData,
file_path_just_pub_id, file_path_to_isolate, FilePathMetadata, IsolatedFilePathData,
MetadataExt,
},
prisma::file_path,
@ -14,7 +14,7 @@ use crate::location::file_path_helper::get_inode_and_device;
use crate::location::file_path_helper::get_inode_and_device_from_path;
use std::{
collections::{HashMap, HashSet, VecDeque},
collections::{HashSet, VecDeque},
future::Future,
hash::{Hash, Hasher},
path::{Path, PathBuf},
@ -71,7 +71,7 @@ impl Hash for WalkingEntry {
pub struct WalkResult<Walked, ToRemove>
where
Walked: Iterator<Item = WalkedEntry>,
ToRemove: Iterator<Item = file_path_for_indexer::Data>,
ToRemove: Iterator<Item = file_path_just_pub_id::Data>,
{
pub walked: Walked,
pub to_walk: VecDeque<ToWalkEntry>,
@ -87,19 +87,22 @@ pub(super) async fn walk<FilePathDBFetcherFut, ToRemoveDbFetcherFut>(
indexer_rules: &[IndexerRule],
mut update_notifier: impl FnMut(&Path, usize),
file_paths_db_fetcher: impl Fn(Vec<file_path::WhereParam>) -> FilePathDBFetcherFut,
to_remove_db_fetcher: impl Fn(IsolatedFilePathData<'static>) -> ToRemoveDbFetcherFut,
to_remove_db_fetcher: impl Fn(
IsolatedFilePathData<'static>,
Vec<file_path::WhereParam>,
) -> ToRemoveDbFetcherFut,
iso_file_path_factory: impl Fn(&Path, bool) -> Result<IsolatedFilePathData<'static>, IndexerError>,
limit: u64,
) -> Result<
WalkResult<
impl Iterator<Item = WalkedEntry>,
impl Iterator<Item = file_path_for_indexer::Data>,
impl Iterator<Item = file_path_just_pub_id::Data>,
>,
IndexerError,
>
where
FilePathDBFetcherFut: Future<Output = Result<Vec<file_path_to_isolate::Data>, IndexerError>>,
ToRemoveDbFetcherFut: Future<Output = Result<Vec<file_path_for_indexer::Data>, IndexerError>>,
ToRemoveDbFetcherFut: Future<Output = Result<Vec<file_path_just_pub_id::Data>, IndexerError>>,
{
let root = root.as_ref();
@ -149,18 +152,21 @@ pub(super) async fn keep_walking<FilePathDBFetcherFut, ToRemoveDbFetcherFut>(
indexer_rules: &[IndexerRule],
mut update_notifier: impl FnMut(&Path, usize),
file_paths_db_fetcher: impl Fn(Vec<file_path::WhereParam>) -> FilePathDBFetcherFut,
to_remove_db_fetcher: impl Fn(IsolatedFilePathData<'static>) -> ToRemoveDbFetcherFut,
to_remove_db_fetcher: impl Fn(
IsolatedFilePathData<'static>,
Vec<file_path::WhereParam>,
) -> ToRemoveDbFetcherFut,
iso_file_path_factory: impl Fn(&Path, bool) -> Result<IsolatedFilePathData<'static>, IndexerError>,
) -> Result<
WalkResult<
impl Iterator<Item = WalkedEntry>,
impl Iterator<Item = file_path_for_indexer::Data>,
impl Iterator<Item = file_path_just_pub_id::Data>,
>,
IndexerError,
>
where
FilePathDBFetcherFut: Future<Output = Result<Vec<file_path_to_isolate::Data>, IndexerError>>,
ToRemoveDbFetcherFut: Future<Output = Result<Vec<file_path_for_indexer::Data>, IndexerError>>,
ToRemoveDbFetcherFut: Future<Output = Result<Vec<file_path_just_pub_id::Data>, IndexerError>>,
{
let mut to_keep_walking = VecDeque::with_capacity(TO_WALK_QUEUE_INITIAL_CAPACITY);
let mut indexed_paths = HashSet::with_capacity(WALK_SINGLE_DIR_PATHS_BUFFER_INITIAL_CAPACITY);
@ -196,20 +202,23 @@ pub(super) async fn walk_single_dir<FilePathDBFetcherFut, ToRemoveDbFetcherFut>(
indexer_rules: &[IndexerRule],
mut update_notifier: impl FnMut(&Path, usize) + '_,
file_paths_db_fetcher: impl Fn(Vec<file_path::WhereParam>) -> FilePathDBFetcherFut,
to_remove_db_fetcher: impl Fn(IsolatedFilePathData<'static>) -> ToRemoveDbFetcherFut,
to_remove_db_fetcher: impl Fn(
IsolatedFilePathData<'static>,
Vec<file_path::WhereParam>,
) -> ToRemoveDbFetcherFut,
iso_file_path_factory: impl Fn(&Path, bool) -> Result<IsolatedFilePathData<'static>, IndexerError>,
add_root: bool,
) -> Result<
(
impl Iterator<Item = WalkedEntry>,
Vec<file_path_for_indexer::Data>,
Vec<file_path_just_pub_id::Data>,
Vec<IndexerError>,
),
IndexerError,
>
where
FilePathDBFetcherFut: Future<Output = Result<Vec<file_path_to_isolate::Data>, IndexerError>>,
ToRemoveDbFetcherFut: Future<Output = Result<Vec<file_path_for_indexer::Data>, IndexerError>>,
ToRemoveDbFetcherFut: Future<Output = Result<Vec<file_path_just_pub_id::Data>, IndexerError>>,
{
let root = root.as_ref();
@ -281,8 +290,6 @@ where
F: Future<Output = Result<Vec<file_path_to_isolate::Data>, IndexerError>>,
{
if !indexed_paths.is_empty() {
// TODO: Converting paths like this into PCR params will break if we pass in too many paths.
// TODO: I am not hitting errors here on my system so gonna leave this for now.
file_paths_db_fetcher(
indexed_paths
.iter()
@ -327,7 +334,10 @@ async fn inner_walk_single_dir<ToRemoveDbFetcherFut>(
}: &ToWalkEntry,
indexer_rules: &[IndexerRule],
update_notifier: &mut impl FnMut(&Path, usize),
to_remove_db_fetcher: &impl Fn(IsolatedFilePathData<'static>) -> ToRemoveDbFetcherFut,
to_remove_db_fetcher: impl Fn(
IsolatedFilePathData<'static>,
Vec<file_path::WhereParam>,
) -> ToRemoveDbFetcherFut,
iso_file_path_factory: &impl Fn(&Path, bool) -> Result<IsolatedFilePathData<'static>, IndexerError>,
WorkingTable {
indexed_paths,
@ -335,9 +345,9 @@ async fn inner_walk_single_dir<ToRemoveDbFetcherFut>(
mut maybe_to_walk,
errors,
}: WorkingTable<'_>,
) -> Vec<file_path_for_indexer::Data>
) -> Vec<file_path_just_pub_id::Data>
where
ToRemoveDbFetcherFut: Future<Output = Result<Vec<file_path_for_indexer::Data>, IndexerError>>,
ToRemoveDbFetcherFut: Future<Output = Result<Vec<file_path_just_pub_id::Data>, IndexerError>>,
{
let Ok(iso_file_path_to_walk) = iso_file_path_factory(path, true).map_err(|e| errors.push(e))
else {
@ -356,7 +366,6 @@ where
paths_buffer.clear();
let mut found_paths_counts = 0;
let mut to_remove = HashMap::new();
// Marking with a loop label here in case of rejection or errors, to continue with next entry
'entries: loop {
@ -496,51 +505,22 @@ where
}
if accept_by_children_dir.unwrap_or(true) {
// Fetch all files then as we index we remove them and anything left over must have been deleted from FS and can be removed from DB
match to_remove_db_fetcher(iso_file_path_to_walk.clone()).await {
Ok(v) => {
for e in v {
to_remove.insert(
(
e.materialized_path
.clone()
.expect("materialized_path should be set"),
e.name.clone().expect("name should be set"),
e.extension.clone().expect("extension should be set"),
),
e,
);
}
}
Err(e) => {
errors.push(e);
}
};
let Ok(iso_file_path) = iso_file_path_factory(&current_path, is_dir)
.map_err(|e| errors.push(e))
else {
continue 'entries;
};
{
// Remove file path as we don't want it to be removed from the DB
to_remove.remove(&(
iso_file_path.materialized_path.to_string(),
iso_file_path.name.to_string(),
iso_file_path.extension.to_string(),
));
paths_buffer.push(WalkingEntry {
iso_file_path,
maybe_metadata: Some(FilePathMetadata {
inode,
device,
size_in_bytes: metadata.len(),
created_at: metadata.created_or_now().into(),
modified_at: metadata.modified_or_now().into(),
}),
});
}
paths_buffer.push(WalkingEntry {
iso_file_path,
maybe_metadata: Some(FilePathMetadata {
inode,
device,
size_in_bytes: metadata.len(),
created_at: metadata.created_or_now().into(),
modified_at: metadata.modified_or_now().into(),
}),
});
// If the ancestors directories wasn't indexed before, now we do
for ancestor in current_path
@ -591,21 +571,7 @@ where
modified_at: metadata.modified_or_now().into(),
});
{
// Remove file path as we don't want it to be removed from the DB
to_remove.remove(&(
ancestor_iso_walking_entry
.iso_file_path
.materialized_path
.to_string(),
ancestor_iso_walking_entry.iso_file_path.name.to_string(),
ancestor_iso_walking_entry
.iso_file_path
.extension
.to_string(),
));
paths_buffer.push(ancestor_iso_walking_entry);
}
paths_buffer.push(ancestor_iso_walking_entry);
} else {
// If indexed_paths contains the current ancestors, then it will contain
// also all if its ancestors too, so we can stop here
@ -615,11 +581,28 @@ where
}
}
// We continue the function even if we fail to fetch `file_path`s to remove,
// the DB will have old `file_path`s but at least this is better than
// don't adding the newly indexed paths
let to_remove = to_remove_db_fetcher(
iso_file_path_to_walk,
paths_buffer
.iter()
.map(|entry| &entry.iso_file_path)
.map(Into::into)
.collect(),
)
.await
.unwrap_or_else(|e| {
errors.push(e);
vec![]
});
// Just merging the `found_paths` with `indexed_paths` here in the end to avoid possibly
// multiple rehashes during function execution
indexed_paths.extend(paths_buffer.drain(..));
to_remove.into_values().collect()
to_remove
}
#[cfg(test)]
@ -753,7 +736,7 @@ mod tests {
&[],
|_, _| {},
|_| async { Ok(vec![]) },
|_| async { Ok(vec![]) },
|_, _| async { Ok(vec![]) },
|path, is_dir| {
IsolatedFilePathData::new(0, root_path, path, is_dir).map_err(Into::into)
},
@ -817,7 +800,7 @@ mod tests {
only_photos_rule,
|_, _| {},
|_| async { Ok(vec![]) },
|_| async { Ok(vec![]) },
|_, _| async { Ok(vec![]) },
|path, is_dir| {
IsolatedFilePathData::new(0, root_path, path, is_dir).map_err(Into::into)
},
@ -890,7 +873,7 @@ mod tests {
git_repos,
|_, _| {},
|_| async { Ok(vec![]) },
|_| async { Ok(vec![]) },
|_, _| async { Ok(vec![]) },
|path, is_dir| {
IsolatedFilePathData::new(0, root_path, path, is_dir).map_err(Into::into)
},
@ -981,7 +964,7 @@ mod tests {
git_repos_no_deps_no_build_dirs,
|_, _| {},
|_| async { Ok(vec![]) },
|_| async { Ok(vec![]) },
|_, _| async { Ok(vec![]) },
|path, is_dir| {
IsolatedFilePathData::new(0, root_path, path, is_dir).map_err(Into::into)
},

View file

@ -22,10 +22,7 @@ use crate::{
util::error::FileIOError,
};
use std::{
collections::{BTreeMap, HashMap},
path::PathBuf,
};
use std::{collections::HashMap, path::PathBuf};
use async_trait::async_trait;
use notify::{
@ -47,7 +44,8 @@ use super::{
pub(super) struct MacOsEventHandler<'lib> {
location_id: location::id::Type,
library: &'lib Library,
recently_created_files: BTreeMap<PathBuf, Instant>,
recently_created_files: HashMap<PathBuf, Instant>,
recently_created_files_buffer: Vec<(PathBuf, Instant)>,
last_check_created_files: Instant,
latest_created_dir: Option<PathBuf>,
last_check_rename: Instant,
@ -65,7 +63,8 @@ impl<'lib> EventHandler<'lib> for MacOsEventHandler<'lib> {
Self {
location_id,
library,
recently_created_files: BTreeMap::new(),
recently_created_files: HashMap::new(),
recently_created_files_buffer: Vec::new(),
last_check_created_files: Instant::now(),
latest_created_dir: None,
last_check_rename: Instant::now(),
@ -107,16 +106,6 @@ impl<'lib> EventHandler<'lib> for MacOsEventHandler<'lib> {
self.latest_created_dir = Some(paths.remove(0));
}
EventKind::Create(CreateKind::File) => {
let path = &paths[0];
create_file(
self.location_id,
path,
&fs::metadata(path)
.await
.map_err(|e| FileIOError::from((path, e)))?,
self.library,
)
.await?;
self.recently_created_files
.insert(paths.remove(0), Instant::now());
}
@ -143,66 +132,109 @@ impl<'lib> EventHandler<'lib> for MacOsEventHandler<'lib> {
}
async fn tick(&mut self) {
// Cleaning out recently created files that are older than 1 second
if self.last_check_created_files.elapsed() > ONE_SECOND {
// Cleaning out recently created files that are older than 200 milliseconds
if self.last_check_created_files.elapsed() > HUNDRED_MILLIS * 2 {
if let Err(e) = self.handle_recently_created_eviction().await {
error!("Error while handling recently created files eviction: {e:#?}");
}
self.last_check_created_files = Instant::now();
self.recently_created_files
.retain(|_, created_at| created_at.elapsed() < ONE_SECOND);
}
if self.last_check_rename.elapsed() > HUNDRED_MILLIS {
// Cleaning out recently renamed files that are older than 100 milliseconds
self.handle_create_eviction().await;
self.handle_remove_eviction().await;
if let Err(e) = self.handle_rename_create_eviction().await {
error!("Failed to create file_path on MacOS : {e:#?}");
}
if let Err(e) = self.handle_rename_remove_eviction().await {
error!("Failed to remove file_path: {e:#?}");
}
self.last_check_rename = Instant::now();
}
}
}
impl MacOsEventHandler<'_> {
async fn handle_create_eviction(&mut self) {
async fn handle_recently_created_eviction(&mut self) -> Result<(), LocationManagerError> {
self.recently_created_files_buffer.clear();
let mut should_invalidate = false;
for (path, created_at) in self.recently_created_files.drain() {
if created_at.elapsed() < ONE_SECOND {
self.recently_created_files_buffer.push((path, created_at));
} else {
create_file(
self.location_id,
&path,
&fs::metadata(&path)
.await
.map_err(|e| FileIOError::from((&path, e)))?,
self.library,
)
.await?;
should_invalidate = true;
}
}
if should_invalidate {
invalidate_query!(self.library, "search.paths");
}
self.recently_created_files
.extend(self.recently_created_files_buffer.drain(..));
Ok(())
}
async fn handle_rename_create_eviction(&mut self) -> Result<(), LocationManagerError> {
// Just to make sure that our buffer is clean
self.paths_map_buffer.clear();
let mut should_invalidate = false;
for (inode_and_device, (instant, path)) in self.new_paths_map.drain() {
if instant.elapsed() > HUNDRED_MILLIS {
if let Err(e) = create_dir_or_file(self.location_id, &path, self.library).await {
error!("Failed to create file_path on MacOS : {e}");
} else {
trace!("Created file_path due timeout: {}", path.display());
invalidate_query!(self.library, "search.paths");
}
create_dir_or_file(self.location_id, &path, self.library).await?;
trace!("Created file_path due timeout: {}", path.display());
should_invalidate = true;
} else {
self.paths_map_buffer
.push((inode_and_device, (instant, path)));
}
}
for (key, value) in self.paths_map_buffer.drain(..) {
self.new_paths_map.insert(key, value);
if should_invalidate {
invalidate_query!(self.library, "search.paths");
}
self.new_paths_map.extend(self.paths_map_buffer.drain(..));
Ok(())
}
async fn handle_remove_eviction(&mut self) {
async fn handle_rename_remove_eviction(&mut self) -> Result<(), LocationManagerError> {
// Just to make sure that our buffer is clean
self.paths_map_buffer.clear();
let mut should_invalidate = false;
for (inode_and_device, (instant, path)) in self.old_paths_map.drain() {
if instant.elapsed() > HUNDRED_MILLIS {
if let Err(e) = remove(self.location_id, &path, self.library).await {
error!("Failed to remove file_path: {e}");
} else {
trace!("Removed file_path due timeout: {}", path.display());
invalidate_query!(self.library, "search.paths");
}
remove(self.location_id, &path, self.library).await?;
trace!("Removed file_path due timeout: {}", path.display());
should_invalidate = true;
} else {
self.paths_map_buffer
.push((inode_and_device, (instant, path)));
}
}
for (key, value) in self.paths_map_buffer.drain(..) {
self.old_paths_map.insert(key, value);
if should_invalidate {
invalidate_query!(self.library, "search.paths");
}
self.old_paths_map.extend(self.paths_map_buffer.drain(..));
Ok(())
}
async fn handle_single_rename_event(

View file

@ -44,7 +44,7 @@ use notify::{Event, EventKind};
use prisma_client_rust::{raw, PrismaValue};
use serde_json::json;
use tokio::{fs, io::ErrorKind};
use tracing::{debug, error, info, trace, warn};
use tracing::{debug, error, trace, warn};
use uuid::Uuid;
use super::INodeAndDevice;
@ -94,7 +94,7 @@ pub(super) async fn create_dir(
{
// FIXME: This is a workaround for Windows, because we can't get the inode and device from the metadata
let _ = metadata; // To avoid unused variable warning
get_inode_and_device_from_path(&path).await?
get_inode_and_device_from_path(path).await?
}
};
@ -113,7 +113,7 @@ pub(super) async fn create_dir(
.materialized_path_for_children()
.expect("We're in the create dir function lol");
info!("Creating path: {}", iso_file_path);
debug!("Creating path: {}", iso_file_path);
create_file_path(
library,
@ -142,9 +142,26 @@ pub(super) async fn create_file(
path: impl AsRef<Path>,
metadata: &Metadata,
library: &Library,
) -> Result<(), LocationManagerError> {
inner_create_file(
location_id,
extract_location_path(location_id, library).await?,
path,
metadata,
library,
)
.await
}
async fn inner_create_file(
location_id: location::id::Type,
location_path: impl AsRef<Path>,
path: impl AsRef<Path>,
metadata: &Metadata,
library: &Library,
) -> Result<(), LocationManagerError> {
let path = path.as_ref();
let location_path = extract_location_path(location_id, library).await?;
let location_path = location_path.as_ref();
trace!(
"Location: <root_path ='{}'> creating file: {}",
@ -154,7 +171,7 @@ pub(super) async fn create_file(
let db = &library.db;
let iso_file_path = IsolatedFilePathData::new(location_id, &location_path, path, false)?;
let iso_file_path = IsolatedFilePathData::new(location_id, location_path, path, false)?;
let extension = iso_file_path.extension.to_string();
let (inode, device) = {
@ -171,6 +188,52 @@ pub(super) async fn create_file(
}
};
// First we check if already exist a file with these same inode and device numbers
// if it does, we just update it
if let Some(file_path) = db
.file_path()
.find_unique(file_path::location_id_inode_device(
location_id,
inode.to_le_bytes().to_vec(),
device.to_le_bytes().to_vec(),
))
.include(file_path_with_object::include())
.exec()
.await?
{
trace!(
"File already exists with that inode and device: {}",
iso_file_path
);
return inner_update_file(location_path, &file_path, path, library, None).await;
// If we can't find an existing file with the same inode and device, we check if there is a file with the same path
} else if let Some(file_path) = db
.file_path()
.find_unique(file_path::location_id_materialized_path_name_extension(
location_id,
iso_file_path.materialized_path.to_string(),
iso_file_path.name.to_string(),
iso_file_path.extension.to_string(),
))
.include(file_path_with_object::include())
.exec()
.await?
{
trace!(
"File already exists with that iso_file_path: {}",
iso_file_path
);
return inner_update_file(
location_path,
&file_path,
path,
library,
Some((inode, device)),
)
.await;
}
let parent_iso_file_path = iso_file_path.parent();
if !parent_iso_file_path.is_root()
&& !check_file_path_exists::<FilePathError>(&parent_iso_file_path, &library.db).await?
@ -186,7 +249,7 @@ pub(super) async fn create_file(
fs_metadata,
} = FileMetadata::new(&location_path, &iso_file_path).await?;
info!("Creating path: {}", iso_file_path);
debug!("Creating path: {}", iso_file_path);
let created_file = create_file_path(
library,
@ -292,32 +355,44 @@ pub(super) async fn update_file(
.exec()
.await?
{
let ret = inner_update_file(location_id, file_path, full_path, library).await;
invalidate_query!(library, "search.paths");
ret
inner_update_file(location_path, file_path, full_path, library, None).await
} else {
// FIXME(fogodev): Have to handle files excluded by indexer rules
Err(LocationManagerError::UpdateNonExistingFile(
full_path.to_path_buf(),
))
inner_create_file(
location_id,
location_path,
full_path,
&fs::metadata(full_path)
.await
.map_err(|e| FileIOError::from((full_path, e)))?,
library,
)
.await
}
.map(|_| invalidate_query!(library, "search.paths"))
}
async fn inner_update_file(
location_id: location::id::Type,
location_path: impl AsRef<Path>,
file_path: &file_path_with_object::Data,
full_path: impl AsRef<Path>,
library @ Library { db, sync, .. }: &Library,
maybe_new_inode_and_device: Option<INodeAndDevice>,
) -> Result<(), LocationManagerError> {
let full_path = full_path.as_ref();
let location = db
.location()
.find_unique(location::id::equals(location_id))
.exec()
.await?
.ok_or_else(|| LocationManagerError::MissingLocation(location_id))?;
let location_path = location_path.as_ref();
let location_path = maybe_missing(location.path.map(PathBuf::from), "location.path")?;
let (current_inode, current_device) = (
u64::from_le_bytes(
maybe_missing(file_path.inode.as_ref(), "file_path.inode")?[0..8]
.try_into()
.map_err(|_| LocationManagerError::InvalidInode)?,
),
u64::from_le_bytes(
maybe_missing(file_path.device.as_ref(), "file_path.device")?[0..8]
.try_into()
.map_err(|_| LocationManagerError::InvalidDevice)?,
),
);
trace!(
"Location: <root_path ='{}'> updating file: {}",
@ -333,6 +408,29 @@ async fn inner_update_file(
kind,
} = FileMetadata::new(&location_path, &iso_file_path).await?;
let (inode, device) = if let Some((inode, device)) = maybe_new_inode_and_device {
(inode, device)
} else {
#[cfg(target_family = "unix")]
{
get_inode_and_device(&fs_metadata)?
}
#[cfg(target_family = "windows")]
{
// FIXME: This is a workaround for Windows, because we can't get the inode and device from the metadata
get_inode_and_device_from_path(full_path).await?
}
};
let (maybe_new_inode, maybe_new_device) =
match (current_inode == inode, current_device == device) {
(true, true) => (None, None),
(true, false) => (None, Some(device)),
(false, true) => (Some(inode), None),
(false, false) => (Some(inode), Some(device)),
};
if let Some(old_cas_id) = &file_path.cas_id {
if old_cas_id != &cas_id {
let (sync_params, db_params): (Vec<_>, Vec<_>) = {
@ -341,21 +439,23 @@ async fn inner_update_file(
[
(
(cas_id::NAME, json!(old_cas_id)),
cas_id::set(Some(old_cas_id.clone())),
Some(cas_id::set(Some(old_cas_id.clone()))),
),
(
(
size_in_bytes_bytes::NAME,
json!(fs_metadata.len().to_be_bytes().to_vec()),
),
size_in_bytes_bytes::set(Some(fs_metadata.len().to_be_bytes().to_vec())),
Some(size_in_bytes_bytes::set(Some(
fs_metadata.len().to_be_bytes().to_vec(),
))),
),
{
let date = DateTime::<Local>::from(fs_metadata.modified_or_now()).into();
(
(date_modified::NAME, json!(date)),
date_modified::set(Some(date)),
Some(date_modified::set(Some(date))),
)
},
{
@ -373,11 +473,34 @@ async fn inner_update_file(
(
(integrity_checksum::NAME, json!(checksum)),
integrity_checksum::set(checksum),
Some(integrity_checksum::set(checksum)),
)
},
{
if let Some(new_inode) = maybe_new_inode {
(
(inode::NAME, json!(new_inode)),
Some(inode::set(Some(new_inode.to_le_bytes().to_vec()))),
)
} else {
((inode::NAME, serde_json::Value::Null), None)
}
},
{
if let Some(new_device) = maybe_new_device {
(
(device::NAME, json!(new_device)),
Some(device::set(Some(new_device.to_le_bytes().to_vec()))),
)
} else {
((device::NAME, serde_json::Value::Null), None)
}
},
]
.into_iter()
.filter_map(|(sync_param, maybe_db_param)| {
maybe_db_param.map(|db_param| (sync_param, db_param))
})
.unzip()
};

View file

@ -1,6 +1,6 @@
use crate::{
invalidate_query,
job::{Job, JobError, JobManagerError},
job::{JobBuilder, JobError, JobManagerError},
library::Library,
location::file_path_helper::filter_existing_file_path_params,
object::{
@ -28,7 +28,7 @@ use serde::Deserialize;
use serde_json::json;
use specta::Type;
use tokio::{fs, io};
use tracing::{debug, info};
use tracing::{debug, info, warn};
use uuid::Uuid;
mod error;
@ -381,13 +381,13 @@ pub async fn scan_location(
let location_base_data = location::Data::from(&location);
Job::new_with_action(
IndexerJobInit {
location,
sub_path: None,
},
"scan_location",
)
JobBuilder::new(IndexerJobInit {
location,
sub_path: None,
})
.with_action("scan_location")
.with_metadata(json!({"location": location_base_data.clone()}))
.build()
.queue_next(FileIdentifierJobInit {
location: location_base_data.clone(),
sub_path: None,
@ -414,13 +414,16 @@ pub async fn scan_location_sub_path(
let location_base_data = location::Data::from(&location);
Job::new_with_action(
IndexerJobInit {
location,
sub_path: Some(sub_path.clone()),
},
"scan_location_sub_path",
)
JobBuilder::new(IndexerJobInit {
location,
sub_path: Some(sub_path.clone()),
})
.with_action("scan_location_sub_path")
.with_metadata(json!({
"location": location_base_data.clone(),
"sub_path": sub_path.clone(),
}))
.build()
.queue_next(FileIdentifierJobInit {
location: location_base_data.clone(),
sub_path: Some(sub_path.clone()),
@ -778,7 +781,11 @@ async fn check_nested_location(
let comps = location_path.components().collect::<Vec<_>>();
let is_a_child_location = potential_children.into_iter().any(|v| {
let comps2 = PathBuf::from(v.path.unwrap());
let Some(location_path) = v.path else {
warn!("Missing location path on location <id='{}'> at check nested location", v.id);
return false;
};
let comps2 = PathBuf::from(location_path);
let comps2 = comps2.components().collect::<Vec<_>>();
if comps.len() > comps2.len() {

View file

@ -18,7 +18,8 @@ use std::{
};
use serde::{Deserialize, Serialize};
use tracing::info;
use serde_json::json;
use tracing::{debug, info, trace};
use super::{process_identifier_file_paths, FileIdentifierJobError, CHUNK_SIZE};
@ -50,28 +51,23 @@ pub struct FileIdentifierJobData {
#[derive(Serialize, Deserialize, Default, Debug)]
pub struct FileIdentifierJobRunMetadata {
report: FileIdentifierReport,
cursor: file_path::id::Type,
}
impl JobRunMetadata for FileIdentifierJobRunMetadata {
fn update(&mut self, new_data: Self) {
self.report.total_orphan_paths += new_data.report.total_orphan_paths;
self.report.total_objects_created += new_data.report.total_objects_created;
self.report.total_objects_linked += new_data.report.total_objects_linked;
self.report.total_objects_ignored += new_data.report.total_objects_ignored;
self.cursor = new_data.cursor;
}
}
#[derive(Serialize, Deserialize, Debug, Default)]
pub struct FileIdentifierReport {
total_orphan_paths: usize,
total_objects_created: usize,
total_objects_linked: usize,
total_objects_ignored: usize,
}
impl JobRunMetadata for FileIdentifierJobRunMetadata {
fn update(&mut self, new_data: Self) {
self.total_orphan_paths += new_data.total_orphan_paths;
self.total_objects_created += new_data.total_objects_created;
self.total_objects_linked += new_data.total_objects_linked;
self.total_objects_ignored += new_data.total_objects_ignored;
self.cursor = new_data.cursor;
}
}
#[async_trait::async_trait]
impl StatefulJob for FileIdentifierJobInit {
type Data = FileIdentifierJobData;
@ -88,7 +84,7 @@ impl StatefulJob for FileIdentifierJobInit {
let init = self;
let Library { db, .. } = &ctx.library;
info!("Identifying orphan File Paths...");
debug!("Identifying orphan File Paths...");
let location_id = init.location.id;
@ -138,10 +134,10 @@ impl StatefulJob for FileIdentifierJobInit {
});
}
info!("Found {} orphan file paths", orphan_count);
debug!("Found {} orphan file paths", orphan_count);
let task_count = (orphan_count as f64 / CHUNK_SIZE as f64).ceil() as usize;
info!(
debug!(
"Found {} orphan Paths. Will execute {} tasks...",
orphan_count, task_count
);
@ -160,11 +156,9 @@ impl StatefulJob for FileIdentifierJobInit {
Ok((
FileIdentifierJobRunMetadata {
report: FileIdentifierReport {
total_orphan_paths: orphan_count,
..Default::default()
},
total_orphan_paths: orphan_count,
cursor: first_path.id,
..Default::default()
},
vec![(); task_count],
)
@ -209,18 +203,18 @@ impl StatefulJob for FileIdentifierJobInit {
step_number,
run_metadata.cursor,
&ctx.library,
run_metadata.report.total_orphan_paths,
run_metadata.total_orphan_paths,
)
.await?;
new_metadata.report.total_objects_created = total_objects_created;
new_metadata.report.total_objects_linked = total_objects_linked;
new_metadata.total_objects_created = total_objects_created;
new_metadata.total_objects_linked = total_objects_linked;
new_metadata.cursor = new_cursor;
ctx.progress_msg(format!(
"Processed {} of {} orphan Paths",
step_number * CHUNK_SIZE,
run_metadata.report.total_orphan_paths
run_metadata.total_orphan_paths
));
Ok(new_metadata.into())
@ -233,9 +227,9 @@ impl StatefulJob for FileIdentifierJobInit {
run_metadata: &Self::RunMetadata,
) -> JobResult {
let init = self;
info!("Finalizing identifier job: {:?}", &run_metadata.report);
info!("Finalizing identifier job: {:?}", &run_metadata);
Ok(Some(serde_json::to_value(init)?))
Ok(Some(json!({"init: ": init, "run_metadata": run_metadata})))
}
}
@ -286,9 +280,10 @@ async fn get_orphan_file_paths(
file_path_id: file_path::id::Type,
maybe_sub_materialized_path: &Option<IsolatedFilePathData<'_>>,
) -> Result<Vec<file_path_for_file_identifier::Data>, prisma_client_rust::QueryError> {
info!(
trace!(
"Querying {} orphan Paths at cursor: {:?}",
CHUNK_SIZE, file_path_id
CHUNK_SIZE,
file_path_id
);
db.file_path()
.find_many(orphan_path_filters(

View file

@ -26,7 +26,7 @@ use futures::future::join_all;
use serde_json::json;
use thiserror::Error;
use tokio::fs;
use tracing::{error, info};
use tracing::{error, trace};
use uuid::Uuid;
pub mod file_identifier_job;
@ -83,8 +83,7 @@ impl FileMetadata {
.await
.map_err(|e| FileIOError::from((&path, e)))?;
#[cfg(debug_assertions)]
tracing::debug!("Analyzed file: {path:?} {cas_id:?} {kind:?}");
trace!("Analyzed file: {path:?} {cas_id:?} {kind:?}");
Ok(FileMetadata {
cas_id,
@ -204,7 +203,7 @@ async fn identifier_job_step(
)
.await?;
info!(
trace!(
"Found {} existing Objects in Library, linking file paths...",
existing_objects.len()
);
@ -221,7 +220,7 @@ async fn identifier_job_step(
.map(|(_, (meta, _))| &meta.cas_id)
.collect::<HashSet<_>>();
info!(
trace!(
"Creating {} new Objects in Library... {:#?}",
file_paths_requiring_new_object.len(),
new_objects_cas_ids
@ -283,10 +282,10 @@ async fn identifier_job_step(
0
});
info!("Created {} new Objects in Library", total_created_files);
trace!("Created {} new Objects in Library", total_created_files);
if total_created_files > 0 {
info!("Updating file paths with created objects");
trace!("Updating file paths with created objects");
sync.write_ops(db, {
let data: (Vec<_>, Vec<_>) = file_path_update_args.into_iter().unzip();
@ -295,7 +294,7 @@ async fn identifier_job_step(
})
.await?;
info!("Updated file paths with created objects");
trace!("Updated file paths with created objects");
}
total_created_files as usize
@ -313,7 +312,7 @@ fn file_path_object_connect_ops<'db>(
db: &'db PrismaClient,
) -> (CRDTOperation, file_path::UpdateQuery<'db>) {
#[cfg(debug_assertions)]
tracing::debug!("Connecting <FilePath id={file_path_id}> to <Object pub_id={object_id}'>");
trace!("Connecting <FilePath id={file_path_id}> to <Object pub_id={object_id}'>");
let vec_id = object_id.as_bytes().to_vec();
@ -342,7 +341,7 @@ async fn process_identifier_file_paths(
library: &Library,
orphan_count: usize,
) -> Result<(usize, usize, file_path::id::Type), JobError> {
info!(
trace!(
"Processing {:?} orphan Paths. ({} completed of {})",
file_paths.len(),
step_number,

View file

@ -13,7 +13,7 @@ use crate::{
use std::path::{Path, PathBuf};
use serde::{Deserialize, Serialize};
use tracing::info;
use tracing::{debug, trace};
use super::{process_identifier_file_paths, FileIdentifierJobError, CHUNK_SIZE};
@ -30,7 +30,7 @@ pub async fn shallow(
) -> Result<(), JobError> {
let Library { db, .. } = &library;
info!("Identifying orphan File Paths...");
debug!("Identifying orphan File Paths...");
let location_id = location.id;
let location_path = maybe_missing(&location.path, "location.path").map(Path::new)?;
@ -68,7 +68,7 @@ pub async fn shallow(
}
let task_count = (orphan_count as f64 / CHUNK_SIZE as f64).ceil() as usize;
info!(
debug!(
"Found {} orphan Paths. Will execute {} tasks...",
orphan_count, task_count
);
@ -153,9 +153,10 @@ async fn get_orphan_file_paths(
file_path_id_cursor: file_path::id::Type,
sub_iso_file_path: &IsolatedFilePathData<'_>,
) -> Result<Vec<file_path_for_file_identifier::Data>, prisma_client_rust::QueryError> {
info!(
trace!(
"Querying {} orphan Paths at cursor: {:?}",
CHUNK_SIZE, file_path_id_cursor
CHUNK_SIZE,
file_path_id_cursor
);
db.file_path()
.find_many(orphan_path_filters(

View file

@ -16,6 +16,7 @@ use crate::{
use std::{hash::Hash, path::PathBuf};
use serde::{Deserialize, Serialize};
use serde_json::json;
use specta::Type;
use tokio::{fs, io};
use tracing::{trace, warn};
@ -207,6 +208,6 @@ impl StatefulJob for FileCopierJobInit {
invalidate_query!(ctx.library, "search.paths");
Ok(Some(serde_json::to_value(init)?))
Ok(Some(json!({ "init": init })))
}
}

View file

@ -14,6 +14,7 @@ use crate::{
use std::{hash::Hash, path::PathBuf};
use serde::{Deserialize, Serialize};
use serde_json::json;
use specta::Type;
use tokio::{fs, io};
use tracing::{trace, warn};
@ -130,6 +131,6 @@ impl StatefulJob for FileCutterJobInit {
let init = self;
invalidate_query!(ctx.library, "search.paths");
Ok(Some(serde_json::to_value(init)?))
Ok(Some(json!({ "init": init })))
}
}

View file

@ -11,6 +11,7 @@ use crate::{
use std::hash::Hash;
use serde::{Deserialize, Serialize};
use serde_json::json;
use specta::Type;
use tokio::{fs, io};
use tracing::warn;
@ -97,6 +98,6 @@ impl StatefulJob for FileDeleterJobInit {
let init = self;
invalidate_query!(ctx.library, "search.paths");
Ok(Some(serde_json::to_value(init)?))
Ok(Some(json!({ "init": init })))
}
}

View file

@ -2,7 +2,7 @@ use std::path::PathBuf;
use tokio::fs as async_fs;
use int_enum::IntEnum;
use tracing::{error, info};
use tracing::{debug, error, trace};
use crate::util::{error::FileIOError, version_manager::VersionManager};
@ -17,14 +17,14 @@ pub enum ThumbnailVersion {
}
pub async fn init_thumbnail_dir(data_dir: PathBuf) -> Result<PathBuf, ThumbnailerError> {
info!("Initializing thumbnail directory");
debug!("Initializing thumbnail directory");
let thumbnail_dir = data_dir.join(THUMBNAIL_CACHE_DIR_NAME);
let version_file = thumbnail_dir.join("version.txt");
let version_manager =
VersionManager::<ThumbnailVersion>::new(version_file.to_str().expect("Invalid path"));
info!("Thumbnail directory: {:?}", thumbnail_dir);
debug!("Thumbnail directory: {:?}", thumbnail_dir);
// create all necessary directories if they don't exist
async_fs::create_dir_all(&thumbnail_dir)
@ -34,7 +34,7 @@ pub async fn init_thumbnail_dir(data_dir: PathBuf) -> Result<PathBuf, Thumbnaile
let mut current_version = match version_manager.get_version() {
Ok(version) => version,
Err(_) => {
info!("Thumbnail version file does not exist, starting fresh");
debug!("Thumbnail version file does not exist, starting fresh");
// Version file does not exist, start fresh
version_manager.set_version(ThumbnailVersion::V1)?;
ThumbnailVersion::V1
@ -94,7 +94,7 @@ async fn move_webp_files(dir: &PathBuf) -> Result<(), ThumbnailerError> {
}
}
}
info!(
trace!(
"Moved {} webp files to their respective shard folders.",
count
);

View file

@ -23,7 +23,7 @@ use once_cell::sync::Lazy;
use serde::{Deserialize, Serialize};
use thiserror::Error;
use tokio::{fs, io, task::block_in_place};
use tracing::{error, info, trace, warn};
use tracing::{error, trace, warn};
use webp::Encoder;
mod directory;
@ -226,14 +226,14 @@ pub async fn inner_process_step(
match fs::metadata(&output_path).await {
Ok(_) => {
info!(
trace!(
"Thumb already exists, skipping generation for {}",
output_path.display()
);
return Ok(false);
}
Err(e) if e.kind() == io::ErrorKind::NotFound => {
info!("Writing {:?} to {:?}", path, output_path);
trace!("Writing {:?} to {:?}", path, output_path);
match kind {
ThumbnailerJobStepKind::Image => {
@ -249,7 +249,7 @@ pub async fn inner_process_step(
}
}
info!("Emitting new thumbnail event");
trace!("Emitting new thumbnail event");
library.emit(CoreEvent::NewThumbnail {
thumb_key: get_thumb_key(cas_id),
});

View file

@ -17,7 +17,7 @@ use sd_file_ext::extensions::Extension;
use std::path::{Path, PathBuf};
use thumbnail::init_thumbnail_dir;
use tokio::fs;
use tracing::info;
use tracing::{debug, trace};
#[cfg(feature = "ffmpeg")]
use super::FILTERED_VIDEO_EXTENSIONS;
@ -66,7 +66,7 @@ pub async fn shallow_thumbnailer(
)
};
info!(
debug!(
"Searching for images in location {location_id} at path {}",
path.display()
);
@ -86,7 +86,7 @@ pub async fn shallow_thumbnailer(
)
.await?;
info!("Found {:?} image files", image_files.len());
trace!("Found {:?} image files", image_files.len());
#[cfg(feature = "ffmpeg")]
let video_files = {
@ -100,7 +100,7 @@ pub async fn shallow_thumbnailer(
)
.await?;
info!("Found {:?} video files", video_files.len());
trace!("Found {:?} video files", video_files.len());
video_files
};

View file

@ -23,7 +23,8 @@ use sd_file_ext::extensions::Extension;
use serde::{Deserialize, Serialize};
use tracing::info;
use serde_json::json;
use tracing::{debug, info, trace};
use super::{
inner_process_step, ThumbnailerError, ThumbnailerJobStep, ThumbnailerJobStepKind,
@ -120,7 +121,7 @@ impl StatefulJob for ThumbnailerJobInit {
),
};
info!("Searching for images in location {location_id} at directory {iso_file_path}");
debug!("Searching for images in location {location_id} at directory {iso_file_path}");
// query database for all image files in this location that need thumbnails
let image_files = get_files_by_extensions(
@ -130,7 +131,7 @@ impl StatefulJob for ThumbnailerJobInit {
ThumbnailerJobStepKind::Image,
)
.await?;
info!("Found {:?} image files", image_files.len());
trace!("Found {:?} image files", image_files.len());
#[cfg(feature = "ffmpeg")]
let all_files = {
@ -142,7 +143,7 @@ impl StatefulJob for ThumbnailerJobInit {
ThumbnailerJobStepKind::Video,
)
.await?;
info!("Found {:?} video files", video_files.len());
trace!("Found {:?} video files", video_files.len());
image_files
.into_iter()
@ -228,7 +229,7 @@ impl StatefulJob for ThumbnailerJobInit {
invalidate_query!(ctx.library, "search.paths");
}
Ok(Some(serde_json::to_value(run_metadata)?))
Ok(Some(json!({"init: ": init, "run_metadata": run_metadata})))
}
}

View file

@ -191,6 +191,6 @@ impl StatefulJob for ObjectValidatorJobInit {
data.task_count
);
Ok(Some(serde_json::to_value(init)?))
Ok(Some(json!({ "init": init })))
}
}

View file

@ -213,6 +213,7 @@ export default () => {
id: 'dateAccessed',
header: 'Date Accessed',
accessorFn: (file) =>
getItemObject(file)?.date_accessed &&
dayjs(getItemObject(file)?.date_accessed).format('MMM Do YYYY')
},
{

View file

@ -88,16 +88,15 @@ function Job({ job, className, isChild }: JobProps) {
// textItems={[[{ text: job.status }, { text: job.id, }]]}
isChild={isChild}
>
{isRunning ||
(isPaused && (
<div className="my-1 ml-1.5 w-[335px]">
<ProgressBar
pending={task_count == 0}
value={completed_task_count}
total={task_count}
/>
</div>
))}
{(isRunning || isPaused) && (
<div className="my-1 ml-1.5 w-[335px]">
<ProgressBar
pending={task_count == 0}
value={completed_task_count}
total={task_count}
/>
</div>
)}
</JobContainer>
);
}

View file

@ -103,12 +103,10 @@ function JobGroup({ data: { jobs, ...data }, clearJob }: JobGroupProps) {
</Fragment>
)}
{/* TODO: FIX THIS, why is this not working? */}
{!isJobsRunning && (
<Button
className="hidden cursor-pointer"
onClick={() => clearJob?.(data.id as string)}
className="cursor-pointer"
// onClick={() => clearJob?.(data.id as string)}
size="icon"
variant="outline"
>
@ -197,7 +195,7 @@ function totalTasks(jobs: JobReport[]) {
}
function niceActionName(action: string, completed: boolean, job?: JobReport) {
const name = job?.metadata?.init?.location?.name || 'Unknown';
const name = job?.metadata?.location?.name || 'Unknown';
switch (action) {
case 'scan_location':
return completed ? `Added location "${name}"` : `Adding location "${name}"`;

View file

@ -15,16 +15,16 @@ export default function useJobInfo(
const isRunning = job.status === 'Running',
isQueued = job.status === 'Queued',
isPaused = job.status === 'Paused',
indexedPath = job.metadata?.data?.indexed_path,
indexedPath = job.metadata?.data?.location.path,
taskCount = realtimeUpdate?.task_count || job.task_count,
completedTaskCount = realtimeUpdate?.completed_task_count || job.completed_task_count,
meta = job.metadata;
meta = job.metadata,
output = meta?.output?.run_metadata;
return {
indexer: {
name: `${isQueued ? 'Index' : isRunning ? 'Indexing' : 'Indexed'} files ${
indexedPath ? `at ${indexedPath}` : ``
}`,
name: `${isQueued ? 'Index' : isRunning ? 'Indexing' : 'Indexed'} files ${indexedPath ? `at ${indexedPath}` : ``
}`,
icon: Folder,
textItems: [
[
@ -32,11 +32,11 @@ export default function useJobInfo(
text: isPaused
? job.message
: isRunning && realtimeUpdate?.message
? realtimeUpdate.message
: `${comma(meta?.data?.total_paths)} ${plural(
meta?.data?.total_paths,
? realtimeUpdate.message
: `${comma(output?.total_paths)} ${plural(
output?.total_paths,
'path'
)}`
)} discovered`
}
]
]
@ -48,17 +48,16 @@ export default function useJobInfo(
[
{
text:
meta?.thumbnails_created === 0
output?.thumbnails_created === 0
? 'None generated'
: `${
completedTaskCount
? comma(completedTaskCount || 0)
: comma(meta?.thumbnails_created)
} of ${taskCount} ${plural(taskCount, 'thumbnail')} generated`
: `${completedTaskCount
? comma(completedTaskCount || 0)
: comma(output?.thumbnails_created)
} of ${taskCount} ${plural(taskCount, 'thumbnail')} generated`
},
{
text:
meta?.thumbnails_skipped && `${meta?.thumbnails_skipped} already exist`
output?.thumbnails_skipped && `${output?.thumbnails_skipped} already exist`
}
]
]
@ -68,56 +67,52 @@ export default function useJobInfo(
icon: Fingerprint,
textItems: [
!isRunning
? meta?.total_orphan_paths === 0
? output?.total_orphan_paths === 0
? [{ text: 'No files changed' }]
: [
{
text: `${comma(meta?.total_orphan_paths)} ${plural(
meta?.total_orphan_paths,
'file'
)}`
},
{
text: `${comma(meta?.total_objects_created)} ${plural(
meta?.total_objects_created,
'Object'
)} created`
},
{
text: `${comma(meta?.total_objects_linked)} ${plural(
meta?.total_objects_linked,
'Object'
)} linked`
}
]
{
text: `${comma(output?.total_orphan_paths)} ${plural(
output?.total_orphan_paths,
'file'
)}`
},
{
text: `${comma(output?.total_objects_created)} ${plural(
output?.total_objects_created,
'Object'
)} created`
},
{
text: `${comma(output?.total_objects_linked)} ${plural(
output?.total_objects_linked,
'Object'
)} linked`
}
]
: [{ text: realtimeUpdate?.message }]
]
},
file_copier: {
name: `${isQueued ? 'Copy' : isRunning ? 'Copying' : 'Copied'} ${
isRunning ? completedTaskCount + 1 : completedTaskCount
} ${isRunning ? `of ${job.task_count}` : ``} ${plural(job.task_count, 'file')}`,
name: `${isQueued ? 'Copy' : isRunning ? 'Copying' : 'Copied'} ${isRunning ? completedTaskCount + 1 : completedTaskCount
} ${isRunning ? `of ${job.task_count}` : ``} ${plural(job.task_count, 'file')}`,
icon: Copy,
textItems: [[{ text: job.status }]]
},
file_deleter: {
name: `${
isQueued ? 'Delete' : isRunning ? 'Deleting' : 'Deleted'
} ${completedTaskCount} ${plural(completedTaskCount, 'file')}`,
name: `${isQueued ? 'Delete' : isRunning ? 'Deleting' : 'Deleted'
} ${completedTaskCount} ${plural(completedTaskCount, 'file')}`,
icon: Trash,
textItems: [[{ text: job.status }]]
},
file_cutter: {
name: `${
isQueued ? 'Cut' : isRunning ? 'Cutting' : 'Cut'
} ${completedTaskCount} ${plural(completedTaskCount, 'file')}`,
name: `${isQueued ? 'Cut' : isRunning ? 'Cutting' : 'Cut'
} ${completedTaskCount} ${plural(completedTaskCount, 'file')}`,
icon: Scissors,
textItems: [[{ text: job.status }]]
},
object_validator: {
name: `${isQueued ? 'Validate' : isRunning ? 'Validating' : 'Validated'} ${
!isQueued ? completedTaskCount : ''
} ${plural(completedTaskCount, 'object')}`,
name: `${isQueued ? 'Validate' : isRunning ? 'Validating' : 'Validated'} ${!isQueued ? completedTaskCount : ''
} ${plural(completedTaskCount, 'object')}`,
icon: Fingerprint,
textItems: [[{ text: job.status }]]
}