mirror of
https://github.com/spacedriveapp/spacedrive
synced 2024-07-02 08:53:32 +00:00
[ENG-823] Fix some stuff (#1024)
* SQLite journal bug * Remove "job.report" indexes thing * resume button - job not found? * panic hook with logging * it did * large pain * fix `check_nested_location` --------- Co-authored-by: Utku Bakir <74243531+utkubakir@users.noreply.github.com>
This commit is contained in:
parent
0dae7f7a63
commit
f86b6c8b52
9
Cargo.lock
generated
9
Cargo.lock
generated
|
@ -5848,7 +5848,7 @@ dependencies = [
|
|||
[[package]]
|
||||
name = "prisma-client-rust"
|
||||
version = "0.6.8"
|
||||
source = "git+https://github.com/Brendonovich/prisma-client-rust?rev=1c1a7fb7b436a01ee7763e7f75cfa8f25a5c10e2#1c1a7fb7b436a01ee7763e7f75cfa8f25a5c10e2"
|
||||
source = "git+https://github.com/Brendonovich/prisma-client-rust?rev=2e9ca1da3c94a14ed65eb5ef534a164bb4802a3f#2e9ca1da3c94a14ed65eb5ef534a164bb4802a3f"
|
||||
dependencies = [
|
||||
"base64 0.13.1",
|
||||
"bigdecimal",
|
||||
|
@ -5881,7 +5881,7 @@ dependencies = [
|
|||
[[package]]
|
||||
name = "prisma-client-rust-cli"
|
||||
version = "0.6.8"
|
||||
source = "git+https://github.com/Brendonovich/prisma-client-rust?rev=1c1a7fb7b436a01ee7763e7f75cfa8f25a5c10e2#1c1a7fb7b436a01ee7763e7f75cfa8f25a5c10e2"
|
||||
source = "git+https://github.com/Brendonovich/prisma-client-rust?rev=2e9ca1da3c94a14ed65eb5ef534a164bb4802a3f#2e9ca1da3c94a14ed65eb5ef534a164bb4802a3f"
|
||||
dependencies = [
|
||||
"directories",
|
||||
"flate2",
|
||||
|
@ -5901,7 +5901,7 @@ dependencies = [
|
|||
[[package]]
|
||||
name = "prisma-client-rust-macros"
|
||||
version = "0.6.8"
|
||||
source = "git+https://github.com/Brendonovich/prisma-client-rust?rev=1c1a7fb7b436a01ee7763e7f75cfa8f25a5c10e2#1c1a7fb7b436a01ee7763e7f75cfa8f25a5c10e2"
|
||||
source = "git+https://github.com/Brendonovich/prisma-client-rust?rev=2e9ca1da3c94a14ed65eb5ef534a164bb4802a3f#2e9ca1da3c94a14ed65eb5ef534a164bb4802a3f"
|
||||
dependencies = [
|
||||
"convert_case 0.6.0",
|
||||
"proc-macro2",
|
||||
|
@ -5913,7 +5913,7 @@ dependencies = [
|
|||
[[package]]
|
||||
name = "prisma-client-rust-sdk"
|
||||
version = "0.6.8"
|
||||
source = "git+https://github.com/Brendonovich/prisma-client-rust?rev=1c1a7fb7b436a01ee7763e7f75cfa8f25a5c10e2#1c1a7fb7b436a01ee7763e7f75cfa8f25a5c10e2"
|
||||
source = "git+https://github.com/Brendonovich/prisma-client-rust?rev=2e9ca1da3c94a14ed65eb5ef534a164bb4802a3f#2e9ca1da3c94a14ed65eb5ef534a164bb4802a3f"
|
||||
dependencies = [
|
||||
"convert_case 0.5.0",
|
||||
"dmmf",
|
||||
|
@ -7816,6 +7816,7 @@ name = "specta"
|
|||
version = "1.0.4"
|
||||
source = "git+https://github.com/oscartbeaumont/specta?rev=2fc97ec8178ba27da1c80c0faaf43cb0db95955f#2fc97ec8178ba27da1c80c0faaf43cb0db95955f"
|
||||
dependencies = [
|
||||
"bigdecimal",
|
||||
"chrono",
|
||||
"document-features",
|
||||
"indexmap",
|
||||
|
|
|
@ -18,19 +18,19 @@ edition = "2021"
|
|||
repository = "https://github.com/spacedriveapp/spacedrive"
|
||||
|
||||
[workspace.dependencies]
|
||||
prisma-client-rust = { git = "https://github.com/Brendonovich/prisma-client-rust", rev = "1c1a7fb7b436a01ee7763e7f75cfa8f25a5c10e2", features = [
|
||||
prisma-client-rust = { git = "https://github.com/Brendonovich/prisma-client-rust", rev = "2e9ca1da3c94a14ed65eb5ef534a164bb4802a3f", features = [
|
||||
"rspc",
|
||||
"sqlite-create-many",
|
||||
"migrations",
|
||||
"sqlite",
|
||||
], default-features = false }
|
||||
prisma-client-rust-cli = { git = "https://github.com/Brendonovich/prisma-client-rust", rev = "1c1a7fb7b436a01ee7763e7f75cfa8f25a5c10e2", features = [
|
||||
prisma-client-rust-cli = { git = "https://github.com/Brendonovich/prisma-client-rust", rev = "2e9ca1da3c94a14ed65eb5ef534a164bb4802a3f", features = [
|
||||
"rspc",
|
||||
"sqlite-create-many",
|
||||
"migrations",
|
||||
"sqlite",
|
||||
], default-features = false }
|
||||
prisma-client-rust-sdk = { git = "https://github.com/Brendonovich/prisma-client-rust", rev = "1c1a7fb7b436a01ee7763e7f75cfa8f25a5c10e2", features = [
|
||||
prisma-client-rust-sdk = { git = "https://github.com/Brendonovich/prisma-client-rust", rev = "2e9ca1da3c94a14ed65eb5ef534a164bb4802a3f", features = [
|
||||
"sqlite",
|
||||
], default-features = false }
|
||||
|
||||
|
|
|
@ -77,11 +77,7 @@ pub(crate) fn mount() -> AlphaRouter<Ctx> {
|
|||
created_at: DateTime<Utc>,
|
||||
jobs: VecDeque<JobReport>,
|
||||
}
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, Type)]
|
||||
pub struct JobGroups {
|
||||
groups: Vec<JobGroup>,
|
||||
index: HashMap<Uuid, i32>, // maps job ids to their group index
|
||||
}
|
||||
|
||||
R.with2(library())
|
||||
.query(|(ctx, library), _: ()| async move {
|
||||
let mut groups: HashMap<String, JobGroup> = HashMap::new();
|
||||
|
@ -160,18 +156,7 @@ pub(crate) fn mount() -> AlphaRouter<Ctx> {
|
|||
let mut groups_vec = groups.into_values().collect::<Vec<_>>();
|
||||
groups_vec.sort_by(|a, b| b.created_at.cmp(&a.created_at));
|
||||
|
||||
// Update the index after sorting the groups
|
||||
let mut index: HashMap<Uuid, i32> = HashMap::new();
|
||||
for (i, group) in groups_vec.iter().enumerate() {
|
||||
for job in &group.jobs {
|
||||
index.insert(job.id, i as i32);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(JobGroups {
|
||||
groups: groups_vec,
|
||||
index,
|
||||
})
|
||||
Ok(groups_vec)
|
||||
})
|
||||
})
|
||||
.procedure("isActive", {
|
||||
|
|
|
@ -118,7 +118,7 @@ impl JobManager {
|
|||
if running_workers.len() < MAX_WORKERS {
|
||||
info!("Running job: {:?}", job.name());
|
||||
|
||||
let worker_id = job_report.parent_id.unwrap_or(job_report.id);
|
||||
let worker_id = job_report.id;
|
||||
|
||||
Worker::new(worker_id, job, job_report, library.clone(), self.clone())
|
||||
.await
|
||||
|
|
|
@ -614,7 +614,7 @@ impl<SJob: StatefulJob> DynJob for Job<SJob> {
|
|||
let inner_step = Arc::clone(&step_arc);
|
||||
let inner_stateful_job = Arc::clone(&stateful_job);
|
||||
|
||||
let step_time = Instant::now();
|
||||
let _step_time = Instant::now();
|
||||
|
||||
let mut job_step_handle = tokio::spawn(async move {
|
||||
inner_stateful_job
|
||||
|
@ -773,10 +773,10 @@ impl<SJob: StatefulJob> DynJob for Job<SJob> {
|
|||
|
||||
// Here we actually run the job, step by step
|
||||
step_result = &mut job_step_handle => {
|
||||
debug!(
|
||||
"Step finished in {:?} Job <id='{job_id}', name='{job_name}'>",
|
||||
step_time.elapsed(),
|
||||
);
|
||||
// debug!(
|
||||
// "Step finished in {:?} Job <id='{job_id}', name='{job_name}'>",
|
||||
// _step_time.elapsed(),
|
||||
// );
|
||||
|
||||
run_metadata = Arc::try_unwrap(run_metadata_arc)
|
||||
.expect("step already ran, no more refs");
|
||||
|
@ -809,6 +809,9 @@ impl<SJob: StatefulJob> DynJob for Job<SJob> {
|
|||
|
||||
if !new_errors.is_empty() {
|
||||
warn!("Job<id='{job_id}', name='{job_name}'> had a step with errors");
|
||||
for err in &new_errors {
|
||||
warn!("Job<id='{job_id}', name='{job_name}'> error: {:?}", err);
|
||||
}
|
||||
errors.extend(new_errors);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -187,6 +187,12 @@ impl Node {
|
|||
})
|
||||
.ok();
|
||||
|
||||
let prev_hook = std::panic::take_hook();
|
||||
std::panic::set_hook(Box::new(move |panic_info| {
|
||||
error!("{}", panic_info);
|
||||
prev_hook(panic_info);
|
||||
}));
|
||||
|
||||
guard
|
||||
}
|
||||
|
||||
|
|
|
@ -129,15 +129,15 @@ impl LibraryManager {
|
|||
.map_err(|e| FileIOError::from((&libraries_dir, e)))?
|
||||
{
|
||||
let config_path = entry.path();
|
||||
let metadata = entry
|
||||
.metadata()
|
||||
.await
|
||||
.map_err(|e| FileIOError::from((&config_path, e)))?;
|
||||
if metadata.is_file()
|
||||
&& config_path
|
||||
.extension()
|
||||
.map(|ext| ext == "sdlibrary")
|
||||
.unwrap_or(false)
|
||||
if config_path
|
||||
.extension()
|
||||
.map(|ext| ext == "sdlibrary")
|
||||
.unwrap_or(false)
|
||||
&& entry
|
||||
.metadata()
|
||||
.await
|
||||
.map_err(|e| FileIOError::from((&config_path, e)))?
|
||||
.is_file()
|
||||
{
|
||||
let Some(Ok(library_id)) = config_path
|
||||
.file_stem()
|
||||
|
|
|
@ -21,14 +21,14 @@ use super::{
|
|||
|
||||
static FORBIDDEN_FILE_NAMES: OnceLock<RegexSet> = OnceLock::new();
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Hash, Eq, PartialEq)]
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, Hash, Eq, PartialEq)]
|
||||
#[non_exhaustive]
|
||||
pub struct IsolatedFilePathData<'a> {
|
||||
pub(in crate::location) location_id: location::id::Type,
|
||||
pub(in crate::location) materialized_path: Cow<'a, str>,
|
||||
pub materialized_path: Cow<'a, str>,
|
||||
pub(in crate::location) is_dir: bool,
|
||||
pub(in crate::location) name: Cow<'a, str>,
|
||||
pub(in crate::location) extension: Cow<'a, str>,
|
||||
pub name: Cow<'a, str>,
|
||||
pub extension: Cow<'a, str>,
|
||||
relative_path: Cow<'a, str>,
|
||||
}
|
||||
|
||||
|
|
|
@ -23,7 +23,6 @@ pub use isolated_file_path_data::{
|
|||
};
|
||||
|
||||
// File Path selectables!
|
||||
file_path::select!(file_path_just_pub_id { pub_id });
|
||||
file_path::select!(file_path_just_pub_id_materialized_path {
|
||||
pub_id
|
||||
materialized_path
|
||||
|
@ -37,6 +36,12 @@ file_path::select!(file_path_for_file_identifier {
|
|||
name
|
||||
extension
|
||||
});
|
||||
file_path::select!(file_path_for_indexer {
|
||||
pub_id
|
||||
materialized_path
|
||||
name
|
||||
extension
|
||||
});
|
||||
file_path::select!(file_path_for_object_validator {
|
||||
pub_id
|
||||
materialized_path
|
||||
|
|
|
@ -185,18 +185,16 @@ impl StatefulJob for IndexerJob {
|
|||
to_walk,
|
||||
to_remove,
|
||||
errors,
|
||||
} = {
|
||||
walk(
|
||||
&to_walk_path,
|
||||
&indexer_rules,
|
||||
update_notifier_fn(ctx),
|
||||
file_paths_db_fetcher_fn!(&db),
|
||||
to_remove_db_fetcher_fn!(location_id, location_path, &db),
|
||||
iso_file_path_factory(location_id, location_path),
|
||||
50_000,
|
||||
)
|
||||
.await?
|
||||
};
|
||||
} = walk(
|
||||
&to_walk_path,
|
||||
&indexer_rules,
|
||||
update_notifier_fn(ctx),
|
||||
file_paths_db_fetcher_fn!(&db),
|
||||
to_remove_db_fetcher_fn!(location_id, location_path, &db),
|
||||
iso_file_path_factory(location_id, location_path),
|
||||
50_000,
|
||||
)
|
||||
.await?;
|
||||
let scan_read_time = scan_start.elapsed();
|
||||
|
||||
let db_delete_start = Instant::now();
|
||||
|
@ -306,17 +304,15 @@ impl StatefulJob for IndexerJob {
|
|||
to_walk,
|
||||
to_remove,
|
||||
errors,
|
||||
} = {
|
||||
keep_walking(
|
||||
to_walk_entry,
|
||||
&data.indexer_rules,
|
||||
update_notifier_fn(ctx),
|
||||
file_paths_db_fetcher_fn!(&db),
|
||||
to_remove_db_fetcher_fn!(location_id, location_path, &db),
|
||||
iso_file_path_factory(location_id, location_path),
|
||||
)
|
||||
.await?
|
||||
};
|
||||
} = keep_walking(
|
||||
to_walk_entry,
|
||||
&data.indexer_rules,
|
||||
update_notifier_fn(ctx),
|
||||
file_paths_db_fetcher_fn!(&db),
|
||||
to_remove_db_fetcher_fn!(location_id, location_path, &db),
|
||||
iso_file_path_factory(location_id, location_path),
|
||||
)
|
||||
.await?;
|
||||
|
||||
new_metadata.scan_read_time = scan_start.elapsed();
|
||||
|
||||
|
|
|
@ -16,7 +16,7 @@ use thiserror::Error;
|
|||
use tracing::info;
|
||||
|
||||
use super::{
|
||||
file_path_helper::{file_path_just_pub_id, FilePathError, IsolatedFilePathData},
|
||||
file_path_helper::{file_path_for_indexer, FilePathError, IsolatedFilePathData},
|
||||
location_with_indexer_rules,
|
||||
};
|
||||
|
||||
|
@ -186,7 +186,7 @@ fn iso_file_path_factory(
|
|||
}
|
||||
|
||||
async fn remove_non_existing_file_paths(
|
||||
to_remove: impl IntoIterator<Item = file_path_just_pub_id::Data>,
|
||||
to_remove: impl IntoIterator<Item = file_path_for_indexer::Data>,
|
||||
db: &PrismaClient,
|
||||
) -> Result<u64, IndexerError> {
|
||||
db.file_path()
|
||||
|
@ -223,25 +223,39 @@ macro_rules! file_paths_db_fetcher_fn {
|
|||
#[macro_export]
|
||||
macro_rules! to_remove_db_fetcher_fn {
|
||||
($location_id:expr, $location_path:expr, $db:expr) => {{
|
||||
|iso_file_path, unique_location_id_materialized_path_name_extension_params| async {
|
||||
|iso_file_path| async {
|
||||
let iso_file_path: $crate::location::file_path_helper::IsolatedFilePathData<'static> =
|
||||
iso_file_path;
|
||||
$db.file_path()
|
||||
.find_many(vec![
|
||||
$crate::prisma::file_path::location_id::equals(Some($location_id)),
|
||||
$crate::prisma::file_path::materialized_path::equals(Some(
|
||||
iso_file_path
|
||||
.materialized_path_for_children()
|
||||
.expect("the received isolated file path must be from a directory"),
|
||||
)),
|
||||
::prisma_client_rust::operator::not(
|
||||
unique_location_id_materialized_path_name_extension_params,
|
||||
),
|
||||
])
|
||||
.select($crate::location::file_path_helper::file_path_just_pub_id::select())
|
||||
.exec()
|
||||
.await
|
||||
.map_err(Into::into)
|
||||
|
||||
let mut data = Vec::new(); // one stupid large vec
|
||||
loop {
|
||||
let r = $db
|
||||
.file_path()
|
||||
.find_many(vec![
|
||||
$crate::prisma::file_path::location_id::equals(Some($location_id)),
|
||||
$crate::prisma::file_path::materialized_path::equals(Some(
|
||||
iso_file_path
|
||||
.materialized_path_for_children()
|
||||
.expect("the received isolated file path must be from a directory"),
|
||||
)),
|
||||
])
|
||||
.take(100)
|
||||
.select($crate::location::file_path_helper::file_path_for_indexer::select())
|
||||
.exec()
|
||||
.await;
|
||||
|
||||
match r {
|
||||
Ok(mut v) => {
|
||||
data.append(&mut v);
|
||||
if v.len() != 100 {
|
||||
break Ok(data);
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
break Err(err.into());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}};
|
||||
}
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
use crate::{
|
||||
location::file_path_helper::{
|
||||
file_path_just_pub_id, file_path_to_isolate, FilePathMetadata, IsolatedFilePathData,
|
||||
file_path_for_indexer, file_path_to_isolate, FilePathMetadata, IsolatedFilePathData,
|
||||
MetadataExt,
|
||||
},
|
||||
prisma::file_path,
|
||||
|
@ -14,13 +14,12 @@ use crate::location::file_path_helper::get_inode_and_device;
|
|||
use crate::location::file_path_helper::get_inode_and_device_from_path;
|
||||
|
||||
use std::{
|
||||
collections::{HashSet, VecDeque},
|
||||
collections::{HashMap, HashSet, VecDeque},
|
||||
future::Future,
|
||||
hash::{Hash, Hasher},
|
||||
path::{Path, PathBuf},
|
||||
};
|
||||
|
||||
use prisma_client_rust::operator;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tokio::fs;
|
||||
use tracing::trace;
|
||||
|
@ -72,7 +71,7 @@ impl Hash for WalkingEntry {
|
|||
pub struct WalkResult<Walked, ToRemove>
|
||||
where
|
||||
Walked: Iterator<Item = WalkedEntry>,
|
||||
ToRemove: Iterator<Item = file_path_just_pub_id::Data>,
|
||||
ToRemove: Iterator<Item = file_path_for_indexer::Data>,
|
||||
{
|
||||
pub walked: Walked,
|
||||
pub to_walk: VecDeque<ToWalkEntry>,
|
||||
|
@ -88,22 +87,19 @@ pub(super) async fn walk<FilePathDBFetcherFut, ToRemoveDbFetcherFut>(
|
|||
indexer_rules: &[IndexerRule],
|
||||
mut update_notifier: impl FnMut(&Path, usize),
|
||||
file_paths_db_fetcher: impl Fn(Vec<file_path::WhereParam>) -> FilePathDBFetcherFut,
|
||||
to_remove_db_fetcher: impl Fn(
|
||||
IsolatedFilePathData<'static>,
|
||||
Vec<file_path::WhereParam>,
|
||||
) -> ToRemoveDbFetcherFut,
|
||||
to_remove_db_fetcher: impl Fn(IsolatedFilePathData<'static>) -> ToRemoveDbFetcherFut,
|
||||
iso_file_path_factory: impl Fn(&Path, bool) -> Result<IsolatedFilePathData<'static>, IndexerError>,
|
||||
limit: u64,
|
||||
) -> Result<
|
||||
WalkResult<
|
||||
impl Iterator<Item = WalkedEntry>,
|
||||
impl Iterator<Item = file_path_just_pub_id::Data>,
|
||||
impl Iterator<Item = file_path_for_indexer::Data>,
|
||||
>,
|
||||
IndexerError,
|
||||
>
|
||||
where
|
||||
FilePathDBFetcherFut: Future<Output = Result<Vec<file_path_to_isolate::Data>, IndexerError>>,
|
||||
ToRemoveDbFetcherFut: Future<Output = Result<Vec<file_path_just_pub_id::Data>, IndexerError>>,
|
||||
ToRemoveDbFetcherFut: Future<Output = Result<Vec<file_path_for_indexer::Data>, IndexerError>>,
|
||||
{
|
||||
let root = root.as_ref();
|
||||
|
||||
|
@ -153,21 +149,18 @@ pub(super) async fn keep_walking<FilePathDBFetcherFut, ToRemoveDbFetcherFut>(
|
|||
indexer_rules: &[IndexerRule],
|
||||
mut update_notifier: impl FnMut(&Path, usize),
|
||||
file_paths_db_fetcher: impl Fn(Vec<file_path::WhereParam>) -> FilePathDBFetcherFut,
|
||||
to_remove_db_fetcher: impl Fn(
|
||||
IsolatedFilePathData<'static>,
|
||||
Vec<file_path::WhereParam>,
|
||||
) -> ToRemoveDbFetcherFut,
|
||||
to_remove_db_fetcher: impl Fn(IsolatedFilePathData<'static>) -> ToRemoveDbFetcherFut,
|
||||
iso_file_path_factory: impl Fn(&Path, bool) -> Result<IsolatedFilePathData<'static>, IndexerError>,
|
||||
) -> Result<
|
||||
WalkResult<
|
||||
impl Iterator<Item = WalkedEntry>,
|
||||
impl Iterator<Item = file_path_just_pub_id::Data>,
|
||||
impl Iterator<Item = file_path_for_indexer::Data>,
|
||||
>,
|
||||
IndexerError,
|
||||
>
|
||||
where
|
||||
FilePathDBFetcherFut: Future<Output = Result<Vec<file_path_to_isolate::Data>, IndexerError>>,
|
||||
ToRemoveDbFetcherFut: Future<Output = Result<Vec<file_path_just_pub_id::Data>, IndexerError>>,
|
||||
ToRemoveDbFetcherFut: Future<Output = Result<Vec<file_path_for_indexer::Data>, IndexerError>>,
|
||||
{
|
||||
let mut to_keep_walking = VecDeque::with_capacity(TO_WALK_QUEUE_INITIAL_CAPACITY);
|
||||
let mut indexed_paths = HashSet::with_capacity(WALK_SINGLE_DIR_PATHS_BUFFER_INITIAL_CAPACITY);
|
||||
|
@ -203,23 +196,20 @@ pub(super) async fn walk_single_dir<FilePathDBFetcherFut, ToRemoveDbFetcherFut>(
|
|||
indexer_rules: &[IndexerRule],
|
||||
mut update_notifier: impl FnMut(&Path, usize) + '_,
|
||||
file_paths_db_fetcher: impl Fn(Vec<file_path::WhereParam>) -> FilePathDBFetcherFut,
|
||||
to_remove_db_fetcher: impl Fn(
|
||||
IsolatedFilePathData<'static>,
|
||||
Vec<file_path::WhereParam>,
|
||||
) -> ToRemoveDbFetcherFut,
|
||||
to_remove_db_fetcher: impl Fn(IsolatedFilePathData<'static>) -> ToRemoveDbFetcherFut,
|
||||
iso_file_path_factory: impl Fn(&Path, bool) -> Result<IsolatedFilePathData<'static>, IndexerError>,
|
||||
add_root: bool,
|
||||
) -> Result<
|
||||
(
|
||||
impl Iterator<Item = WalkedEntry>,
|
||||
Vec<file_path_just_pub_id::Data>,
|
||||
Vec<file_path_for_indexer::Data>,
|
||||
Vec<IndexerError>,
|
||||
),
|
||||
IndexerError,
|
||||
>
|
||||
where
|
||||
FilePathDBFetcherFut: Future<Output = Result<Vec<file_path_to_isolate::Data>, IndexerError>>,
|
||||
ToRemoveDbFetcherFut: Future<Output = Result<Vec<file_path_just_pub_id::Data>, IndexerError>>,
|
||||
ToRemoveDbFetcherFut: Future<Output = Result<Vec<file_path_for_indexer::Data>, IndexerError>>,
|
||||
{
|
||||
let root = root.as_ref();
|
||||
|
||||
|
@ -291,6 +281,8 @@ where
|
|||
F: Future<Output = Result<Vec<file_path_to_isolate::Data>, IndexerError>>,
|
||||
{
|
||||
if !indexed_paths.is_empty() {
|
||||
// TODO: Converting paths like this into PCR params will break if we pass in too many paths.
|
||||
// TODO: I am not hitting errors here on my system so gonna leave this for now.
|
||||
file_paths_db_fetcher(
|
||||
indexed_paths
|
||||
.iter()
|
||||
|
@ -335,10 +327,7 @@ async fn inner_walk_single_dir<ToRemoveDbFetcherFut>(
|
|||
}: &ToWalkEntry,
|
||||
indexer_rules: &[IndexerRule],
|
||||
update_notifier: &mut impl FnMut(&Path, usize),
|
||||
to_remove_db_fetcher: &impl Fn(
|
||||
IsolatedFilePathData<'static>,
|
||||
Vec<file_path::WhereParam>,
|
||||
) -> ToRemoveDbFetcherFut,
|
||||
to_remove_db_fetcher: &impl Fn(IsolatedFilePathData<'static>) -> ToRemoveDbFetcherFut,
|
||||
iso_file_path_factory: &impl Fn(&Path, bool) -> Result<IsolatedFilePathData<'static>, IndexerError>,
|
||||
WorkingTable {
|
||||
indexed_paths,
|
||||
|
@ -346,9 +335,9 @@ async fn inner_walk_single_dir<ToRemoveDbFetcherFut>(
|
|||
mut maybe_to_walk,
|
||||
errors,
|
||||
}: WorkingTable<'_>,
|
||||
) -> Vec<file_path_just_pub_id::Data>
|
||||
) -> Vec<file_path_for_indexer::Data>
|
||||
where
|
||||
ToRemoveDbFetcherFut: Future<Output = Result<Vec<file_path_just_pub_id::Data>, IndexerError>>,
|
||||
ToRemoveDbFetcherFut: Future<Output = Result<Vec<file_path_for_indexer::Data>, IndexerError>>,
|
||||
{
|
||||
let Ok(iso_file_path_to_walk) = iso_file_path_factory(path, true).map_err(|e| errors.push(e))
|
||||
else {
|
||||
|
@ -367,8 +356,9 @@ where
|
|||
paths_buffer.clear();
|
||||
|
||||
let mut found_paths_counts = 0;
|
||||
let mut to_remove = HashMap::new();
|
||||
|
||||
// Marking with a loop label here in case of rejection or erros, to continue with next entry
|
||||
// Marking with a loop label here in case of rejection or errors, to continue with next entry
|
||||
'entries: loop {
|
||||
let entry = match read_dir.next_entry().await {
|
||||
Ok(Some(entry)) => entry,
|
||||
|
@ -477,9 +467,9 @@ where
|
|||
// If it wasn't accepted then we mark as rejected
|
||||
if accept_by_children_dir.is_none() {
|
||||
trace!(
|
||||
"Path {} rejected because it didn't passed in any AcceptIfChildrenDirectoriesArePresent rule",
|
||||
current_path.display()
|
||||
);
|
||||
"Path {} rejected because it didn't passed in any AcceptIfChildrenDirectoriesArePresent rule",
|
||||
current_path.display()
|
||||
);
|
||||
accept_by_children_dir = Some(false);
|
||||
}
|
||||
}
|
||||
|
@ -505,22 +495,50 @@ where
|
|||
continue 'entries;
|
||||
}
|
||||
|
||||
if accept_by_children_dir.is_none() || accept_by_children_dir.expect("<-- checked") {
|
||||
if accept_by_children_dir.unwrap_or(true) {
|
||||
// Fetch all files then as we index we remove them and anything left over must have been deleted from FS and can be removed from DB
|
||||
match to_remove_db_fetcher(iso_file_path_to_walk.clone()).await {
|
||||
Ok(v) => {
|
||||
for e in v {
|
||||
to_remove.insert(
|
||||
(
|
||||
e.materialized_path.clone().unwrap(),
|
||||
e.name.clone().unwrap(),
|
||||
e.extension.clone().unwrap(),
|
||||
),
|
||||
e,
|
||||
);
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
errors.push(e);
|
||||
}
|
||||
};
|
||||
|
||||
let Ok(iso_file_path) = iso_file_path_factory(¤t_path, is_dir)
|
||||
.map_err(|e| errors.push(e))
|
||||
else {
|
||||
continue 'entries;
|
||||
};
|
||||
paths_buffer.push(WalkingEntry {
|
||||
iso_file_path,
|
||||
maybe_metadata: Some(FilePathMetadata {
|
||||
inode,
|
||||
device,
|
||||
size_in_bytes: metadata.len(),
|
||||
created_at: metadata.created_or_now().into(),
|
||||
modified_at: metadata.modified_or_now().into(),
|
||||
}),
|
||||
});
|
||||
|
||||
{
|
||||
// Remove file path as we don't want it to be removed from the DB
|
||||
to_remove.remove(&(
|
||||
iso_file_path.materialized_path.to_string(),
|
||||
iso_file_path.name.to_string(),
|
||||
iso_file_path.extension.to_string(),
|
||||
));
|
||||
paths_buffer.push(WalkingEntry {
|
||||
iso_file_path,
|
||||
maybe_metadata: Some(FilePathMetadata {
|
||||
inode,
|
||||
device,
|
||||
size_in_bytes: metadata.len(),
|
||||
created_at: metadata.created_or_now().into(),
|
||||
modified_at: metadata.modified_or_now().into(),
|
||||
}),
|
||||
});
|
||||
}
|
||||
|
||||
// If the ancestors directories wasn't indexed before, now we do
|
||||
for ancestor in current_path
|
||||
|
@ -571,7 +589,21 @@ where
|
|||
modified_at: metadata.modified_or_now().into(),
|
||||
});
|
||||
|
||||
paths_buffer.push(ancestor_iso_walking_entry);
|
||||
{
|
||||
// Remove file path as we don't want it to be removed from the DB
|
||||
to_remove.remove(&(
|
||||
ancestor_iso_walking_entry
|
||||
.iso_file_path
|
||||
.materialized_path
|
||||
.to_string(),
|
||||
ancestor_iso_walking_entry.iso_file_path.name.to_string(),
|
||||
ancestor_iso_walking_entry
|
||||
.iso_file_path
|
||||
.extension
|
||||
.to_string(),
|
||||
));
|
||||
paths_buffer.push(ancestor_iso_walking_entry);
|
||||
}
|
||||
} else {
|
||||
// If indexed_paths contains the current ancestors, then it will contain
|
||||
// also all if its ancestors too, so we can stop here
|
||||
|
@ -581,30 +613,11 @@ where
|
|||
}
|
||||
}
|
||||
|
||||
// We continue the function even if we fail to fetch `file_path`s to remove,
|
||||
// the DB will have old `file_path`s but at least this is better than
|
||||
// don't adding the newly indexed paths
|
||||
let to_remove = to_remove_db_fetcher(
|
||||
iso_file_path_to_walk,
|
||||
vec![operator::or(
|
||||
paths_buffer
|
||||
.iter()
|
||||
.map(|entry| &entry.iso_file_path)
|
||||
.map(Into::into)
|
||||
.collect(),
|
||||
)],
|
||||
)
|
||||
.await
|
||||
.unwrap_or_else(|e| {
|
||||
errors.push(e);
|
||||
vec![]
|
||||
});
|
||||
|
||||
// Just merging the `found_paths` with `indexed_paths` here in the end to avoid possibly
|
||||
// multiple rehashes during function execution
|
||||
indexed_paths.extend(paths_buffer.drain(..));
|
||||
|
||||
to_remove
|
||||
to_remove.into_iter().map(|(_, v)| v).collect()
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
@ -738,7 +751,7 @@ mod tests {
|
|||
&[],
|
||||
|_, _| {},
|
||||
|_| async { Ok(vec![]) },
|
||||
|_, _| async { Ok(vec![]) },
|
||||
|_| async { Ok(vec![]) },
|
||||
|path, is_dir| {
|
||||
IsolatedFilePathData::new(0, root_path, path, is_dir).map_err(Into::into)
|
||||
},
|
||||
|
@ -802,7 +815,7 @@ mod tests {
|
|||
only_photos_rule,
|
||||
|_, _| {},
|
||||
|_| async { Ok(vec![]) },
|
||||
|_, _| async { Ok(vec![]) },
|
||||
|_| async { Ok(vec![]) },
|
||||
|path, is_dir| {
|
||||
IsolatedFilePathData::new(0, root_path, path, is_dir).map_err(Into::into)
|
||||
},
|
||||
|
@ -875,7 +888,7 @@ mod tests {
|
|||
git_repos,
|
||||
|_, _| {},
|
||||
|_| async { Ok(vec![]) },
|
||||
|_, _| async { Ok(vec![]) },
|
||||
|_| async { Ok(vec![]) },
|
||||
|path, is_dir| {
|
||||
IsolatedFilePathData::new(0, root_path, path, is_dir).map_err(Into::into)
|
||||
},
|
||||
|
@ -966,7 +979,7 @@ mod tests {
|
|||
git_repos_no_deps_no_build_dirs,
|
||||
|_, _| {},
|
||||
|_| async { Ok(vec![]) },
|
||||
|_, _| async { Ok(vec![]) },
|
||||
|_| async { Ok(vec![]) },
|
||||
|path, is_dir| {
|
||||
IsolatedFilePathData::new(0, root_path, path, is_dir).map_err(Into::into)
|
||||
},
|
||||
|
|
|
@ -756,7 +756,7 @@ async fn check_nested_location(
|
|||
) -> Result<bool, QueryError> {
|
||||
let location_path = location_path.as_ref();
|
||||
|
||||
let (parents_count, children_count) = db
|
||||
let (parents_count, potential_children) = db
|
||||
._batch((
|
||||
db.location().count(vec![location::path::in_vec(
|
||||
location_path
|
||||
|
@ -769,7 +769,7 @@ async fn check_nested_location(
|
|||
})
|
||||
.collect(),
|
||||
)]),
|
||||
db.location().count(vec![location::path::starts_with(
|
||||
db.location().find_many(vec![location::path::starts_with(
|
||||
location_path
|
||||
.to_str()
|
||||
.map(str::to_string)
|
||||
|
@ -778,5 +778,23 @@ async fn check_nested_location(
|
|||
))
|
||||
.await?;
|
||||
|
||||
Ok(parents_count > 0 || children_count > 0)
|
||||
let comps = location_path.components().collect::<Vec<_>>();
|
||||
let is_a_child_location = potential_children.into_iter().any(|v| {
|
||||
let comps2 = PathBuf::from(v.path.unwrap());
|
||||
let comps2 = comps2.components().collect::<Vec<_>>();
|
||||
|
||||
if comps.len() > comps2.len() {
|
||||
return false;
|
||||
}
|
||||
|
||||
for (a, b) in comps.iter().zip(comps2.iter()) {
|
||||
if a != b {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
true
|
||||
});
|
||||
|
||||
Ok(parents_count > 0 || is_a_child_location)
|
||||
}
|
||||
|
|
|
@ -83,7 +83,8 @@ impl FileMetadata {
|
|||
.await
|
||||
.map_err(|e| FileIOError::from((&path, e)))?;
|
||||
|
||||
info!("Analyzed file: {path:?} {cas_id:?} {kind:?}");
|
||||
#[cfg(debug_assertions)]
|
||||
tracing::debug!("Analyzed file: {path:?} {cas_id:?} {kind:?}");
|
||||
|
||||
Ok(FileMetadata {
|
||||
cas_id,
|
||||
|
@ -311,7 +312,8 @@ fn file_path_object_connect_ops<'db>(
|
|||
sync: &SyncManager,
|
||||
db: &'db PrismaClient,
|
||||
) -> (CRDTOperation, file_path::UpdateQuery<'db>) {
|
||||
info!("Connecting <FilePath id={file_path_id}> to <Object pub_id={object_id}'>");
|
||||
#[cfg(debug_assertions)]
|
||||
tracing::debug!("Connecting <FilePath id={file_path_id}> to <Object pub_id={object_id}'>");
|
||||
|
||||
let vec_id = object_id.as_bytes().to_vec();
|
||||
|
||||
|
|
|
@ -76,7 +76,7 @@ function JobGroup({ data: { jobs, ...data }, clearJob }: JobGroupProps) {
|
|||
|
||||
{isJobsRunning && (
|
||||
<Fragment>
|
||||
<Tooltip label="Pause (coming soon)">
|
||||
<Tooltip label="Pause">
|
||||
<Button
|
||||
className="cursor-pointer"
|
||||
onClick={() => {
|
||||
|
|
|
@ -1,13 +1,6 @@
|
|||
import { useQueryClient } from '@tanstack/react-query';
|
||||
import { Trash, X } from 'phosphor-react';
|
||||
import { useCallback, useEffect, useState } from 'react';
|
||||
import {
|
||||
JobGroups,
|
||||
JobReport,
|
||||
useLibraryMutation,
|
||||
useLibraryQuery,
|
||||
useLibrarySubscription
|
||||
} from '@sd/client';
|
||||
import { useLibraryMutation, useLibraryQuery } from '@sd/client';
|
||||
import { Button, PopoverClose, Tooltip } from '@sd/ui';
|
||||
import { showAlertDialog } from '~/components/AlertDialog';
|
||||
import IsRunningJob from './IsRunningJob';
|
||||
|
@ -64,7 +57,7 @@ export function JobsManager() {
|
|||
</PopoverClose>
|
||||
<div className="custom-scroll job-manager-scroll h-full overflow-x-hidden">
|
||||
<div className="h-full border-r border-app-line/50">
|
||||
{jobs?.groups?.map((group) => (
|
||||
{jobs?.map((group) => (
|
||||
<JobGroup
|
||||
key={group.id}
|
||||
data={group}
|
||||
|
@ -73,7 +66,7 @@ export function JobsManager() {
|
|||
}}
|
||||
/>
|
||||
))}
|
||||
{jobs?.groups?.length === 0 && (
|
||||
{jobs?.length === 0 && (
|
||||
<div className="flex h-32 items-center justify-center text-sidebar-inkDull">
|
||||
No jobs.
|
||||
</div>
|
||||
|
|
|
@ -8,7 +8,7 @@ export type Procedures = {
|
|||
{ key: "files.get", input: LibraryArgs<GetArgs>, result: { id: number; pub_id: number[]; kind: number | null; key_id: number | null; hidden: boolean | null; favorite: boolean | null; important: boolean | null; note: string | null; date_created: string | null; date_accessed: string | null; file_paths: FilePath[]; media_data: MediaData | null } | null } |
|
||||
{ key: "invalidation.test-invalidate", input: never, result: number } |
|
||||
{ key: "jobs.isActive", input: LibraryArgs<null>, result: boolean } |
|
||||
{ key: "jobs.reports", input: LibraryArgs<null>, result: JobGroups } |
|
||||
{ key: "jobs.reports", input: LibraryArgs<null>, result: JobGroup[] } |
|
||||
{ key: "library.list", input: never, result: LibraryConfigWrapped[] } |
|
||||
{ key: "library.statistics", input: LibraryArgs<null>, result: Statistics } |
|
||||
{ key: "locations.get", input: LibraryArgs<number>, result: Location | null } |
|
||||
|
@ -141,8 +141,6 @@ export type InvalidateOperationEvent = { key: string; arg: any; result: any | nu
|
|||
|
||||
export type JobGroup = { id: string; action: string | null; status: JobStatus; created_at: string; jobs: JobReport[] }
|
||||
|
||||
export type JobGroups = { groups: JobGroup[]; index: { [key: string]: number } }
|
||||
|
||||
export type JobProgressEvent = { id: string; task_count: number; completed_task_count: number; message: string; estimated_completion: string }
|
||||
|
||||
export type JobReport = { id: string; name: string; action: string | null; data: number[] | null; metadata: any | null; is_background: boolean; errors_text: string[]; created_at: string | null; started_at: string | null; completed_at: string | null; parent_id: string | null; status: JobStatus; task_count: number; completed_task_count: number; message: string; estimated_completion: string }
|
||||
|
|
Loading…
Reference in a new issue