[ENG-1724] Introduce scan state for location (#2302)

Done
This commit is contained in:
Ericson "Fogo" Soares 2024-04-10 13:19:59 -03:00 committed by GitHub
parent 606c007514
commit 67554c89b3
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
15 changed files with 205 additions and 68 deletions

View file

@ -260,7 +260,7 @@ impl MetadataForIndexerRules for Metadata {
}
impl RulePerKind {
#[deprecated]
#[deprecated = "Use `[apply_with_metadata]` instead"]
async fn apply(
&self,
source: impl AsRef<Path> + Send,
@ -328,7 +328,7 @@ pub struct IndexerRule {
}
impl IndexerRule {
#[deprecated]
#[deprecated = "Use `[apply_with_metadata]` instead"]
pub async fn apply(
&self,
source: impl AsRef<Path> + Send,
@ -362,7 +362,7 @@ impl IndexerRule {
inner(&self.rules, source.as_ref(), metadata).await
}
#[deprecated]
#[deprecated = "Use `[IndexerRuler::apply_all]` instead"]
pub async fn apply_all(
rules: &[Self],
source: impl AsRef<Path> + Send,
@ -472,7 +472,7 @@ fn reject_by_glob(source: impl AsRef<Path>, reject_glob_set: &GlobSet) -> bool {
!accept_by_glob(source.as_ref(), reject_glob_set)
}
#[deprecated]
#[deprecated = "Use `[accept_dir_for_its_children_with_metadata]` instead"]
async fn accept_dir_for_its_children(
source: impl AsRef<Path> + Send,
children: &HashSet<String>,
@ -558,7 +558,7 @@ async fn accept_dir_for_its_children_with_metadata(
Ok(false)
}
#[deprecated]
#[deprecated = "Use `[reject_dir_for_its_children_with_metadata]` instead"]
async fn reject_dir_for_its_children(
source: impl AsRef<Path> + Send,
children: &HashSet<String>,

View file

@ -185,6 +185,7 @@ impl From<location_with_indexer_rules::Data> for location::Data {
sync_preview_media: data.sync_preview_media,
hidden: data.hidden,
date_created: data.date_created,
scan_state: data.scan_state,
file_paths: None,
indexer_rules: None,
instance: None,
@ -208,6 +209,7 @@ impl From<&location_with_indexer_rules::Data> for location::Data {
sync_preview_media: data.sync_preview_media,
hidden: data.hidden,
date_created: data.date_created,
scan_state: data.scan_state,
file_paths: None,
indexer_rules: None,
instance: None,

View file

@ -0,0 +1,29 @@
-- AlterTable
ALTER TABLE "job" ADD COLUMN "critical_error" TEXT;
ALTER TABLE "job" ADD COLUMN "non_critical_errors" BLOB;
-- RedefineTables
PRAGMA foreign_keys=OFF;
CREATE TABLE "new_location" (
"id" INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
"pub_id" BLOB NOT NULL,
"name" TEXT,
"path" TEXT,
"total_capacity" INTEGER,
"available_capacity" INTEGER,
"size_in_bytes" BLOB,
"is_archived" BOOLEAN,
"generate_preview_media" BOOLEAN,
"sync_preview_media" BOOLEAN,
"hidden" BOOLEAN,
"date_created" DATETIME,
"scan_state" INTEGER NOT NULL DEFAULT 0,
"instance_id" INTEGER,
CONSTRAINT "location_instance_id_fkey" FOREIGN KEY ("instance_id") REFERENCES "instance" ("id") ON DELETE SET NULL ON UPDATE CASCADE
);
INSERT INTO "new_location" ("available_capacity", "date_created", "generate_preview_media", "hidden", "id", "instance_id", "is_archived", "name", "path", "pub_id", "size_in_bytes", "sync_preview_media", "total_capacity") SELECT "available_capacity", "date_created", "generate_preview_media", "hidden", "id", "instance_id", "is_archived", "name", "path", "pub_id", "size_in_bytes", "sync_preview_media", "total_capacity" FROM "location";
DROP TABLE "location";
ALTER TABLE "new_location" RENAME TO "location";
CREATE UNIQUE INDEX "location_pub_id_key" ON "location"("pub_id");
PRAGMA foreign_key_check;
PRAGMA foreign_keys=ON;

View file

@ -119,6 +119,8 @@ model Location {
hidden Boolean?
date_created DateTime?
scan_state Int @default(0) // Enum: sd_core::location::ScanState
/// @local
// this is just a client side cache which is annoying but oh well (@brendan)
instance_id Int?

View file

@ -1,7 +1,7 @@
use crate::{
invalidate_query,
library::{update_library_statistics, Library, LibraryConfig, LibraryName},
location::{scan_location, LocationCreateArgs},
location::{scan_location, LocationCreateArgs, ScanState},
util::MaybeUndefined,
Node,
};
@ -254,7 +254,9 @@ pub(crate) fn mount() -> AlphaRouter<Ctx> {
return Ok(());
};
scan_location(&node, &library, location)
let scan_state = ScanState::try_from(location.scan_state)?;
scan_location(&node, &library, location, scan_state)
.await
.map_err(rspc::Error::from)
}))

View file

@ -3,7 +3,7 @@ use crate::{
location::{
delete_location, find_location, indexer::OldIndexerJobInit, light_scan_location,
non_indexed::NonIndexedPathItem, relink_location, scan_location, scan_location_sub_path,
LocationCreateArgs, LocationError, LocationUpdateArgs,
LocationCreateArgs, LocationError, LocationUpdateArgs, ScanState,
},
object::old_file_identifier::old_file_identifier_job::OldFileIdentifierJobInit,
old_job::StatefulJob,
@ -311,7 +311,7 @@ pub(crate) fn mount() -> AlphaRouter<Ctx> {
.mutation(|(node, library), args: LocationCreateArgs| async move {
if let Some(location) = args.create(&node, &library).await? {
let id = Some(location.id);
scan_location(&node, &library, location).await?;
scan_location(&node, &library, location, ScanState::Pending).await?;
invalidate_query!(library, "locations.list");
Ok(id)
} else {
@ -349,7 +349,8 @@ pub(crate) fn mount() -> AlphaRouter<Ctx> {
.mutation(|(node, library), args: LocationCreateArgs| async move {
if let Some(location) = args.add_library(&node, &library).await? {
let id = location.id;
scan_location(&node, &library, location).await?;
let location_scan_state = ScanState::try_from(location.scan_state)?;
scan_location(&node, &library, location, location_scan_state).await?;
invalidate_query!(library, "locations.list");
Ok(Some(id))
} else {
@ -392,18 +393,18 @@ pub(crate) fn mount() -> AlphaRouter<Ctx> {
// library.orphan_remover.invoke().await;
}
let location = find_location(&library, location_id)
.include(location_with_indexer_rules::include())
.exec()
.await?
.ok_or(LocationError::IdNotFound(location_id))?;
let location_scan_state = ScanState::try_from(location.scan_state)?;
// rescan location
scan_location(
&node,
&library,
find_location(&library, location_id)
.include(location_with_indexer_rules::include())
.exec()
.await?
.ok_or(LocationError::IdNotFound(location_id))?,
)
.await
.map_err(Into::into)
scan_location(&node, &library, location, location_scan_state)
.await
.map_err(Into::into)
},
)
})

View file

@ -76,6 +76,8 @@ pub enum LocationError {
MissingPath(location::id::Type),
#[error("missing-field: {0}")]
MissingField(#[from] MissingFieldError),
#[error("invalid location scan state value: {0}")]
InvalidScanStateValue(i32),
}
impl From<LocationError> for rspc::Error {

View file

@ -1,7 +1,7 @@
use crate::{
file_paths_db_fetcher_fn, invalidate_query,
library::Library,
location::{location_with_indexer_rules, update_location_size},
location::{location_with_indexer_rules, update_location_size, ScanState},
old_job::{
CurrentStep, JobError, JobInitOutput, JobReportUpdate, JobResult, JobRunMetadata,
JobStepOutput, StatefulJob, WorkerContext,
@ -531,9 +531,37 @@ impl StatefulJob for OldIndexerJobInit {
update_location_size(init.location.id, &ctx.library)
.await
.map_err(IndexerError::from)?;
ctx.library
.db
.location()
.update(
location::id::equals(init.location.id),
vec![location::scan_state::set(ScanState::Indexed as i32)],
)
.exec()
.await
.map_err(IndexerError::from)?;
}
}
// FIXME(fogodev): This is currently a workaround to don't save paths and sizes in the
// metadata after a job is completed, as it's pretty heavy. A proper fix isn't needed
// right now as I already changed it in the new indexer job. And this old one
// will be removed eventually.
let run_metadata = Self::RunMetadata {
db_write_time: run_metadata.db_write_time,
scan_read_time: run_metadata.scan_read_time,
total_paths: run_metadata.total_paths,
total_updated_paths: run_metadata.total_updated_paths,
total_save_steps: run_metadata.total_save_steps,
total_update_steps: run_metadata.total_update_steps,
indexed_count: run_metadata.indexed_count,
updated_count: run_metadata.updated_count,
removed_count: run_metadata.removed_count,
paths_and_sizes: HashMap::new(),
};
Ok(Some(json!({"init: ": init, "run_metadata": run_metadata})))
}
}
@ -543,8 +571,8 @@ fn update_notifier_fn(ctx: &WorkerContext) -> impl FnMut(&Path, usize) + '_ {
OldIndexerJobData::on_scan_progress(
ctx,
vec![ScanProgress::Message(format!(
"Found: {total_entries} entries; Scanning: {:?}",
path.file_name().unwrap_or(path.as_os_str())
"{total_entries} entries found at {}",
path.display()
))],
);
}

View file

@ -81,7 +81,6 @@ pub async fn old_shallow(
walk_single_dir(
&to_walk_path,
&indexer_rules,
|_, _| {},
file_paths_db_fetcher_fn!(&db),
to_remove_db_fetcher_fn!(location_id, &db),
iso_file_path_factory(location_id, location_path),
@ -104,7 +103,6 @@ pub async fn old_shallow(
errors.into_iter().for_each(|e| error!("{e}"));
// TODO pass these uuids to sync system
remove_non_existing_file_paths(to_remove, &db, sync).await?;
let mut new_directories_to_scan = HashSet::new();

View file

@ -153,11 +153,12 @@ where
let mut to_remove = vec![];
while let Some(entry) = to_walk.pop_front() {
let last_indexed_count = indexed_paths.len();
let (entry_size, current_to_remove) = inner_walk_single_dir(
root,
&entry,
indexer_rules,
&mut update_notifier,
&to_remove_db_fetcher,
&iso_file_path_factory,
WorkingTable {
@ -170,6 +171,8 @@ where
.await;
to_remove.push(current_to_remove);
update_notifier(&entry.path, indexed_paths.len() - last_indexed_count);
// Saving the size of current entry
paths_and_sizes.insert(entry.path, entry_size);
@ -227,7 +230,6 @@ where
to_walk_entry.path.clone(),
to_walk_entry,
indexer_rules,
&mut update_notifier,
&to_remove_db_fetcher,
&iso_file_path_factory,
WorkingTable {
@ -239,6 +241,8 @@ where
)
.await;
update_notifier(&to_walk_entry.path, indexed_paths.len());
let (walked, to_update) = filter_existing_paths(indexed_paths, file_paths_db_fetcher).await?;
Ok(WalkResult {
@ -263,7 +267,6 @@ where
pub(super) async fn walk_single_dir<FilePathDBFetcherFut, ToRemoveDbFetcherFut>(
root: impl AsRef<Path>,
indexer_rules: &[IndexerRule],
mut update_notifier: impl FnMut(&Path, usize) + '_,
file_paths_db_fetcher: impl Fn(Vec<file_path::WhereParam>) -> FilePathDBFetcherFut,
to_remove_db_fetcher: impl Fn(
IsolatedFilePathData<'static>,
@ -312,7 +315,6 @@ where
maybe_parent: None,
},
indexer_rules,
&mut update_notifier,
&to_remove_db_fetcher,
&iso_file_path_factory,
WorkingTable {
@ -435,7 +437,6 @@ async fn inner_walk_single_dir<ToRemoveDbFetcherFut>(
..
}: &ToWalkEntry,
indexer_rules: &[IndexerRule],
update_notifier: &mut impl FnMut(&Path, usize),
to_remove_db_fetcher: impl Fn(
IsolatedFilePathData<'static>,
Vec<file_path::WhereParam>,
@ -469,8 +470,6 @@ where
// Just to make sure...
paths_buffer.clear();
let mut found_paths_counts = 0;
// Marking with a loop label here in case of rejection or errors, to continue with next entry
'entries: loop {
let entry = match read_dir.next_entry().await {
@ -491,16 +490,6 @@ where
let current_path = entry.path();
// Just sending updates if we found more paths since the last loop
let current_found_paths_count = paths_buffer.len();
if found_paths_counts != current_found_paths_count {
update_notifier(
&current_path,
indexed_paths.len() + current_found_paths_count,
);
found_paths_counts = current_found_paths_count;
}
trace!(
"Current filesystem path: {}, accept_by_children_dir: {:#?}",
current_path.display(),

View file

@ -35,7 +35,7 @@ use chrono::Utc;
use futures::future::TryFutureExt;
use normpath::PathExt;
use prisma_client_rust::{operator::and, or, QueryError};
use serde::Deserialize;
use serde::{Deserialize, Serialize};
use serde_json::json;
use specta::Type;
use tokio::{fs, io, time::Instant};
@ -55,6 +55,29 @@ use metadata::SpacedriveLocationMetadataFile;
pub type LocationPubId = Uuid;
#[repr(i32)]
#[derive(Debug, Clone, Copy, Serialize, Deserialize, Type, Eq, PartialEq)]
pub enum ScanState {
Pending = 0,
Indexed = 1,
FilesIdentified = 2,
Completed = 3,
}
impl TryFrom<i32> for ScanState {
type Error = LocationError;
fn try_from(value: i32) -> Result<Self, Self::Error> {
Ok(match value {
0 => Self::Pending,
1 => Self::Indexed,
2 => Self::FilesIdentified,
3 => Self::Completed,
_ => return Err(LocationError::InvalidScanStateValue(value)),
})
}
}
/// `LocationCreateArgs` is the argument received from the client using `rspc` to create a new location.
/// It has the actual path and a vector of indexer rules ids, to create many-to-many relationships
/// between the location and indexer rules.
@ -442,6 +465,7 @@ pub async fn scan_location(
node: &Arc<Node>,
library: &Arc<Library>,
location: location_with_indexer_rules::Data,
location_scan_state: ScanState,
) -> Result<(), JobManagerError> {
// TODO(N): This isn't gonna work with removable media and this will likely permanently break if the DB is restored from a backup.
if location.instance_id != Some(library.config().await.instance_id) {
@ -450,25 +474,61 @@ pub async fn scan_location(
let location_base_data = location::Data::from(&location);
JobBuilder::new(OldIndexerJobInit {
location,
sub_path: None,
})
.with_action("scan_location")
.with_metadata(json!({"location": location_base_data.clone()}))
.build()
.queue_next(OldFileIdentifierJobInit {
location: location_base_data.clone(),
sub_path: None,
})
.queue_next(OldMediaProcessorJobInit {
location: location_base_data,
sub_path: None,
regenerate_thumbnails: false,
regenerate_labels: false,
})
.spawn(node, library)
.await
debug!("Scanning location with state: {location_scan_state:?}");
match location_scan_state {
ScanState::Pending | ScanState::Completed => {
JobBuilder::new(OldIndexerJobInit {
location,
sub_path: None,
})
.with_action("scan_location")
.with_metadata(json!({"location": location_base_data.clone()}))
.build()
.queue_next(OldFileIdentifierJobInit {
location: location_base_data.clone(),
sub_path: None,
})
.queue_next(OldMediaProcessorJobInit {
location: location_base_data,
sub_path: None,
regenerate_thumbnails: false,
regenerate_labels: false,
})
.spawn(node, library)
.await
}
ScanState::Indexed => {
JobBuilder::new(OldFileIdentifierJobInit {
location: location_base_data.clone(),
sub_path: None,
})
.with_action("scan_location_already_indexed")
.with_metadata(json!({"location": location_base_data.clone()}))
.build()
.queue_next(OldMediaProcessorJobInit {
location: location_base_data,
sub_path: None,
regenerate_thumbnails: false,
regenerate_labels: false,
})
.spawn(node, library)
.await
}
ScanState::FilesIdentified => {
JobBuilder::new(OldMediaProcessorJobInit {
location: location_base_data.clone(),
sub_path: None,
regenerate_thumbnails: false,
regenerate_labels: false,
})
.with_action("scan_location_files_already_identified")
.with_metadata(json!({"location": location_base_data}))
.build()
.spawn(node, library)
.await
}
}
.map_err(Into::into)
}

View file

@ -1,6 +1,7 @@
use crate::{
invalidate_query,
library::Library,
location::ScanState,
old_job::{
CurrentStep, JobError, JobInitOutput, JobReportUpdate, JobResult, JobStepOutput,
StatefulJob, WorkerContext,
@ -420,6 +421,17 @@ impl StatefulJob for OldMediaProcessorJobInit {
invalidate_query!(ctx.library, "search.paths");
}
ctx.library
.db
.location()
.update(
location::id::equals(self.location.id),
vec![location::scan_state::set(ScanState::Completed as i32)],
)
.exec()
.await
.map_err(MediaProcessorError::from)?;
Ok(Some(json!({"init: ": self, "run_metadata": run_metadata})))
}
}

View file

@ -1,5 +1,6 @@
use crate::{
library::Library,
location::ScanState,
old_job::{
CurrentStep, JobError, JobInitOutput, JobReportUpdate, JobResult, JobRunMetadata,
JobStepOutput, StatefulJob, WorkerContext,
@ -239,13 +240,24 @@ impl StatefulJob for OldFileIdentifierJobInit {
async fn finalize(
&self,
_: &WorkerContext,
ctx: &WorkerContext,
_data: &Option<Self::Data>,
run_metadata: &Self::RunMetadata,
) -> JobResult {
let init = self;
info!("Finalizing identifier job: {:?}", &run_metadata);
ctx.library
.db
.location()
.update(
location::id::equals(init.location.id),
vec![location::scan_state::set(ScanState::FilesIdentified as i32)],
)
.exec()
.await
.map_err(FileIdentifierJobError::from)?;
Ok(Some(json!({"init: ": init, "run_metadata": run_metadata})))
}
}

View file

@ -1,10 +1,10 @@
// ! A system for loading a default set of data on startup. This is ONLY enabled in development builds.
use crate::{
library::Libraries,
library::{LibraryManagerError, LibraryName},
library::{Libraries, LibraryManagerError, LibraryName},
location::{
delete_location, scan_location, LocationCreateArgs, LocationError, LocationManagerError,
ScanState,
},
old_job::JobManagerError,
util::AbortOnDrop,
@ -178,7 +178,7 @@ impl InitConfig {
.create(node, &library)
.await?
{
scan_location(node, &library, location).await?;
scan_location(node, &library, location, ScanState::Pending).await?;
} else {
warn!(
"Debug init error: location '{}' was not found after being created!",

View file

@ -416,7 +416,7 @@ export type LightScanArgs = { location_id: number; sub_path: string }
export type Listener2 = { id: string; name: string; addrs: string[] }
export type Location = { id: number; pub_id: number[]; name: string | null; path: string | null; total_capacity: number | null; available_capacity: number | null; size_in_bytes: number[] | null; is_archived: boolean | null; generate_preview_media: boolean | null; sync_preview_media: boolean | null; hidden: boolean | null; date_created: string | null; instance_id: number | null }
export type Location = { id: number; pub_id: number[]; name: string | null; path: string | null; total_capacity: number | null; available_capacity: number | null; size_in_bytes: number[] | null; is_archived: boolean | null; generate_preview_media: boolean | null; sync_preview_media: boolean | null; hidden: boolean | null; date_created: string | null; scan_state: number; instance_id: number | null }
/**
* `LocationCreateArgs` is the argument received from the client using `rspc` to create a new location.