Job system improvements v2 (#707)

* Add `JobInitData` trait + change `StatefulJob::name` to a constant

* `StatefulJob::new`

* Move `invalidate_query` into `StatefulJob::finalize`

* Cleanup `spawn_job` signature

* Remove `queue_job` and run jobs from the `StatefulJob::finalize` method

* `StatefulJob::queue_jobs`

* `JobManager::ingest` return `Result`

* Remove `jobs.isRunning`

* Invalidation system direct push + batching invalidate

* Look ma, only a single clippy warning!!!!

* Error handling for JobManager

* Rust fmt

* Introducing Job hierarchy to enable job enqueuing

* Rust fmt again 🙄

* core.ts

---------

Co-authored-by: Ericson Soares <ericson.ds999@gmail.com>
This commit is contained in:
Oscar Beaumont 2023-04-15 14:43:04 +08:00 committed by GitHub
parent 03eb27e91d
commit 2dd3d6f921
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
35 changed files with 1005 additions and 646 deletions

10
Cargo.lock generated
View file

@ -6656,6 +6656,7 @@ dependencies = [
"sd-p2p",
"sd-sync",
"serde",
"serde-hashkey",
"serde_json",
"serde_with 2.2.0",
"specta",
@ -6958,6 +6959,15 @@ dependencies = [
"serde",
]
[[package]]
name = "serde-hashkey"
version = "0.4.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c13a90d3c31ebd0b83e38600c8117083ec4c4e1a7a0cab364e79e19706ade04e"
dependencies = [
"serde",
]
[[package]]
name = "serde-value"
version = "0.7.0"

View file

@ -10,12 +10,8 @@ rust-version = "1.68.1"
[features]
default = []
mobile = [
] # This feature allows features to be disabled when the Core is running on mobile.
ffmpeg = [
"dep:ffmpeg-next",
"dep:sd-ffmpeg",
] # This feature controls whether the Spacedrive Core contains functionality which requires FFmpeg.
mobile = [] # This feature allows features to be disabled when the Core is running on mobile.
ffmpeg = ["dep:ffmpeg-next", "dep:sd-ffmpeg"] # This feature controls whether the Spacedrive Core contains functionality which requires FFmpeg.
location-watcher = ["dep:notify"]
sync-messages = []
@ -77,6 +73,7 @@ notify = { version = "5.0.0", default-features = false, features = [
"macos_fsevent",
], optional = true }
static_assertions = "1.1.0"
serde-hashkey = "0.4.5"
[target.'cfg(windows)'.dependencies.winapi-util]
version = "0.1.5"

View file

@ -236,12 +236,14 @@ CREATE TABLE "job" (
"status" INTEGER NOT NULL DEFAULT 0,
"data" BLOB,
"metadata" BLOB,
"parent_id" BLOB,
"task_count" INTEGER NOT NULL DEFAULT 1,
"completed_task_count" INTEGER NOT NULL DEFAULT 0,
"date_created" DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
"date_modified" DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
"seconds_elapsed" INTEGER NOT NULL DEFAULT 0,
CONSTRAINT "job_node_id_fkey" FOREIGN KEY ("node_id") REFERENCES "node" ("id") ON DELETE CASCADE ON UPDATE CASCADE
CONSTRAINT "job_node_id_fkey" FOREIGN KEY ("node_id") REFERENCES "node" ("id") ON DELETE CASCADE ON UPDATE CASCADE,
CONSTRAINT "job_parent_id_fkey" FOREIGN KEY ("parent_id") REFERENCES "job" ("id") ON DELETE CASCADE ON UPDATE CASCADE
);
-- CreateTable

View file

@ -134,7 +134,7 @@ model FilePath {
name String
extension String
size_in_bytes String @default("0")
size_in_bytes String @default("0")
inode Bytes // This is actually an unsigned 64 bit integer, but we don't have this type in SQLite
device Bytes // This is actually an unsigned 64 bit integer, but we don't have this type in SQLite
@ -167,9 +167,9 @@ model FilePath {
/// @shared(id: pub_id)
model Object {
id Int @id @default(autoincrement())
pub_id Bytes @unique
kind Int @default(0)
id Int @id @default(autoincrement())
pub_id Bytes @unique
kind Int @default(0)
key_id Int?
// handy ways to mark an object
@ -362,6 +362,8 @@ model Job {
data Bytes?
metadata Bytes?
parent_id Bytes?
task_count Int @default(1)
completed_task_count Int @default(0)
date_created DateTime @default(now())
@ -370,6 +372,9 @@ model Job {
nodes Node @relation(fields: [node_id], references: [id], onDelete: Cascade, onUpdate: Cascade)
parent Job? @relation("jobs_dependency", fields: [parent_id], references: [id], onDelete: Cascade, onUpdate: Cascade)
children Job[] @relation("jobs_dependency")
@@map("job")
}

View file

@ -1,15 +1,10 @@
use crate::{
invalidate_query,
job::Job,
library::Library,
location::{find_location, LocationError},
location::{file_path_helper::MaterializedPath, find_location, LocationError},
object::fs::{
copy::{FileCopierJob, FileCopierJobInit},
cut::{FileCutterJob, FileCutterJobInit},
decrypt::{FileDecryptorJob, FileDecryptorJobInit},
delete::{FileDeleterJob, FileDeleterJobInit},
encrypt::{FileEncryptorJob, FileEncryptorJobInit},
erase::{FileEraserJob, FileEraserJobInit},
copy::FileCopierJobInit, cut::FileCutterJobInit, decrypt::FileDecryptorJobInit,
delete::FileDeleterJobInit, encrypt::FileEncryptorJobInit, erase::FileEraserJobInit,
},
prisma::{location, object},
};
@ -17,7 +12,7 @@ use crate::{
use rspc::{ErrorCode, Type};
use serde::Deserialize;
use std::path::Path;
use tokio::{fs, sync::oneshot};
use tokio::fs;
use super::{utils::LibraryRequest, RouterBuilder};
@ -102,83 +97,40 @@ pub(crate) fn mount() -> RouterBuilder {
.library_mutation("encryptFiles", |t| {
t(
|_, args: FileEncryptorJobInit, library: Library| async move {
library.spawn_job(Job::new(args, FileEncryptorJob {})).await;
invalidate_query!(library, "locations.getExplorerData");
Ok(())
library.spawn_job(args).await.map_err(Into::into)
},
)
})
.library_mutation("decryptFiles", |t| {
t(
|_, args: FileDecryptorJobInit, library: Library| async move {
library.spawn_job(Job::new(args, FileDecryptorJob {})).await;
invalidate_query!(library, "locations.getExplorerData");
Ok(())
library.spawn_job(args).await.map_err(Into::into)
},
)
})
.library_mutation("deleteFiles", |t| {
t(|_, args: FileDeleterJobInit, library: Library| async move {
library.spawn_job(Job::new(args, FileDeleterJob {})).await;
invalidate_query!(library, "locations.getExplorerData");
Ok(())
library.spawn_job(args).await.map_err(Into::into)
})
})
.library_mutation("eraseFiles", |t| {
t(|_, args: FileEraserJobInit, library: Library| async move {
library.spawn_job(Job::new(args, FileEraserJob {})).await;
invalidate_query!(library, "locations.getExplorerData");
Ok(())
library.spawn_job(args).await.map_err(Into::into)
})
})
.library_mutation("duplicateFiles", |t| {
t(|_, args: FileCopierJobInit, library: Library| async move {
let (done_tx, done_rx) = oneshot::channel();
library
.spawn_job(Job::new(
args,
FileCopierJob {
done_tx: Some(done_tx),
},
))
.await;
let _ = done_rx.await;
invalidate_query!(library, "locations.getExplorerData");
Ok(())
library.spawn_job(args).await.map_err(Into::into)
})
})
.library_mutation("copyFiles", |t| {
t(|_, args: FileCopierJobInit, library: Library| async move {
let (done_tx, done_rx) = oneshot::channel();
library
.spawn_job(Job::new(
args,
FileCopierJob {
done_tx: Some(done_tx),
},
))
.await;
let _ = done_rx.await;
invalidate_query!(library, "locations.getExplorerData");
Ok(())
library.spawn_job(args).await.map_err(Into::into)
})
})
.library_mutation("cutFiles", |t| {
t(|_, args: FileCutterJobInit, library: Library| async move {
library.spawn_job(Job::new(args, FileCutterJob {})).await;
invalidate_query!(library, "locations.getExplorerData");
Ok(())
library.spawn_job(args).await.map_err(Into::into)
})
})
.library_mutation("renameFile", |t| {
@ -189,26 +141,38 @@ pub(crate) fn mount() -> RouterBuilder {
pub new_file_name: String,
}
t(|_, args: RenameFileArgs, library: Library| async move {
let location = find_location(&library, args.location_id)
.select(location::select!({ path }))
.exec()
.await?
.ok_or(LocationError::IdNotFound(args.location_id))?;
t(
|_,
RenameFileArgs {
location_id,
file_name,
new_file_name,
}: RenameFileArgs,
library: Library| async move {
let location = find_location(&library, location_id)
.select(location::select!({ path }))
.exec()
.await?
.ok_or(LocationError::IdNotFound(location_id))?;
let location_path = Path::new(&location.path);
fs::rename(
location_path.join(&args.file_name),
location_path.join(&args.new_file_name),
)
.await
.map_err(|e| {
rspc::Error::new(ErrorCode::Conflict, format!("Failed to rename file: {e}"))
})?;
let location_path = Path::new(&location.path);
fs::rename(
location_path.join(&MaterializedPath::from((location_id, &file_name))),
location_path.join(&MaterializedPath::from((location_id, &new_file_name))),
)
.await
.map_err(|e| {
rspc::Error::with_cause(
ErrorCode::Conflict,
"Failed to rename file".to_string(),
e,
)
})?;
invalidate_query!(library, "tags.getExplorerData");
invalidate_query!(library, "tags.getExplorerData");
Ok(())
})
Ok(())
},
)
})
}

View file

@ -1,10 +1,10 @@
use crate::{
job::{Job, JobManager},
job::JobManager,
location::{find_location, LocationError},
object::{
file_identifier::file_identifier_job::{FileIdentifierJob, FileIdentifierJobInit},
preview::thumbnailer_job::{ThumbnailerJob, ThumbnailerJobInit},
validation::validator_job::{ObjectValidatorJob, ObjectValidatorJobInit},
file_identifier::file_identifier_job::FileIdentifierJobInit,
preview::thumbnailer_job::ThumbnailerJobInit,
validation::validator_job::ObjectValidatorJobInit,
},
};
@ -19,16 +19,16 @@ pub(crate) fn mount() -> RouterBuilder {
.library_query("getRunning", |t| {
t(|ctx, _: (), _| async move { Ok(ctx.jobs.get_running().await) })
})
.library_query("isRunning", |t| {
t(|ctx, _: (), _| async move { Ok(!ctx.jobs.get_running().await.is_empty()) })
})
.library_query("getHistory", |t| {
t(|_, _: (), library| async move { Ok(JobManager::get_history(&library).await?) })
t(|_, _: (), library| async move {
JobManager::get_history(&library).await.map_err(Into::into)
})
})
.library_mutation("clearAll", |t| {
t(|_, _: (), library| async move {
JobManager::clear_all_jobs(&library).await?;
Ok(())
JobManager::clear_all_jobs(&library)
.await
.map_err(Into::into)
})
})
.library_mutation("generateThumbsForLocation", |t| {
@ -45,17 +45,13 @@ pub(crate) fn mount() -> RouterBuilder {
};
library
.spawn_job(Job::new(
ThumbnailerJobInit {
location,
sub_path: Some(args.path),
background: false,
},
ThumbnailerJob {},
))
.await;
Ok(())
.spawn_job(ThumbnailerJobInit {
location,
sub_path: Some(args.path),
background: false,
})
.await
.map_err(Into::into)
},
)
})
@ -72,17 +68,13 @@ pub(crate) fn mount() -> RouterBuilder {
}
library
.spawn_job(Job::new(
ObjectValidatorJobInit {
location_id: args.id,
path: args.path,
background: true,
},
ObjectValidatorJob {},
))
.await;
Ok(())
.spawn_job(ObjectValidatorJobInit {
location_id: args.id,
path: args.path,
background: true,
})
.await
.map_err(Into::into)
})
})
.library_mutation("identifyUniqueFiles", |t| {
@ -98,16 +90,12 @@ pub(crate) fn mount() -> RouterBuilder {
};
library
.spawn_job(Job::new(
FileIdentifierJobInit {
location,
sub_path: Some(args.path),
},
FileIdentifierJob {},
))
.await;
Ok(())
.spawn_job(FileIdentifierJobInit {
location,
sub_path: Some(args.path),
})
.await
.map_err(Into::into)
})
})
.library_subscription("newThumbnail", |t| {

View file

@ -1,9 +1,6 @@
use rspc::{Config, Type};
use serde::{Deserialize, Serialize};
use std::{
sync::Arc,
time::{Duration, Instant},
};
use std::sync::Arc;
use crate::{node::NodeConfig, Node};
@ -18,7 +15,6 @@ pub(crate) type RouterBuilder = rspc::RouterBuilder<Ctx>;
pub enum CoreEvent {
NewThumbnail { cas_id: String },
InvalidateOperation(InvalidateOperationEvent),
InvalidateOperationDebounced(InvalidateOperationEvent),
}
mod files;
@ -86,28 +82,7 @@ pub(crate) fn mount() -> Arc<Router> {
.yolo_merge("jobs.", jobs::mount())
.yolo_merge("p2p.", p2p::mount())
.yolo_merge("sync.", sync::mount())
// TODO: Scope the invalidate queries to a specific library (filtered server side)
.subscription("invalidateQuery", |t| {
t(|ctx, _: ()| {
let mut event_bus_rx = ctx.event_bus.0.subscribe();
let mut last = Instant::now();
async_stream::stream! {
while let Ok(event) = event_bus_rx.recv().await {
match event {
CoreEvent::InvalidateOperation(op) => yield op,
CoreEvent::InvalidateOperationDebounced(op) => {
let current = Instant::now();
if current.duration_since(last) > Duration::from_millis(1000 / 10) {
last = current;
yield op;
}
},
_ => {}
}
}
}
})
})
.yolo_merge("invalidation.", utils::mount_invalidate())
.build()
.arced();
InvalidRequests::validate(r.clone()); // This validates all invalidation calls.

View file

@ -1,9 +1,20 @@
use crate::api::Router;
use crate::api::{CoreEvent, Router, RouterBuilder};
use async_stream::stream;
use rspc::{internal::specta::DataType, Type};
use serde::Serialize;
use serde_hashkey::to_key;
use serde_json::Value;
use std::sync::Arc;
use std::{
collections::HashMap,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
time::Duration,
};
use tokio::sync::broadcast;
use tracing::warn;
#[cfg(debug_assertions)]
use std::sync::Mutex;
@ -18,12 +29,13 @@ pub struct InvalidateOperationEvent {
/// This fields are intentionally private.
key: &'static str,
arg: Value,
result: Option<Value>,
}
impl InvalidateOperationEvent {
/// If you are using this function, your doing it wrong.
pub fn dangerously_create(key: &'static str, arg: Value) -> Self {
Self { key, arg }
pub fn dangerously_create(key: &'static str, arg: Value, result: Option<Value>) -> Self {
Self { key, arg, result }
}
}
@ -32,7 +44,8 @@ impl InvalidateOperationEvent {
#[allow(dead_code)]
pub(crate) struct InvalidationRequest {
pub key: &'static str,
pub input_ty: Option<DataType>,
pub arg_ty: Option<DataType>,
pub result_ty: Option<DataType>,
pub macro_src: &'static str,
}
@ -60,11 +73,20 @@ impl InvalidRequests {
let queries = r.queries();
for req in &invalidate_requests.queries {
if let Some(query_ty) = queries.get(req.key) {
if let Some(input) = &req.input_ty {
if &query_ty.ty.input != input {
if let Some(arg) = &req.arg_ty {
if &query_ty.ty.input != arg {
panic!(
"Error at '{}': Attempted to invalid query '{}' but the argument type does not match the type defined on the router.",
req.macro_src, req.key
);
}
}
if let Some(result) = &req.result_ty {
if &query_ty.ty.result != result {
panic!(
"Error at '{}': Attempted to invalid query '{}' but the data type does not match the type defined on the router.",
req.macro_src, req.key
);
}
}
@ -91,8 +113,8 @@ impl InvalidRequests {
#[macro_export]
#[allow(clippy::crate_in_macro_def)]
macro_rules! invalidate_query {
($library:expr, $key:literal) => {{
let library: &crate::library::Library = &$library; // Assert the library is the correct type
($ctx:expr, $key:literal) => {{
let ctx: &crate::library::Library = &$ctx; // Assert the context is the correct type
#[cfg(debug_assertions)]
{
@ -104,20 +126,21 @@ macro_rules! invalidate_query {
.queries
.push(crate::api::utils::InvalidationRequest {
key: $key,
input_ty: None,
arg_ty: None,
result_ty: None,
macro_src: concat!(file!(), ":", line!()),
})
}
}
// The error are ignored here because they aren't mission critical. If they fail the UI might be outdated for a bit.
library.emit(crate::api::CoreEvent::InvalidateOperation(
crate::api::utils::InvalidateOperationEvent::dangerously_create($key, serde_json::Value::Null)
ctx.emit(crate::api::CoreEvent::InvalidateOperation(
crate::api::utils::InvalidateOperationEvent::dangerously_create($key, serde_json::Value::Null, None)
))
}};
($library:expr, $key:literal: $input_ty:ty, $input:expr $(,)?) => {{
let _: $input_ty = $input; // Assert the type the user provided is correct
let library: &crate::library::Library = &$library; // Assert the library is the correct type
($ctx:expr, $key:literal: $arg_ty:ty, $arg:expr $(,)?) => {{
let _: $arg_ty = $arg; // Assert the type the user provided is correct
let ctx: &crate::library::Library = &$ctx; // Assert the context is the correct type
#[cfg(debug_assertions)]
{
@ -129,7 +152,46 @@ macro_rules! invalidate_query {
.queries
.push(crate::api::utils::InvalidationRequest {
key: $key,
input_ty: Some(<$input_ty as rspc::internal::specta::Type>::reference(rspc::internal::specta::DefOpts {
arg_ty: Some(<$arg_ty as rspc::internal::specta::Type>::reference(rspc::internal::specta::DefOpts {
parent_inline: false,
type_map: &mut rspc::internal::specta::TypeDefs::new(),
}, &[])),
result_ty: None,
macro_src: concat!(file!(), ":", line!()),
})
}
}
// The error are ignored here because they aren't mission critical. If they fail the UI might be outdated for a bit.
let _ = serde_json::to_value($arg)
.map(|v|
ctx.emit(crate::api::CoreEvent::InvalidateOperation(
crate::api::utils::InvalidateOperationEvent::dangerously_create($key, v, None),
))
)
.map_err(|_| {
tracing::warn!("Failed to serialize invalidate query event!");
});
}};
($ctx:expr, $key:literal: $arg_ty:ty, $arg:expr, $result_ty:ty: $result:expr $(,)?) => {{
let _: $arg_ty = $arg; // Assert the type the user provided is correct
let ctx: &crate::library::Library = &$ctx; // Assert the context is the correct type
#[cfg(debug_assertions)]
{
#[ctor::ctor]
fn invalidate() {
crate::api::utils::INVALIDATION_REQUESTS
.lock()
.unwrap()
.queries
.push(crate::api::utils::InvalidationRequest {
key: $key,
arg_ty: Some(<$arg_ty as rspc::internal::specta::Type>::reference(rspc::internal::specta::DefOpts {
parent_inline: false,
type_map: &mut rspc::internal::specta::TypeDefs::new(),
}, &[])),
result_ty: Some(<$result_ty as rspc::internal::specta::Type>::reference(rspc::internal::specta::DefOpts {
parent_inline: false,
type_map: &mut rspc::internal::specta::TypeDefs::new(),
}, &[])),
@ -139,14 +201,65 @@ macro_rules! invalidate_query {
}
// The error are ignored here because they aren't mission critical. If they fail the UI might be outdated for a bit.
let _ = serde_json::to_value($input)
.map(|v|
library.emit(crate::api::CoreEvent::InvalidateOperation(
crate::api::utils::InvalidateOperationEvent::dangerously_create($key, v),
))
let _ = serde_json::to_value($arg)
.and_then(|arg|
serde_json::to_value($result)
.map(|result|
ctx.emit(crate::api::CoreEvent::InvalidateOperation(
crate::api::utils::InvalidateOperationEvent::dangerously_create($key, arg, Some(result)),
))
)
)
.map_err(|_| {
tracing::warn!("Failed to serialize invalidate query event!");
});
}};
}
pub fn mount_invalidate() -> RouterBuilder {
let (tx, _) = broadcast::channel(100);
let manager_thread_active = AtomicBool::new(false);
// TODO: Scope the invalidate queries to a specific library (filtered server side)
RouterBuilder::new().subscription("listen", move |t| {
t(move |ctx, _: ()| {
// This thread is used to deal with batching and deduplication.
// Their is only ever one of these management threads per Node but we spawn it like this so we can steal the event bus from the rspc context.
// Batching is important because when refetching data on the frontend rspc can fetch all invalidated queries in a single round trip.
if !manager_thread_active.swap(true, Ordering::Relaxed) {
let mut event_bus_rx = ctx.event_bus.0.subscribe();
let tx = tx.clone();
tokio::spawn(async move {
let mut buf = HashMap::with_capacity(100);
tokio::select! {
event = event_bus_rx.recv() => {
if let Ok(event) = event {
if let CoreEvent::InvalidateOperation(op) = event {
// Newer data replaces older data in the buffer
buf.insert(to_key(&(op.key, &op.arg)).unwrap(), op);
}
} else {
warn!("Shutting down invalidation manager thread due to the core event bus being droppped!");
}
},
// Given human reaction time of ~250 milli this should be a good ballance.
_ = tokio::time::sleep(Duration::from_millis(200)) => {
match tx.send(buf.drain().map(|(_k, v)| v).collect::<Vec<_>>()) {
Ok(_) => {},
Err(_) => warn!("Error emitting invalidation manager events!"),
}
}
}
});
}
let mut rx = tx.subscribe();
stream! {
while let Ok(msg) = rx.recv().await {
yield msg;
}
}
})
})
}

View file

@ -1,29 +1,20 @@
use crate::{
invalidate_query,
job::{worker::Worker, DynJob, Job, JobError},
job::{worker::Worker, DynJob, Job, JobError, StatefulJob},
library::Library,
location::indexer::{
indexer_job::{IndexerJob, INDEXER_JOB_NAME},
shallow_indexer_job::{ShallowIndexerJob, SHALLOW_INDEXER_JOB_NAME},
},
location::indexer::{indexer_job::IndexerJob, shallow_indexer_job::ShallowIndexerJob},
object::{
file_identifier::{
file_identifier_job::{FileIdentifierJob, FILE_IDENTIFIER_JOB_NAME},
shallow_file_identifier_job::{
ShallowFileIdentifierJob, SHALLOW_FILE_IDENTIFIER_JOB_NAME,
},
file_identifier_job::FileIdentifierJob,
shallow_file_identifier_job::ShallowFileIdentifierJob,
},
fs::{
copy::{FileCopierJob, COPY_JOB_NAME},
cut::{FileCutterJob, CUT_JOB_NAME},
delete::{FileDeleterJob, DELETE_JOB_NAME},
erase::{FileEraserJob, ERASE_JOB_NAME},
copy::FileCopierJob, cut::FileCutterJob, delete::FileDeleterJob, erase::FileEraserJob,
},
preview::{
shallow_thumbnailer_job::{ShallowThumbnailerJob, SHALLOW_THUMBNAILER_JOB_NAME},
thumbnailer_job::{ThumbnailerJob, THUMBNAILER_JOB_NAME},
shallow_thumbnailer_job::ShallowThumbnailerJob, thumbnailer_job::ThumbnailerJob,
},
validation::validator_job::{ObjectValidatorJob, VALIDATOR_JOB_NAME},
validation::validator_job::ObjectValidatorJob,
},
prisma::{job, node},
};
@ -36,10 +27,13 @@ use std::{
time::Duration,
};
use chrono::{DateTime, Utc};
use futures::future::BoxFuture;
use int_enum::IntEnum;
use prisma_client_rust::Direction;
use rspc::Type;
use serde::{Deserialize, Serialize};
use thiserror::Error;
use tokio::{
sync::{broadcast, mpsc, Mutex, RwLock},
time::sleep,
@ -54,6 +48,40 @@ pub enum JobManagerEvent {
IngestJob(Library, Box<dyn DynJob>),
}
#[derive(Error, Debug)]
pub enum JobManagerError {
#[error("Tried to dispatch a job that is already running: Job <name='{name}', hash='{hash}'>")]
AlreadyRunningJob { name: &'static str, hash: u64 },
#[error("Failed to fetch job data from database: {0}")]
Database(#[from] prisma_client_rust::QueryError),
#[error("Job error: {0}")]
Job(#[from] JobError),
}
impl From<JobManagerError> for rspc::Error {
fn from(value: JobManagerError) -> Self {
match value {
JobManagerError::AlreadyRunningJob { .. } => Self::with_cause(
rspc::ErrorCode::BadRequest,
"Tried to spawn a job that is already running!".to_string(),
value,
),
JobManagerError::Database(_) => Self::with_cause(
rspc::ErrorCode::InternalServerError,
"Error accessing the database".to_string(),
value,
),
JobManagerError::Job(_) => Self::with_cause(
rspc::ErrorCode::InternalServerError,
"Job error".to_string(),
value,
),
}
}
}
/// JobManager handles queueing and executing jobs using the `DynJob`
/// Handling persisting JobReports to the database, pause/resuming, and
///
@ -94,40 +122,29 @@ impl JobManager {
this
}
pub async fn ingest(self: Arc<Self>, library: &Library, job: Box<dyn DynJob>) {
pub async fn ingest(
self: Arc<Self>,
library: &Library,
job: Box<dyn DynJob>,
) -> Result<(), JobManagerError> {
let job_hash = job.hash();
if self.current_jobs_hashes.read().await.contains(&job_hash) {
return Err(JobManagerError::AlreadyRunningJob {
name: job.name(),
hash: job_hash,
});
}
debug!(
"Ingesting job: <name='{}', hash='{}'>",
job.name(),
job_hash
);
if !self.current_jobs_hashes.read().await.contains(&job_hash) {
self.current_jobs_hashes.write().await.insert(job_hash);
self.dispatch_job(library, job).await;
} else {
debug!(
"Job already in queue: <name='{}', hash='{}'>",
job.name(),
job_hash
);
}
}
pub async fn ingest_queue(&self, job: Box<dyn DynJob>) {
let job_hash = job.hash();
debug!("Queueing job: <name='{}', hash='{}'>", job.name(), job_hash);
if !self.current_jobs_hashes.read().await.contains(&job_hash) {
self.current_jobs_hashes.write().await.insert(job_hash);
self.job_queue.write().await.push_back(job);
} else {
debug!(
"Job already in queue: <name='{}', hash='{}'>",
job.name(),
job_hash
);
}
self.current_jobs_hashes.write().await.insert(job_hash);
self.dispatch_job(library, job).await;
Ok(())
}
pub async fn complete(self: Arc<Self>, library: &Library, job_id: Uuid, job_hash: u64) {
@ -156,9 +173,7 @@ impl JobManager {
ret
}
pub async fn get_history(
library: &Library,
) -> Result<Vec<JobReport>, prisma_client_rust::QueryError> {
pub async fn get_history(library: &Library) -> Result<Vec<JobReport>, JobManagerError> {
Ok(library
.db
.job()
@ -172,7 +187,7 @@ impl JobManager {
.collect())
}
pub async fn clear_all_jobs(library: &Library) -> Result<(), prisma_client_rust::QueryError> {
pub async fn clear_all_jobs(library: &Library) -> Result<(), JobManagerError> {
library.db.job().delete_many(vec![]).exec().await?;
invalidate_query!(library, "jobs.getHistory");
@ -184,14 +199,11 @@ impl JobManager {
}
pub async fn pause(&self) {
let running_workers_read_guard = self.running_workers.read().await;
if !running_workers_read_guard.is_empty() {
if !self.running_workers.read().await.is_empty() {
self.shutdown_tx
.send(())
.expect("Failed to send shutdown signal");
}
// Dropping our handle so jobs can finish
drop(running_workers_read_guard);
loop {
sleep(Duration::from_millis(50)).await;
@ -201,93 +213,107 @@ impl JobManager {
}
}
pub async fn resume_jobs(self: Arc<Self>, library: &Library) -> Result<(), JobError> {
let paused_jobs = library
pub async fn resume_jobs(self: Arc<Self>, library: &Library) -> Result<(), JobManagerError> {
for root_paused_job_report in library
.db
.job()
.find_many(vec![job::status::equals(JobStatus::Paused.int_value())])
.find_many(vec![
job::status::equals(JobStatus::Paused.int_value()),
job::parent_id::equals(None), // only fetch top-level jobs, they will resume their children
])
.exec()
.await?;
for paused_job_data in paused_jobs {
let paused_job = JobReport::from(paused_job_data);
info!("Resuming job: {}, id: {}", paused_job.name, paused_job.id);
match paused_job.name.as_str() {
THUMBNAILER_JOB_NAME => {
Arc::clone(&self)
.dispatch_job(library, Job::resume(paused_job, ThumbnailerJob {})?)
.await;
}
SHALLOW_THUMBNAILER_JOB_NAME => {
Arc::clone(&self)
.dispatch_job(library, Job::resume(paused_job, ShallowThumbnailerJob {})?)
.await;
}
INDEXER_JOB_NAME => {
Arc::clone(&self)
.dispatch_job(library, Job::resume(paused_job, IndexerJob {})?)
.await;
}
SHALLOW_INDEXER_JOB_NAME => {
Arc::clone(&self)
.dispatch_job(library, Job::resume(paused_job, ShallowIndexerJob {})?)
.await;
}
FILE_IDENTIFIER_JOB_NAME => {
Arc::clone(&self)
.dispatch_job(library, Job::resume(paused_job, FileIdentifierJob {})?)
.await;
}
SHALLOW_FILE_IDENTIFIER_JOB_NAME => {
Arc::clone(&self)
.dispatch_job(
library,
Job::resume(paused_job, ShallowFileIdentifierJob {})?,
)
.await;
}
VALIDATOR_JOB_NAME => {
Arc::clone(&self)
.dispatch_job(library, Job::resume(paused_job, ObjectValidatorJob {})?)
.await;
}
CUT_JOB_NAME => {
Arc::clone(&self)
.dispatch_job(library, Job::resume(paused_job, FileCutterJob {})?)
.await;
}
COPY_JOB_NAME => {
Arc::clone(&self)
.dispatch_job(
library,
Job::resume(paused_job, FileCopierJob { done_tx: None })?,
)
.await;
}
DELETE_JOB_NAME => {
Arc::clone(&self)
.dispatch_job(library, Job::resume(paused_job, FileDeleterJob {})?)
.await;
}
ERASE_JOB_NAME => {
Arc::clone(&self)
.dispatch_job(library, Job::resume(paused_job, FileEraserJob {})?)
.await;
}
_ => {
error!(
"Unknown job type: {}, id: {}",
paused_job.name, paused_job.id
);
return Err(JobError::UnknownJobName(paused_job.id, paused_job.name));
}
};
.await?
.into_iter()
.map(JobReport::from)
{
Arc::clone(&self)
.dispatch_job(
library,
Self::recursive_resume_job(root_paused_job_report, library).await?,
)
.await;
}
Ok(())
}
fn recursive_resume_job(
parent: JobReport,
library: &Library,
) -> BoxFuture<Result<Box<dyn DynJob>, JobManagerError>> {
// Recursive async functions must return boxed futures
Box::pin(async move {
info!(
"Trying to resume Job <id='{}', name='{}'>",
parent.name, parent.id
);
let maybe_children_job = if let Some(children_job_report) = library
.db
.job()
.find_first(vec![job::parent_id::equals(Some(
parent.id.as_bytes().to_vec(),
))])
.exec()
.await?
.map(JobReport::from)
{
Some(Self::recursive_resume_job(children_job_report, library).await?)
} else {
None
};
Self::get_resumable_job(parent, maybe_children_job)
})
}
fn get_resumable_job(
job_report: JobReport,
next_job: Option<Box<dyn DynJob>>,
) -> Result<Box<dyn DynJob>, JobManagerError> {
match job_report.name.as_str() {
<ThumbnailerJob as StatefulJob>::NAME => {
Job::resume(job_report, ThumbnailerJob {}, next_job)
}
<ShallowThumbnailerJob as StatefulJob>::NAME => {
Job::resume(job_report, ShallowThumbnailerJob {}, next_job)
}
<IndexerJob as StatefulJob>::NAME => Job::resume(job_report, IndexerJob {}, next_job),
<ShallowIndexerJob as StatefulJob>::NAME => {
Job::resume(job_report, ShallowIndexerJob {}, next_job)
}
<FileIdentifierJob as StatefulJob>::NAME => {
Job::resume(job_report, FileIdentifierJob {}, next_job)
}
<ShallowFileIdentifierJob as StatefulJob>::NAME => {
Job::resume(job_report, ShallowFileIdentifierJob {}, next_job)
}
<ObjectValidatorJob as StatefulJob>::NAME => {
Job::resume(job_report, ObjectValidatorJob {}, next_job)
}
<FileCutterJob as StatefulJob>::NAME => {
Job::resume(job_report, FileCutterJob {}, next_job)
}
<FileCopierJob as StatefulJob>::NAME => {
Job::resume(job_report, FileCopierJob {}, next_job)
}
<FileDeleterJob as StatefulJob>::NAME => {
Job::resume(job_report, FileDeleterJob {}, next_job)
}
<FileEraserJob as StatefulJob>::NAME => {
Job::resume(job_report, FileEraserJob {}, next_job)
}
_ => {
error!(
"Unknown job type: {}, id: {}",
job_report.name, job_report.id
);
Err(JobError::UnknownJobName(job_report.id, job_report.name))
}
}
.map_err(Into::into)
}
async fn dispatch_job(self: Arc<Self>, library: &Library, mut job: Box<dyn DynJob>) {
// create worker to process job
let mut running_workers = self.running_workers.write().await;
@ -295,7 +321,7 @@ impl JobManager {
info!("Running job: {:?}", job.name());
let job_report = job
.report()
.report_mut()
.take()
.expect("critical error: missing job on worker");
@ -341,9 +367,10 @@ pub struct JobReport {
pub name: String,
pub data: Option<Vec<u8>>,
pub metadata: Option<serde_json::Value>,
// client_id: i32,
pub date_created: chrono::DateTime<chrono::Utc>,
pub date_modified: chrono::DateTime<chrono::Utc>,
pub created_at: Option<DateTime<Utc>>,
pub updated_at: Option<DateTime<Utc>>,
pub parent_id: Option<Uuid>,
pub status: JobStatus,
pub task_count: i32,
@ -371,12 +398,11 @@ impl From<job::Data> for JobReport {
JobReport {
id: Uuid::from_slice(&data.id).unwrap(),
name: data.name,
// client_id: data.client_id,
status: JobStatus::from_int(data.status).unwrap(),
task_count: data.task_count,
completed_task_count: data.completed_task_count,
date_created: data.date_created.into(),
date_modified: data.date_modified.into(),
created_at: Some(data.date_created.into()),
updated_at: Some(data.date_modified.into()),
data: data.data,
metadata: data.metadata.and_then(|m| {
serde_json::from_slice(&m).unwrap_or_else(|e| -> Option<serde_json::Value> {
@ -386,6 +412,8 @@ impl From<job::Data> for JobReport {
}),
message: String::new(),
seconds_elapsed: data.seconds_elapsed,
// SAFETY: We created this uuid before
parent_id: data.parent_id.map(|id| Uuid::from_slice(&id).unwrap()),
}
}
}
@ -395,20 +423,30 @@ impl JobReport {
Self {
id: uuid,
name,
// client_id: 0,
date_created: chrono::Utc::now(),
date_modified: chrono::Utc::now(),
created_at: None,
updated_at: None,
status: JobStatus::Queued,
task_count: 0,
data: None,
metadata: None,
parent_id: None,
completed_task_count: 0,
message: String::new(),
seconds_elapsed: 0,
}
}
pub async fn create(&self, library: &Library) -> Result<(), JobError> {
pub fn new_with_parent(uuid: Uuid, name: String, parent_id: Uuid) -> Self {
let mut report = Self::new(uuid, name);
report.parent_id = Some(parent_id);
report
}
pub async fn create(&mut self, library: &Library) -> Result<(), JobError> {
let now = Utc::now();
self.created_at = Some(now);
self.updated_at = Some(now);
library
.db
.job()
@ -417,13 +455,20 @@ impl JobReport {
self.name.clone(),
JobStatus::Running as i32,
node::id::equals(library.node_local_id),
vec![job::data::set(self.data.clone())],
vec![
job::data::set(self.data.clone()),
job::parent_id::set(self.parent_id.map(|id| id.as_bytes().to_vec())),
job::date_created::set(now.into()),
job::date_modified::set(now.into()),
],
)
.exec()
.await?;
Ok(())
}
pub async fn update(&self, library: &Library) -> Result<(), JobError> {
pub async fn update(&mut self, library: &Library) -> Result<(), JobError> {
let now = Utc::now();
self.updated_at = Some(now);
library
.db
.job()
@ -435,7 +480,7 @@ impl JobReport {
job::metadata::set(serde_json::to_vec(&self.metadata).ok()),
job::task_count::set(self.task_count),
job::completed_task_count::set(self.completed_task_count),
job::date_modified::set(chrono::Utc::now().into()),
job::date_modified::set(now.into()),
job::seconds_elapsed::set(self.seconds_elapsed),
],
)

View file

@ -1,5 +1,6 @@
use crate::{
location::{indexer::IndexerError, LocationError, LocationManagerError},
library::Library,
location::indexer::IndexerError,
object::{file_identifier::FileIdentifierJobError, preview::ThumbnailerError},
};
@ -7,13 +8,14 @@ use std::{
collections::{hash_map::DefaultHasher, VecDeque},
fmt::Debug,
hash::{Hash, Hasher},
sync::Arc,
};
use rmp_serde::{decode::Error as DecodeError, encode::Error as EncodeError};
use sd_crypto::Error as CryptoError;
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use thiserror::Error;
use tracing::info;
use tracing::{debug, error, info};
use uuid::Uuid;
mod job_manager;
@ -45,8 +47,6 @@ pub enum JobError {
MissingJobDataState(Uuid, String),
#[error("missing some job data: '{value}'")]
MissingData { value: String },
#[error("Location manager error: {0}")]
LocationManager(#[from] LocationManagerError),
#[error("error converting/handling OS strings")]
OsStr,
#[error("error converting/handling paths")]
@ -55,8 +55,6 @@ pub enum JobError {
// Specific job errors
#[error("Indexer error: {0}")]
IndexerError(#[from] IndexerError),
#[error("Location error: {0}")]
LocationError(#[from] LocationError),
#[error("Thumbnailer error: {0}")]
ThumbnailError(#[from] ThumbnailerError),
#[error("Identifier error: {0}")]
@ -76,56 +74,139 @@ pub enum JobError {
pub type JobResult = Result<JobMetadata, JobError>;
pub type JobMetadata = Option<serde_json::Value>;
/// `JobInitData` is a trait to represent the data being passed to initialize a `Job`
pub trait JobInitData: Serialize + DeserializeOwned + Send + Sync + Hash {
type Job: StatefulJob;
fn hash(&self) -> u64 {
let mut s = DefaultHasher::new();
<Self::Job as StatefulJob>::NAME.hash(&mut s);
<Self as Hash>::hash(self, &mut s);
s.finish()
}
}
#[async_trait::async_trait]
pub trait StatefulJob: Send + Sync + Sized {
type Init: Serialize + DeserializeOwned + Send + Sync + Hash;
type Init: JobInitData<Job = Self>;
type Data: Serialize + DeserializeOwned + Send + Sync;
type Step: Serialize + DeserializeOwned + Send + Sync;
fn name(&self) -> &'static str;
/// The name of the job is a unique human readable identifier for the job.
const NAME: &'static str;
/// Construct a new instance of the job. This is used so the user can pass `Self::Init` into the `spawn_job` function and we can still run the job.
/// This does remove the flexibility of being able to pass arguments into the job's struct but with resumable jobs I view that as an anti-pattern anyway.
fn new() -> Self;
/// initialize the steps for the job
async fn init(&self, ctx: WorkerContext, state: &mut JobState<Self>) -> Result<(), JobError>;
/// is called for each step in the job. These steps are created in the `Self::init` method.
async fn execute_step(
&self,
ctx: WorkerContext,
state: &mut JobState<Self>,
) -> Result<(), JobError>;
/// is called after all steps have been executed
async fn finalize(&mut self, ctx: WorkerContext, state: &mut JobState<Self>) -> JobResult;
}
#[async_trait::async_trait]
pub trait DynJob: Send + Sync {
fn report(&mut self) -> &mut Option<JobReport>;
fn id(&self) -> Uuid;
fn parent_id(&self) -> Option<Uuid>;
fn report(&self) -> &Option<JobReport>;
fn report_mut(&mut self) -> &mut Option<JobReport>;
fn name(&self) -> &'static str;
async fn run(&mut self, ctx: WorkerContext) -> JobResult;
async fn run(&mut self, job_manager: Arc<JobManager>, ctx: WorkerContext) -> JobResult;
fn hash(&self) -> u64;
fn queue_next(&mut self, next_job: Box<dyn DynJob>);
fn serialize_state(&self) -> Result<Vec<u8>, JobError>;
async fn register_children(&mut self, library: &Library) -> Result<(), JobError>;
async fn pause_children(&mut self, library: &Library) -> Result<(), JobError>;
async fn cancel_children(&mut self, library: &Library) -> Result<(), JobError>;
}
pub struct Job<SJob: StatefulJob> {
report: Option<JobReport>,
state: JobState<SJob>,
stateful_job: SJob,
next_job: Option<Box<dyn DynJob>>,
}
impl<SJob: StatefulJob> Job<SJob> {
pub fn new(init: SJob::Init, stateful_job: SJob) -> Box<Self> {
pub trait IntoJob<SJob: StatefulJob + 'static> {
fn into_job(self) -> Box<dyn DynJob>;
}
impl<SJob, Init> IntoJob<SJob> for Init
where
SJob: StatefulJob<Init = Init> + 'static,
Init: JobInitData<Job = SJob>,
{
fn into_job(self) -> Box<dyn DynJob> {
Job::new(self)
}
}
impl<SJob, Init> IntoJob<SJob> for Box<Job<SJob>>
where
SJob: StatefulJob<Init = Init> + 'static,
Init: JobInitData<Job = SJob>,
{
fn into_job(self) -> Box<dyn DynJob> {
self
}
}
impl<SJob, Init> Job<SJob>
where
SJob: StatefulJob<Init = Init> + 'static,
Init: JobInitData<Job = SJob>,
{
pub fn new(init: Init) -> Box<Self> {
Box::new(Self {
report: Some(JobReport::new(
Uuid::new_v4(),
stateful_job.name().to_string(),
)),
report: Some(JobReport::new(Uuid::new_v4(), SJob::NAME.to_string())),
state: JobState {
init,
data: None,
steps: VecDeque::new(),
step_number: 0,
},
stateful_job,
stateful_job: SJob::new(),
next_job: None,
})
}
pub fn resume(mut report: JobReport, stateful_job: SJob) -> Result<Box<Self>, JobError> {
pub fn queue_next<NextSJob, NextInit>(mut self: Box<Self>, init: NextInit) -> Box<Self>
where
NextSJob: StatefulJob<Init = NextInit> + 'static,
NextInit: JobInitData<Job = NextSJob>,
{
let last_job = Job::new_dependent(
init,
self.next_job
.as_ref()
.map(|job| job.id())
// SAFETY: If we're queueing a next job then we should have a report yet
.unwrap_or(self.report.as_ref().unwrap().id),
);
if let Some(ref mut next) = self.next_job {
next.queue_next(last_job);
} else {
self.next_job = Some(last_job);
}
self
}
pub fn resume(
mut report: JobReport,
stateful_job: SJob,
next_job: Option<Box<dyn DynJob>>,
) -> Result<Box<dyn DynJob>, JobError> {
let job_state_data = if let Some(data) = report.data.take() {
data
} else {
@ -136,14 +217,26 @@ impl<SJob: StatefulJob> Job<SJob> {
report: Some(report),
state: rmp_serde::from_slice(&job_state_data)?,
stateful_job,
next_job,
}))
}
}
impl<State: StatefulJob> Hash for Job<State> {
fn hash<H: Hasher>(&self, state: &mut H) {
self.name().hash(state);
self.state.hash(state);
fn new_dependent(init: Init, parent_id: Uuid) -> Box<Self> {
Box::new(Self {
report: Some(JobReport::new_with_parent(
Uuid::new_v4(),
SJob::NAME.to_string(),
parent_id,
)),
state: JobState {
init,
data: None,
steps: VecDeque::new(),
step_number: 0,
},
stateful_job: SJob::new(),
next_job: None,
})
}
}
@ -155,23 +248,30 @@ pub struct JobState<Job: StatefulJob> {
pub step_number: usize,
}
impl<Job: StatefulJob> Hash for JobState<Job> {
fn hash<H: Hasher>(&self, state: &mut H) {
self.init.hash(state);
}
}
#[async_trait::async_trait]
impl<State: StatefulJob> DynJob for Job<State> {
fn report(&mut self) -> &mut Option<JobReport> {
impl<SJob: StatefulJob> DynJob for Job<SJob> {
fn id(&self) -> Uuid {
// SAFETY: This method is using during queueing, so we still have a report
self.report().as_ref().unwrap().id
}
fn parent_id(&self) -> Option<Uuid> {
self.report.as_ref().and_then(|r| r.parent_id)
}
fn report(&self) -> &Option<JobReport> {
&self.report
}
fn report_mut(&mut self) -> &mut Option<JobReport> {
&mut self.report
}
fn name(&self) -> &'static str {
self.stateful_job.name()
<SJob as StatefulJob>::NAME
}
async fn run(&mut self, ctx: WorkerContext) -> JobResult {
async fn run(&mut self, job_manager: Arc<JobManager>, ctx: WorkerContext) -> JobResult {
let mut job_should_run = true;
// Checking if we have a brand new job, or if we are resuming an old one.
@ -215,14 +315,83 @@ impl<State: StatefulJob> DynJob for Job<State> {
self.state.step_number += 1;
}
self.stateful_job
let metadata = self
.stateful_job
.finalize(ctx.clone(), &mut self.state)
.await
.await?;
if let Some(next_job) = self.next_job.take() {
debug!(
"Job '{}' requested to spawn '{}' now that it's complete!",
self.name(),
next_job.name()
);
if let Err(e) = job_manager.clone().ingest(&ctx.library, next_job).await {
error!("Failed to ingest next job: {e}");
}
}
Ok(metadata)
}
fn hash(&self) -> u64 {
let mut hasher = DefaultHasher::new();
Hash::hash(self, &mut hasher);
hasher.finish()
<SJob::Init as JobInitData>::hash(&self.state.init)
}
fn queue_next(&mut self, next_job: Box<dyn DynJob>) {
if let Some(ref mut next) = self.next_job {
next.queue_next(next_job);
} else {
self.next_job = Some(next_job);
}
}
fn serialize_state(&self) -> Result<Vec<u8>, JobError> {
rmp_serde::to_vec_named(&self.state).map_err(Into::into)
}
async fn register_children(&mut self, library: &Library) -> Result<(), JobError> {
if let Some(ref mut next_job) = self.next_job {
// SAFETY: As these children jobs haven't been run yet, they still have their report field
let next_job_report = next_job.report_mut().as_mut().unwrap();
if next_job_report.created_at.is_none() {
next_job_report.create(library).await?
}
next_job.register_children(library).await?;
}
Ok(())
}
async fn pause_children(&mut self, library: &Library) -> Result<(), JobError> {
if let Some(ref mut next_job) = self.next_job {
let state = next_job.serialize_state()?;
// SAFETY: As these children jobs haven't been run yet, they still have their report field
let mut report = next_job.report_mut().as_mut().unwrap();
report.status = JobStatus::Paused;
report.data = Some(state);
report.update(library).await?;
next_job.pause_children(library).await?;
}
Ok(())
}
async fn cancel_children(&mut self, library: &Library) -> Result<(), JobError> {
if let Some(ref mut next_job) = self.next_job {
let state = next_job.serialize_state()?;
// SAFETY: As these children jobs haven't been run yet, they still have their report field
let mut report = next_job.report_mut().as_mut().unwrap();
report.status = JobStatus::Canceled;
report.data = Some(state);
report.update(library).await?;
next_job.cancel_children(library).await?;
}
Ok(())
}
}

View file

@ -102,18 +102,21 @@ impl Worker {
let job_hash = job.hash();
let job_id = worker.report.id;
let old_status = worker.report.status;
worker.report.status = JobStatus::Running;
if matches!(old_status, JobStatus::Queued) {
// If the report doesn't have a created_at date, it's a new report
if worker.report.created_at.is_none() {
worker.report.create(&library).await?;
} else {
// Otherwise it can be a job being resumed or a children job that was already been created
worker.report.update(&library).await?;
}
drop(worker);
invalidate_query!(library, "jobs.isRunning");
job.register_children(&library).await?;
invalidate_query!(library, "jobs.getRunning");
// spawn task to handle receiving events from the worker
tokio::spawn(Worker::track_progress(
@ -153,7 +156,7 @@ impl Worker {
let (done_tx, done_rx) = oneshot::channel();
match job.run(worker_ctx.clone()).await {
match job.run(job_manager.clone(), worker_ctx.clone()).await {
Ok(metadata) => {
// handle completion
worker_ctx
@ -162,13 +165,22 @@ impl Worker {
.expect("critical error: failed to send worker complete event");
}
Err(JobError::Paused(state)) => {
info!("Job <id='{job_id}'> paused, we will pause all children jobs");
if let Err(e) = job.pause_children(&library).await {
error!("Failed to pause children jobs: {e:#?}");
}
worker_ctx
.events_tx
.send(WorkerEvent::Paused(state, done_tx))
.expect("critical error: failed to send worker pause event");
}
Err(e) => {
error!("job '{}' failed with error: {:#?}", job_id, e);
error!("Job <id='{job_id}'> failed with error: {e:#?}; We will cancel all children jobs");
if let Err(e) = job.cancel_children(&library).await {
error!("Failed to cancel children jobs: {e:#?}");
}
worker_ctx
.events_tx
.send(WorkerEvent::Failed(done_tx))
@ -236,7 +248,6 @@ impl Worker {
error!("failed to update job report: {:#?}", e);
}
invalidate_query!(library, "jobs.isRunning");
invalidate_query!(library, "jobs.getRunning");
invalidate_query!(library, "jobs.getHistory");

View file

@ -144,43 +144,8 @@ impl Node {
)
.await?;
// Adding already existing locations for location management
for library in library_manager.get_all_libraries().await {
for location in library
.db
.location()
.find_many(vec![])
.exec()
.await
.unwrap_or_else(|e| {
error!(
"Failed to get locations from database for location manager: {:#?}",
e
);
vec![]
}) {
if let Err(e) = location_manager.add(location.id, library.clone()).await {
error!("Failed to add location to location manager: {:#?}", e);
}
}
}
debug!("Watching locations");
// Trying to resume possible paused jobs
tokio::spawn({
let library_manager = library_manager.clone();
let jobs = jobs.clone();
async move {
for library in library_manager.get_all_libraries().await {
if let Err(e) = jobs.clone().resume_jobs(&library).await {
error!("Failed to resume jobs for library. {:#?}", e);
}
}
}
});
tokio::spawn({
let library_manager = library_manager.clone();

View file

@ -1,6 +1,6 @@
use crate::{
api::CoreEvent,
job::DynJob,
job::{IntoJob, JobInitData, JobManagerError, StatefulJob},
location::{file_path_helper::LastFilePathIdManager, LocationManager},
node::NodeConfigManager,
object::preview::THUMBNAIL_CACHE_DIR_NAME,
@ -56,12 +56,19 @@ impl Debug for Library {
}
impl Library {
pub(crate) async fn spawn_job(&self, job: Box<dyn DynJob>) {
self.node_context.jobs.clone().ingest(self, job).await;
}
pub(crate) async fn queue_job(&self, job: Box<dyn DynJob>) {
self.node_context.jobs.ingest_queue(job).await;
pub(crate) async fn spawn_job<SJob, Init>(
&self,
jobable: impl IntoJob<SJob>,
) -> Result<(), JobManagerError>
where
SJob: StatefulJob<Init = Init> + 'static,
Init: JobInitData + 'static,
{
self.node_context
.jobs
.clone()
.ingest(self, jobable.into_job())
.await
}
pub(crate) fn emit(&self, event: CoreEvent) {

View file

@ -23,7 +23,7 @@ use std::{
};
use thiserror::Error;
use tokio::sync::RwLock;
use tracing::debug;
use tracing::{debug, error};
use uuid::Uuid;
use super::{Library, LibraryConfig, LibraryConfigWrapped};
@ -227,9 +227,9 @@ impl LibraryManager {
.collect()
}
pub(crate) async fn get_all_libraries(&self) -> Vec<Library> {
self.libraries.read().await.clone()
}
// pub(crate) async fn get_all_libraries(&self) -> Vec<Library> {
// self.libraries.read().await.clone()
// }
pub(crate) async fn edit(
&self,
@ -260,6 +260,31 @@ impl LibraryManager {
invalidate_query!(library, "library.list");
for library in self.libraries.read().await.iter() {
for location in library
.db
.location()
.find_many(vec![])
.exec()
.await
.unwrap_or_else(|e| {
error!(
"Failed to get locations from database for location manager: {:#?}",
e
);
vec![]
}) {
if let Err(e) = self
.node_context
.location_manager
.add(location.id, library.clone())
.await
{
error!("Failed to add location to location manager: {:#?}", e);
}
}
}
Ok(())
}
@ -351,7 +376,7 @@ impl LibraryManager {
}
});
Ok(Library {
let library = Library {
id,
local_id: node_data.id,
config,
@ -361,6 +386,18 @@ impl LibraryManager {
last_file_path_id_manager: Arc::new(LastFilePathIdManager::new()),
node_local_id: node_data.id,
node_context,
})
};
if let Err(e) = library
.node_context
.jobs
.clone()
.resume_jobs(&library)
.await
{
error!("Failed to resume jobs for library. {:#?}", e);
}
Ok(library)
}
}

View file

@ -1,4 +1,3 @@
use crate::location::Library;
use crate::prisma::{file_path, location, PrismaClient};
use std::{
@ -13,7 +12,6 @@ use dashmap::{mapref::entry::Entry, DashMap};
use futures::future::try_join_all;
use prisma_client_rust::{Direction, QueryError};
use serde::{Deserialize, Serialize};
use serde_json::json;
use thiserror::Error;
use tokio::{fs, io};
use tracing::error;
@ -318,7 +316,7 @@ impl LastFilePathIdManager {
#[cfg(feature = "location-watcher")]
pub async fn create_file_path(
&self,
Library { db, sync, .. }: &Library,
crate::location::Library { db, sync, .. }: &crate::location::Library,
MaterializedPath {
materialized_path,
is_dir,
@ -333,6 +331,7 @@ impl LastFilePathIdManager {
// Keeping a reference in that map for the entire duration of the function, so we keep it locked
use crate::sync;
use serde_json::json;
let mut last_id_ref = match self.last_id_by_location.entry(location_id) {
Entry::Occupied(ocupied) => ocupied.into_ref(),
@ -513,6 +512,7 @@ pub fn filter_existing_file_path_params(
/// With this function we try to do a loose filtering of file paths, to avoid having to do check
/// twice for directories and for files. This is because directories have a trailing `/` or `\` in
/// the materialized path
#[allow(unused)]
pub fn loose_find_existing_file_path_params(
MaterializedPath {
materialized_path,

View file

@ -1,5 +1,5 @@
use crate::{
job::{JobError, JobResult, JobState, StatefulJob, WorkerContext},
job::{JobError, JobInitData, JobResult, JobState, StatefulJob, WorkerContext},
library::Library,
location::file_path_helper::{
ensure_sub_path_is_directory, ensure_sub_path_is_in_location,
@ -27,21 +27,26 @@ use super::{
/// BATCH_SIZE is the number of files to index at each step, writing the chunk of files metadata in the database.
const BATCH_SIZE: usize = 1000;
pub const INDEXER_JOB_NAME: &str = "indexer";
/// A `IndexerJob` is a stateful job that walks a directory and indexes all files.
/// First it walks the directory and generates a list of files to index, chunked into
/// batches of [`BATCH_SIZE`]. Then for each chunk it write the file metadata to the database.
pub struct IndexerJob;
impl JobInitData for IndexerJobInit {
type Job = IndexerJob;
}
#[async_trait::async_trait]
impl StatefulJob for IndexerJob {
type Init = IndexerJobInit;
type Data = IndexerJobData;
type Step = IndexerJobStep;
fn name(&self) -> &'static str {
INDEXER_JOB_NAME
const NAME: &'static str = "indexer";
fn new() -> Self {
Self {}
}
/// Creates a vector of valid path buffers from a directory, chunked into batches of `BATCH_SIZE`.
@ -272,7 +277,6 @@ impl StatefulJob for IndexerJob {
})
}
/// Logs some metadata about the indexer job
async fn finalize(&mut self, ctx: WorkerContext, state: &mut JobState<Self>) -> JobResult {
finalize_indexer(&state.init.location.path, state, ctx)
}

View file

@ -1,5 +1,5 @@
use crate::{
job::{JobError, JobResult, JobState, StatefulJob, WorkerContext},
job::{JobError, JobInitData, JobResult, JobState, StatefulJob, WorkerContext},
library::Library,
location::file_path_helper::{
ensure_sub_path_is_directory, ensure_sub_path_is_in_location,
@ -31,7 +31,6 @@ use super::{
/// BATCH_SIZE is the number of files to index at each step, writing the chunk of files metadata in the database.
const BATCH_SIZE: usize = 1000;
pub const SHALLOW_INDEXER_JOB_NAME: &str = "shallow_indexer";
/// `ShallowIndexerJobInit` receives a `location::Data` object to be indexed
/// and possibly a `sub_path` to be indexed. The `sub_path` is used when
@ -54,14 +53,20 @@ impl Hash for ShallowIndexerJobInit {
/// batches of [`BATCH_SIZE`]. Then for each chunk it write the file metadata to the database.
pub struct ShallowIndexerJob;
impl JobInitData for ShallowIndexerJobInit {
type Job = ShallowIndexerJob;
}
#[async_trait::async_trait]
impl StatefulJob for ShallowIndexerJob {
type Init = ShallowIndexerJobInit;
type Data = IndexerJobData;
type Step = IndexerJobStep;
fn name(&self) -> &'static str {
SHALLOW_INDEXER_JOB_NAME
const NAME: &'static str = "shallow_indexer";
fn new() -> Self {
Self {}
}
/// Creates a vector of valid path buffers from a directory, chunked into batches of `BATCH_SIZE`.

View file

@ -1,4 +1,4 @@
use crate::library::Library;
use crate::{job::JobManagerError, library::Library};
use std::{
collections::BTreeSet,
@ -102,6 +102,8 @@ pub enum LocationManagerError {
FilePathError(#[from] FilePathError),
#[error("Corrupted location pub_id on database: (error: {0})")]
CorruptedLocationPubId(#[from] uuid::Error),
#[error("Job Manager error: (error: {0})")]
JobManager(#[from] JobManagerError),
}
type OnlineLocations = BTreeSet<Vec<u8>>;

View file

@ -126,7 +126,7 @@ pub(super) async fn create_dir(
info!("Created path: {}", created_path.materialized_path);
// scan the new directory
scan_location_sub_path(library, location, &created_path.materialized_path).await;
scan_location_sub_path(library, location, &created_path.materialized_path).await?;
invalidate_query!(library, "locations.getExplorerData");

View file

@ -1,15 +1,14 @@
use crate::{
invalidate_query,
job::Job,
job::{Job, JobManagerError},
library::Library,
object::{
file_identifier::{
file_identifier_job::{FileIdentifierJob, FileIdentifierJobInit},
shallow_file_identifier_job::{ShallowFileIdentifierJob, ShallowFileIdentifierJobInit},
file_identifier_job::FileIdentifierJobInit,
shallow_file_identifier_job::ShallowFileIdentifierJobInit,
},
preview::{
shallow_thumbnailer_job::{ShallowThumbnailerJob, ShallowThumbnailerJobInit},
thumbnailer_job::{ThumbnailerJob, ThumbnailerJobInit},
shallow_thumbnailer_job::ShallowThumbnailerJobInit, thumbnailer_job::ThumbnailerJobInit,
},
},
prisma::{file_path, indexer_rules_in_location, location, node, object},
@ -39,11 +38,7 @@ mod metadata;
pub use error::LocationError;
use file_path_helper::file_path_just_object_id;
use indexer::{
indexer_job::IndexerJob,
shallow_indexer_job::{ShallowIndexerJob, ShallowIndexerJobInit},
IndexerJobInit,
};
use indexer::{shallow_indexer_job::ShallowIndexerJobInit, IndexerJobInit};
pub use manager::{LocationManager, LocationManagerError};
use metadata::SpacedriveLocationMetadataFile;
@ -323,43 +318,30 @@ async fn link_location_and_indexer_rules(
pub async fn scan_location(
library: &Library,
location: location_with_indexer_rules::Data,
) -> Result<(), LocationError> {
) -> Result<(), JobManagerError> {
if location.node_id != library.node_local_id {
return Ok(());
}
library
.queue_job(Job::new(
FileIdentifierJobInit {
location: location::Data::from(&location),
sub_path: None,
},
FileIdentifierJob {},
))
.await;
let location_base_data = location::Data::from(&location);
library
.queue_job(Job::new(
ThumbnailerJobInit {
location: location::Data::from(&location),
sub_path: None,
background: true,
},
ThumbnailerJob {},
))
.await;
library
.spawn_job(Job::new(
IndexerJobInit {
.spawn_job(
Job::new(IndexerJobInit {
location,
sub_path: None,
},
IndexerJob {},
))
.await;
Ok(())
})
.queue_next(FileIdentifierJobInit {
location: location_base_data.clone(),
sub_path: None,
})
.queue_next(ThumbnailerJobInit {
location: location_base_data,
sub_path: None,
background: true,
}),
)
.await
}
#[cfg(feature = "location-watcher")]
@ -367,82 +349,61 @@ pub async fn scan_location_sub_path(
library: &Library,
location: location_with_indexer_rules::Data,
sub_path: impl AsRef<Path>,
) {
) -> Result<(), JobManagerError> {
let sub_path = sub_path.as_ref().to_path_buf();
if location.node_id != library.node_local_id {
return;
return Ok(());
}
library
.queue_job(Job::new(
FileIdentifierJobInit {
location: location::Data::from(&location),
sub_path: Some(sub_path.clone()),
},
FileIdentifierJob {},
))
.await;
let location_base_data = location::Data::from(&location);
library
.queue_job(Job::new(
ThumbnailerJobInit {
location: location::Data::from(&location),
sub_path: Some(sub_path.clone()),
background: true,
},
ThumbnailerJob {},
))
.await;
library
.spawn_job(Job::new(
IndexerJobInit {
.spawn_job(
Job::new(IndexerJobInit {
location,
sub_path: Some(sub_path.clone()),
})
.queue_next(FileIdentifierJobInit {
location: location_base_data.clone(),
sub_path: Some(sub_path.clone()),
})
.queue_next(ThumbnailerJobInit {
location: location_base_data,
sub_path: Some(sub_path),
},
IndexerJob {},
))
.await;
background: true,
}),
)
.await
}
pub async fn light_scan_location(
library: &Library,
location: location_with_indexer_rules::Data,
sub_path: impl AsRef<Path>,
) -> Result<(), LocationError> {
) -> Result<(), JobManagerError> {
let sub_path = sub_path.as_ref().to_path_buf();
if location.node_id != library.node_local_id {
return Ok(());
}
let location_base_data = location::Data::from(&location);
library
.queue_job(Job::new(
ShallowFileIdentifierJobInit {
location: location::Data::from(&location),
.spawn_job(
Job::new(ShallowIndexerJobInit {
location,
sub_path: sub_path.clone(),
},
ShallowFileIdentifierJob {},
))
.await;
library
.queue_job(Job::new(
ShallowThumbnailerJobInit {
location: location::Data::from(&location),
})
.queue_next(ShallowFileIdentifierJobInit {
location: location_base_data.clone(),
sub_path: sub_path.clone(),
},
ShallowThumbnailerJob {},
))
.await;
library
.spawn_job(Job::new(
ShallowIndexerJobInit { location, sub_path },
ShallowIndexerJob {},
))
.await;
Ok(())
})
.queue_next(ShallowThumbnailerJobInit {
location: location_base_data,
sub_path,
}),
)
.await
}
pub async fn relink_location(

View file

@ -1,5 +1,7 @@
use crate::{
job::{JobError, JobReportUpdate, JobResult, JobState, StatefulJob, WorkerContext},
job::{
JobError, JobInitData, JobReportUpdate, JobResult, JobState, StatefulJob, WorkerContext,
},
library::Library,
location::file_path_helper::{
ensure_sub_path_is_directory, ensure_sub_path_is_in_location,
@ -22,8 +24,6 @@ use super::{
FileIdentifierReport, FilePathIdAndLocationIdCursor, CHUNK_SIZE,
};
pub const FILE_IDENTIFIER_JOB_NAME: &str = "file_identifier";
pub struct FileIdentifierJob {}
/// `FileIdentifierJobInit` takes file_paths without a file_id from an entire location
@ -53,14 +53,20 @@ pub struct FileIdentifierJobState {
maybe_sub_materialized_path: Option<MaterializedPath<'static>>,
}
impl JobInitData for FileIdentifierJobInit {
type Job = FileIdentifierJob;
}
#[async_trait::async_trait]
impl StatefulJob for FileIdentifierJob {
type Init = FileIdentifierJobInit;
type Data = FileIdentifierJobState;
type Step = ();
fn name(&self) -> &'static str {
FILE_IDENTIFIER_JOB_NAME
const NAME: &'static str = "file_identifier";
fn new() -> Self {
Self {}
}
async fn init(&self, ctx: WorkerContext, state: &mut JobState<Self>) -> Result<(), JobError> {
@ -108,7 +114,7 @@ impl StatefulJob for FileIdentifierJob {
if orphan_count == 0 {
return Err(JobError::EarlyFinish {
name: self.name().to_string(),
name: <Self as StatefulJob>::NAME.to_string(),
reason: "Found no orphan file paths to process".to_string(),
});
}
@ -166,7 +172,7 @@ impl StatefulJob for FileIdentifierJob {
get_orphan_file_paths(&ctx.library.db, cursor, maybe_sub_materialized_path).await?;
process_identifier_file_paths(
self.name(),
<Self as StatefulJob>::NAME,
location,
&file_paths,
state.step_number,

View file

@ -1,5 +1,7 @@
use crate::{
job::{JobError, JobReportUpdate, JobResult, JobState, StatefulJob, WorkerContext},
job::{
JobError, JobInitData, JobReportUpdate, JobResult, JobState, StatefulJob, WorkerContext,
},
library::Library,
location::file_path_helper::{
ensure_sub_path_is_directory, ensure_sub_path_is_in_location,
@ -22,8 +24,6 @@ use super::{
FileIdentifierReport, FilePathIdAndLocationIdCursor, CHUNK_SIZE,
};
pub const SHALLOW_FILE_IDENTIFIER_JOB_NAME: &str = "shallow_file_identifier";
pub struct ShallowFileIdentifierJob {}
/// `ShallowFileIdentifierJobInit` takes file_paths without a file_id from a specific path
@ -50,14 +50,20 @@ pub struct ShallowFileIdentifierJobState {
sub_path_id: i32,
}
impl JobInitData for ShallowFileIdentifierJobInit {
type Job = ShallowFileIdentifierJob;
}
#[async_trait::async_trait]
impl StatefulJob for ShallowFileIdentifierJob {
type Init = ShallowFileIdentifierJobInit;
type Data = ShallowFileIdentifierJobState;
type Step = ();
fn name(&self) -> &'static str {
SHALLOW_FILE_IDENTIFIER_JOB_NAME
const NAME: &'static str = "shallow_file_identifier";
fn new() -> Self {
Self {}
}
async fn init(&self, ctx: WorkerContext, state: &mut JobState<Self>) -> Result<(), JobError> {
@ -113,7 +119,7 @@ impl StatefulJob for ShallowFileIdentifierJob {
if orphan_count == 0 {
return Err(JobError::EarlyFinish {
name: self.name().to_string(),
name: <Self as StatefulJob>::NAME.to_string(),
reason: "Found no orphan file paths to process".to_string(),
});
}
@ -167,7 +173,7 @@ impl StatefulJob for ShallowFileIdentifierJob {
let file_paths = get_orphan_file_paths(&ctx.library.db, cursor, *sub_path_id).await?;
process_identifier_file_paths(
self.name(),
<Self as StatefulJob>::NAME,
location,
&file_paths,
state.step_number,

View file

@ -1,17 +1,19 @@
use crate::job::{JobError, JobReportUpdate, JobResult, JobState, StatefulJob, WorkerContext};
use crate::{
invalidate_query,
job::{
JobError, JobInitData, JobReportUpdate, JobResult, JobState, StatefulJob, WorkerContext,
},
};
use std::{hash::Hash, path::PathBuf};
use serde::{Deserialize, Serialize};
use specta::Type;
use tokio::sync::oneshot;
use tracing::{error, trace};
use tracing::trace;
use super::{context_menu_fs_info, get_path_from_location_id, osstr_to_string, FsInfo};
pub struct FileCopierJob {
pub done_tx: Option<oneshot::Sender<()>>,
}
pub struct FileCopierJob {}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct FileCopierJobState {
@ -48,7 +50,9 @@ impl From<FsInfo> for FileCopierJobStep {
}
}
pub const COPY_JOB_NAME: &str = "file_copier";
impl JobInitData for FileCopierJobInit {
type Job = FileCopierJob;
}
#[async_trait::async_trait]
impl StatefulJob for FileCopierJob {
@ -56,8 +60,10 @@ impl StatefulJob for FileCopierJob {
type Data = FileCopierJobState;
type Step = FileCopierJobStep;
fn name(&self) -> &'static str {
COPY_JOB_NAME
const NAME: &'static str = "file_copier";
fn new() -> Self {
Self {}
}
async fn init(&self, ctx: WorkerContext, state: &mut JobState<Self>) -> Result<(), JobError> {
@ -179,12 +185,8 @@ impl StatefulJob for FileCopierJob {
Ok(())
}
async fn finalize(&mut self, _ctx: WorkerContext, state: &mut JobState<Self>) -> JobResult {
if let Some(done_tx) = self.done_tx.take() {
if done_tx.send(()).is_err() {
error!("Failed to send done signal on FileCopierJob");
}
}
async fn finalize(&mut self, ctx: WorkerContext, state: &mut JobState<Self>) -> JobResult {
invalidate_query!(ctx.library, "locations.getExplorerData");
Ok(Some(serde_json::to_value(&state.init)?))
}

View file

@ -1,4 +1,9 @@
use crate::job::{JobError, JobReportUpdate, JobResult, JobState, StatefulJob, WorkerContext};
use crate::{
invalidate_query,
job::{
JobError, JobInitData, JobReportUpdate, JobResult, JobState, StatefulJob, WorkerContext,
},
};
use std::{hash::Hash, path::PathBuf};
@ -27,7 +32,9 @@ pub struct FileCutterJobStep {
pub target_directory: PathBuf,
}
pub const CUT_JOB_NAME: &str = "file_cutter";
impl JobInitData for FileCutterJobInit {
type Job = FileCutterJob;
}
#[async_trait::async_trait]
impl StatefulJob for FileCutterJob {
@ -35,8 +42,10 @@ impl StatefulJob for FileCutterJob {
type Data = FileCutterJobState;
type Step = FileCutterJobStep;
fn name(&self) -> &'static str {
CUT_JOB_NAME
const NAME: &'static str = "file_cutter";
fn new() -> Self {
Self {}
}
async fn init(&self, ctx: WorkerContext, state: &mut JobState<Self>) -> Result<(), JobError> {
@ -85,7 +94,9 @@ impl StatefulJob for FileCutterJob {
Ok(())
}
async fn finalize(&mut self, _ctx: WorkerContext, state: &mut JobState<Self>) -> JobResult {
async fn finalize(&mut self, ctx: WorkerContext, state: &mut JobState<Self>) -> JobResult {
invalidate_query!(ctx.library, "locations.getExplorerData");
Ok(Some(serde_json::to_value(&state.init)?))
}
}

View file

@ -4,7 +4,12 @@ use specta::Type;
use std::{collections::VecDeque, path::PathBuf};
use tokio::fs::File;
use crate::job::{JobError, JobReportUpdate, JobResult, JobState, StatefulJob, WorkerContext};
use crate::{
invalidate_query,
job::{
JobError, JobInitData, JobReportUpdate, JobResult, JobState, StatefulJob, WorkerContext,
},
};
use super::{context_menu_fs_info, FsInfo, BYTES_EXT};
pub struct FileDecryptorJob;
@ -27,16 +32,20 @@ pub struct FileDecryptorJobStep {
pub fs_info: FsInfo,
}
const JOB_NAME: &str = "file_decryptor";
impl JobInitData for FileDecryptorJobInit {
type Job = FileDecryptorJob;
}
#[async_trait::async_trait]
impl StatefulJob for FileDecryptorJob {
type Data = FileDecryptorJobState;
type Init = FileDecryptorJobInit;
type Data = FileDecryptorJobState;
type Step = FileDecryptorJobStep;
fn name(&self) -> &'static str {
JOB_NAME
const NAME: &'static str = "file_decryptor";
fn new() -> Self {
Self {}
}
async fn init(&self, ctx: WorkerContext, state: &mut JobState<Self>) -> Result<(), JobError> {
@ -145,7 +154,9 @@ impl StatefulJob for FileDecryptorJob {
Ok(())
}
async fn finalize(&mut self, _ctx: WorkerContext, state: &mut JobState<Self>) -> JobResult {
async fn finalize(&mut self, ctx: WorkerContext, state: &mut JobState<Self>) -> JobResult {
invalidate_query!(ctx.library, "locations.getExplorerData");
// mark job as successful
Ok(Some(serde_json::to_value(&state.init)?))
}

View file

@ -1,4 +1,9 @@
use crate::job::{JobError, JobReportUpdate, JobResult, JobState, StatefulJob, WorkerContext};
use crate::{
invalidate_query,
job::{
JobError, JobInitData, JobReportUpdate, JobResult, JobState, StatefulJob, WorkerContext,
},
};
use std::hash::Hash;
@ -18,7 +23,9 @@ pub struct FileDeleterJobInit {
pub path_id: i32,
}
pub const DELETE_JOB_NAME: &str = "file_deleter";
impl JobInitData for FileDeleterJobInit {
type Job = FileDeleterJob;
}
#[async_trait::async_trait]
impl StatefulJob for FileDeleterJob {
@ -26,8 +33,10 @@ impl StatefulJob for FileDeleterJob {
type Data = FileDeleterJobState;
type Step = FsInfo;
fn name(&self) -> &'static str {
DELETE_JOB_NAME
const NAME: &'static str = "file_deleter";
fn new() -> Self {
Self {}
}
async fn init(&self, ctx: WorkerContext, state: &mut JobState<Self>) -> Result<(), JobError> {
@ -64,7 +73,9 @@ impl StatefulJob for FileDeleterJob {
Ok(())
}
async fn finalize(&mut self, _ctx: WorkerContext, state: &mut JobState<Self>) -> JobResult {
async fn finalize(&mut self, ctx: WorkerContext, state: &mut JobState<Self>) -> JobResult {
invalidate_query!(ctx.library, "locations.getExplorerData");
Ok(Some(serde_json::to_value(&state.init)?))
}
}

View file

@ -1,4 +1,4 @@
use crate::{job::*, library::Library};
use crate::{invalidate_query, job::*, library::Library};
use std::path::PathBuf;
@ -12,7 +12,7 @@ use sd_crypto::{
use serde::{Deserialize, Serialize};
use specta::Type;
use tokio::{fs::File, io::AsyncReadExt};
use tracing::warn;
use tracing::{error, warn};
use super::{context_menu_fs_info, FsInfo, BYTES_EXT};
@ -43,7 +43,9 @@ pub struct Metadata {
pub date_created: chrono::DateTime<FixedOffset>,
}
const JOB_NAME: &str = "file_encryptor";
impl JobInitData for FileEncryptorJobInit {
type Job = FileEncryptorJob;
}
#[async_trait::async_trait]
impl StatefulJob for FileEncryptorJob {
@ -51,8 +53,10 @@ impl StatefulJob for FileEncryptorJob {
type Data = FileEncryptorJobState;
type Step = FsInfo;
fn name(&self) -> &'static str {
JOB_NAME
const NAME: &'static str = "file_encryptor";
fn new() -> Self {
Self {}
}
async fn init(&self, ctx: WorkerContext, state: &mut JobState<Self>) -> Result<(), JobError> {
@ -122,7 +126,17 @@ impl StatefulJob for FileEncryptorJob {
ctx.library.clone(),
&output_path,
)
.await?;
.await
.map_or_else(
|e| {
error!(
"Failed to make location manager ignore the path {}; Error: {e:#?}",
output_path.display()
);
None
},
Some,
);
let mut reader = File::open(&info.fs_path).await?;
let mut writer = File::create(output_path).await?;
@ -223,7 +237,9 @@ impl StatefulJob for FileEncryptorJob {
Ok(())
}
async fn finalize(&mut self, _ctx: WorkerContext, state: &mut JobState<Self>) -> JobResult {
async fn finalize(&mut self, ctx: WorkerContext, state: &mut JobState<Self>) -> JobResult {
invalidate_query!(ctx.library, "locations.getExplorerData");
// mark job as successful
Ok(Some(serde_json::to_value(&state.init)?))
}

View file

@ -1,4 +1,9 @@
use crate::job::{JobError, JobReportUpdate, JobResult, JobState, StatefulJob, WorkerContext};
use crate::{
invalidate_query,
job::{
JobError, JobInitData, JobReportUpdate, JobResult, JobState, StatefulJob, WorkerContext,
},
};
use std::{hash::Hash, path::PathBuf};
@ -42,7 +47,9 @@ impl From<FsInfo> for FileEraserJobStep {
}
}
pub const ERASE_JOB_NAME: &str = "file_eraser";
impl JobInitData for FileEraserJobInit {
type Job = FileEraserJob;
}
#[async_trait::async_trait]
impl StatefulJob for FileEraserJob {
@ -50,8 +57,10 @@ impl StatefulJob for FileEraserJob {
type Data = FsInfo;
type Step = FileEraserJobStep;
fn name(&self) -> &'static str {
ERASE_JOB_NAME
const NAME: &'static str = "file_eraser";
fn new() -> Self {
Self {}
}
async fn init(&self, ctx: WorkerContext, state: &mut JobState<Self>) -> Result<(), JobError> {
@ -118,7 +127,7 @@ impl StatefulJob for FileEraserJob {
Ok(())
}
async fn finalize(&mut self, _ctx: WorkerContext, state: &mut JobState<Self>) -> JobResult {
async fn finalize(&mut self, ctx: WorkerContext, state: &mut JobState<Self>) -> JobResult {
if let Some(ref info) = state.data {
if info.path_data.is_dir {
tokio::fs::remove_dir_all(&info.fs_path).await?;
@ -127,6 +136,8 @@ impl StatefulJob for FileEraserJob {
warn!("missing job state, unable to fully finalise erase job");
}
invalidate_query!(ctx.library, "locations.getExplorerData");
Ok(Some(serde_json::to_value(&state.init)?))
}
}

View file

@ -1,5 +1,7 @@
use crate::{
job::{JobError, JobReportUpdate, JobResult, JobState, StatefulJob, WorkerContext},
job::{
JobError, JobInitData, JobReportUpdate, JobResult, JobState, StatefulJob, WorkerContext,
},
library::Library,
location::{
file_path_helper::{
@ -32,8 +34,6 @@ use super::{
#[cfg(feature = "ffmpeg")]
use super::FILTERED_VIDEO_EXTENSIONS;
pub const SHALLOW_THUMBNAILER_JOB_NAME: &str = "shallow_thumbnailer";
pub struct ShallowThumbnailerJob {}
#[derive(Serialize, Deserialize, Clone)]
@ -49,14 +49,20 @@ impl Hash for ShallowThumbnailerJobInit {
}
}
impl JobInitData for ShallowThumbnailerJobInit {
type Job = ShallowThumbnailerJob;
}
#[async_trait::async_trait]
impl StatefulJob for ShallowThumbnailerJob {
type Init = ShallowThumbnailerJobInit;
type Data = ThumbnailerJobState;
type Step = ThumbnailerJobStep;
fn name(&self) -> &'static str {
SHALLOW_THUMBNAILER_JOB_NAME
const NAME: &'static str = "shallow_thumbnailer";
fn new() -> Self {
Self {}
}
async fn init(&self, ctx: WorkerContext, state: &mut JobState<Self>) -> Result<(), JobError> {

View file

@ -1,5 +1,7 @@
use crate::{
job::{JobError, JobReportUpdate, JobResult, JobState, StatefulJob, WorkerContext},
job::{
JobError, JobInitData, JobReportUpdate, JobResult, JobState, StatefulJob, WorkerContext,
},
library::Library,
location::file_path_helper::{
ensure_sub_path_is_directory, ensure_sub_path_is_in_location,
@ -25,8 +27,6 @@ use super::{
#[cfg(feature = "ffmpeg")]
use super::FILTERED_VIDEO_EXTENSIONS;
pub const THUMBNAILER_JOB_NAME: &str = "thumbnailer";
pub struct ThumbnailerJob {}
#[derive(Serialize, Deserialize, Clone)]
@ -45,14 +45,20 @@ impl Hash for ThumbnailerJobInit {
}
}
impl JobInitData for ThumbnailerJobInit {
type Job = ThumbnailerJob;
}
#[async_trait::async_trait]
impl StatefulJob for ThumbnailerJob {
type Init = ThumbnailerJobInit;
type Data = ThumbnailerJobState;
type Step = ThumbnailerJobStep;
fn name(&self) -> &'static str {
THUMBNAILER_JOB_NAME
const NAME: &'static str = "thumbnailer";
fn new() -> Self {
Self {}
}
async fn init(&self, ctx: WorkerContext, state: &mut JobState<Self>) -> Result<(), JobError> {

View file

@ -1,5 +1,7 @@
use crate::{
job::{JobError, JobReportUpdate, JobResult, JobState, StatefulJob, WorkerContext},
job::{
JobError, JobInitData, JobReportUpdate, JobResult, JobState, StatefulJob, WorkerContext,
},
library::Library,
location::file_path_helper::{file_path_for_object_validator, MaterializedPath},
prisma::{file_path, location},
@ -14,8 +16,6 @@ use tracing::info;
use super::hash::file_checksum;
pub const VALIDATOR_JOB_NAME: &str = "object_validator";
// The Validator is able to:
// - generate a full byte checksum for Objects in a Location
// - generate checksums for all Objects missing without one
@ -36,14 +36,20 @@ pub struct ObjectValidatorJobInit {
pub background: bool,
}
impl JobInitData for ObjectValidatorJobInit {
type Job = ObjectValidatorJob;
}
#[async_trait::async_trait]
impl StatefulJob for ObjectValidatorJob {
type Init = ObjectValidatorJobInit;
type Data = ObjectValidatorJobState;
type Step = file_path_for_object_validator::Data;
fn name(&self) -> &'static str {
VALIDATOR_JOB_NAME
const NAME: &'static str = "object_validator";
fn new() -> Self {
Self {}
}
async fn init(&self, ctx: WorkerContext, state: &mut JobState<Self>) -> Result<(), JobError> {

View file

@ -44,53 +44,53 @@ impl SyncManager {
pub async fn write_ops<'item, I: prisma_client_rust::BatchItem<'item>>(
&self,
tx: &PrismaClient,
(ops, queries): (Vec<CRDTOperation>, I),
(_ops, queries): (Vec<CRDTOperation>, I),
) -> prisma_client_rust::Result<<I as prisma_client_rust::BatchItemParent>::ReturnValue> {
let owned = ops
.iter()
.filter_map(|op| match &op.typ {
CRDTOperationType::Owned(owned_op) => Some(tx.owned_operation().create(
op.id.as_bytes().to_vec(),
op.timestamp.0 as i64,
to_vec(&owned_op.items).unwrap(),
owned_op.model.clone(),
node::pub_id::equals(op.node.as_bytes().to_vec()),
vec![],
)),
_ => None,
})
.collect::<Vec<_>>();
let shared = ops
.iter()
.filter_map(|op| match &op.typ {
CRDTOperationType::Shared(shared_op) => {
let kind = match &shared_op.data {
SharedOperationData::Create(_) => "c",
SharedOperationData::Update { .. } => "u",
SharedOperationData::Delete => "d",
};
Some(tx.shared_operation().create(
op.id.as_bytes().to_vec(),
op.timestamp.0 as i64,
shared_op.model.to_string(),
to_vec(&shared_op.record_id).unwrap(),
kind.to_string(),
to_vec(&shared_op.data).unwrap(),
node::pub_id::equals(op.node.as_bytes().to_vec()),
vec![],
))
}
_ => None,
})
.collect::<Vec<_>>();
#[cfg(feature = "sync-messages")]
let res = {
let owned = _ops
.iter()
.filter_map(|op| match &op.typ {
CRDTOperationType::Owned(owned_op) => Some(tx.owned_operation().create(
op.id.as_bytes().to_vec(),
op.timestamp.0 as i64,
to_vec(&owned_op.items).unwrap(),
owned_op.model.clone(),
node::pub_id::equals(op.node.as_bytes().to_vec()),
vec![],
)),
_ => None,
})
.collect::<Vec<_>>();
let shared = _ops
.iter()
.filter_map(|op| match &op.typ {
CRDTOperationType::Shared(shared_op) => {
let kind = match &shared_op.data {
SharedOperationData::Create(_) => "c",
SharedOperationData::Update { .. } => "u",
SharedOperationData::Delete => "d",
};
Some(tx.shared_operation().create(
op.id.as_bytes().to_vec(),
op.timestamp.0 as i64,
shared_op.model.to_string(),
to_vec(&shared_op.record_id).unwrap(),
kind.to_string(),
to_vec(&shared_op.data).unwrap(),
node::pub_id::equals(op.node.as_bytes().to_vec()),
vec![],
))
}
_ => None,
})
.collect::<Vec<_>>();
let (res, _) = tx._batch((queries, (owned, shared))).await?;
for op in ops {
for op in _ops {
self.tx.send(SyncMessage::Created(op)).ok();
}

View file

@ -3,7 +3,8 @@ import { Loader } from '@sd/ui';
import { useLibraryQuery } from '~/../packages/client/src';
export default () => {
const { data: isRunningJob } = useLibraryQuery(['jobs.isRunning']);
const { data: runningJobs } = useLibraryQuery(['jobs.getRunning']);
const isRunningJob = runningJobs?.length !== undefined && runningJobs?.length > 0;
return isRunningJob ? (
<Loader className="h-[20px] w-[20px]" />

View file

@ -7,7 +7,6 @@ export type Procedures = {
{ key: "files.get", input: LibraryArgs<GetArgs>, result: { id: number, pub_id: number[], kind: number, key_id: number | null, hidden: boolean, favorite: boolean, important: boolean, has_thumbnail: boolean, has_thumbstrip: boolean, has_video_preview: boolean, ipfs_id: string | null, note: string | null, date_created: string, file_paths: FilePath[], media_data: MediaData | null } | null } |
{ key: "jobs.getHistory", input: LibraryArgs<null>, result: JobReport[] } |
{ key: "jobs.getRunning", input: LibraryArgs<null>, result: JobReport[] } |
{ key: "jobs.isRunning", input: LibraryArgs<null>, result: boolean } |
{ key: "keys.getDefault", input: LibraryArgs<null>, result: string | null } |
{ key: "keys.getKey", input: LibraryArgs<string>, result: string } |
{ key: "keys.getSecretKey", input: LibraryArgs<null>, result: string | null } |
@ -78,7 +77,7 @@ export type Procedures = {
{ key: "tags.delete", input: LibraryArgs<number>, result: null } |
{ key: "tags.update", input: LibraryArgs<TagUpdateArgs>, result: null },
subscriptions:
{ key: "invalidateQuery", input: never, result: InvalidateOperationEvent } |
{ key: "invalidation.listen", input: never, result: InvalidateOperationEvent[] } |
{ key: "jobs.newThumbnail", input: LibraryArgs<null>, result: string } |
{ key: "locations.online", input: never, result: number[][] } |
{ key: "p2p.events", input: never, result: P2PEvent } |
@ -162,9 +161,9 @@ export type IndexerRule = { id: number, kind: number, name: string, parameters:
*/
export type IndexerRuleCreateArgs = { kind: RuleKind, name: string, parameters: number[] }
export type InvalidateOperationEvent = { key: string, arg: any }
export type InvalidateOperationEvent = { key: string, arg: any, result: any | null }
export type JobReport = { id: string, name: string, data: number[] | null, metadata: any | null, date_created: string, date_modified: string, status: JobStatus, task_count: number, completed_task_count: number, message: string, seconds_elapsed: number }
export type JobReport = { id: string, name: string, data: number[] | null, metadata: any | null, created_at: string | null, updated_at: string | null, parent_id: string | null, status: JobStatus, task_count: number, completed_task_count: number, message: string, seconds_elapsed: number }
export type JobStatus = "Queued" | "Running" | "Completed" | "Canceled" | "Failed" | "Paused"

View file

@ -82,13 +82,20 @@ export const useLibraryMutation = libraryHooks.useMutation;
export function useInvalidateQuery() {
const context = rspc.useContext();
rspc.useSubscription(['invalidateQuery'], {
onData: (invalidateOperation) => {
const key = [invalidateOperation.key];
if (invalidateOperation.arg !== null) {
key.concat(invalidateOperation.arg);
rspc.useSubscription(['invalidation.listen'], {
onData: (ops) => {
for (const op of ops) {
const key = [op.key];
if (op.arg !== null) {
key.concat(op.arg);
}
if (op.result !== null) {
context.queryClient.setQueryData(key, op.result);
} else {
context.queryClient.invalidateQueries(key);
}
}
context.queryClient.invalidateQueries(key);
}
});
}