[ENG-1629] Write new file identifier with the task system (#2334)

* Introduce deep vs shallow for indexer tasks with different priorities

* Make job wait to dispatch if it's paused

* Extract file metadata task on file identifier job

* Some initial drafts on object processor task

* Object processor task for file identifier

* File Identifier job and shallow
This commit is contained in:
Ericson "Fogo" Soares 2024-04-25 01:06:11 -03:00 committed by GitHub
parent 463babe1d4
commit 73f521a3b8
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
25 changed files with 2171 additions and 165 deletions

3
Cargo.lock generated
View file

@ -8634,6 +8634,7 @@ version = "0.1.0"
dependencies = [
"async-channel",
"async-trait",
"blake3",
"chrono",
"futures",
"futures-concurrency",
@ -8648,6 +8649,7 @@ dependencies = [
"sd-core-indexer-rules",
"sd-core-prisma-helpers",
"sd-core-sync",
"sd-file-ext",
"sd-prisma",
"sd-sync",
"sd-task-system",
@ -8655,6 +8657,7 @@ dependencies = [
"serde",
"serde_json",
"specta",
"static_assertions",
"strum",
"tempfile",
"thiserror",

View file

@ -76,6 +76,7 @@ rmp-serde = "1.1.2"
rmpv = { version = "^1.0.1", features = ["with-serde"] }
serde = "1.0"
serde_json = "1.0"
static_assertions = "1.1.0"
strum = "0.25"
strum_macros = "0.25"
tempfile = "3.8.1"

View file

@ -82,6 +82,7 @@ rspc = { workspace = true, features = [
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
specta = { workspace = true }
static_assertions = { workspace = true }
strum = { workspace = true, features = ["derive"] }
strum_macros = { workspace = true }
tempfile = { workspace = true }
@ -128,7 +129,6 @@ serde-hashkey = "0.4.5"
serde_repr = "0.1"
serde_with = "3.4.0"
slotmap = "1.0.6"
static_assertions = "1.1.0"
sysinfo = "0.29.10"
tar = "0.4.40"
tower-service = "0.3.2"

View file

@ -100,6 +100,18 @@ impl<'a> IsolatedFilePathData<'a> {
self.extension.as_ref()
}
#[must_use]
pub fn to_owned(self) -> IsolatedFilePathData<'static> {
IsolatedFilePathData {
location_id: self.location_id,
materialized_path: Cow::Owned(self.materialized_path.to_string()),
is_dir: self.is_dir,
name: Cow::Owned(self.name.to_string()),
extension: Cow::Owned(self.extension.to_string()),
relative_path: Cow::Owned(self.relative_path.to_string()),
}
}
#[must_use]
pub const fn is_dir(&self) -> bool {
self.is_dir

View file

@ -16,6 +16,7 @@ sd-core-prisma-helpers = { path = "../prisma-helpers" }
sd-core-sync = { path = "../sync" }
# Sub-crates
sd-file-ext = { path = "../../../crates/file-ext" }
sd-prisma = { path = "../../../crates/prisma" }
sd-sync = { path = "../../../crates/sync" }
sd-task-system = { path = "../../../crates/task-system" }
@ -24,6 +25,7 @@ sd-utils = { path = "../../../crates/utils" }
async-channel = { workspace = true }
async-trait = { workspace = true }
blake3 = { workspace = true }
chrono = { workspace = true, features = ["serde"] }
futures = { workspace = true }
futures-concurrency = { workspace = true }
@ -37,6 +39,7 @@ rspc = { workspace = true }
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
specta = { workspace = true }
static_assertions = { workspace = true }
strum = { workspace = true, features = ["derive", "phf"] }
thiserror = { workspace = true }
tokio = { workspace = true, features = ["fs", "sync", "parking_lot"] }

View file

@ -0,0 +1,68 @@
use std::path::Path;
use blake3::Hasher;
use static_assertions::const_assert;
use tokio::{
fs::{self, File},
io::{self, AsyncReadExt, AsyncSeekExt, SeekFrom},
};
const SAMPLE_COUNT: u64 = 4;
const SAMPLE_SIZE: u64 = 1024 * 10;
const HEADER_OR_FOOTER_SIZE: u64 = 1024 * 8;
// minimum file size of 100KiB, to avoid sample hashing for small files as they can be smaller than the total sample size
const MINIMUM_FILE_SIZE: u64 = 1024 * 100;
// Asserting that nobody messed up our consts
const_assert!((HEADER_OR_FOOTER_SIZE * 2 + SAMPLE_COUNT * SAMPLE_SIZE) < MINIMUM_FILE_SIZE);
// Asserting that the sample size is larger than header/footer size, as the same buffer is used for both
const_assert!(SAMPLE_SIZE > HEADER_OR_FOOTER_SIZE);
// SAFETY: Casts here are safe, they're hardcoded values we have some const assertions above to make sure they're correct
#[allow(clippy::cast_possible_truncation)]
#[allow(clippy::cast_possible_wrap)]
pub async fn generate_cas_id(
path: impl AsRef<Path> + Send,
size: u64,
) -> Result<String, io::Error> {
let mut hasher = Hasher::new();
hasher.update(&size.to_le_bytes());
if size <= MINIMUM_FILE_SIZE {
// For small files, we hash the whole file
hasher.update(&fs::read(path).await?);
} else {
let mut file = File::open(path).await?;
let mut buf = vec![0; SAMPLE_SIZE as usize].into_boxed_slice();
// Hashing the header
let mut current_pos = file
.read_exact(&mut buf[..HEADER_OR_FOOTER_SIZE as usize])
.await? as u64;
hasher.update(&buf[..HEADER_OR_FOOTER_SIZE as usize]);
// Sample hashing the inner content of the file
let seek_jump = (size - HEADER_OR_FOOTER_SIZE * 2) / SAMPLE_COUNT;
loop {
file.read_exact(&mut buf).await?;
hasher.update(&buf);
if current_pos >= (HEADER_OR_FOOTER_SIZE + seek_jump * (SAMPLE_COUNT - 1)) {
break;
}
current_pos = file.seek(SeekFrom::Start(current_pos + seek_jump)).await?;
}
// Hashing the footer
file.seek(SeekFrom::End(-(HEADER_OR_FOOTER_SIZE as i64)))
.await?;
file.read_exact(&mut buf[..HEADER_OR_FOOTER_SIZE as usize])
.await?;
hasher.update(&buf[..HEADER_OR_FOOTER_SIZE as usize]);
}
Ok(hasher.finalize().to_hex()[..16].to_string())
}

View file

@ -0,0 +1,566 @@
use crate::{
job_system::{
job::{Job, JobReturn, JobTaskDispatcher, ReturnStatus},
report::ReportOutputMetadata,
utils::cancel_pending_tasks,
SerializableJob, SerializedTasks,
},
utils::sub_path::maybe_get_iso_file_path_from_sub_path,
Error, JobContext, JobName, LocationScanState, NonCriticalJobError, ProgressUpdate,
};
use sd_core_file_path_helper::IsolatedFilePathData;
use sd_core_prisma_helpers::file_path_for_file_identifier;
use sd_prisma::prisma::{file_path, location, SortOrder};
use sd_task_system::{
AnyTaskOutput, IntoTask, SerializableTask, Task, TaskDispatcher, TaskHandle, TaskId,
TaskOutput, TaskStatus,
};
use sd_utils::db::maybe_missing;
use std::{
collections::HashMap,
hash::{Hash, Hasher},
mem,
path::PathBuf,
sync::Arc,
time::Duration,
};
use futures::{stream::FuturesUnordered, StreamExt};
use futures_concurrency::future::TryJoin;
use prisma_client_rust::or;
use serde::{Deserialize, Serialize};
use serde_json::json;
use tokio::time::Instant;
use tracing::warn;
use super::{
tasks::{
ExtractFileMetadataTask, ExtractFileMetadataTaskOutput, ObjectProcessorTask,
ObjectProcessorTaskMetrics,
},
FileIdentifierError, CHUNK_SIZE,
};
#[derive(Debug)]
pub struct FileIdentifierJob {
location: Arc<location::Data>,
location_path: Arc<PathBuf>,
sub_path: Option<PathBuf>,
metadata: Metadata,
errors: Vec<NonCriticalJobError>,
pending_tasks_on_resume: Vec<TaskHandle<Error>>,
tasks_for_shutdown: Vec<Box<dyn Task<Error>>>,
}
impl Hash for FileIdentifierJob {
fn hash<H: Hasher>(&self, state: &mut H) {
self.location.id.hash(state);
if let Some(ref sub_path) = self.sub_path {
sub_path.hash(state);
}
}
}
impl Job for FileIdentifierJob {
const NAME: JobName = JobName::FileIdentifier;
async fn resume_tasks(
&mut self,
dispatcher: &JobTaskDispatcher,
ctx: &impl JobContext,
SerializedTasks(serialized_tasks): SerializedTasks,
) -> Result<(), Error> {
self.pending_tasks_on_resume = dispatcher
.dispatch_many_boxed(
rmp_serde::from_slice::<Vec<(TaskKind, Vec<u8>)>>(&serialized_tasks)
.map_err(FileIdentifierError::from)?
.into_iter()
.map(|(task_kind, task_bytes)| async move {
match task_kind {
TaskKind::ExtractFileMetadata => {
<ExtractFileMetadataTask as SerializableTask<Error>>::deserialize(
&task_bytes,
(),
)
.await
.map(IntoTask::into_task)
}
TaskKind::ObjectProcessor => ObjectProcessorTask::deserialize(
&task_bytes,
(Arc::clone(ctx.db()), Arc::clone(ctx.sync())),
)
.await
.map(IntoTask::into_task),
}
})
.collect::<Vec<_>>()
.try_join()
.await
.map_err(FileIdentifierError::from)?,
)
.await;
Ok(())
}
async fn run(
mut self,
dispatcher: JobTaskDispatcher,
ctx: impl JobContext,
) -> Result<ReturnStatus, Error> {
let mut pending_running_tasks = FuturesUnordered::new();
self.init_or_resume(&mut pending_running_tasks, &ctx, &dispatcher)
.await?;
while let Some(task) = pending_running_tasks.next().await {
match task {
Ok(TaskStatus::Done((task_id, TaskOutput::Out(out)))) => {
if let Some(new_object_processor_task) = self
.process_task_output(task_id, out, &ctx, &dispatcher)
.await
{
pending_running_tasks.push(new_object_processor_task);
};
}
Ok(TaskStatus::Done((task_id, TaskOutput::Empty))) => {
warn!("Task <id='{task_id}'> returned an empty output");
}
Ok(TaskStatus::Shutdown(task)) => {
self.tasks_for_shutdown.push(task);
}
Ok(TaskStatus::Error(e)) => {
cancel_pending_tasks(&pending_running_tasks).await;
return Err(e);
}
Ok(TaskStatus::Canceled | TaskStatus::ForcedAbortion) => {
cancel_pending_tasks(&pending_running_tasks).await;
return Ok(ReturnStatus::Canceled);
}
Err(e) => {
cancel_pending_tasks(&pending_running_tasks).await;
return Err(e.into());
}
}
}
if !self.tasks_for_shutdown.is_empty() {
return Ok(ReturnStatus::Shutdown(self.serialize().await));
}
// From this point onward, we are done with the job and it can't be interrupted anymore
let Self {
location,
metadata,
errors,
..
} = self;
ctx.db()
.location()
.update(
location::id::equals(location.id),
vec![location::scan_state::set(
LocationScanState::FilesIdentified as i32,
)],
)
.exec()
.await
.map_err(FileIdentifierError::from)?;
Ok(ReturnStatus::Completed(
JobReturn::builder()
.with_metadata(metadata)
.with_non_critical_errors(errors)
.build(),
))
}
}
impl FileIdentifierJob {
pub fn new(
location: location::Data,
sub_path: Option<PathBuf>,
) -> Result<Self, FileIdentifierError> {
Ok(Self {
location_path: maybe_missing(&location.path, "location.path")
.map(PathBuf::from)
.map(Arc::new)?,
location: Arc::new(location),
sub_path,
metadata: Metadata::default(),
errors: Vec::new(),
pending_tasks_on_resume: Vec::new(),
tasks_for_shutdown: Vec::new(),
})
}
async fn init_or_resume(
&mut self,
pending_running_tasks: &mut FuturesUnordered<TaskHandle<Error>>,
job_ctx: &impl JobContext,
dispatcher: &JobTaskDispatcher,
) -> Result<(), FileIdentifierError> {
// if we don't have any pending task, then this is a fresh job
if self.pending_tasks_on_resume.is_empty() {
let db = job_ctx.db();
let maybe_sub_iso_file_path = maybe_get_iso_file_path_from_sub_path(
self.location.id,
&self.sub_path,
&*self.location_path,
db,
)
.await?;
let mut orphans_count = 0;
let mut last_orphan_file_path_id = None;
let start = Instant::now();
loop {
#[allow(clippy::cast_possible_wrap)]
// SAFETY: we know that CHUNK_SIZE is a valid i64
let orphan_paths = db
.file_path()
.find_many(orphan_path_filters(
self.location.id,
last_orphan_file_path_id,
&maybe_sub_iso_file_path,
))
.order_by(file_path::id::order(SortOrder::Asc))
.take(CHUNK_SIZE as i64)
.select(file_path_for_file_identifier::select())
.exec()
.await?;
if orphan_paths.is_empty() {
break;
}
orphans_count += orphan_paths.len() as u64;
last_orphan_file_path_id =
Some(orphan_paths.last().expect("orphan_paths is not empty").id);
job_ctx.progress(vec![
ProgressUpdate::TaskCount(orphans_count),
ProgressUpdate::Message(format!("{orphans_count} files to be identified")),
]);
pending_running_tasks.push(
dispatcher
.dispatch(ExtractFileMetadataTask::new_deep(
Arc::clone(&self.location),
Arc::clone(&self.location_path),
orphan_paths,
))
.await,
);
}
self.metadata.seeking_orphans_time = start.elapsed();
self.metadata.total_found_orphans = orphans_count;
} else {
pending_running_tasks.extend(mem::take(&mut self.pending_tasks_on_resume));
}
Ok(())
}
/// Process output of tasks, according to the downcasted output type
///
/// # Panics
/// Will panic if another task type is added in the job, but this function wasn't updated to handle it
///
async fn process_task_output(
&mut self,
task_id: TaskId,
any_task_output: Box<dyn AnyTaskOutput>,
job_ctx: &impl JobContext,
dispatcher: &JobTaskDispatcher,
) -> Option<TaskHandle<Error>> {
if any_task_output.is::<ExtractFileMetadataTaskOutput>() {
return self
.process_extract_file_metadata_output(
*any_task_output
.downcast::<ExtractFileMetadataTaskOutput>()
.expect("just checked"),
job_ctx,
dispatcher,
)
.await;
} else if any_task_output.is::<ObjectProcessorTaskMetrics>() {
self.process_object_processor_output(
*any_task_output
.downcast::<ObjectProcessorTaskMetrics>()
.expect("just checked"),
job_ctx,
);
} else {
unreachable!("Unexpected task output type: <id='{task_id}'>");
}
None
}
async fn process_extract_file_metadata_output(
&mut self,
ExtractFileMetadataTaskOutput {
identified_files,
extract_metadata_time,
errors,
}: ExtractFileMetadataTaskOutput,
job_ctx: &impl JobContext,
dispatcher: &JobTaskDispatcher,
) -> Option<TaskHandle<Error>> {
self.metadata.extract_metadata_time += extract_metadata_time;
self.errors.extend(errors);
if identified_files.is_empty() {
self.metadata.completed_tasks += 1;
job_ctx.progress(vec![ProgressUpdate::CompletedTaskCount(
self.metadata.completed_tasks,
)]);
None
} else {
job_ctx.progress_msg(format!("Identified {} files", identified_files.len()));
Some(
dispatcher
.dispatch(ObjectProcessorTask::new_deep(
identified_files,
Arc::clone(job_ctx.db()),
Arc::clone(job_ctx.sync()),
))
.await,
)
}
}
fn process_object_processor_output(
&mut self,
ObjectProcessorTaskMetrics {
assign_cas_ids_time,
fetch_existing_objects_time,
assign_to_existing_object_time,
create_object_time,
created_objects_count,
linked_objects_count,
}: ObjectProcessorTaskMetrics,
job_ctx: &impl JobContext,
) {
self.metadata.assign_cas_ids_time += assign_cas_ids_time;
self.metadata.fetch_existing_objects_time += fetch_existing_objects_time;
self.metadata.assign_to_existing_object_time += assign_to_existing_object_time;
self.metadata.create_object_time += create_object_time;
self.metadata.created_objects_count += created_objects_count;
self.metadata.linked_objects_count += linked_objects_count;
self.metadata.completed_tasks += 1;
job_ctx.progress(vec![
ProgressUpdate::CompletedTaskCount(self.metadata.completed_tasks),
ProgressUpdate::Message(format!(
"Processed {} of {} objects",
self.metadata.created_objects_count + self.metadata.linked_objects_count,
self.metadata.total_found_orphans
)),
]);
}
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
enum TaskKind {
ExtractFileMetadata,
ObjectProcessor,
}
#[derive(Serialize, Deserialize)]
struct SaveState {
location: Arc<location::Data>,
location_path: Arc<PathBuf>,
sub_path: Option<PathBuf>,
metadata: Metadata,
errors: Vec<NonCriticalJobError>,
tasks_for_shutdown_bytes: Option<SerializedTasks>,
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct Metadata {
extract_metadata_time: Duration,
assign_cas_ids_time: Duration,
fetch_existing_objects_time: Duration,
assign_to_existing_object_time: Duration,
create_object_time: Duration,
seeking_orphans_time: Duration,
total_found_orphans: u64,
created_objects_count: u64,
linked_objects_count: u64,
completed_tasks: u64,
}
impl From<Metadata> for ReportOutputMetadata {
fn from(value: Metadata) -> Self {
Self::Metrics(HashMap::from([
(
"extract_metadata_time".into(),
json!(value.extract_metadata_time),
),
(
"assign_cas_ids_time".into(),
json!(value.assign_cas_ids_time),
),
(
"fetch_existing_objects_time".into(),
json!(value.fetch_existing_objects_time),
),
(
"assign_to_existing_object_time".into(),
json!(value.assign_to_existing_object_time),
),
("create_object_time".into(), json!(value.create_object_time)),
(
"seeking_orphans_time".into(),
json!(value.seeking_orphans_time),
),
(
"total_found_orphans".into(),
json!(value.total_found_orphans),
),
(
"created_objects_count".into(),
json!(value.created_objects_count),
),
(
"linked_objects_count".into(),
json!(value.linked_objects_count),
),
("total_tasks".into(), json!(value.completed_tasks)),
]))
}
}
impl SerializableJob for FileIdentifierJob {
async fn serialize(self) -> Result<Option<Vec<u8>>, rmp_serde::encode::Error> {
let Self {
location,
location_path,
sub_path,
metadata,
errors,
tasks_for_shutdown,
..
} = self;
rmp_serde::to_vec_named(&SaveState {
location,
location_path,
sub_path,
metadata,
tasks_for_shutdown_bytes: Some(SerializedTasks(rmp_serde::to_vec_named(
&tasks_for_shutdown
.into_iter()
.map(|task| async move {
if task.is::<ExtractFileMetadataTask>() {
SerializableTask::serialize(
*task
.downcast::<ExtractFileMetadataTask>()
.expect("just checked"),
)
.await
.map(|bytes| (TaskKind::ExtractFileMetadata, bytes))
} else if task.is::<ObjectProcessorTask>() {
task.downcast::<ObjectProcessorTask>()
.expect("just checked")
.serialize()
.await
.map(|bytes| (TaskKind::ObjectProcessor, bytes))
} else {
unreachable!("Unexpected task type")
}
})
.collect::<Vec<_>>()
.try_join()
.await?,
)?)),
errors,
})
.map(Some)
}
async fn deserialize(
serialized_job: &[u8],
_: &impl JobContext,
) -> Result<Option<(Self, Option<SerializedTasks>)>, rmp_serde::decode::Error> {
let SaveState {
location,
location_path,
sub_path,
metadata,
errors,
tasks_for_shutdown_bytes,
} = rmp_serde::from_slice::<SaveState>(serialized_job)?;
Ok(Some((
Self {
location,
location_path,
sub_path,
metadata,
errors,
pending_tasks_on_resume: Vec::new(),
tasks_for_shutdown: Vec::new(),
},
tasks_for_shutdown_bytes,
)))
}
}
fn orphan_path_filters(
location_id: location::id::Type,
file_path_id: Option<file_path::id::Type>,
maybe_sub_iso_file_path: &Option<IsolatedFilePathData<'_>>,
) -> Vec<file_path::WhereParam> {
sd_utils::chain_optional_iter(
[
or!(
file_path::object_id::equals(None),
file_path::cas_id::equals(None)
),
file_path::is_dir::equals(Some(false)),
file_path::location_id::equals(Some(location_id)),
file_path::size_in_bytes_bytes::not(Some(0u64.to_be_bytes().to_vec())),
],
[
// this is a workaround for the cursor not working properly
file_path_id.map(file_path::id::gte),
maybe_sub_iso_file_path.as_ref().map(|sub_iso_file_path| {
file_path::materialized_path::starts_with(
sub_iso_file_path
.materialized_path_for_children()
.expect("sub path iso_file_path must be a directory"),
)
}),
],
)
}

View file

@ -0,0 +1,120 @@
use crate::utils::sub_path::SubPathError;
use sd_core_file_path_helper::{FilePathError, IsolatedFilePathData};
use sd_file_ext::{extensions::Extension, kind::ObjectKind};
use sd_utils::{db::MissingFieldError, error::FileIOError};
use std::{fs::Metadata, path::Path};
use prisma_client_rust::QueryError;
use rspc::ErrorCode;
use serde::{Deserialize, Serialize};
use specta::Type;
use tokio::fs;
use tracing::trace;
mod cas_id;
mod job;
mod shallow;
mod tasks;
use cas_id::generate_cas_id;
pub use job::FileIdentifierJob;
pub use shallow::shallow;
// we break these tasks into chunks of 100 to improve performance
const CHUNK_SIZE: usize = 100;
#[derive(thiserror::Error, Debug)]
pub enum FileIdentifierError {
#[error("missing field on database: {0}")]
MissingField(#[from] MissingFieldError),
#[error("failed to deserialized stored tasks for job resume: {0}")]
DeserializeTasks(#[from] rmp_serde::decode::Error),
#[error("database error: {0}")]
Database(#[from] QueryError),
#[error(transparent)]
FilePathError(#[from] FilePathError),
#[error(transparent)]
SubPath(#[from] SubPathError),
}
impl From<FileIdentifierError> for rspc::Error {
fn from(err: FileIdentifierError) -> Self {
match err {
FileIdentifierError::SubPath(sub_path_err) => sub_path_err.into(),
_ => Self::with_cause(ErrorCode::InternalServerError, err.to_string(), err),
}
}
}
#[derive(thiserror::Error, Debug, Serialize, Deserialize, Type)]
pub enum NonCriticalFileIdentifierError {
#[error("failed to extract file metadata: {0}")]
FailedToExtractFileMetadata(String),
#[cfg(target_os = "windows")]
#[error("failed to extract metadata from on-demand file: {0}")]
FailedToExtractMetadataFromOnDemandFile(String),
#[error("failed to extract isolated file path data: {0}")]
FailedToExtractIsolatedFilePathData(String),
}
#[derive(Debug, Clone)]
pub struct FileMetadata {
pub cas_id: Option<String>,
pub kind: ObjectKind,
pub fs_metadata: Metadata,
}
impl FileMetadata {
/// Fetch metadata from the file system and generate a cas id for the file
/// if it's not empty.
///
/// # Panics
/// Will panic if the file is a directory.
pub async fn new(
location_path: impl AsRef<Path> + Send,
iso_file_path: &IsolatedFilePathData<'_>,
) -> Result<Self, FileIOError> {
let path = location_path.as_ref().join(iso_file_path);
let fs_metadata = fs::metadata(&path)
.await
.map_err(|e| FileIOError::from((&path, e)))?;
assert!(
!fs_metadata.is_dir(),
"We can't generate cas_id for directories"
);
// derive Object kind
let kind = Extension::resolve_conflicting(&path, false)
.await
.map_or(ObjectKind::Unknown, Into::into);
let cas_id = if fs_metadata.len() != 0 {
generate_cas_id(&path, fs_metadata.len())
.await
.map(Some)
.map_err(|e| FileIOError::from((&path, e)))?
} else {
// We can't do shit with empty files
None
};
trace!(
"Analyzed file: <path='{}', cas_id={cas_id:?}, object_kind={kind}>",
path.display()
);
Ok(Self {
cas_id,
kind,
fs_metadata,
})
}
}

View file

@ -0,0 +1,207 @@
use crate::{utils::sub_path::maybe_get_iso_file_path_from_sub_path, Error, NonCriticalJobError};
use sd_core_file_path_helper::IsolatedFilePathData;
use sd_core_prisma_helpers::file_path_for_file_identifier;
use sd_core_sync::Manager as SyncManager;
use sd_prisma::prisma::{file_path, location, PrismaClient, SortOrder};
use sd_task_system::{
BaseTaskDispatcher, CancelTaskOnDrop, TaskDispatcher, TaskOutput, TaskStatus,
};
use sd_utils::db::maybe_missing;
use std::{
path::{Path, PathBuf},
sync::Arc,
};
use futures_concurrency::future::FutureGroup;
use lending_stream::{LendingStream, StreamExt};
use prisma_client_rust::or;
use tracing::{debug, warn};
use super::{
tasks::{ExtractFileMetadataTask, ExtractFileMetadataTaskOutput, ObjectProcessorTask},
FileIdentifierError, CHUNK_SIZE,
};
pub async fn shallow(
location: location::Data,
sub_path: impl AsRef<Path> + Send,
dispatcher: BaseTaskDispatcher<Error>,
db: Arc<PrismaClient>,
sync: Arc<SyncManager>,
invalidate_query: impl Fn(&'static str) + Send + Sync,
) -> Result<Vec<NonCriticalJobError>, Error> {
let sub_path = sub_path.as_ref();
let location_path = maybe_missing(&location.path, "location.path")
.map(PathBuf::from)
.map(Arc::new)
.map_err(FileIdentifierError::from)?;
let location = Arc::new(location);
let sub_iso_file_path =
maybe_get_iso_file_path_from_sub_path(location.id, &Some(sub_path), &*location_path, &db)
.await
.map_err(FileIdentifierError::from)?
.map_or_else(
|| {
IsolatedFilePathData::new(location.id, &*location_path, &*location_path, true)
.map_err(FileIdentifierError::from)
},
Ok,
)?;
let mut orphans_count = 0;
let mut last_orphan_file_path_id = None;
let mut pending_running_tasks = FutureGroup::new();
loop {
#[allow(clippy::cast_possible_wrap)]
// SAFETY: we know that CHUNK_SIZE is a valid i64
let orphan_paths = db
.file_path()
.find_many(orphan_path_filters(
location.id,
last_orphan_file_path_id,
&sub_iso_file_path,
))
.order_by(file_path::id::order(SortOrder::Asc))
.take(CHUNK_SIZE as i64)
.select(file_path_for_file_identifier::select())
.exec()
.await
.map_err(FileIdentifierError::from)?;
let Some(last_orphan) = orphan_paths.last() else {
// No orphans here!
break;
};
orphans_count += orphan_paths.len() as u64;
last_orphan_file_path_id = Some(last_orphan.id);
pending_running_tasks.insert(CancelTaskOnDrop(
dispatcher
.dispatch(ExtractFileMetadataTask::new_shallow(
Arc::clone(&location),
Arc::clone(&location_path),
orphan_paths,
))
.await,
));
}
if orphans_count == 0 {
debug!(
"No orphans found on <location_id={}, sub_path='{}'>",
location.id,
sub_path.display()
);
return Ok(vec![]);
}
let errors = process_tasks(pending_running_tasks, dispatcher, db, sync).await?;
invalidate_query("search.paths");
invalidate_query("search.objects");
Ok(errors)
}
async fn process_tasks(
pending_running_tasks: FutureGroup<CancelTaskOnDrop<Error>>,
dispatcher: BaseTaskDispatcher<Error>,
db: Arc<PrismaClient>,
sync: Arc<SyncManager>,
) -> Result<Vec<NonCriticalJobError>, Error> {
let mut pending_running_tasks = pending_running_tasks.lend_mut();
let mut errors = vec![];
while let Some((pending_running_tasks, task_result)) = pending_running_tasks.next().await {
match task_result {
Ok(TaskStatus::Done((_, TaskOutput::Out(any_task_output)))) => {
// We only care about ExtractFileMetadataTaskOutput because we need to dispatch further tasks
// and the ObjectProcessorTask only gives back some metrics not much important for
// shallow file identifier
if any_task_output.is::<ExtractFileMetadataTaskOutput>() {
let ExtractFileMetadataTaskOutput {
identified_files,
errors: more_errors,
..
} = *any_task_output
.downcast::<ExtractFileMetadataTaskOutput>()
.expect("just checked");
errors.extend(more_errors);
if !identified_files.is_empty() {
pending_running_tasks.insert(CancelTaskOnDrop(
dispatcher
.dispatch(ObjectProcessorTask::new_shallow(
identified_files,
Arc::clone(&db),
Arc::clone(&sync),
))
.await,
));
}
}
}
Ok(TaskStatus::Done((task_id, TaskOutput::Empty))) => {
warn!("Task <id='{task_id}'> returned an empty output");
}
Ok(TaskStatus::Shutdown(_)) => {
debug!(
"Spacedrive is shutting down while a shallow file identifier was in progress"
);
return Ok(vec![]);
}
Ok(TaskStatus::Error(e)) => {
return Err(e);
}
Ok(TaskStatus::Canceled | TaskStatus::ForcedAbortion) => {
warn!("Task was cancelled or aborted on shallow file identifier");
return Ok(vec![]);
}
Err(e) => {
return Err(e.into());
}
}
}
Ok(errors)
}
fn orphan_path_filters(
location_id: location::id::Type,
file_path_id: Option<file_path::id::Type>,
sub_iso_file_path: &IsolatedFilePathData<'_>,
) -> Vec<file_path::WhereParam> {
sd_utils::chain_optional_iter(
[
or!(
file_path::object_id::equals(None),
file_path::cas_id::equals(None)
),
file_path::is_dir::equals(Some(false)),
file_path::location_id::equals(Some(location_id)),
file_path::materialized_path::equals(Some(
sub_iso_file_path
.materialized_path_for_children()
.expect("sub path for shallow identifier must be a directory"),
)),
file_path::size_in_bytes_bytes::not(Some(0u64.to_be_bytes().to_vec())),
],
[file_path_id.map(file_path::id::gte)],
)
}

View file

@ -0,0 +1,280 @@
use crate::{
file_identifier::{FileMetadata, NonCriticalFileIdentifierError},
Error, NonCriticalJobError,
};
use sd_core_file_path_helper::IsolatedFilePathData;
use sd_core_prisma_helpers::file_path_for_file_identifier;
use sd_prisma::prisma::location;
use sd_task_system::{
ExecStatus, Interrupter, InterruptionKind, IntoAnyTaskOutput, SerializableTask, Task, TaskId,
};
use sd_utils::error::FileIOError;
use std::{
collections::HashMap, future::IntoFuture, mem, path::PathBuf, pin::pin, sync::Arc,
time::Duration,
};
use futures::stream::{self, FuturesUnordered, StreamExt};
use futures_concurrency::stream::Merge;
use serde::{Deserialize, Serialize};
use tokio::time::Instant;
use tracing::error;
use uuid::Uuid;
use super::IdentifiedFile;
#[derive(Debug, Serialize, Deserialize)]
pub struct ExtractFileMetadataTask {
id: TaskId,
location: Arc<location::Data>,
location_path: Arc<PathBuf>,
file_paths_by_id: HashMap<Uuid, file_path_for_file_identifier::Data>,
identified_files: HashMap<Uuid, IdentifiedFile>,
extract_metadata_time: Duration,
errors: Vec<NonCriticalJobError>,
is_shallow: bool,
}
#[derive(Debug)]
pub struct ExtractFileMetadataTaskOutput {
pub identified_files: HashMap<Uuid, IdentifiedFile>,
pub extract_metadata_time: Duration,
pub errors: Vec<NonCriticalJobError>,
}
impl ExtractFileMetadataTask {
fn new(
location: Arc<location::Data>,
location_path: Arc<PathBuf>,
file_paths: Vec<file_path_for_file_identifier::Data>,
is_shallow: bool,
) -> Self {
Self {
id: TaskId::new_v4(),
location,
location_path,
identified_files: HashMap::with_capacity(file_paths.len()),
file_paths_by_id: file_paths
.into_iter()
.map(|file_path| {
// SAFETY: This should never happen
(
Uuid::from_slice(&file_path.pub_id).expect("file_path.pub_id is invalid!"),
file_path,
)
})
.collect(),
extract_metadata_time: Duration::ZERO,
errors: Vec::new(),
is_shallow,
}
}
#[must_use]
pub fn new_deep(
location: Arc<location::Data>,
location_path: Arc<PathBuf>,
file_paths: Vec<file_path_for_file_identifier::Data>,
) -> Self {
Self::new(location, location_path, file_paths, false)
}
#[must_use]
pub fn new_shallow(
location: Arc<location::Data>,
location_path: Arc<PathBuf>,
file_paths: Vec<file_path_for_file_identifier::Data>,
) -> Self {
Self::new(location, location_path, file_paths, true)
}
}
#[async_trait::async_trait]
impl Task<Error> for ExtractFileMetadataTask {
fn id(&self) -> TaskId {
self.id
}
fn with_priority(&self) -> bool {
self.is_shallow
}
async fn run(&mut self, interrupter: &Interrupter) -> Result<ExecStatus, Error> {
enum StreamMessage {
Processed(Uuid, Result<FileMetadata, FileIOError>),
Interrupt(InterruptionKind),
}
let Self {
location,
location_path,
file_paths_by_id,
identified_files,
extract_metadata_time,
errors,
..
} = self;
let start_time = Instant::now();
if !file_paths_by_id.is_empty() {
let extraction_futures = file_paths_by_id
.iter()
.filter_map(|(file_path_id, file_path)| {
try_iso_file_path_extraction(
location.id,
*file_path_id,
file_path,
Arc::clone(location_path),
errors,
)
})
.map(|(file_path_id, iso_file_path, location_path)| async move {
StreamMessage::Processed(
file_path_id,
FileMetadata::new(&*location_path, &iso_file_path).await,
)
})
.collect::<FuturesUnordered<_>>();
let mut msg_stream = pin!((
extraction_futures,
stream::once(interrupter.into_future()).map(StreamMessage::Interrupt)
)
.merge());
while let Some(msg) = msg_stream.next().await {
match msg {
StreamMessage::Processed(file_path_pub_id, res) => {
let file_path = file_paths_by_id
.remove(&file_path_pub_id)
.expect("file_path must be here");
match res {
Ok(FileMetadata { cas_id, kind, .. }) => {
identified_files.insert(
file_path_pub_id,
IdentifiedFile {
file_path,
cas_id,
kind,
},
);
}
Err(e) => {
handle_non_critical_errors(
location.id,
file_path_pub_id,
&e,
errors,
);
}
}
if file_paths_by_id.is_empty() {
// All files have been processed so we can end this merged stream and don't keep waiting an
// interrupt signal
break;
}
}
StreamMessage::Interrupt(kind) => {
*extract_metadata_time += start_time.elapsed();
return Ok(match kind {
InterruptionKind::Pause => ExecStatus::Paused,
InterruptionKind::Cancel => ExecStatus::Canceled,
});
}
}
}
}
Ok(ExecStatus::Done(
ExtractFileMetadataTaskOutput {
identified_files: mem::take(identified_files),
extract_metadata_time: *extract_metadata_time + start_time.elapsed(),
errors: mem::take(errors),
}
.into_output(),
))
}
}
fn handle_non_critical_errors(
location_id: location::id::Type,
file_path_pub_id: Uuid,
e: &FileIOError,
errors: &mut Vec<NonCriticalJobError>,
) {
error!("Failed to extract file metadata <location_id={location_id}, file_path_pub_id='{file_path_pub_id}'>: {e:#?}");
let formatted_error = format!("<file_path_pub_id='{file_path_pub_id}', error={e}>");
#[cfg(target_os = "windows")]
{
// Handle case where file is on-demand (NTFS only)
if e.source.raw_os_error().map_or(false, |code| code == 362) {
errors.push(
NonCriticalFileIdentifierError::FailedToExtractMetadataFromOnDemandFile(
formatted_error,
)
.into(),
);
} else {
errors.push(
NonCriticalFileIdentifierError::FailedToExtractFileMetadata(formatted_error).into(),
);
}
}
#[cfg(not(target_os = "windows"))]
{
errors.push(
NonCriticalFileIdentifierError::FailedToExtractFileMetadata(formatted_error).into(),
);
}
}
fn try_iso_file_path_extraction(
location_id: location::id::Type,
file_path_pub_id: Uuid,
file_path: &file_path_for_file_identifier::Data,
location_path: Arc<PathBuf>,
errors: &mut Vec<NonCriticalJobError>,
) -> Option<(Uuid, IsolatedFilePathData<'static>, Arc<PathBuf>)> {
IsolatedFilePathData::try_from((location_id, file_path))
.map(IsolatedFilePathData::to_owned)
.map(|iso_file_path| (file_path_pub_id, iso_file_path, location_path))
.map_err(|e| {
error!("Failed to extract isolated file path data: {e:#?}");
errors.push(
NonCriticalFileIdentifierError::FailedToExtractIsolatedFilePathData(format!(
"<file_path_pub_id='{file_path_pub_id}', error={e}>"
))
.into(),
);
})
.ok()
}
impl SerializableTask<Error> for ExtractFileMetadataTask {
type SerializeError = rmp_serde::encode::Error;
type DeserializeError = rmp_serde::decode::Error;
type DeserializeCtx = ();
async fn serialize(self) -> Result<Vec<u8>, Self::SerializeError> {
rmp_serde::to_vec_named(&self)
}
async fn deserialize(
data: &[u8],
(): Self::DeserializeCtx,
) -> Result<Self, Self::DeserializeError> {
rmp_serde::from_slice(data)
}
}

View file

@ -0,0 +1,18 @@
use sd_core_prisma_helpers::file_path_for_file_identifier;
use sd_file_ext::kind::ObjectKind;
use serde::{Deserialize, Serialize};
mod extract_file_metadata;
mod object_processor;
pub use extract_file_metadata::{ExtractFileMetadataTask, ExtractFileMetadataTaskOutput};
pub use object_processor::{ObjectProcessorTask, ObjectProcessorTaskMetrics};
#[derive(Debug, Serialize, Deserialize)]
pub(super) struct IdentifiedFile {
pub(super) file_path: file_path_for_file_identifier::Data,
pub(super) cas_id: Option<String>,
pub(super) kind: ObjectKind,
}

View file

@ -0,0 +1,473 @@
use crate::{file_identifier::FileIdentifierError, Error};
use sd_core_prisma_helpers::{
file_path_for_file_identifier, file_path_pub_id, object_for_file_identifier,
};
use sd_core_sync::Manager as SyncManager;
use sd_prisma::{
prisma::{file_path, object, PrismaClient},
prisma_sync,
};
use sd_sync::{CRDTOperation, OperationFactory};
use sd_task_system::{
check_interruption, ExecStatus, Interrupter, IntoAnyTaskOutput, SerializableTask, Task, TaskId,
};
use sd_utils::{msgpack, uuid_to_bytes};
use std::{
collections::{HashMap, HashSet},
mem,
sync::Arc,
time::Duration,
};
use prisma_client_rust::Select;
use serde::{Deserialize, Serialize};
use tokio::time::Instant;
use tracing::{debug, trace};
use uuid::Uuid;
use super::IdentifiedFile;
#[derive(Debug)]
pub struct ObjectProcessorTask {
id: TaskId,
db: Arc<PrismaClient>,
sync: Arc<SyncManager>,
identified_files: HashMap<Uuid, IdentifiedFile>,
metrics: ObjectProcessorTaskMetrics,
stage: Stage,
is_shallow: bool,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct SaveState {
id: TaskId,
identified_files: HashMap<Uuid, IdentifiedFile>,
metrics: ObjectProcessorTaskMetrics,
stage: Stage,
is_shallow: bool,
}
#[derive(Debug, Serialize, Deserialize, Default)]
pub struct ObjectProcessorTaskMetrics {
pub assign_cas_ids_time: Duration,
pub fetch_existing_objects_time: Duration,
pub assign_to_existing_object_time: Duration,
pub create_object_time: Duration,
pub created_objects_count: u64,
pub linked_objects_count: u64,
}
#[derive(Debug, Serialize, Deserialize)]
enum Stage {
Starting,
FetchExistingObjects,
AssignFilePathsToExistingObjects {
existing_objects_by_cas_id: HashMap<String, object_for_file_identifier::Data>,
},
CreateObjects,
}
impl ObjectProcessorTask {
fn new(
identified_files: HashMap<Uuid, IdentifiedFile>,
db: Arc<PrismaClient>,
sync: Arc<SyncManager>,
is_shallow: bool,
) -> Self {
Self {
id: TaskId::new_v4(),
db,
sync,
identified_files,
stage: Stage::Starting,
metrics: ObjectProcessorTaskMetrics::default(),
is_shallow,
}
}
pub fn new_deep(
identified_files: HashMap<Uuid, IdentifiedFile>,
db: Arc<PrismaClient>,
sync: Arc<SyncManager>,
) -> Self {
Self::new(identified_files, db, sync, false)
}
pub fn new_shallow(
identified_files: HashMap<Uuid, IdentifiedFile>,
db: Arc<PrismaClient>,
sync: Arc<SyncManager>,
) -> Self {
Self::new(identified_files, db, sync, true)
}
}
#[async_trait::async_trait]
impl Task<Error> for ObjectProcessorTask {
fn id(&self) -> TaskId {
self.id
}
fn with_priority(&self) -> bool {
self.is_shallow
}
async fn run(&mut self, interrupter: &Interrupter) -> Result<ExecStatus, Error> {
let Self {
db,
sync,
identified_files,
stage,
metrics:
ObjectProcessorTaskMetrics {
assign_cas_ids_time,
fetch_existing_objects_time,
assign_to_existing_object_time,
create_object_time,
created_objects_count,
linked_objects_count,
},
..
} = self;
loop {
match stage {
Stage::Starting => {
let start = Instant::now();
assign_cas_id_to_file_paths(identified_files, db, sync).await?;
*assign_cas_ids_time = start.elapsed();
*stage = Stage::FetchExistingObjects;
}
Stage::FetchExistingObjects => {
let start = Instant::now();
let existing_objects_by_cas_id =
fetch_existing_objects_by_cas_id(identified_files, db).await?;
*fetch_existing_objects_time = start.elapsed();
*stage = Stage::AssignFilePathsToExistingObjects {
existing_objects_by_cas_id,
};
}
Stage::AssignFilePathsToExistingObjects {
existing_objects_by_cas_id,
} => {
let start = Instant::now();
let assigned_file_path_pub_ids = assign_existing_objects_to_file_paths(
identified_files,
existing_objects_by_cas_id,
db,
sync,
)
.await?;
*assign_to_existing_object_time = start.elapsed();
*linked_objects_count = assigned_file_path_pub_ids.len() as u64;
debug!(
"Found {} existing Objects, linked file paths to them",
existing_objects_by_cas_id.len()
);
for file_path_pub_id::Data { pub_id } in assigned_file_path_pub_ids {
let pub_id = Uuid::from_slice(&pub_id).expect("uuid bytes are invalid");
trace!("Assigned file path <file_path_pub_id={pub_id}> to existing object");
identified_files
.remove(&pub_id)
.expect("file_path must be here");
}
*stage = Stage::CreateObjects;
if identified_files.is_empty() {
// No objects to be created, we're good to finish already
break;
}
}
Stage::CreateObjects => {
let start = Instant::now();
*created_objects_count = create_objects(identified_files, db, sync).await?;
*create_object_time = start.elapsed();
break;
}
}
check_interruption!(interrupter);
}
Ok(ExecStatus::Done(mem::take(&mut self.metrics).into_output()))
}
}
async fn assign_cas_id_to_file_paths(
identified_files: &HashMap<Uuid, IdentifiedFile>,
db: &PrismaClient,
sync: &SyncManager,
) -> Result<(), FileIdentifierError> {
// Assign cas_id to each file path
sync.write_ops(
db,
identified_files
.iter()
.map(|(pub_id, IdentifiedFile { cas_id, .. })| {
(
sync.shared_update(
prisma_sync::file_path::SyncId {
pub_id: uuid_to_bytes(*pub_id),
},
file_path::cas_id::NAME,
msgpack!(cas_id),
),
db.file_path()
.update(
file_path::pub_id::equals(uuid_to_bytes(*pub_id)),
vec![file_path::cas_id::set(cas_id.clone())],
)
// We don't need any data here, just the id avoids receiving the entire object
// as we can't pass an empty select macro call
.select(file_path::select!({ id })),
)
})
.unzip::<_, _, _, Vec<_>>(),
)
.await?;
Ok(())
}
async fn fetch_existing_objects_by_cas_id(
identified_files: &HashMap<Uuid, IdentifiedFile>,
db: &PrismaClient,
) -> Result<HashMap<String, object_for_file_identifier::Data>, FileIdentifierError> {
// Retrieves objects that are already connected to file paths with the same id
db.object()
.find_many(vec![object::file_paths::some(vec![
file_path::cas_id::in_vec(
identified_files
.values()
.filter_map(|IdentifiedFile { cas_id, .. }| cas_id.as_ref())
.cloned()
.collect::<HashSet<_>>()
.into_iter()
.collect(),
),
])])
.select(object_for_file_identifier::select())
.exec()
.await
.map_err(Into::into)
.map(|objects| {
objects
.into_iter()
.filter_map(|object| {
object
.file_paths
.first()
.and_then(|file_path| file_path.cas_id.clone())
.map(|cas_id| (cas_id, object))
})
.collect()
})
}
async fn assign_existing_objects_to_file_paths(
identified_files: &HashMap<Uuid, IdentifiedFile>,
objects_by_cas_id: &HashMap<String, object_for_file_identifier::Data>,
db: &PrismaClient,
sync: &SyncManager,
) -> Result<Vec<file_path_pub_id::Data>, FileIdentifierError> {
// Attempt to associate each file path with an object that has been
// connected to file paths with the same cas_id
sync.write_ops(
db,
identified_files
.iter()
.filter_map(|(pub_id, IdentifiedFile { cas_id, .. })| {
objects_by_cas_id
// Filtering out files without cas_id due to being empty
.get(cas_id.as_ref()?)
.map(|object| (*pub_id, object))
})
.map(|(pub_id, object)| {
connect_file_path_to_object(
pub_id,
// SAFETY: This pub_id is generated by the uuid lib, but we have to store bytes in sqlite
Uuid::from_slice(&object.pub_id).expect("uuid bytes are invalid"),
sync,
db,
)
})
.unzip::<_, _, Vec<_>, Vec<_>>(),
)
.await
.map_err(Into::into)
}
fn connect_file_path_to_object<'db>(
file_path_pub_id: Uuid,
object_pub_id: Uuid,
sync: &SyncManager,
db: &'db PrismaClient,
) -> (CRDTOperation, Select<'db, file_path_pub_id::Data>) {
trace!("Connecting <file_path_pub_id={file_path_pub_id}> to <object_pub_id={object_pub_id}'>");
let vec_id = object_pub_id.as_bytes().to_vec();
(
sync.shared_update(
prisma_sync::file_path::SyncId {
pub_id: uuid_to_bytes(file_path_pub_id),
},
file_path::object::NAME,
msgpack!(prisma_sync::object::SyncId {
pub_id: vec_id.clone()
}),
),
db.file_path()
.update(
file_path::pub_id::equals(uuid_to_bytes(file_path_pub_id)),
vec![file_path::object::connect(object::pub_id::equals(vec_id))],
)
.select(file_path_pub_id::select()),
)
}
async fn create_objects(
identified_files: &HashMap<Uuid, IdentifiedFile>,
db: &PrismaClient,
sync: &SyncManager,
) -> Result<u64, FileIdentifierError> {
trace!("Creating {} new Objects", identified_files.len(),);
let (object_create_args, file_path_update_args) = identified_files
.iter()
.map(
|(
file_path_pub_id,
IdentifiedFile {
file_path: file_path_for_file_identifier::Data { date_created, .. },
kind,
..
},
)| {
let object_pub_id = Uuid::new_v4();
let kind = *kind as i32;
let (sync_params, db_params) = [
(
(object::date_created::NAME, msgpack!(date_created)),
object::date_created::set(*date_created),
),
(
(object::kind::NAME, msgpack!(kind)),
object::kind::set(Some(kind)),
),
]
.into_iter()
.unzip::<_, _, Vec<_>, Vec<_>>();
(
(
sync.shared_create(
prisma_sync::object::SyncId {
pub_id: uuid_to_bytes(object_pub_id),
},
sync_params,
),
object::create_unchecked(uuid_to_bytes(object_pub_id), db_params),
),
connect_file_path_to_object(*file_path_pub_id, object_pub_id, sync, db),
)
},
)
.unzip::<_, _, Vec<_>, Vec<_>>();
// create new object records with assembled values
let total_created_files = sync
.write_ops(db, {
let (sync, db_params) = object_create_args
.into_iter()
.unzip::<_, _, Vec<_>, Vec<_>>();
(
sync.into_iter().flatten().collect(),
db.object().create_many(db_params),
)
})
.await?;
trace!("Created {total_created_files} new Objects");
if total_created_files > 0 {
trace!("Updating file paths with created objects");
sync.write_ops(
db,
file_path_update_args
.into_iter()
.unzip::<_, _, Vec<_>, Vec<_>>(),
)
.await?;
trace!("Updated file paths with created objects");
}
#[allow(clippy::cast_sign_loss)] // SAFETY: We're sure the value is positive
Ok(total_created_files as u64)
}
impl SerializableTask<Error> for ObjectProcessorTask {
type SerializeError = rmp_serde::encode::Error;
type DeserializeError = rmp_serde::decode::Error;
type DeserializeCtx = (Arc<PrismaClient>, Arc<SyncManager>);
async fn serialize(self) -> Result<Vec<u8>, Self::SerializeError> {
let Self {
id,
identified_files,
metrics,
stage,
is_shallow,
..
} = self;
rmp_serde::to_vec_named(&SaveState {
id,
identified_files,
metrics,
stage,
is_shallow,
})
}
async fn deserialize(
data: &[u8],
(db, sync): Self::DeserializeCtx,
) -> Result<Self, Self::DeserializeError> {
rmp_serde::from_slice(data).map(
|SaveState {
id,
identified_files,
metrics,
stage,
is_shallow,
}| Self {
id,
db,
sync,
identified_files,
metrics,
stage,
is_shallow,
},
)
}
}

View file

@ -8,13 +8,15 @@ use crate::{
utils::cancel_pending_tasks,
SerializableJob, SerializedTasks,
},
Error, NonCriticalJobError,
utils::sub_path::get_full_path_from_sub_path,
Error, LocationScanState, NonCriticalJobError,
};
use sd_core_file_path_helper::IsolatedFilePathData;
use sd_core_indexer_rules::{IndexerRule, IndexerRuler};
use sd_core_prisma_helpers::location_with_indexer_rules;
use sd_prisma::prisma::location;
use sd_task_system::{
AnyTaskOutput, IntoTask, SerializableTask, Task, TaskDispatcher, TaskHandle, TaskId,
TaskOutput, TaskStatus,
@ -39,7 +41,7 @@ use tokio::time::Instant;
use tracing::warn;
use super::{
determine_initial_walk_path, remove_non_existing_file_paths, reverse_update_directories_sizes,
remove_non_existing_file_paths, reverse_update_directories_sizes,
tasks::{
saver::{SaveTask, SaveTaskOutput},
updater::{UpdateTask, UpdateTaskOutput},
@ -70,6 +72,64 @@ pub struct IndexerJob {
impl Job for IndexerJob {
const NAME: JobName = JobName::Indexer;
async fn resume_tasks(
&mut self,
dispatcher: &JobTaskDispatcher,
ctx: &impl JobContext,
SerializedTasks(serialized_tasks): SerializedTasks,
) -> Result<(), Error> {
let location_id = self.location.id;
self.pending_tasks_on_resume = dispatcher
.dispatch_many_boxed(
rmp_serde::from_slice::<Vec<(TaskKind, Vec<u8>)>>(&serialized_tasks)
.map_err(IndexerError::from)?
.into_iter()
.map(|(task_kind, task_bytes)| {
let indexer_ruler = self.indexer_ruler.clone();
let iso_file_path_factory = self.iso_file_path_factory.clone();
async move {
match task_kind {
TaskKind::Walk => WalkDirTask::deserialize(
&task_bytes,
(
indexer_ruler.clone(),
WalkerDBProxy {
location_id,
db: Arc::clone(ctx.db()),
},
iso_file_path_factory.clone(),
dispatcher.clone(),
),
)
.await
.map(IntoTask::into_task),
TaskKind::Save => SaveTask::deserialize(
&task_bytes,
(Arc::clone(ctx.db()), Arc::clone(ctx.sync())),
)
.await
.map(IntoTask::into_task),
TaskKind::Update => UpdateTask::deserialize(
&task_bytes,
(Arc::clone(ctx.db()), Arc::clone(ctx.sync())),
)
.await
.map(IntoTask::into_task),
}
}
})
.collect::<Vec<_>>()
.try_join()
.await
.map_err(IndexerError::from)?,
)
.await;
Ok(())
}
async fn run(
mut self,
dispatcher: JobTaskDispatcher,
@ -102,7 +162,7 @@ impl Job for IndexerJob {
self.metadata.total_paths += chunked_saves.len() as u64;
self.metadata.total_save_steps += 1;
SaveTask::new(
SaveTask::new_deep(
self.location.id,
self.location.pub_id.clone(),
chunked_saves,
@ -162,6 +222,10 @@ impl Job for IndexerJob {
metadata.db_write_time += start_size_update_time.elapsed();
}
if metadata.removed_count > 0 {
// TODO: Dispatch a task to remove orphan objects
}
if metadata.indexed_count > 0 || metadata.removed_count > 0 {
ctx.invalidate_query("search.paths");
}
@ -171,6 +235,16 @@ impl Job for IndexerJob {
"all tasks must be completed here"
);
ctx.db()
.location()
.update(
location::id::equals(location.id),
vec![location::scan_state::set(LocationScanState::Indexed as i32)],
)
.exec()
.await
.map_err(IndexerError::from)?;
Ok(ReturnStatus::Completed(
JobReturn::builder()
.with_metadata(metadata)
@ -178,64 +252,6 @@ impl Job for IndexerJob {
.build(),
))
}
async fn resume_tasks(
&mut self,
dispatcher: &JobTaskDispatcher,
ctx: &impl JobContext,
SerializedTasks(serialized_tasks): SerializedTasks,
) -> Result<(), Error> {
let location_id = self.location.id;
self.pending_tasks_on_resume = dispatcher
.dispatch_many_boxed(
rmp_serde::from_slice::<Vec<(TaskKind, Vec<u8>)>>(&serialized_tasks)
.map_err(IndexerError::from)?
.into_iter()
.map(|(task_kind, task_bytes)| {
let indexer_ruler = self.indexer_ruler.clone();
let iso_file_path_factory = self.iso_file_path_factory.clone();
async move {
match task_kind {
TaskKind::Walk => WalkDirTask::deserialize(
&task_bytes,
(
indexer_ruler.clone(),
WalkerDBProxy {
location_id,
db: Arc::clone(ctx.db()),
},
iso_file_path_factory.clone(),
dispatcher.clone(),
),
)
.await
.map(IntoTask::into_task),
TaskKind::Save => SaveTask::deserialize(
&task_bytes,
(Arc::clone(ctx.db()), Arc::clone(ctx.sync())),
)
.await
.map(IntoTask::into_task),
TaskKind::Update => UpdateTask::deserialize(
&task_bytes,
(Arc::clone(ctx.db()), Arc::clone(ctx.sync())),
)
.await
.map(IntoTask::into_task),
}
}
})
.collect::<Vec<_>>()
.try_join()
.await
.map_err(IndexerError::from)?,
)
.await;
Ok(())
}
}
impl IndexerJob {
@ -282,6 +298,12 @@ impl IndexerJob {
job_ctx: &impl JobContext,
dispatcher: &JobTaskDispatcher,
) -> Result<Vec<TaskHandle<Error>>, IndexerError> {
self.metadata.completed_tasks += 1;
job_ctx.progress(vec![ProgressUpdate::CompletedTaskCount(
self.metadata.completed_tasks,
)]);
if any_task_output.is::<WalkTaskOutput>() {
return self
.process_walk_output(
@ -310,12 +332,6 @@ impl IndexerJob {
unreachable!("Unexpected task output type: <id='{task_id}'>");
}
self.metadata.completed_tasks += 1;
job_ctx.progress(vec![ProgressUpdate::CompletedTaskCount(
self.metadata.completed_tasks,
)]);
Ok(Vec::new())
}
@ -394,7 +410,7 @@ impl IndexerJob {
self.metadata.total_paths += chunked_saves.len() as u64;
self.metadata.total_save_steps += 1;
SaveTask::new(
SaveTask::new_deep(
self.location.id,
self.location.pub_id.clone(),
chunked_saves,
@ -413,7 +429,7 @@ impl IndexerJob {
self.metadata.total_updated_paths += chunked_updates.len() as u64;
self.metadata.total_update_steps += 1;
UpdateTask::new(
UpdateTask::new_deep(
chunked_updates,
Arc::clone(job_ctx.db()),
Arc::clone(job_ctx.sync()),
@ -528,7 +544,7 @@ impl IndexerJob {
// if we don't have any pending task, then this is a fresh job
if self.pending_tasks_on_resume.is_empty() {
let walker_root_path = Arc::new(
determine_initial_walk_path(
get_full_path_from_sub_path(
self.location.id,
&self.sub_path,
&*self.iso_file_path_factory.location_path,
@ -539,7 +555,7 @@ impl IndexerJob {
pending_running_tasks.push(
dispatcher
.dispatch(WalkDirTask::new(
.dispatch(WalkDirTask::new_deep(
walker_root_path.as_ref(),
Arc::clone(&walker_root_path),
self.indexer_ruler.clone(),
@ -548,7 +564,7 @@ impl IndexerJob {
location_id: self.location.id,
db: Arc::clone(job_ctx.db()),
},
Some(dispatcher.clone()),
dispatcher.clone(),
)?)
.await,
);

View file

@ -1,9 +1,6 @@
use crate::NonCriticalJobError;
use crate::{utils::sub_path::SubPathError, NonCriticalJobError};
use sd_core_file_path_helper::{
ensure_file_path_exists, ensure_sub_path_is_directory, ensure_sub_path_is_in_location,
FilePathError, IsolatedFilePathData,
};
use sd_core_file_path_helper::{FilePathError, IsolatedFilePathData};
use sd_core_indexer_rules::IndexerRuleError;
use sd_core_prisma_helpers::{
file_path_pub_and_cas_ids, file_path_to_isolate_with_pub_id, file_path_walker,
@ -53,8 +50,8 @@ pub enum IndexerError {
// Not Found errors
#[error("indexer rule not found: <id='{0}'>")]
IndexerRuleNotFound(i32),
#[error("received sub path not in database: <path='{}'>", .0.display())]
SubPathNotFound(Box<Path>),
#[error(transparent)]
SubPath(#[from] SubPathError),
// Internal Errors
#[error("database Error: {0}")]
@ -78,10 +75,12 @@ pub enum IndexerError {
impl From<IndexerError> for rspc::Error {
fn from(err: IndexerError) -> Self {
match err {
IndexerError::IndexerRuleNotFound(_) | IndexerError::SubPathNotFound(_) => {
IndexerError::IndexerRuleNotFound(_) => {
Self::with_cause(ErrorCode::NotFound, err.to_string(), err)
}
IndexerError::SubPath(sub_path_err) => sub_path_err.into(),
IndexerError::Rules(rule_err) => rule_err.into(),
_ => Self::with_cause(ErrorCode::InternalServerError, err.to_string(), err),
@ -111,36 +110,6 @@ pub enum NonCriticalIndexerError {
MissingFilePathData(String),
}
async fn determine_initial_walk_path(
location_id: location::id::Type,
sub_path: &Option<impl AsRef<Path> + Send + Sync>,
location_path: impl AsRef<Path> + Send,
db: &PrismaClient,
) -> Result<PathBuf, IndexerError> {
let location_path = location_path.as_ref();
match sub_path {
Some(sub_path) if sub_path.as_ref() != Path::new("") => {
let sub_path = sub_path.as_ref();
let full_path = ensure_sub_path_is_in_location(location_path, sub_path).await?;
ensure_sub_path_is_directory(location_path, sub_path).await?;
ensure_file_path_exists(
sub_path,
&IsolatedFilePathData::new(location_id, location_path, &full_path, true)
.map_err(IndexerError::from)?,
db,
IndexerError::SubPathNotFound,
)
.await?;
Ok(full_path)
}
_ => Ok(location_path.to_path_buf()),
}
}
fn chunk_db_queries<'db, 'iso>(
iso_file_paths: impl IntoIterator<Item = &'iso IsolatedFilePathData<'iso>>,
db: &'db PrismaClient,

View file

@ -1,4 +1,4 @@
use crate::{Error, NonCriticalJobError};
use crate::{utils::sub_path::get_full_path_from_sub_path, Error, NonCriticalJobError};
use sd_core_indexer_rules::{IndexerRule, IndexerRuler};
use sd_core_prisma_helpers::location_with_indexer_rules;
@ -19,7 +19,7 @@ use itertools::Itertools;
use tracing::{debug, warn};
use super::{
determine_initial_walk_path, remove_non_existing_file_paths, reverse_update_directories_sizes,
remove_non_existing_file_paths, reverse_update_directories_sizes,
tasks::{
saver::{SaveTask, SaveTaskOutput},
updater::{UpdateTask, UpdateTaskOutput},
@ -45,7 +45,9 @@ pub async fn shallow(
.map_err(IndexerError::from)?;
let to_walk_path = Arc::new(
determine_initial_walk_path(location.id, &Some(sub_path), &*location_path, &db).await?,
get_full_path_from_sub_path(location.id, &Some(sub_path), &*location_path, &db)
.await
.map_err(IndexerError::from)?,
);
let Some(WalkTaskOutput {
@ -124,7 +126,7 @@ async fn walk(
dispatcher: &BaseTaskDispatcher<Error>,
) -> Result<Option<WalkTaskOutput>, Error> {
match dispatcher
.dispatch(WalkDirTask::new(
.dispatch(WalkDirTask::new_shallow(
ToWalkEntry::from(&*to_walk_path),
to_walk_path,
location
@ -142,7 +144,6 @@ async fn walk(
location_id: location.id,
db,
},
None::<BaseTaskDispatcher<Error>>,
)?)
.await
.await?
@ -186,7 +187,7 @@ async fn save_and_update(
.chunks(BATCH_SIZE)
.into_iter()
.map(|chunk| {
SaveTask::new(
SaveTask::new_shallow(
location.id,
location.pub_id.clone(),
chunk.collect::<Vec<_>>(),
@ -201,7 +202,7 @@ async fn save_and_update(
.chunks(BATCH_SIZE)
.into_iter()
.map(|chunk| {
UpdateTask::new(
UpdateTask::new_shallow(
chunk.collect::<Vec<_>>(),
Arc::clone(&db),
Arc::clone(&sync),

View file

@ -28,11 +28,12 @@ pub struct SaveTask {
walked_entries: Vec<WalkedEntry>,
db: Arc<PrismaClient>,
sync: Arc<SyncManager>,
is_shallow: bool,
}
impl SaveTask {
#[must_use]
pub fn new(
pub fn new_deep(
location_id: location::id::Type,
location_pub_id: location::pub_id::Type,
walked_entries: Vec<WalkedEntry>,
@ -46,6 +47,26 @@ impl SaveTask {
walked_entries,
db,
sync,
is_shallow: false,
}
}
#[must_use]
pub fn new_shallow(
location_id: location::id::Type,
location_pub_id: location::pub_id::Type,
walked_entries: Vec<WalkedEntry>,
db: Arc<PrismaClient>,
sync: Arc<SyncManager>,
) -> Self {
Self {
id: TaskId::new_v4(),
location_id,
location_pub_id,
walked_entries,
db,
sync,
is_shallow: true,
}
}
}
@ -56,6 +77,7 @@ struct SaveTaskSaveState {
location_id: location::id::Type,
location_pub_id: location::pub_id::Type,
walked_entries: Vec<WalkedEntry>,
is_shallow: bool,
}
impl SerializableTask<Error> for SaveTask {
@ -71,6 +93,7 @@ impl SerializableTask<Error> for SaveTask {
location_id,
location_pub_id,
walked_entries,
is_shallow,
..
} = self;
rmp_serde::to_vec_named(&SaveTaskSaveState {
@ -78,6 +101,7 @@ impl SerializableTask<Error> for SaveTask {
location_id,
location_pub_id,
walked_entries,
is_shallow,
})
}
@ -91,6 +115,7 @@ impl SerializableTask<Error> for SaveTask {
location_id,
location_pub_id,
walked_entries,
is_shallow,
}| Self {
id,
location_id,
@ -98,6 +123,7 @@ impl SerializableTask<Error> for SaveTask {
walked_entries,
db,
sync,
is_shallow,
},
)
}
@ -115,6 +141,11 @@ impl Task<Error> for SaveTask {
self.id
}
fn with_priority(&self) -> bool {
// If we're running in shallow mode, then we want priority
self.is_shallow
}
async fn run(&mut self, _: &Interrupter) -> Result<ExecStatus, Error> {
use file_path::{
create_unchecked, date_created, date_indexed, date_modified, extension, hidden, inode,

View file

@ -28,11 +28,12 @@ pub struct UpdateTask {
object_ids_that_should_be_unlinked: HashSet<object::id::Type>,
db: Arc<PrismaClient>,
sync: Arc<SyncManager>,
is_shallow: bool,
}
impl UpdateTask {
#[must_use]
pub fn new(
pub fn new_deep(
walked_entries: Vec<WalkedEntry>,
db: Arc<PrismaClient>,
sync: Arc<SyncManager>,
@ -43,6 +44,23 @@ impl UpdateTask {
db,
sync,
object_ids_that_should_be_unlinked: HashSet::new(),
is_shallow: false,
}
}
#[must_use]
pub fn new_shallow(
walked_entries: Vec<WalkedEntry>,
db: Arc<PrismaClient>,
sync: Arc<SyncManager>,
) -> Self {
Self {
id: TaskId::new_v4(),
walked_entries,
db,
sync,
object_ids_that_should_be_unlinked: HashSet::new(),
is_shallow: true,
}
}
}
@ -52,6 +70,7 @@ struct UpdateTaskSaveState {
id: TaskId,
walked_entries: Vec<WalkedEntry>,
object_ids_that_should_be_unlinked: HashSet<object::id::Type>,
is_shallow: bool,
}
impl SerializableTask<Error> for UpdateTask {
@ -62,10 +81,19 @@ impl SerializableTask<Error> for UpdateTask {
type DeserializeCtx = (Arc<PrismaClient>, Arc<SyncManager>);
async fn serialize(self) -> Result<Vec<u8>, Self::SerializeError> {
let Self {
id,
walked_entries,
object_ids_that_should_be_unlinked,
is_shallow,
..
} = self;
rmp_serde::to_vec_named(&UpdateTaskSaveState {
id: self.id,
walked_entries: self.walked_entries,
object_ids_that_should_be_unlinked: self.object_ids_that_should_be_unlinked,
id,
walked_entries,
object_ids_that_should_be_unlinked,
is_shallow,
})
}
@ -78,12 +106,14 @@ impl SerializableTask<Error> for UpdateTask {
id,
walked_entries,
object_ids_that_should_be_unlinked,
is_shallow,
}| Self {
id,
walked_entries,
object_ids_that_should_be_unlinked,
db,
sync,
is_shallow,
},
)
}
@ -101,6 +131,11 @@ impl Task<Error> for UpdateTask {
self.id
}
fn with_priority(&self) -> bool {
// If we're running in shallow mode, then we want priority
self.is_shallow
}
async fn run(&mut self, interrupter: &Interrupter) -> Result<ExecStatus, Error> {
use file_path::{
cas_id, date_created, date_modified, hidden, inode, is_dir, object, object_id,

View file

@ -9,8 +9,8 @@ use sd_core_prisma_helpers::{file_path_pub_and_cas_ids, file_path_walker};
use sd_prisma::prisma::file_path;
use sd_task_system::{
check_interruption, ExecStatus, Interrupter, IntoAnyTaskOutput, SerializableTask, Task,
TaskDispatcher, TaskHandle, TaskId,
check_interruption, BaseTaskDispatcher, ExecStatus, Interrupter, IntoAnyTaskOutput,
SerializableTask, Task, TaskDispatcher, TaskHandle, TaskId,
};
use sd_utils::{db::inode_from_db, error::FileIOError};
@ -239,6 +239,7 @@ struct WalkDirSaveState {
stage: WalkerStageSaveState,
errors: Vec<NonCriticalJobError>,
scan_time: Duration,
is_shallow: bool,
}
#[derive(Debug, Serialize, Deserialize)]
@ -351,7 +352,7 @@ impl From<WalkerStageSaveState> for WalkerStage {
}
#[derive(Debug)]
pub struct WalkDirTask<DBProxy, IsoPathFactory, Dispatcher>
pub struct WalkDirTask<DBProxy, IsoPathFactory, Dispatcher = BaseTaskDispatcher<Error>>
where
DBProxy: WalkerDBProxy,
IsoPathFactory: IsoFilePathFactory,
@ -368,6 +369,7 @@ where
maybe_dispatcher: Option<Dispatcher>,
errors: Vec<NonCriticalJobError>,
scan_time: Duration,
is_shallow: bool,
}
impl<DBProxy, IsoPathFactory, Dispatcher> WalkDirTask<DBProxy, IsoPathFactory, Dispatcher>
@ -376,13 +378,13 @@ where
IsoPathFactory: IsoFilePathFactory,
Dispatcher: TaskDispatcher<Error>,
{
pub fn new(
pub fn new_deep(
entry: impl Into<ToWalkEntry> + Send,
root: Arc<PathBuf>,
indexer_ruler: IndexerRuler,
iso_file_path_factory: IsoPathFactory,
db_proxy: DBProxy,
maybe_dispatcher: Option<Dispatcher>,
dispatcher: Dispatcher,
) -> Result<Self, IndexerError> {
let entry = entry.into();
Ok(Self {
@ -394,7 +396,38 @@ where
db_proxy,
stage: WalkerStage::Start,
entry,
maybe_dispatcher,
maybe_dispatcher: Some(dispatcher),
is_shallow: false,
errors: Vec::new(),
scan_time: Duration::ZERO,
})
}
}
impl<DBProxy, IsoPathFactory> WalkDirTask<DBProxy, IsoPathFactory, BaseTaskDispatcher<Error>>
where
DBProxy: WalkerDBProxy,
IsoPathFactory: IsoFilePathFactory,
{
pub fn new_shallow(
entry: impl Into<ToWalkEntry> + Send,
root: Arc<PathBuf>,
indexer_ruler: IndexerRuler,
iso_file_path_factory: IsoPathFactory,
db_proxy: DBProxy,
) -> Result<Self, IndexerError> {
let entry = entry.into();
Ok(Self {
id: TaskId::new_v4(),
root,
indexer_ruler,
entry_iso_file_path: iso_file_path_factory.build(&entry.path, true)?,
iso_file_path_factory,
db_proxy,
stage: WalkerStage::Start,
entry,
maybe_dispatcher: None,
is_shallow: true,
errors: Vec::new(),
scan_time: Duration::ZERO,
})
@ -413,14 +446,26 @@ where
type DeserializeCtx = (IndexerRuler, DBProxy, IsoPathFactory, Dispatcher);
async fn serialize(self) -> Result<Vec<u8>, Self::SerializeError> {
let Self {
id,
entry,
root,
entry_iso_file_path,
stage,
errors,
scan_time,
is_shallow,
..
} = self;
rmp_serde::to_vec_named(&WalkDirSaveState {
id: self.id,
entry: self.entry,
root: self.root,
entry_iso_file_path: self.entry_iso_file_path,
stage: self.stage.into(),
errors: self.errors,
scan_time: self.scan_time,
id,
entry,
root,
entry_iso_file_path,
stage: stage.into(),
errors,
scan_time,
is_shallow,
})
}
@ -437,6 +482,7 @@ where
stage,
errors,
scan_time,
is_shallow,
}| Self {
id,
entry,
@ -446,9 +492,10 @@ where
iso_file_path_factory,
db_proxy,
stage: stage.into(),
maybe_dispatcher: Some(dispatcher),
maybe_dispatcher: is_shallow.then_some(dispatcher),
errors,
scan_time,
is_shallow,
},
)
}
@ -466,6 +513,11 @@ where
self.id
}
fn with_priority(&self) -> bool {
// If we're running in shallow mode, then we want priority
self.is_shallow
}
#[allow(clippy::too_many_lines)]
async fn run(&mut self, interrupter: &Interrupter) -> Result<ExecStatus, Error> {
let Self {
@ -747,13 +799,13 @@ async fn keep_walking(
to_keep_walking
.drain(..)
.map(|entry| {
WalkDirTask::new(
WalkDirTask::new_deep(
entry,
Arc::clone(root),
indexer_ruler.clone(),
iso_file_path_factory.clone(),
db_proxy.clone(),
Some(dispatcher.clone()),
dispatcher.clone(),
)
.map_err(|e| NonCriticalIndexerError::DispatchKeepWalking(e.to_string()))
})
@ -1226,7 +1278,7 @@ mod tests {
let handle = system
.dispatch(
WalkDirTask::new(
WalkDirTask::new_deep(
root_path.to_path_buf(),
Arc::new(root_path.to_path_buf()),
indexer_ruler,
@ -1234,7 +1286,7 @@ mod tests {
root_path: Arc::new(root_path.to_path_buf()),
},
DummyDBProxy,
Some(system.get_dispatcher()),
system.get_dispatcher(),
)
.unwrap(),
)

View file

@ -25,7 +25,10 @@ use futures_concurrency::{
use serde::{Deserialize, Serialize};
use specta::Type;
use strum::{Display, EnumString};
use tokio::spawn;
use tokio::{
spawn,
sync::{watch, Mutex},
};
use tracing::{debug, error, info, warn};
use uuid::Uuid;
@ -42,6 +45,7 @@ use super::{
#[strum(use_phf, serialize_all = "snake_case")]
pub enum JobName {
Indexer,
FileIdentifier,
// TODO: Add more job names as needed
}
@ -631,7 +635,10 @@ async fn to_spawn_job<Ctx: JobContext>(
let mut remote_controllers = vec![];
let (dispatcher, remote_controllers_rx) = JobTaskDispatcher::new(base_dispatcher);
let (running_state_tx, running_state_rx) = watch::channel(JobRunningState::Running);
let (dispatcher, remote_controllers_rx) =
JobTaskDispatcher::new(base_dispatcher, running_state_rx);
if let Some(existing_tasks) = existing_tasks {
if let Err(e) = job
@ -664,6 +671,7 @@ async fn to_spawn_job<Ctx: JobContext>(
match command {
Command::Pause => {
running_state_tx.send_modify(|state| *state = JobRunningState::Paused);
remote_controllers
.iter()
.map(TaskRemoteController::pause)
@ -680,6 +688,8 @@ async fn to_spawn_job<Ctx: JobContext>(
});
}
Command::Resume => {
running_state_tx.send_modify(|state| *state = JobRunningState::Running);
remote_controllers
.iter()
.map(TaskRemoteController::resume)
@ -726,14 +736,29 @@ async fn to_spawn_job<Ctx: JobContext>(
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum JobRunningState {
Running,
Paused,
}
impl Default for JobRunningState {
fn default() -> Self {
Self::Running
}
}
#[derive(Debug, Clone)]
pub struct JobTaskDispatcher {
dispatcher: BaseTaskDispatcher<Error>,
remote_controllers_tx: chan::Sender<TaskRemoteController>,
running_state: Arc<Mutex<watch::Receiver<JobRunningState>>>,
}
impl TaskDispatcher<Error> for JobTaskDispatcher {
async fn dispatch_boxed(&self, boxed_task: Box<dyn Task<Error>>) -> TaskHandle<Error> {
self.wait_for_dispatch_approval().await;
let handle = self.dispatcher.dispatch_boxed(boxed_task).await;
self.remote_controllers_tx
@ -748,14 +773,9 @@ impl TaskDispatcher<Error> for JobTaskDispatcher {
&self,
boxed_tasks: impl IntoIterator<Item = Box<dyn Task<Error>>> + Send,
) -> Vec<TaskHandle<Error>> {
let handles = self.dispatcher.dispatch_many_boxed(boxed_tasks).await;
self.wait_for_dispatch_approval().await;
for handle in &handles {
self.remote_controllers_tx
.send(handle.remote_controller())
.await
.expect("remote controllers tx closed");
}
let handles = self.dispatcher.dispatch_many_boxed(boxed_tasks).await;
handles
.iter()
@ -770,15 +790,28 @@ impl TaskDispatcher<Error> for JobTaskDispatcher {
}
impl JobTaskDispatcher {
fn new(dispatcher: BaseTaskDispatcher<Error>) -> (Self, chan::Receiver<TaskRemoteController>) {
fn new(
dispatcher: BaseTaskDispatcher<Error>,
running_state_rx: watch::Receiver<JobRunningState>,
) -> (Self, chan::Receiver<TaskRemoteController>) {
let (remote_controllers_tx, remote_controllers_rx) = chan::unbounded();
(
Self {
dispatcher,
remote_controllers_tx,
running_state: Arc::new(Mutex::new(running_state_rx)),
},
remote_controllers_rx,
)
}
async fn wait_for_dispatch_approval(&self) {
self.running_state
.lock()
.await
.wait_for(|state| *state == JobRunningState::Running)
.await
.expect("job running state watch channel unexpectedly closed");
}
}

View file

@ -1,4 +1,4 @@
use crate::indexer::IndexerJob;
use crate::{file_identifier::FileIdentifierJob, indexer::IndexerJob};
use sd_prisma::prisma::{job, location};
use sd_utils::uuid_to_bytes;
@ -212,6 +212,7 @@ async fn load_job<Ctx: JobContext>(
Ctx,
[
IndexerJob,
FileIdentifierJob,
// TODO: Add more jobs here
// e.g.: FileIdentifierJob, MediaProcessorJob, etc.,
]

View file

@ -33,9 +33,12 @@ use serde::{Deserialize, Serialize};
use specta::Type;
use thiserror::Error;
pub mod file_identifier;
pub mod indexer;
pub mod job_system;
pub mod utils;
use file_identifier::{FileIdentifierError, NonCriticalFileIdentifierError};
use indexer::{IndexerError, NonCriticalIndexerError};
pub use job_system::{
@ -47,6 +50,8 @@ pub use job_system::{
pub enum Error {
#[error(transparent)]
Indexer(#[from] IndexerError),
#[error(transparent)]
FileIdentifier(#[from] FileIdentifierError),
#[error(transparent)]
TaskSystem(#[from] TaskSystemError),
@ -56,6 +61,7 @@ impl From<Error> for rspc::Error {
fn from(e: Error) -> Self {
match e {
Error::Indexer(e) => e.into(),
Error::FileIdentifier(e) => e.into(),
Error::TaskSystem(e) => {
Self::with_cause(rspc::ErrorCode::InternalServerError, e.to_string(), e)
}
@ -68,4 +74,15 @@ pub enum NonCriticalJobError {
// TODO: Add variants as needed
#[error(transparent)]
Indexer(#[from] NonCriticalIndexerError),
#[error(transparent)]
FileIdentifier(#[from] NonCriticalFileIdentifierError),
}
#[repr(i32)]
#[derive(Debug, Clone, Copy, Serialize, Deserialize, Type, Eq, PartialEq)]
pub enum LocationScanState {
Pending = 0,
Indexed = 1,
FilesIdentified = 2,
Completed = 3,
}

View file

@ -0,0 +1 @@
pub mod sub_path;

View file

@ -0,0 +1,93 @@
use rspc::ErrorCode;
use sd_core_file_path_helper::{
ensure_file_path_exists, ensure_sub_path_is_directory, ensure_sub_path_is_in_location,
FilePathError, IsolatedFilePathData,
};
use sd_prisma::prisma::{location, PrismaClient};
use std::path::{Path, PathBuf};
use prisma_client_rust::QueryError;
#[derive(thiserror::Error, Debug)]
pub enum SubPathError {
#[error("received sub path not in database: <path='{}'>", .0.display())]
SubPathNotFound(Box<Path>),
#[error("database error: {0}")]
Database(#[from] QueryError),
#[error(transparent)]
IsoFilePath(#[from] FilePathError),
}
impl From<SubPathError> for rspc::Error {
fn from(err: SubPathError) -> Self {
match err {
SubPathError::SubPathNotFound(_) => {
Self::with_cause(ErrorCode::NotFound, err.to_string(), err)
}
_ => Self::with_cause(ErrorCode::InternalServerError, err.to_string(), err),
}
}
}
pub async fn get_full_path_from_sub_path(
location_id: location::id::Type,
sub_path: &Option<impl AsRef<Path> + Send + Sync>,
location_path: impl AsRef<Path> + Send,
db: &PrismaClient,
) -> Result<PathBuf, SubPathError> {
let location_path = location_path.as_ref();
match sub_path {
Some(sub_path) if sub_path.as_ref() != Path::new("") => {
let sub_path = sub_path.as_ref();
let full_path = ensure_sub_path_is_in_location(location_path, sub_path).await?;
ensure_sub_path_is_directory(location_path, sub_path).await?;
ensure_file_path_exists(
sub_path,
&IsolatedFilePathData::new(location_id, location_path, &full_path, true)?,
db,
SubPathError::SubPathNotFound,
)
.await?;
Ok(full_path)
}
_ => Ok(location_path.to_path_buf()),
}
}
pub async fn maybe_get_iso_file_path_from_sub_path(
location_id: location::id::Type,
sub_path: &Option<impl AsRef<Path> + Send + Sync>,
location_path: impl AsRef<Path> + Send,
db: &PrismaClient,
) -> Result<Option<IsolatedFilePathData<'static>>, SubPathError> {
let location_path = location_path.as_ref();
match sub_path {
Some(sub_path) if sub_path.as_ref() != Path::new("") => {
let full_path = ensure_sub_path_is_in_location(location_path, sub_path).await?;
ensure_sub_path_is_directory(location_path, sub_path).await?;
let sub_iso_file_path =
IsolatedFilePathData::new(location_id, location_path, &full_path, true)?;
ensure_file_path_exists(
sub_path,
&sub_iso_file_path,
db,
SubPathError::SubPathNotFound,
)
.await
.map(|()| Some(sub_iso_file_path))
}
_ => Ok(None),
}
}

View file

@ -30,6 +30,7 @@
use sd_prisma::prisma::{self, file_path, job, label, location, object};
// File Path selectables!
file_path::select!(file_path_pub_id { pub_id });
file_path::select!(file_path_pub_and_cas_ids { id pub_id cas_id });
file_path::select!(file_path_just_pub_id_materialized_path {
pub_id

View file

@ -13,8 +13,7 @@ use async_channel as chan;
use async_trait::async_trait;
use chan::{Recv, RecvError};
use downcast_rs::{impl_downcast, Downcast};
use futures::executor::block_on;
use tokio::sync::oneshot;
use tokio::{runtime::Handle, sync::oneshot};
use tracing::{trace, warn};
use uuid::Uuid;
@ -58,6 +57,12 @@ pub enum TaskOutput {
Empty,
}
impl From<()> for TaskOutput {
fn from((): ()) -> Self {
Self::Empty
}
}
/// An enum representing all possible outcomes for a task.
#[derive(Debug)]
pub enum TaskStatus<E: RunError> {
@ -125,14 +130,8 @@ impl<T: Task<E> + 'static, E: RunError> IntoTask<E> for T {
/// due to a limitation in the Rust language.
#[async_trait]
pub trait Task<E: RunError>: fmt::Debug + Downcast + Send + Sync + 'static {
/// This method represent the work that should be done by the worker, it will be called by the
/// worker when there is a slot available in its internal queue.
/// We receive a `&mut self` so any internal data can be mutated on each `run` invocation.
///
/// The [`interrupter`](Interrupter) is a helper object that can be used to check if the user requested a pause or a cancel,
/// so the user can decide the appropriated moment to pause or cancel the task. Avoiding corrupted data or
/// inconsistent states.
async fn run(&mut self, interrupter: &Interrupter) -> Result<ExecStatus, E>;
/// An unique identifier for the task, it will be used to identify the task on the system and also to the user.
fn id(&self) -> TaskId;
/// This method defines whether a task should run with priority or not. The task system has a mechanism
/// to suspend non-priority tasks on any worker and run priority tasks ASAP. This is useful for tasks that
@ -142,8 +141,14 @@ pub trait Task<E: RunError>: fmt::Debug + Downcast + Send + Sync + 'static {
false
}
/// An unique identifier for the task, it will be used to identify the task on the system and also to the user.
fn id(&self) -> TaskId;
/// This method represent the work that should be done by the worker, it will be called by the
/// worker when there is a slot available in its internal queue.
/// We receive a `&mut self` so any internal data can be mutated on each `run` invocation.
///
/// The [`interrupter`](Interrupter) is a helper object that can be used to check if the user requested a pause or a cancel,
/// so the user can decide the appropriated moment to pause or cancel the task. Avoiding corrupted data or
/// inconsistent states.
async fn run(&mut self, interrupter: &Interrupter) -> Result<ExecStatus, E>;
}
impl_downcast!(Task<E> where E: RunError);
@ -508,7 +513,7 @@ impl<E: RunError> Future for CancelTaskOnDrop<E> {
impl<E: RunError> Drop for CancelTaskOnDrop<E> {
fn drop(&mut self) {
// FIXME: We should use async drop when it becomes stable
block_on(self.0.cancel());
Handle::current().block_on(self.0.cancel());
}
}