[ENG-1627] Move all jobs and actors to an "Old" namespace (#2157)

* Bunch of new warns due to a stronger clippy config

* Moving old processing stuff to a new namespace
Also fixing a bunch of typos through the entire codebase.

* Rustfmt
This commit is contained in:
Ericson "Fogo" Soares 2024-03-04 15:57:35 -03:00 committed by GitHub
parent 9cb976af72
commit 68ca2382fd
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
93 changed files with 906 additions and 761 deletions

View file

@ -194,7 +194,7 @@ mod locations {
}
#[derive(Debug)]
pub struct CredentialsProvider(sd_cloud_api::locations::authorise::Response);
pub struct CredentialsProvider(sd_cloud_api::locations::authorize::Response);
impl ProvideCredentials for CredentialsProvider {
fn provide_credentials<'a>(&'a self) -> future::ProvideCredentials<'a>
@ -219,7 +219,7 @@ mod locations {
// Reuse the client between procedure calls
fn get_aws_s3_client(
token: sd_cloud_api::locations::authorise::Response,
token: sd_cloud_api::locations::authorize::Response,
) -> &'static aws_sdk_s3::Client {
AWS_S3_CLIENT.get_or_init(|| {
aws_sdk_s3::Client::new(
@ -259,7 +259,7 @@ mod locations {
// TODO: Remove this
.procedure("testing", {
// // TODO: Move this off a static. This is just for debugging.
// static AUTH_TOKEN: Lazy<Mutex<Option<AuthoriseResponse>>> =
// static AUTH_TOKEN: Lazy<Mutex<Option<AuthorizeResponse>>> =
// Lazy::new(|| Mutex::new(None));
#[derive(Type, Deserialize)]
@ -273,7 +273,7 @@ mod locations {
let token = &mut None; // AUTH_TOKEN.lock().await; // TODO: Caching of the token. For now it's annoying when debugging.
if token.is_none() {
*token = Some(
sd_cloud_api::locations::authorise(
sd_cloud_api::locations::authorize(
node.cloud_api_config().await,
params.id,
)

View file

@ -1,17 +1,17 @@
use crate::{
api::{locations::object_with_file_paths, utils::library},
invalidate_query,
job::Job,
library::Library,
location::{get_location_path_from_location_id, LocationError},
object::{
fs::{
copy::FileCopierJobInit, cut::FileCutterJobInit, delete::FileDeleterJobInit,
erase::FileEraserJobInit, error::FileSystemJobsError,
find_available_filename_for_duplicate,
error::FileSystemJobsError, find_available_filename_for_duplicate,
old_copy::OldFileCopierJobInit, old_cut::OldFileCutterJobInit,
old_delete::OldFileDeleterJobInit, old_erase::OldFileEraserJobInit,
},
media::media_data_image_from_prisma_data,
},
old_job::Job,
};
use sd_cache::{CacheNode, Model, NormalisedResult, Reference};
@ -19,7 +19,7 @@ use sd_file_ext::kind::ObjectKind;
use sd_file_path_helper::{
file_path_to_isolate, file_path_to_isolate_with_id, FilePathError, IsolatedFilePathData,
};
use sd_images::ConvertableExtension;
use sd_images::ConvertibleExtension;
use sd_media_metadata::MediaMetadata;
use sd_prisma::{
prisma::{file_path, location, object},
@ -396,7 +396,7 @@ pub(crate) fn mount() -> AlphaRouter<Ctx> {
// })
.procedure("deleteFiles", {
R.with2(library())
.mutation(|(node, library), args: FileDeleterJobInit| async move {
.mutation(|(node, library), args: OldFileDeleterJobInit| async move {
match args.file_path_ids.len() {
0 => Ok(()),
1 => {
@ -472,7 +472,7 @@ pub(crate) fn mount() -> AlphaRouter<Ctx> {
location_id: location::id::Type,
file_path_id: file_path::id::Type,
delete_src: bool, // if set, we delete the src image after
desired_extension: ConvertableExtension,
desired_extension: ConvertibleExtension,
quality_percentage: Option<i32>, // 1% - 125%
}
R.with2(library())
@ -604,7 +604,7 @@ pub(crate) fn mount() -> AlphaRouter<Ctx> {
})
.procedure("eraseFiles", {
R.with2(library())
.mutation(|(node, library), args: FileEraserJobInit| async move {
.mutation(|(node, library), args: OldFileEraserJobInit| async move {
Job::new(args)
.spawn(&node, &library)
.await
@ -613,7 +613,7 @@ pub(crate) fn mount() -> AlphaRouter<Ctx> {
})
.procedure("copyFiles", {
R.with2(library())
.mutation(|(node, library), args: FileCopierJobInit| async move {
.mutation(|(node, library), args: OldFileCopierJobInit| async move {
Job::new(args)
.spawn(&node, &library)
.await
@ -622,7 +622,7 @@ pub(crate) fn mount() -> AlphaRouter<Ctx> {
})
.procedure("cutFiles", {
R.with2(library())
.mutation(|(node, library), args: FileCutterJobInit| async move {
.mutation(|(node, library), args: OldFileCutterJobInit| async move {
Job::new(args)
.spawn(&node, &library)
.await

View file

@ -1,11 +1,12 @@
use crate::{
invalidate_query,
job::{job_without_data, Job, JobReport, JobStatus, Jobs},
location::{find_location, LocationError},
object::{
file_identifier::file_identifier_job::FileIdentifierJobInit, media::MediaProcessorJobInit,
validation::validator_job::ObjectValidatorJobInit,
media::OldMediaProcessorJobInit,
old_file_identifier::old_file_identifier_job::OldFileIdentifierJobInit,
validation::old_validator_job::OldObjectValidatorJobInit,
},
old_job::{job_without_data, Job, JobReport, JobStatus, OldJobs},
};
use sd_prisma::prisma::{job, location, SortOrder};
@ -96,7 +97,7 @@ pub(crate) fn mount() -> AlphaRouter<Ctx> {
.flat_map(JobReport::try_from)
.collect();
let active_reports_by_id = node.jobs.get_active_reports_with_id().await;
let active_reports_by_id = node.old_jobs.get_active_reports_with_id().await;
for job in job_reports {
// action name and group key are computed from the job data
@ -161,7 +162,7 @@ pub(crate) fn mount() -> AlphaRouter<Ctx> {
.procedure("isActive", {
R.with2(library())
.query(|(node, library), _: ()| async move {
Ok(node.jobs.has_active_workers(library.id).await)
Ok(node.old_jobs.has_active_workers(library.id).await)
})
})
.procedure("clear", {
@ -202,7 +203,7 @@ pub(crate) fn mount() -> AlphaRouter<Ctx> {
.procedure("pause", {
R.with2(library())
.mutation(|(node, library), id: Uuid| async move {
let ret = Jobs::pause(&node.jobs, id).await.map_err(Into::into);
let ret = OldJobs::pause(&node.old_jobs, id).await.map_err(Into::into);
invalidate_query!(library, "jobs.reports");
ret
})
@ -210,7 +211,9 @@ pub(crate) fn mount() -> AlphaRouter<Ctx> {
.procedure("resume", {
R.with2(library())
.mutation(|(node, library), id: Uuid| async move {
let ret = Jobs::resume(&node.jobs, id).await.map_err(Into::into);
let ret = OldJobs::resume(&node.old_jobs, id)
.await
.map_err(Into::into);
invalidate_query!(library, "jobs.reports");
ret
})
@ -218,7 +221,9 @@ pub(crate) fn mount() -> AlphaRouter<Ctx> {
.procedure("cancel", {
R.with2(library())
.mutation(|(node, library), id: Uuid| async move {
let ret = Jobs::cancel(&node.jobs, id).await.map_err(Into::into);
let ret = OldJobs::cancel(&node.old_jobs, id)
.await
.map_err(Into::into);
invalidate_query!(library, "jobs.reports");
ret
})
@ -243,7 +248,7 @@ pub(crate) fn mount() -> AlphaRouter<Ctx> {
return Err(LocationError::IdNotFound(id).into());
};
Job::new(MediaProcessorJobInit {
Job::new(OldMediaProcessorJobInit {
location,
sub_path: Some(path),
regenerate_thumbnails: regenerate,
@ -275,7 +280,7 @@ pub(crate) fn mount() -> AlphaRouter<Ctx> {
return Err(LocationError::IdNotFound(id).into());
};
Job::new(MediaProcessorJobInit {
Job::new(OldMediaProcessorJobInit {
location,
sub_path: Some(path),
regenerate_thumbnails: false,
@ -300,7 +305,7 @@ pub(crate) fn mount() -> AlphaRouter<Ctx> {
return Err(LocationError::IdNotFound(args.id).into());
};
Job::new(ObjectValidatorJobInit {
Job::new(OldObjectValidatorJobInit {
location,
sub_path: Some(args.path),
})
@ -322,7 +327,7 @@ pub(crate) fn mount() -> AlphaRouter<Ctx> {
return Err(LocationError::IdNotFound(args.id).into());
};
Job::new(FileIdentifierJobInit {
Job::new(OldFileIdentifierJobInit {
location,
sub_path: Some(args.path),
})

View file

@ -1,4 +1,6 @@
use crate::{invalidate_query, library::Library, object::media::thumbnail::get_indexed_thumb_key};
use crate::{
invalidate_query, library::Library, object::media::old_thumbnail::get_indexed_thumb_key,
};
use sd_prisma::{
prisma::{label, label_on_object, object, SortOrder},

View file

@ -1,15 +1,15 @@
use crate::{
invalidate_query,
job::StatefulJob,
location::{
delete_location, find_location,
indexer::{rules::IndexerRuleCreateArgs, IndexerJobInit},
indexer::{rules::IndexerRuleCreateArgs, OldIndexerJobInit},
light_scan_location, location_with_indexer_rules,
non_indexed::NonIndexedPathItem,
relink_location, scan_location, scan_location_sub_path, LocationCreateArgs, LocationError,
LocationUpdateArgs,
},
object::file_identifier::file_identifier_job::FileIdentifierJobInit,
object::old_file_identifier::old_file_identifier_job::OldFileIdentifierJobInit,
old_job::StatefulJob,
p2p::PeerMetadata,
util::AbortOnDrop,
};
@ -452,12 +452,12 @@ pub(crate) fn mount() -> AlphaRouter<Ctx> {
sub_path,
}: LightScanArgs| async move {
if node
.jobs
.old_jobs
.has_job_running(|job_identity| {
job_identity.target_location == location_id
&& (job_identity.name == <IndexerJobInit as StatefulJob>::NAME
&& (job_identity.name == <OldIndexerJobInit as StatefulJob>::NAME
|| job_identity.name
== <FileIdentifierJobInit as StatefulJob>::NAME)
== <OldFileIdentifierJobInit as StatefulJob>::NAME)
})
.await
{

View file

@ -1,10 +1,10 @@
use crate::{
invalidate_query,
job::JobProgressEvent,
node::{
config::{NodeConfig, NodePreferences, P2PDiscoveryState, Port},
get_hardware_model_name, HardwareModel,
},
old_job::JobProgressEvent,
p2p::{into_listener2, Listener2},
Node,
};

View file

@ -14,7 +14,7 @@ pub(crate) fn mount() -> AlphaRouter<Ctx> {
#[cfg(feature = "ai")]
{
use sd_ai::image_labeler::{Model, YoloV8};
use sd_ai::old_image_labeler::{Model, YoloV8};
Ok(YoloV8::versions())
}
},

View file

@ -61,7 +61,7 @@ pub(crate) fn mount() -> AlphaRouter<Ctx> {
.map(|node_version| version != *node_version)
.unwrap_or(true)
{
new_model = sd_ai::image_labeler::YoloV8::model(Some(&version))
new_model = sd_ai::old_image_labeler::YoloV8::model(Some(&version))
.map_err(|e| {
error!(
"Failed to crate image_detection model: '{}'; Error: {e:#?}",
@ -97,7 +97,7 @@ pub(crate) fn mount() -> AlphaRouter<Ctx> {
let version = model.version().to_string();
tokio::spawn(async move {
let notification =
if let Some(image_labeller) = node.image_labeller.as_ref() {
if let Some(image_labeller) = node.old_image_labeller.as_ref() {
if let Err(e) = image_labeller.change_model(model).await {
NotificationData {
title: String::from(

View file

@ -5,7 +5,7 @@ use crate::{
},
library::Library,
location::{non_indexed, LocationError},
object::media::thumbnail::get_indexed_thumb_key,
object::media::old_thumbnail::get_indexed_thumb_key,
util::{unsafe_streamed_query, BatchedStream},
};
@ -156,7 +156,7 @@ pub fn mount() -> AlphaRouter<Ctx> {
let mut stream = BatchedStream::new(paths);
Ok(unsafe_streamed_query(stream! {
while let Some(result) = stream.next().await {
// We optimise for the case of no errors because it should be way more common.
// We optimize for the case of no errors because it should be way more common.
let mut entries = Vec::with_capacity(result.len());
let mut errors = Vec::with_capacity(0);

View file

@ -1,7 +1,7 @@
use crate::{
api::{utils::InvalidateOperationEvent, CoreEvent},
library::Library,
object::media::thumbnail::WEBP_EXTENSION,
object::media::old_thumbnail::WEBP_EXTENSION,
p2p::operations,
util::InfallibleResponse,
Node,

View file

@ -54,11 +54,12 @@ pub(crate) async fn serve_file(
// ETag
let mut status_code = StatusCode::PARTIAL_CONTENT;
if let Ok(time) = metadata.modified() {
let etag_header = format!(
let etag_header =
format!(
r#""{}""#,
// The ETag's can be any value so we just use the modified time to make it easy.
time.duration_since(UNIX_EPOCH)
.expect("are you a time traveller? cause that's the only explanation for this error")
.expect("are you a time traveler? cause that's the only explanation for this error")
.as_millis()
);

View file

@ -3,11 +3,11 @@
use crate::{
api::{CoreEvent, Router},
location::LocationManagerError,
object::media::thumbnail::actor::Thumbnailer,
object::media::old_thumbnail::old_actor::OldThumbnailer,
};
#[cfg(feature = "ai")]
use sd_ai::image_labeler::{DownloadModelError, ImageLabeler, YoloV8};
use sd_ai::old_image_labeler::{DownloadModelError, OldImageLabeler, YoloV8};
use api::notifications::{Notification, NotificationData, NotificationId};
use chrono::{DateTime, Utc};
@ -34,12 +34,12 @@ pub mod api;
mod cloud;
pub mod custom_uri;
mod env;
pub(crate) mod job;
pub mod library;
pub(crate) mod location;
pub(crate) mod node;
pub(crate) mod notifications;
pub(crate) mod object;
pub(crate) mod old_job;
pub(crate) mod p2p;
pub(crate) mod preferences;
#[doc(hidden)] // TODO(@Oscar): Make this private when breaking out `utils` into `sd-utils`
@ -56,18 +56,18 @@ pub struct Node {
pub data_dir: PathBuf,
pub config: Arc<config::Manager>,
pub libraries: Arc<library::Libraries>,
pub jobs: Arc<job::Jobs>,
pub old_jobs: Arc<old_job::OldJobs>,
pub locations: location::Locations,
pub p2p: Arc<p2p::P2PManager>,
pub event_bus: (broadcast::Sender<CoreEvent>, broadcast::Receiver<CoreEvent>),
pub notifications: Notifications,
pub thumbnailer: Thumbnailer,
pub thumbnailer: OldThumbnailer,
pub files_over_p2p_flag: Arc<AtomicBool>,
pub cloud_sync_flag: Arc<AtomicBool>,
pub env: Arc<env::Env>,
pub http: reqwest::Client,
#[cfg(feature = "ai")]
pub image_labeller: Option<ImageLabeler>,
pub old_image_labeller: Option<OldImageLabeler>,
}
impl fmt::Debug for Node {
@ -111,41 +111,43 @@ impl Node {
};
let (locations, locations_actor) = location::Locations::new();
let (jobs, jobs_actor) = job::Jobs::new();
let (old_jobs, jobs_actor) = old_job::OldJobs::new();
let libraries = library::Libraries::new(data_dir.join("libraries")).await?;
let (p2p, start_p2p) = p2p::P2PManager::new(config.clone(), libraries.clone())
.await
.map_err(NodeError::P2PManager)?;
let node =
Arc::new(Node {
data_dir: data_dir.to_path_buf(),
jobs,
locations,
notifications: notifications::Notifications::new(),
p2p,
thumbnailer: Thumbnailer::new(
data_dir,
libraries.clone(),
event_bus.0.clone(),
config.preferences_watcher(),
)
.await,
config,
event_bus,
libraries,
files_over_p2p_flag: Arc::new(AtomicBool::new(false)),
cloud_sync_flag: Arc::new(AtomicBool::new(false)),
http: reqwest::Client::new(),
env,
#[cfg(feature = "ai")]
image_labeller: ImageLabeler::new(YoloV8::model(image_labeler_version)?, data_dir)
.await
.map_err(|e| {
error!("Failed to initialize image labeller. AI features will be disabled: {e:#?}");
})
.ok(),
});
let node = Arc::new(Node {
data_dir: data_dir.to_path_buf(),
old_jobs,
locations,
notifications: notifications::Notifications::new(),
p2p,
thumbnailer: OldThumbnailer::new(
data_dir,
libraries.clone(),
event_bus.0.clone(),
config.preferences_watcher(),
)
.await,
config,
event_bus,
libraries,
files_over_p2p_flag: Arc::new(AtomicBool::new(false)),
cloud_sync_flag: Arc::new(AtomicBool::new(false)),
http: reqwest::Client::new(),
env,
#[cfg(feature = "ai")]
old_image_labeller: OldImageLabeler::new(
YoloV8::model(image_labeler_version)?,
data_dir,
)
.await
.map_err(|e| {
error!("Failed to initialize image labeller. AI features will be disabled: {e:#?}");
})
.ok(),
});
// Restore backend feature flags
for feature in node.config.get().await.features {
@ -250,10 +252,10 @@ impl Node {
pub async fn shutdown(&self) {
info!("Spacedrive shutting down...");
self.thumbnailer.shutdown().await;
self.jobs.shutdown().await;
self.old_jobs.shutdown().await;
self.p2p.shutdown().await;
#[cfg(feature = "ai")]
if let Some(image_labeller) = &self.image_labeller {
if let Some(image_labeller) = &self.old_image_labeller {
image_labeller.shutdown().await;
}
info!("Spacedrive Core shutdown successful!");

View file

@ -1,4 +1,4 @@
use crate::{api::CoreEvent, object::media::thumbnail::get_indexed_thumbnail_path, sync, Node};
use crate::{api::CoreEvent, object::media::old_thumbnail::get_indexed_thumbnail_path, sync, Node};
use sd_file_path_helper::{file_path_to_full_path, IsolatedFilePathData};
use sd_p2p2::Identity;

View file

@ -24,7 +24,7 @@ pub enum LibraryManagerError {
Uuid(#[from] uuid::Error),
#[error("failed to run indexer rules seeder: {0}")]
IndexerRulesSeeder(#[from] indexer::rules::seed::SeederError),
// #[error("failed to initialise the key manager: {0}")]
// #[error("failed to initialize the key manager: {0}")]
// KeyManager(#[from] sd_crypto::Error),
#[error("error migrating the library: {0}")]
MigrationError(#[from] db::MigrationError),

View file

@ -138,7 +138,7 @@ impl Libraries {
.await?;
// FIX-ME: Linux releases crashes with *** stack smashing detected *** if spawn_volume_watcher is enabled
// No ideia why, but this will be irrelevant after the UDisk API is implemented, so let's leave it disabled for now
// No idea why, but this will be irrelevant after the UDisk API is implemented, so let's leave it disabled for now
#[cfg(not(target_os = "linux"))]
{
use crate::volume::watcher::spawn_volume_watcher;
@ -532,7 +532,7 @@ impl Libraries {
};
}
if let Err(e) = node.jobs.clone().cold_resume(node, &library).await {
if let Err(e) = node.old_jobs.clone().cold_resume(node, &library).await {
error!("Failed to resume jobs for library. {:#?}", e);
}

View file

@ -95,7 +95,7 @@ impl From<LocationError> for rspc::Error {
Self::with_cause(ErrorCode::BadRequest, err.to_string(), err)
}
// Custom error message is used to differenciate these errors in the frontend
// Custom error message is used to differentiate these errors in the frontend
// TODO: A better solution would be for rspc to support sending custom data alongside errors
NeedRelink { .. } => {
Self::with_cause(ErrorCode::Conflict, "NEED_RELINK".to_owned(), err)

View file

@ -24,25 +24,25 @@ use tracing::{trace, warn};
use super::location_with_indexer_rules;
pub mod indexer_job;
pub mod old_indexer_job;
mod old_shallow;
mod old_walk;
pub mod rules;
mod shallow;
mod walk;
use old_walk::WalkedEntry;
use rules::IndexerRuleError;
use walk::WalkedEntry;
pub use indexer_job::IndexerJobInit;
pub use shallow::*;
pub use old_indexer_job::OldIndexerJobInit;
pub use old_shallow::*;
#[derive(Serialize, Deserialize, Debug)]
pub struct IndexerJobSaveStep {
pub struct OldIndexerJobSaveStep {
chunk_idx: usize,
walked: Vec<WalkedEntry>,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct IndexerJobUpdateStep {
pub struct OldIndexerJobUpdateStep {
chunk_idx: usize,
to_update: Vec<WalkedEntry>,
}
@ -85,7 +85,7 @@ impl From<IndexerError> for rspc::Error {
async fn execute_indexer_save_step(
location: &location_with_indexer_rules::Data,
save_step: &IndexerJobSaveStep,
save_step: &OldIndexerJobSaveStep,
library: &Library,
) -> Result<i64, IndexerError> {
let Library { sync, db, .. } = library;
@ -170,7 +170,7 @@ async fn execute_indexer_save_step(
}
async fn execute_indexer_update_step(
update_step: &IndexerJobUpdateStep,
update_step: &OldIndexerJobUpdateStep,
Library { sync, db, .. }: &Library,
) -> Result<i64, IndexerError> {
let (sync_stuff, paths_to_update): (Vec<_>, Vec<_>) = update_step
@ -511,7 +511,7 @@ pub async fn reverse_update_directories_sizes(
});
}
} else {
warn!("Corrupt database possesing a file_path entry without materialized_path");
warn!("Corrupt database possessing a file_path entry without materialized_path");
}
});

View file

@ -1,11 +1,11 @@
use crate::{
file_paths_db_fetcher_fn, invalidate_query,
job::{
library::Library,
location::{location_with_indexer_rules, update_location_size},
old_job::{
CurrentStep, JobError, JobInitOutput, JobReportUpdate, JobResult, JobRunMetadata,
JobStepOutput, StatefulJob, WorkerContext,
},
library::Library,
location::{location_with_indexer_rules, update_location_size},
to_remove_db_fetcher_fn,
};
@ -37,10 +37,10 @@ use tracing::{debug, info, warn};
use super::{
execute_indexer_save_step, execute_indexer_update_step, iso_file_path_factory,
old_walk::{keep_walking, walk, ToWalkEntry, WalkResult},
remove_non_existing_file_paths, reverse_update_directories_sizes,
rules::IndexerRule,
walk::{keep_walking, walk, ToWalkEntry, WalkResult},
IndexerError, IndexerJobSaveStep, IndexerJobUpdateStep,
IndexerError, OldIndexerJobSaveStep, OldIndexerJobUpdateStep,
};
/// BATCH_SIZE is the number of files to index at each step, writing the chunk of files metadata in the database.
@ -50,12 +50,12 @@ const BATCH_SIZE: usize = 1000;
/// and possibly a `sub_path` to be indexed. The `sub_path` is used when
/// we want do index just a part of a location.
#[derive(Serialize, Deserialize, Debug)]
pub struct IndexerJobInit {
pub struct OldIndexerJobInit {
pub location: location_with_indexer_rules::Data,
pub sub_path: Option<PathBuf>,
}
impl Hash for IndexerJobInit {
impl Hash for OldIndexerJobInit {
fn hash<H: Hasher>(&self, state: &mut H) {
self.location.id.hash(state);
if let Some(ref sub_path) = self.sub_path {
@ -67,14 +67,14 @@ impl Hash for IndexerJobInit {
/// is cached and casted on `PathBuf` from `local_path` column in the `location` table. It also
/// contains some metadata for logging purposes.
#[derive(Serialize, Deserialize, Debug)]
pub struct IndexerJobData {
pub struct OldIndexerJobData {
location_path: PathBuf,
indexed_path: PathBuf,
indexer_rules: Vec<IndexerRule>,
}
#[derive(Serialize, Deserialize, Default, Debug)]
pub struct IndexerJobRunMetadata {
pub struct OldIndexerJobRunMetadata {
db_write_time: Duration,
scan_read_time: Duration,
total_paths: u64,
@ -87,7 +87,7 @@ pub struct IndexerJobRunMetadata {
paths_and_sizes: HashMap<PathBuf, u64>,
}
impl JobRunMetadata for IndexerJobRunMetadata {
impl JobRunMetadata for OldIndexerJobRunMetadata {
fn update(&mut self, new_data: Self) {
self.db_write_time += new_data.db_write_time;
self.scan_read_time += new_data.scan_read_time;
@ -112,7 +112,7 @@ pub enum ScanProgress {
Message(String),
}
impl IndexerJobData {
impl OldIndexerJobData {
fn on_scan_progress(ctx: &WorkerContext, progress: Vec<ScanProgress>) {
ctx.progress(
progress
@ -131,20 +131,20 @@ impl IndexerJobData {
/// `IndexerJobStepInput` defines the action that should be executed in the current step
#[derive(Serialize, Deserialize, Debug)]
pub enum IndexerJobStepInput {
Save(IndexerJobSaveStep),
pub enum OldIndexerJobStepInput {
Save(OldIndexerJobSaveStep),
Walk(ToWalkEntry),
Update(IndexerJobUpdateStep),
Update(OldIndexerJobUpdateStep),
}
/// A `IndexerJob` is a stateful job that walks a directory and indexes all files.
/// First it walks the directory and generates a list of files to index, chunked into
/// batches of [`BATCH_SIZE`]. Then for each chunk it write the file metadata to the database.
#[async_trait::async_trait]
impl StatefulJob for IndexerJobInit {
type Data = IndexerJobData;
type Step = IndexerJobStepInput;
type RunMetadata = IndexerJobRunMetadata;
impl StatefulJob for OldIndexerJobInit {
type Data = OldIndexerJobData;
type Step = OldIndexerJobStepInput;
type RunMetadata = OldIndexerJobRunMetadata;
const NAME: &'static str = "indexer";
const IS_BATCHED: bool = true;
@ -255,7 +255,7 @@ impl StatefulJob for IndexerJobInit {
*total_new_paths += chunk_steps.len() as u64;
*to_save_chunks += 1;
IndexerJobStepInput::Save(IndexerJobSaveStep {
OldIndexerJobStepInput::Save(OldIndexerJobSaveStep {
chunk_idx: i,
walked: chunk_steps,
})
@ -271,18 +271,18 @@ impl StatefulJob for IndexerJobInit {
*total_updated_paths += chunk_updates.len() as u64;
*to_update_chunks += 1;
IndexerJobStepInput::Update(IndexerJobUpdateStep {
OldIndexerJobStepInput::Update(OldIndexerJobUpdateStep {
chunk_idx: i,
to_update: chunk_updates,
})
}),
)
.chain(to_walk.into_iter().map(IndexerJobStepInput::Walk))
.chain(to_walk.into_iter().map(OldIndexerJobStepInput::Walk))
.collect::<Vec<_>>();
debug!("Walker at indexer job found {total_updated_paths} file_paths to be updated");
IndexerJobData::on_scan_progress(
OldIndexerJobData::on_scan_progress(
ctx,
vec![
ScanProgress::ChunkCount(*to_save_chunks + *to_update_chunks),
@ -294,14 +294,14 @@ impl StatefulJob for IndexerJobInit {
],
);
*data = Some(IndexerJobData {
*data = Some(OldIndexerJobData {
location_path: location_path.to_path_buf(),
indexed_path: to_walk_path,
indexer_rules,
});
Ok((
IndexerJobRunMetadata {
OldIndexerJobRunMetadata {
db_write_time: db_delete_time,
scan_read_time,
total_paths: *total_new_paths,
@ -334,10 +334,10 @@ impl StatefulJob for IndexerJobInit {
let init = self;
let mut new_metadata = Self::RunMetadata::default();
match step {
IndexerJobStepInput::Save(step) => {
OldIndexerJobStepInput::Save(step) => {
let start_time = Instant::now();
IndexerJobData::on_scan_progress(
OldIndexerJobData::on_scan_progress(
ctx,
vec![
ScanProgress::SavedChunks(step.chunk_idx + 1),
@ -355,9 +355,9 @@ impl StatefulJob for IndexerJobInit {
Ok(new_metadata.into())
}
IndexerJobStepInput::Update(to_update) => {
OldIndexerJobStepInput::Update(to_update) => {
let start_time = Instant::now();
IndexerJobData::on_scan_progress(
OldIndexerJobData::on_scan_progress(
ctx,
vec![
ScanProgress::UpdatedChunks(to_update.chunk_idx + 1),
@ -376,7 +376,7 @@ impl StatefulJob for IndexerJobInit {
Ok(new_metadata.into())
}
IndexerJobStepInput::Walk(to_walk_entry) => {
OldIndexerJobStepInput::Walk(to_walk_entry) => {
let location_id = init.location.id;
let location_path =
maybe_missing(&init.location.path, "location.path").map(Path::new)?;
@ -424,7 +424,7 @@ impl StatefulJob for IndexerJobInit {
new_metadata.total_paths += chunk_steps.len() as u64;
new_metadata.total_save_steps += 1;
IndexerJobStepInput::Save(IndexerJobSaveStep {
OldIndexerJobStepInput::Save(OldIndexerJobSaveStep {
chunk_idx: i,
walked: chunk_steps,
})
@ -435,16 +435,16 @@ impl StatefulJob for IndexerJobInit {
new_metadata.total_updated_paths += chunk_updates.len() as u64;
new_metadata.total_update_steps += 1;
IndexerJobStepInput::Update(IndexerJobUpdateStep {
OldIndexerJobStepInput::Update(OldIndexerJobUpdateStep {
chunk_idx: i,
to_update: chunk_updates,
})
},
))
.chain(to_walk.into_iter().map(IndexerJobStepInput::Walk))
.chain(to_walk.into_iter().map(OldIndexerJobStepInput::Walk))
.collect::<Vec<_>>();
IndexerJobData::on_scan_progress(
OldIndexerJobData::on_scan_progress(
ctx,
vec![
ScanProgress::ChunkCount(more_steps.len() - to_walk_count),
@ -539,7 +539,7 @@ impl StatefulJob for IndexerJobInit {
fn update_notifier_fn(ctx: &WorkerContext) -> impl FnMut(&Path, usize) + '_ {
move |path, total_entries| {
IndexerJobData::on_scan_progress(
OldIndexerJobData::on_scan_progress(
ctx,
vec![ScanProgress::Message(format!(
"Found: {total_entries} entries; Scanning: {:?}",

View file

@ -1,13 +1,13 @@
use crate::{
file_paths_db_fetcher_fn, invalidate_query,
job::JobError,
library::Library,
location::{
indexer::{
execute_indexer_update_step, reverse_update_directories_sizes, IndexerJobUpdateStep,
execute_indexer_update_step, reverse_update_directories_sizes, OldIndexerJobUpdateStep,
},
scan_location_sub_path, update_location_size,
},
old_job::JobError,
to_remove_db_fetcher_fn, Node,
};
@ -29,14 +29,14 @@ use tracing::{debug, error};
use super::{
execute_indexer_save_step, iso_file_path_factory, location_with_indexer_rules,
remove_non_existing_file_paths, rules::IndexerRule, walk::walk_single_dir, IndexerError,
IndexerJobSaveStep,
old_walk::walk_single_dir, remove_non_existing_file_paths, rules::IndexerRule, IndexerError,
OldIndexerJobSaveStep,
};
/// BATCH_SIZE is the number of files to index at each step, writing the chunk of files metadata in the database.
const BATCH_SIZE: usize = 1000;
pub async fn shallow(
pub async fn old_shallow(
location: &location_with_indexer_rules::Data,
sub_path: &PathBuf,
node: &Arc<Node>,
@ -127,7 +127,7 @@ pub async fn shallow(
new_directories_to_scan.insert(new_dir);
});
IndexerJobSaveStep {
OldIndexerJobSaveStep {
chunk_idx: i,
walked,
}
@ -160,7 +160,7 @@ pub async fn shallow(
let to_update = chunk.collect::<Vec<_>>();
to_update_count += to_update.len();
IndexerJobUpdateStep {
OldIndexerJobUpdateStep {
chunk_idx: i,
to_update,
}

View file

@ -1,6 +1,6 @@
use crate::{
job::JobManagerError,
library::{Library, LibraryManagerEvent},
old_job::JobManagerError,
Node,
};

View file

@ -75,7 +75,7 @@ impl<'lib> EventHandler<'lib> for LinuxEventHandler<'lib> {
match kind {
EventKind::Create(CreateKind::File)
| EventKind::Modify(ModifyKind::Data(DataChange::Any)) => {
// When we receive a create, modify data or metadata events of the abore kinds
// When we receive a create, modify data or metadata events of the above kinds
// we just mark the file to be updated in a near future
// each consecutive event of these kinds that we receive for the same file
// we just store the path again in the map below, with a new instant
@ -112,7 +112,7 @@ impl<'lib> EventHandler<'lib> for LinuxEventHandler<'lib> {
.await?;
}
EventKind::Modify(ModifyKind::Name(RenameMode::From)) => {
// Just in case we can't garantee that we receive the Rename From event before the
// Just in case we can't guarantee that we receive the Rename From event before the
// Rename Both event. Just a safeguard
if self.recently_renamed_from.remove(&paths[0]).is_none() {
self.rename_from.insert(paths.remove(0), Instant::now());

View file

@ -1,5 +1,5 @@
//! On MacOS, we use the FSEvents backend of notify-rs and Rename events are pretty complicated;
//! There are just (ModifyKind::Name(RenameMode::Any) events and nothing else.
//! There are just ModifyKind::Name(RenameMode::Any) events and nothing else.
//! This means that we have to link the old path with the new path to know which file was renamed.
//! But you can't forget that renames events aren't always the case that I file name was modified,
//! but its path was modified. So we have to check if the file was moved. When a file is moved
@ -94,7 +94,7 @@ impl<'lib> EventHandler<'lib> for MacOsEventHandler<'lib> {
// NOTE: This is a MacOS specific event that happens when a folder is created
// trough Finder. It creates a folder but 2 events are triggered in
// FSEvents. So we store and check the latest created folder to avoid
// hiting a unique constraint in the database
// hitting a unique constraint in the database
return Ok(());
}
}
@ -119,7 +119,7 @@ impl<'lib> EventHandler<'lib> for MacOsEventHandler<'lib> {
| EventKind::Modify(ModifyKind::Metadata(
MetadataKind::WriteTime | MetadataKind::Extended,
)) => {
// When we receive a create, modify data or metadata events of the abore kinds
// When we receive a create, modify data or metadata events of the above kinds
// we just mark the file to be updated in a near future
// each consecutive event of these kinds that we receive for the same file
// we just store the path again in the map below, with a new instant

View file

@ -7,12 +7,12 @@ use crate::{
manager::LocationManagerError, scan_location_sub_path, update_location_size,
},
object::{
file_identifier::FileMetadata,
media::{
media_data_extractor::{can_extract_media_data_for_image, extract_media_data},
media_data_image_to_query_params,
thumbnail::get_indexed_thumbnail_path,
old_thumbnail::get_indexed_thumbnail_path,
},
old_file_identifier::FileMetadata,
validation::hash::file_checksum,
},
Node,

View file

@ -1,11 +1,11 @@
use crate::{
invalidate_query,
job::{JobBuilder, JobError, JobManagerError},
library::Library,
object::{
file_identifier::{self, file_identifier_job::FileIdentifierJobInit},
media::{media_processor, MediaProcessorJobInit},
media::{old_media_processor, OldMediaProcessorJobInit},
old_file_identifier::{self, old_file_identifier_job::OldFileIdentifierJobInit},
},
old_job::{JobBuilder, JobError, JobManagerError},
Node,
};
@ -47,7 +47,7 @@ pub mod metadata;
pub mod non_indexed;
pub use error::LocationError;
use indexer::IndexerJobInit;
use indexer::OldIndexerJobInit;
pub use manager::{LocationManagerError, Locations};
use metadata::SpacedriveLocationMetadataFile;
@ -451,18 +451,18 @@ pub async fn scan_location(
let location_base_data = location::Data::from(&location);
JobBuilder::new(IndexerJobInit {
JobBuilder::new(OldIndexerJobInit {
location,
sub_path: None,
})
.with_action("scan_location")
.with_metadata(json!({"location": location_base_data.clone()}))
.build()
.queue_next(FileIdentifierJobInit {
.queue_next(OldFileIdentifierJobInit {
location: location_base_data.clone(),
sub_path: None,
})
.queue_next(MediaProcessorJobInit {
.queue_next(OldMediaProcessorJobInit {
location: location_base_data,
sub_path: None,
regenerate_thumbnails: false,
@ -488,7 +488,7 @@ pub async fn scan_location_sub_path(
let location_base_data = location::Data::from(&location);
JobBuilder::new(IndexerJobInit {
JobBuilder::new(OldIndexerJobInit {
location,
sub_path: Some(sub_path.clone()),
})
@ -498,11 +498,11 @@ pub async fn scan_location_sub_path(
"sub_path": sub_path.clone(),
}))
.build()
.queue_next(FileIdentifierJobInit {
.queue_next(OldFileIdentifierJobInit {
location: location_base_data.clone(),
sub_path: Some(sub_path.clone()),
})
.queue_next(MediaProcessorJobInit {
.queue_next(OldMediaProcessorJobInit {
location: location_base_data,
sub_path: Some(sub_path),
regenerate_thumbnails: false,
@ -528,9 +528,9 @@ pub async fn light_scan_location(
let location_base_data = location::Data::from(&location);
indexer::shallow(&location, &sub_path, &node, &library).await?;
file_identifier::shallow(&location_base_data, &sub_path, &library).await?;
media_processor::shallow(
indexer::old_shallow(&location, &sub_path, &node, &library).await?;
old_file_identifier::old_shallow(&location_base_data, &sub_path, &library).await?;
old_media_processor::old_shallow(
&location_base_data,
&sub_path,
&library,
@ -603,7 +603,7 @@ pub(crate) fn normalize_path(path: impl AsRef<Path>) -> io::Result<(String, Stri
.and_then(|normalized_path| {
if cfg!(windows) {
// Use normalized path as main path on Windows
// This ensures we always receive a valid windows formated path
// This ensures we always receive a valid windows formatted path
// ex: /Users/JohnDoe/Downloads will become C:\Users\JohnDoe\Downloads
// Internally `normalize` calls `GetFullPathNameW` on Windows
// https://learn.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-getfullpathnamew

View file

@ -3,7 +3,7 @@ use crate::{
library::Library,
object::{
cas::generate_cas_id,
media::thumbnail::{get_ephemeral_thumb_key, BatchToProcess, GenerateThumbnailArgs},
media::old_thumbnail::{get_ephemeral_thumb_key, BatchToProcess, GenerateThumbnailArgs},
},
Node,
};

View file

@ -1,6 +1,6 @@
use crate::{
api::{notifications::Notification, BackendFeature},
object::media::thumbnail::preferences::ThumbnailerPreferences,
object::media::old_thumbnail::preferences::ThumbnailerPreferences,
util::version_manager::{Kind, ManagedVersion, VersionManager, VersionManagerError},
};
@ -80,7 +80,7 @@ pub struct NodeConfig {
/// URL of the Spacedrive API
#[serde(default, skip_serializing_if = "Option::is_none")]
pub sd_api_origin: Option<String>,
/// The aggreagation of many different preferences for the node
/// The aggregation of many different preferences for the node
pub preferences: NodePreferences,
// Model version for the image labeler
pub image_labeler_version: Option<String>,
@ -145,7 +145,7 @@ impl ManagedVersion<NodeConfigVersion> for NodeConfig {
name.truncate(250);
#[cfg(feature = "ai")]
let image_labeler_version = Some(sd_ai::image_labeler::DEFAULT_MODEL_VERSION.to_string());
let image_labeler_version = Some(sd_ai::old_image_labeler::DEFAULT_MODEL_VERSION.to_string());
#[cfg(not(feature = "ai"))]
let image_labeler_version = None;
@ -301,7 +301,7 @@ impl Manager {
#[cfg(feature = "ai")]
if config.image_labeler_version.is_none() {
config.image_labeler_version =
Some(sd_ai::image_labeler::DEFAULT_MODEL_VERSION.to_string());
Some(sd_ai::old_image_labeler::DEFAULT_MODEL_VERSION.to_string());
}
#[cfg(not(feature = "ai"))]

View file

@ -24,8 +24,8 @@ impl Notifications {
}
/// DO NOT USE THIS. Use `Node::emit_notification` or `Library::emit_notification` instead.
pub fn _internal_send(&self, notif: Notification) {
self.0.send(notif).ok();
pub fn _internal_send(&self, notification: Notification) {
self.0.send(notification).ok();
}
pub fn _internal_next_id(&self) -> u32 {

View file

@ -16,11 +16,11 @@ use once_cell::sync::Lazy;
use regex::Regex;
use serde::{Deserialize, Serialize};
pub mod delete;
pub mod erase;
pub mod old_delete;
pub mod old_erase;
pub mod copy;
pub mod cut;
pub mod old_copy;
pub mod old_cut;
// pub mod decrypt;
// pub mod encrypt;

View file

@ -1,10 +1,10 @@
use crate::{
invalidate_query,
job::{
library::Library,
old_job::{
CurrentStep, JobError, JobInitOutput, JobResult, JobRunErrors, JobStepOutput, StatefulJob,
WorkerContext,
},
library::Library,
};
use sd_file_path_helper::{join_location_relative_path, IsolatedFilePathData};
@ -27,12 +27,12 @@ use super::{
};
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct FileCopierJobData {
pub struct OldFileCopierJobData {
sources_location_path: PathBuf,
}
#[derive(Serialize, Deserialize, Hash, Type, Debug)]
pub struct FileCopierJobInit {
pub struct OldFileCopierJobInit {
pub source_location_id: location::id::Type,
pub target_location_id: location::id::Type,
pub sources_file_path_ids: Vec<file_path::id::Type>,
@ -40,15 +40,15 @@ pub struct FileCopierJobInit {
}
#[derive(Serialize, Deserialize, Debug)]
pub struct FileCopierJobStep {
pub struct OldFileCopierJobStep {
pub source_file_data: FileData,
pub target_full_path: PathBuf,
}
#[async_trait::async_trait]
impl StatefulJob for FileCopierJobInit {
type Data = FileCopierJobData;
type Step = FileCopierJobStep;
impl StatefulJob for OldFileCopierJobInit {
type Data = OldFileCopierJobData;
type Step = OldFileCopierJobStep;
type RunMetadata = ();
const NAME: &'static str = "file_copier";
@ -90,7 +90,7 @@ impl StatefulJob for FileCopierJobInit {
find_available_filename_for_duplicate(full_target_path).await?;
}
Ok::<_, FileSystemJobsError>(FileCopierJobStep {
Ok::<_, FileSystemJobsError>(OldFileCopierJobStep {
source_file_data: file_data,
target_full_path: full_target_path,
})
@ -99,7 +99,7 @@ impl StatefulJob for FileCopierJobInit {
.try_join()
.await?;
*data = Some(FileCopierJobData {
*data = Some(OldFileCopierJobData {
sources_location_path,
});
@ -110,7 +110,7 @@ impl StatefulJob for FileCopierJobInit {
&self,
ctx: &WorkerContext,
CurrentStep {
step: FileCopierJobStep {
step: OldFileCopierJobStep {
source_file_data,
target_full_path,
},
@ -163,7 +163,7 @@ impl StatefulJob for FileCopierJobInit {
{
Ok(source_file_data) => {
// Currently not supporting file_name suffixes children files in a directory being copied
more_steps.push(FileCopierJobStep {
more_steps.push(OldFileCopierJobStep {
target_full_path: target_children_full_path,
source_file_data,
});

View file

@ -1,11 +1,11 @@
use crate::{
invalidate_query,
job::{
library::Library,
object::fs::{construct_target_filename, error::FileSystemJobsError},
old_job::{
CurrentStep, JobError, JobInitOutput, JobResult, JobRunErrors, JobStepOutput, StatefulJob,
WorkerContext,
},
library::Library,
object::fs::{construct_target_filename, error::FileSystemJobsError},
};
use sd_file_path_helper::push_location_relative_path;
@ -23,7 +23,7 @@ use tracing::{trace, warn};
use super::{fetch_source_and_target_location_paths, get_many_files_datas, FileData};
#[derive(Serialize, Deserialize, Hash, Type, Debug)]
pub struct FileCutterJobInit {
pub struct OldFileCutterJobInit {
pub source_location_id: location::id::Type,
pub target_location_id: location::id::Type,
pub sources_file_path_ids: Vec<file_path::id::Type>,
@ -31,13 +31,13 @@ pub struct FileCutterJobInit {
}
#[derive(Serialize, Deserialize, Debug)]
pub struct FileCutterJobData {
pub struct OldFileCutterJobData {
full_target_directory_path: PathBuf,
}
#[async_trait::async_trait]
impl StatefulJob for FileCutterJobInit {
type Data = FileCutterJobData;
impl StatefulJob for OldFileCutterJobInit {
type Data = OldFileCutterJobData;
type Step = FileData;
type RunMetadata = ();
@ -68,7 +68,7 @@ impl StatefulJob for FileCutterJobInit {
&init.target_location_relative_directory_path,
);
*data = Some(FileCutterJobData {
*data = Some(OldFileCutterJobData {
full_target_directory_path,
});

View file

@ -1,10 +1,10 @@
use crate::{
invalidate_query,
job::{
CurrentStep, JobError, JobInitOutput, JobResult, JobStepOutput, StatefulJob, WorkerContext,
},
library::Library,
location::get_location_path_from_location_id,
old_job::{
CurrentStep, JobError, JobInitOutput, JobResult, JobStepOutput, StatefulJob, WorkerContext,
},
};
use sd_prisma::{
@ -25,13 +25,13 @@ use tracing::warn;
use super::{error::FileSystemJobsError, get_many_files_datas, FileData};
#[derive(Serialize, Deserialize, Hash, Type, Debug)]
pub struct FileDeleterJobInit {
pub struct OldFileDeleterJobInit {
pub location_id: location::id::Type,
pub file_path_ids: Vec<file_path::id::Type>,
}
#[async_trait::async_trait]
impl StatefulJob for FileDeleterJobInit {
impl StatefulJob for OldFileDeleterJobInit {
type Data = ();
type Step = FileData;
type RunMetadata = ();
@ -72,7 +72,7 @@ impl StatefulJob for FileDeleterJobInit {
_: &Self::RunMetadata,
) -> Result<JobStepOutput<Self::Step, Self::RunMetadata>, JobError> {
// need to handle stuff such as querying prisma for all paths of a file, and deleting all of those if requested (with a checkbox in the ui)
// maybe a files.countOccurances/and or files.getPath(location_id, path_id) to show how many of these files would be deleted (and where?)
// maybe a files.countOccurrences/and or files.getPath(location_id, path_id) to show how many of these files would be deleted (and where?)
let Library { db, sync, .. } = ctx.library.as_ref();

View file

@ -1,11 +1,11 @@
use crate::{
invalidate_query,
job::{
library::Library,
location::get_location_path_from_location_id,
old_job::{
CurrentStep, JobError, JobInitOutput, JobResult, JobRunMetadata, JobStepOutput,
StatefulJob, WorkerContext,
},
library::Library,
location::get_location_path_from_location_id,
};
use sd_file_path_helper::IsolatedFilePathData;
@ -31,7 +31,7 @@ use super::{
#[serde_as]
#[derive(Serialize, Deserialize, Hash, Type, Debug)]
pub struct FileEraserJobInit {
pub struct OldFileEraserJobInit {
pub location_id: location::id::Type,
pub file_path_ids: Vec<file_path::id::Type>,
#[specta(type = String)]
@ -40,25 +40,25 @@ pub struct FileEraserJobInit {
}
#[derive(Serialize, Deserialize, Debug)]
pub struct FileEraserJobData {
pub struct OldFileEraserJobData {
location_path: PathBuf,
}
#[derive(Serialize, Deserialize, Default, Debug)]
pub struct FileEraserJobRunMetadata {
diretories_to_remove: Vec<PathBuf>,
directories_to_remove: Vec<PathBuf>,
}
impl JobRunMetadata for FileEraserJobRunMetadata {
fn update(&mut self, new_data: Self) {
self.diretories_to_remove
.extend(new_data.diretories_to_remove);
self.directories_to_remove
.extend(new_data.directories_to_remove);
}
}
#[async_trait::async_trait]
impl StatefulJob for FileEraserJobInit {
type Data = FileEraserJobData;
impl StatefulJob for OldFileEraserJobInit {
type Data = OldFileEraserJobData;
type Step = FileData;
type RunMetadata = FileEraserJobRunMetadata;
@ -82,7 +82,7 @@ impl StatefulJob for FileEraserJobInit {
let steps = get_many_files_datas(db, &location_path, &init.file_path_ids).await?;
*data = Some(FileEraserJobData { location_path });
*data = Some(OldFileEraserJobData { location_path });
Ok((Default::default(), steps).into())
}
@ -97,7 +97,7 @@ impl StatefulJob for FileEraserJobInit {
let init = self;
// need to handle stuff such as querying prisma for all paths of a file, and deleting all of those if requested (with a checkbox in the ui)
// maybe a files.countOccurances/and or files.getPath(location_id, path_id) to show how many of these files would be erased (and where?)
// maybe a files.countOccurrences/and or files.getPath(location_id, path_id) to show how many of these files would be erased (and where?)
let mut new_metadata = Self::RunMetadata::default();
@ -135,7 +135,7 @@ impl StatefulJob for FileEraserJobInit {
);
}
new_metadata
.diretories_to_remove
.directories_to_remove
.push(step.full_path.clone());
Ok((more_steps, new_metadata).into())
@ -186,7 +186,7 @@ impl StatefulJob for FileEraserJobInit {
let init = self;
try_join_all(
run_metadata
.diretories_to_remove
.directories_to_remove
.iter()
.cloned()
.map(|data| async {

View file

@ -1,4 +1,4 @@
use crate::job::JobRunErrors;
use crate::old_job::JobRunErrors;
use sd_file_ext::extensions::{Extension, ImageExtension, ALL_IMAGE_EXTENSIONS};
use sd_file_path_helper::{file_path_for_media_processor, IsolatedFilePathData};
@ -31,7 +31,7 @@ pub enum MediaDataError {
}
#[derive(Serialize, Deserialize, Default, Debug)]
pub struct MediaDataExtractorMetadata {
pub struct OldMediaDataExtractorMetadata {
pub extracted: u32,
pub skipped: u32,
}
@ -68,8 +68,8 @@ pub async fn process(
location_path: impl AsRef<Path>,
db: &PrismaClient,
ctx_update_fn: &impl Fn(usize),
) -> Result<(MediaDataExtractorMetadata, JobRunErrors), MediaDataError> {
let mut run_metadata = MediaDataExtractorMetadata::default();
) -> Result<(OldMediaDataExtractorMetadata, JobRunErrors), MediaDataError> {
let mut run_metadata = OldMediaDataExtractorMetadata::default();
if files_paths.is_empty() {
return Ok((run_metadata, JobRunErrors::default()));
}

View file

@ -1,8 +1,8 @@
pub mod media_data_extractor;
pub mod media_processor;
pub mod thumbnail;
pub mod old_media_processor;
pub mod old_thumbnail;
pub use media_processor::MediaProcessorJobInit;
pub use old_media_processor::OldMediaProcessorJobInit;
use sd_media_metadata::ImageMetadata;
use sd_prisma::prisma::media_data::*;

View file

@ -1,15 +1,15 @@
use crate::{
invalidate_query,
job::{
library::Library,
old_job::{
CurrentStep, JobError, JobInitOutput, JobReportUpdate, JobResult, JobStepOutput,
StatefulJob, WorkerContext,
},
library::Library,
Node,
};
#[cfg(feature = "ai")]
use crate::job::JobRunErrors;
use crate::old_job::JobRunErrors;
use sd_file_ext::extensions::Extension;
use sd_file_path_helper::{
@ -20,7 +20,7 @@ use sd_prisma::prisma::{location, PrismaClient};
use sd_utils::db::maybe_missing;
#[cfg(feature = "ai")]
use sd_ai::image_labeler::{BatchToken as ImageLabelerBatchToken, LabelerOutput};
use sd_ai::old_image_labeler::{BatchToken as ImageLabelerBatchToken, LabelerOutput};
#[cfg(feature = "ai")]
use std::sync::Arc;
@ -42,22 +42,22 @@ use tokio::time::sleep;
use tracing::{debug, error, info, trace, warn};
use super::{
media_data_extractor, process,
thumbnail::{self, GenerateThumbnailArgs},
BatchToProcess, MediaProcessorError, MediaProcessorMetadata,
media_data_extractor,
old_thumbnail::{self, GenerateThumbnailArgs},
process, BatchToProcess, MediaProcessorError, OldMediaProcessorMetadata,
};
const BATCH_SIZE: usize = 10;
#[derive(Serialize, Deserialize, Debug)]
pub struct MediaProcessorJobInit {
pub struct OldMediaProcessorJobInit {
pub location: location::Data,
pub sub_path: Option<PathBuf>,
pub regenerate_thumbnails: bool,
pub regenerate_labels: bool,
}
impl Hash for MediaProcessorJobInit {
impl Hash for OldMediaProcessorJobInit {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
self.location.id.hash(state);
if let Some(ref sub_path) = self.sub_path {
@ -67,7 +67,7 @@ impl Hash for MediaProcessorJobInit {
}
#[derive(Debug, Serialize, Deserialize)]
pub struct MediaProcessorJobData {
pub struct OldMediaProcessorJobData {
location_path: PathBuf,
to_process_path: PathBuf,
#[serde(skip, default)]
@ -80,7 +80,7 @@ pub struct MediaProcessorJobData {
}
#[derive(Debug, Serialize, Deserialize)]
pub enum MediaProcessorJobStep {
pub enum OldMediaProcessorJobStep {
ExtractMediaData(Vec<file_path_for_media_processor::Data>),
WaitThumbnails(usize),
#[cfg(feature = "ai")]
@ -88,10 +88,10 @@ pub enum MediaProcessorJobStep {
}
#[async_trait::async_trait]
impl StatefulJob for MediaProcessorJobInit {
type Data = MediaProcessorJobData;
type Step = MediaProcessorJobStep;
type RunMetadata = MediaProcessorMetadata;
impl StatefulJob for OldMediaProcessorJobInit {
type Data = OldMediaProcessorJobData;
type Step = OldMediaProcessorJobStep;
type RunMetadata = OldMediaProcessorMetadata;
const NAME: &'static str = "media_processor";
const IS_BATCHED: bool = true;
@ -179,7 +179,7 @@ impl StatefulJob for MediaProcessorJobInit {
#[cfg(feature = "ai")]
let (labeler_batch_token, labels_rx) =
if let Some(image_labeller) = ctx.node.image_labeller.as_ref() {
if let Some(image_labeller) = ctx.node.old_image_labeller.as_ref() {
let (labeler_batch_token, labels_rx) = image_labeller
.new_resumable_batch(
location_id,
@ -201,13 +201,11 @@ impl StatefulJob for MediaProcessorJobInit {
.chunks(BATCH_SIZE)
.into_iter()
.map(|chunk| chunk.collect::<Vec<_>>())
.map(MediaProcessorJobStep::ExtractMediaData)
.map(OldMediaProcessorJobStep::ExtractMediaData)
.chain(
[
(thumbs_to_process_count > 0).then_some(MediaProcessorJobStep::WaitThumbnails(
thumbs_to_process_count as usize,
)),
]
[(thumbs_to_process_count > 0).then_some(
OldMediaProcessorJobStep::WaitThumbnails(thumbs_to_process_count as usize),
)]
.into_iter()
.flatten(),
)
@ -215,8 +213,9 @@ impl StatefulJob for MediaProcessorJobInit {
[
#[cfg(feature = "ai")]
{
(total_files_for_labeling > 0)
.then_some(MediaProcessorJobStep::WaitLabels(total_files_for_labeling))
(total_files_for_labeling > 0).then_some(
OldMediaProcessorJobStep::WaitLabels(total_files_for_labeling),
)
},
#[cfg(not(feature = "ai"))]
{
@ -237,7 +236,7 @@ impl StatefulJob for MediaProcessorJobInit {
)),
]);
*data = Some(MediaProcessorJobData {
*data = Some(OldMediaProcessorJobData {
location_path,
to_process_path,
maybe_thumbnailer_progress_rx,
@ -265,7 +264,7 @@ impl StatefulJob for MediaProcessorJobInit {
_: &Self::RunMetadata,
) -> Result<JobStepOutput<Self::Step, Self::RunMetadata>, JobError> {
match step {
MediaProcessorJobStep::ExtractMediaData(file_paths) => process(
OldMediaProcessorJobStep::ExtractMediaData(file_paths) => process(
file_paths,
self.location.id,
&data.location_path,
@ -280,7 +279,7 @@ impl StatefulJob for MediaProcessorJobInit {
.map(Into::into)
.map_err(Into::into),
MediaProcessorJobStep::WaitThumbnails(total_thumbs) => {
OldMediaProcessorJobStep::WaitThumbnails(total_thumbs) => {
ctx.progress(vec![
JobReportUpdate::TaskCount(*total_thumbs),
JobReportUpdate::Phase("thumbnails".to_string()),
@ -326,8 +325,8 @@ impl StatefulJob for MediaProcessorJobInit {
}
#[cfg(feature = "ai")]
MediaProcessorJobStep::WaitLabels(total_labels) => {
let Some(image_labeller) = ctx.node.image_labeller.as_ref() else {
OldMediaProcessorJobStep::WaitLabels(total_labels) => {
let Some(image_labeller) = ctx.node.old_image_labeller.as_ref() else {
let err = "AI system is disabled due to a previous error, skipping labels job";
error!(err);
return Ok(JobRunErrors(vec![err.to_string()]).into());
@ -433,7 +432,7 @@ async fn dispatch_thumbnails_for_processing(
let mut file_paths = get_all_children_files_by_extensions(
db,
parent_iso_file_path,
&thumbnail::ALL_THUMBNAILABLE_EXTENSIONS,
&old_thumbnail::ALL_THUMBNAILABLE_EXTENSIONS,
)
.await?;
@ -526,7 +525,7 @@ async fn get_files_for_labeling(
AND materialized_path LIKE {{}}
{}
ORDER BY materialized_path ASC",
// Orderind by materialized_path so we can prioritize processing the first files
// Ordering by materialized_path so we can prioritize processing the first files
// in the above part of the directories tree
&media_data_extractor::FILTERED_IMAGE_EXTENSIONS
.iter()
@ -569,7 +568,7 @@ async fn get_all_children_files_by_extensions(
AND LOWER(extension) IN ({})
AND materialized_path LIKE {{}}
ORDER BY materialized_path ASC",
// Orderind by materialized_path so we can prioritize processing the first files
// Ordering by materialized_path so we can prioritize processing the first files
// in the above part of the directories tree
extensions
.iter()

View file

@ -1,4 +1,4 @@
use crate::job::{JobRunErrors, JobRunMetadata};
use crate::old_job::{JobRunErrors, JobRunMetadata};
use sd_file_path_helper::{file_path_for_media_processor, FilePathError};
use sd_prisma::prisma::{location, PrismaClient};
@ -10,15 +10,15 @@ use thiserror::Error;
use tracing::error;
use super::{
media_data_extractor::{self, MediaDataError, MediaDataExtractorMetadata},
thumbnail::{self, BatchToProcess, ThumbnailerError},
media_data_extractor::{self, MediaDataError, OldMediaDataExtractorMetadata},
old_thumbnail::{self, BatchToProcess, ThumbnailerError},
};
mod job;
mod shallow;
pub use job::MediaProcessorJobInit;
pub use shallow::shallow;
pub use job::OldMediaProcessorJobInit;
pub use shallow::old_shallow;
#[derive(Error, Debug)]
pub enum MediaProcessorError {
@ -37,14 +37,14 @@ pub enum MediaProcessorError {
}
#[derive(Debug, Serialize, Deserialize, Default)]
pub struct MediaProcessorMetadata {
media_data: MediaDataExtractorMetadata,
pub struct OldMediaProcessorMetadata {
media_data: OldMediaDataExtractorMetadata,
thumbs_processed: u32,
labels_extracted: u32,
}
impl From<MediaDataExtractorMetadata> for MediaProcessorMetadata {
fn from(media_data: MediaDataExtractorMetadata) -> Self {
impl From<OldMediaDataExtractorMetadata> for OldMediaProcessorMetadata {
fn from(media_data: OldMediaDataExtractorMetadata) -> Self {
Self {
media_data,
thumbs_processed: 0,
@ -53,7 +53,7 @@ impl From<MediaDataExtractorMetadata> for MediaProcessorMetadata {
}
}
impl JobRunMetadata for MediaProcessorMetadata {
impl JobRunMetadata for OldMediaProcessorMetadata {
fn update(&mut self, new_data: Self) {
self.media_data.extracted += new_data.media_data.extracted;
self.media_data.skipped += new_data.media_data.skipped;
@ -68,7 +68,7 @@ pub async fn process(
location_path: impl AsRef<Path>,
db: &PrismaClient,
ctx_update_fn: &impl Fn(usize),
) -> Result<(MediaProcessorMetadata, JobRunErrors), MediaProcessorError> {
) -> Result<(OldMediaProcessorMetadata, JobRunErrors), MediaProcessorError> {
// Add here new kinds of media processing if necessary in the future
media_data_extractor::process(files_paths, location_id, location_path, db, ctx_update_fn)

View file

@ -1,8 +1,8 @@
use crate::{
invalidate_query,
job::{JobError, JobRunMetadata},
library::Library,
object::media::thumbnail::GenerateThumbnailArgs,
object::media::old_thumbnail::GenerateThumbnailArgs,
old_job::{JobError, JobRunMetadata},
Node,
};
@ -15,7 +15,7 @@ use sd_prisma::prisma::{location, PrismaClient};
use sd_utils::db::maybe_missing;
#[cfg(feature = "ai")]
use sd_ai::image_labeler::LabelerOutput;
use sd_ai::old_image_labeler::LabelerOutput;
use std::path::{Path, PathBuf};
@ -31,13 +31,13 @@ use futures::StreamExt;
use super::{
media_data_extractor::{self, process},
thumbnail::{self, BatchToProcess},
MediaProcessorError, MediaProcessorMetadata,
old_thumbnail::{self, BatchToProcess},
MediaProcessorError, OldMediaProcessorMetadata,
};
const BATCH_SIZE: usize = 10;
pub async fn shallow(
pub async fn old_shallow(
location: &location::Data,
sub_path: &PathBuf,
library @ Library { db, sync, .. }: &Library,
@ -110,7 +110,7 @@ pub async fn shallow(
#[cfg(feature = "ai")]
// Check if we have an image labeller and has_labels then enqueue a new batch
let labels_rx = node.image_labeller.as_ref().and_then(|image_labeller| {
let labels_rx = node.old_image_labeller.as_ref().and_then(|image_labeller| {
has_labels.then(|| {
image_labeller.new_batch(
location_id,
@ -122,7 +122,7 @@ pub async fn shallow(
})
});
let mut run_metadata = MediaProcessorMetadata::default();
let mut run_metadata = OldMediaProcessorMetadata::default();
for files in chunked_files {
let (more_run_metadata, errors) = process(&files, location.id, &location_path, db, &|_| {})
@ -245,7 +245,7 @@ async fn dispatch_thumbnails_for_processing(
let file_paths = get_files_by_extensions(
db,
parent_iso_file_path,
&thumbnail::ALL_THUMBNAILABLE_EXTENSIONS,
&old_thumbnail::ALL_THUMBNAILABLE_EXTENSIONS,
)
.await?;

View file

@ -1,6 +1,6 @@
use crate::{
library::{Libraries, LibraryId},
object::media::thumbnail::ONE_SEC,
object::media::old_thumbnail::ONE_SEC,
util::version_manager::{Kind, ManagedVersion, VersionManager, VersionManagerError},
};

View file

@ -19,9 +19,9 @@ use thiserror::Error;
use tokio::task;
use tracing::error;
pub mod actor;
mod clean_up;
mod directory;
pub mod old_actor;
pub mod preferences;
mod process;
mod shard;

View file

@ -27,7 +27,7 @@ use super::{
directory::init_thumbnail_dir,
process::{generate_thumbnail, ThumbData},
state::RegisterReporter,
worker::{worker, WorkerChannels},
worker::{old_worker, WorkerChannels},
BatchToProcess, ThumbnailKind, ThumbnailerError, ONE_SEC, THUMBNAIL_CACHE_DIR_NAME,
};
@ -60,7 +60,7 @@ pub(super) enum DatabaseMessage {
// └── <library_id>/ # we segregate thumbnails by library
// └── <cas_id>[0..3]/ # sharding
// └── <cas_id>.webp
pub struct Thumbnailer {
pub struct OldThumbnailer {
thumbnails_directory: Arc<PathBuf>,
cas_ids_to_delete_tx: chan::Sender<(Vec<String>, ThumbnailKind)>,
thumbnails_to_generate_tx: chan::Sender<(BatchToProcess, ThumbnailKind)>,
@ -70,7 +70,7 @@ pub struct Thumbnailer {
cancel_tx: chan::Sender<oneshot::Sender<()>>,
}
impl Thumbnailer {
impl OldThumbnailer {
pub async fn new(
data_dir: impl AsRef<Path>,
libraries_manager: Arc<Libraries>,
@ -112,7 +112,7 @@ impl Thumbnailer {
let node_preferences = node_preferences_rx.clone();
async move {
while let Err(e) = spawn(worker(
while let Err(e) = spawn(old_worker(
*AVAILABLE_PARALLELISM
.get()
.expect("BATCH_SIZE is set at thumbnailer new method"),

View file

@ -1,7 +1,7 @@
use crate::api::CoreEvent;
use sd_file_ext::extensions::{DocumentExtension, ImageExtension};
use sd_images::{format_image, scale_dimensions, ConvertableExtension};
use sd_images::{format_image, scale_dimensions, ConvertibleExtension};
use sd_media_metadata::image::Orientation;
use sd_prisma::prisma::location;
use sd_utils::error::FileIOError;
@ -129,7 +129,7 @@ pub(super) async fn batch_processor(
let batch_size = batch.len();
// Tranforming to `VecDeque` so we don't need to move anything as we consume from the beginning
// Transforming to `VecDeque` so we don't need to move anything as we consume from the beginning
// This from is guaranteed to be O(1)
let mut queue = VecDeque::from(batch);
@ -364,7 +364,7 @@ pub(super) async fn generate_thumbnail(
#[cfg(feature = "ffmpeg")]
{
use crate::object::media::thumbnail::can_generate_thumbnail_for_video;
use crate::object::media::old_thumbnail::can_generate_thumbnail_for_video;
use sd_file_ext::extensions::VideoExtension;
if let Ok(extension) = VideoExtension::from_str(extension) {
@ -419,8 +419,8 @@ async fn generate_image_thumbnail(
// this corrects the rotation/flip of the image based on the *available* exif data
// not all images have exif data, so we don't error. we also don't rotate HEIF as that's against the spec
if let Some(orientation) = Orientation::from_path(&file_path) {
if ConvertableExtension::try_from(file_path.as_ref())
.expect("we already checked if the image was convertable")
if ConvertibleExtension::try_from(file_path.as_ref())
.expect("we already checked if the image was convertible")
.should_rotate()
{
img = orientation.correct_thumbnail(img);

View file

@ -16,11 +16,12 @@ use tokio::{fs, io};
use tracing::{error, info, trace};
use super::{
actor::ActorError, get_shard_hex, BatchToProcess, ThumbnailKind, EPHEMERAL_DIR, SAVE_STATE_FILE,
get_shard_hex, old_actor::ActorError, BatchToProcess, ThumbnailKind, EPHEMERAL_DIR,
SAVE_STATE_FILE,
};
#[derive(Debug, Serialize, Deserialize)]
pub(super) struct ThumbsProcessingSaveState {
pub(super) struct OldThumbsProcessingSaveState {
pub(super) bookkeeper: BookKeeper,
pub(super) ephemeral_file_names: HashSet<OsString>,
// This queues doubles as LIFO and FIFO, assuming LIFO in case of users asking for a new batch
@ -31,7 +32,7 @@ pub(super) struct ThumbsProcessingSaveState {
pub(super) ephemeral_leftovers_queue: VecDeque<BatchToProcess>,
}
impl Default for ThumbsProcessingSaveState {
impl Default for OldThumbsProcessingSaveState {
fn default() -> Self {
Self {
bookkeeper: BookKeeper::default(),
@ -43,7 +44,7 @@ impl Default for ThumbsProcessingSaveState {
}
}
impl ThumbsProcessingSaveState {
impl OldThumbsProcessingSaveState {
pub(super) async fn load(thumbnails_directory: impl AsRef<Path>) -> Self {
let resume_file = thumbnails_directory.as_ref().join(SAVE_STATE_FILE);

View file

@ -18,11 +18,11 @@ use tokio_stream::{
use tracing::{debug, error, trace};
use super::{
actor::DatabaseMessage,
clean_up::{process_ephemeral_clean_up, process_indexed_clean_up},
old_actor::DatabaseMessage,
preferences::ThumbnailerPreferences,
process::{batch_processor, ProcessorControlChannels},
state::{remove_by_cas_ids, RegisterReporter, ThumbsProcessingSaveState},
state::{remove_by_cas_ids, OldThumbsProcessingSaveState, RegisterReporter},
BatchToProcess, ThumbnailKind, HALF_HOUR, ONE_SEC, THIRTY_SECS,
};
@ -35,7 +35,7 @@ pub(super) struct WorkerChannels {
pub(super) cancel_rx: chan::Receiver<oneshot::Sender<()>>,
}
pub(super) async fn worker(
pub(super) async fn old_worker(
available_parallelism: usize,
node_preferences_rx: watch::Receiver<NodePreferences>,
reporter: broadcast::Sender<CoreEvent>,
@ -71,13 +71,13 @@ pub(super) async fn worker(
IdleTick,
}
let ThumbsProcessingSaveState {
let OldThumbsProcessingSaveState {
mut bookkeeper,
mut ephemeral_file_names,
mut queue,
mut indexed_leftovers_queue,
mut ephemeral_leftovers_queue,
} = ThumbsProcessingSaveState::load(thumbnails_directory.as_ref()).await;
} = OldThumbsProcessingSaveState::load(thumbnails_directory.as_ref()).await;
let (generated_ephemeral_thumbnails_tx, ephemeral_thumbnails_cas_ids_rx) = chan::bounded(32);
let (leftovers_tx, leftovers_rx) = chan::bounded(8);
@ -85,7 +85,7 @@ pub(super) async fn worker(
let (stop_older_processing_tx, stop_older_processing_rx) = chan::bounded(1);
let mut shutdown_leftovers_rx = pin!(leftovers_rx.clone());
let mut shutdowm_batch_report_progress_rx = pin!(batch_report_progress_rx.clone());
let mut shutdown_batch_report_progress_rx = pin!(batch_report_progress_rx.clone());
let mut current_batch_processing_rx: Option<oneshot::Receiver<()>> = None;
@ -275,15 +275,15 @@ pub(super) async fn worker(
}
// Consuming the last progress reports to keep everything up to date
shutdowm_batch_report_progress_rx.close();
shutdown_batch_report_progress_rx.close();
while let Some((location_id, progressed)) =
shutdowm_batch_report_progress_rx.next().await
shutdown_batch_report_progress_rx.next().await
{
bookkeeper.add_progress(location_id, progressed).await;
}
// Saving state
ThumbsProcessingSaveState {
OldThumbsProcessingSaveState {
bookkeeper,
ephemeral_file_names,
queue,

View file

@ -4,10 +4,10 @@ use serde::{Deserialize, Serialize};
use specta::Type;
pub mod cas;
pub mod file_identifier;
pub mod fs;
pub mod media;
pub mod orphan_remover;
pub mod old_file_identifier;
pub mod old_orphan_remover;
pub mod tag;
pub mod validation;

View file

@ -1,7 +1,7 @@
use crate::{
job::JobError,
library::Library,
object::{cas::generate_cas_id, object_for_file_identifier},
old_job::JobError,
};
use sd_file_ext::{extensions::Extension, kind::ObjectKind};
@ -25,7 +25,7 @@ use tokio::fs;
use tracing::{error, trace};
use uuid::Uuid;
pub mod file_identifier_job;
pub mod old_file_identifier_job;
mod shallow;
pub use shallow::*;

View file

@ -1,9 +1,9 @@
use crate::{
job::{
library::Library,
old_job::{
CurrentStep, JobError, JobInitOutput, JobReportUpdate, JobResult, JobRunMetadata,
JobStepOutput, StatefulJob, WorkerContext,
},
library::Library,
};
use sd_file_path_helper::{
@ -26,17 +26,17 @@ use tracing::{debug, info, trace};
use super::{process_identifier_file_paths, FileIdentifierJobError, CHUNK_SIZE};
/// `FileIdentifierJobInit` takes file_paths without an object_id from a location
/// or starting from a `sub_path` (getting every descendent from this `sub_path`
/// or starting from a `sub_path` getting every descendent from this `sub_path`
/// and uniquely identifies them:
/// - first: generating the cas_id and extracting metadata
/// - finally: creating unique object records, and linking them to their file_paths
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct FileIdentifierJobInit {
pub struct OldFileIdentifierJobInit {
pub location: location::Data,
pub sub_path: Option<PathBuf>, // subpath to start from
}
impl Hash for FileIdentifierJobInit {
impl Hash for OldFileIdentifierJobInit {
fn hash<H: Hasher>(&self, state: &mut H) {
self.location.id.hash(state);
if let Some(ref sub_path) = self.sub_path {
@ -46,13 +46,13 @@ impl Hash for FileIdentifierJobInit {
}
#[derive(Serialize, Deserialize, Debug)]
pub struct FileIdentifierJobData {
pub struct OldFileIdentifierJobData {
location_path: PathBuf,
maybe_sub_iso_file_path: Option<IsolatedFilePathData<'static>>,
}
#[derive(Serialize, Deserialize, Default, Debug)]
pub struct FileIdentifierJobRunMetadata {
pub struct OldFileIdentifierJobRunMetadata {
cursor: file_path::id::Type,
total_orphan_paths: usize,
total_objects_created: usize,
@ -60,7 +60,7 @@ pub struct FileIdentifierJobRunMetadata {
total_objects_ignored: usize,
}
impl JobRunMetadata for FileIdentifierJobRunMetadata {
impl JobRunMetadata for OldFileIdentifierJobRunMetadata {
fn update(&mut self, new_data: Self) {
self.total_orphan_paths += new_data.total_orphan_paths;
self.total_objects_created += new_data.total_objects_created;
@ -71,10 +71,10 @@ impl JobRunMetadata for FileIdentifierJobRunMetadata {
}
#[async_trait::async_trait]
impl StatefulJob for FileIdentifierJobInit {
type Data = FileIdentifierJobData;
impl StatefulJob for OldFileIdentifierJobInit {
type Data = OldFileIdentifierJobData;
type Step = ();
type RunMetadata = FileIdentifierJobRunMetadata;
type RunMetadata = OldFileIdentifierJobRunMetadata;
const NAME: &'static str = "file_identifier";
const IS_BATCHED: bool = true;
@ -127,7 +127,7 @@ impl StatefulJob for FileIdentifierJobInit {
count_orphan_file_paths(db, location_id, &maybe_sub_iso_file_path).await?;
// Initializing `state.data` here because we need a complete state in case of early finish
*data = Some(FileIdentifierJobData {
*data = Some(OldFileIdentifierJobData {
location_path: location_path.to_path_buf(),
maybe_sub_iso_file_path,
});
@ -167,7 +167,7 @@ impl StatefulJob for FileIdentifierJobInit {
]);
Ok((
FileIdentifierJobRunMetadata {
OldFileIdentifierJobRunMetadata {
total_orphan_paths: orphan_count,
cursor: first_path.id,
..Default::default()

View file

@ -1,4 +1,4 @@
use crate::{invalidate_query, job::JobError, library::Library};
use crate::{invalidate_query, library::Library, old_job::JobError};
use sd_file_path_helper::{
ensure_file_path_exists, ensure_sub_path_is_directory, ensure_sub_path_is_in_location,
@ -21,7 +21,7 @@ pub struct ShallowFileIdentifierJobState {
sub_iso_file_path: IsolatedFilePathData<'static>,
}
pub async fn shallow(
pub async fn old_shallow(
location: &location::Data,
sub_path: &PathBuf,
library: &Library,

View file

@ -6,7 +6,7 @@ use std::path::Path;
use thiserror::Error;
pub mod hash;
pub mod validator_job;
pub mod old_validator_job;
#[derive(Error, Debug)]
pub enum ValidatorError {

View file

@ -1,8 +1,8 @@
use crate::{
job::{
library::Library,
old_job::{
CurrentStep, JobError, JobInitOutput, JobResult, JobStepOutput, StatefulJob, WorkerContext,
},
library::Library,
};
use sd_file_path_helper::{
@ -28,19 +28,19 @@ use tracing::info;
use super::{hash::file_checksum, ValidatorError};
#[derive(Serialize, Deserialize, Debug)]
pub struct ObjectValidatorJobData {
pub struct OldObjectValidatorJobData {
pub location_path: PathBuf,
pub task_count: usize,
}
// The validator can
#[derive(Serialize, Deserialize, Debug)]
pub struct ObjectValidatorJobInit {
pub struct OldObjectValidatorJobInit {
pub location: location::Data,
pub sub_path: Option<PathBuf>,
}
impl Hash for ObjectValidatorJobInit {
impl Hash for OldObjectValidatorJobInit {
fn hash<H: Hasher>(&self, state: &mut H) {
self.location.id.hash(state);
if let Some(ref sub_path) = self.sub_path {
@ -54,8 +54,8 @@ impl Hash for ObjectValidatorJobInit {
// - generate checksums for all Objects missing without one
// - compare two objects and return true if they are the same
#[async_trait::async_trait]
impl StatefulJob for ObjectValidatorJobInit {
type Data = ObjectValidatorJobData;
impl StatefulJob for OldObjectValidatorJobInit {
type Data = OldObjectValidatorJobData;
type Step = file_path_for_object_validator::Data;
type RunMetadata = ();
@ -122,7 +122,7 @@ impl StatefulJob for ObjectValidatorJobInit {
.exec()
.await?;
*data = Some(ObjectValidatorJobData {
*data = Some(OldObjectValidatorJobData {
location_path,
task_count: steps.len(),
});
@ -143,7 +143,7 @@ impl StatefulJob for ObjectValidatorJobInit {
let Library { db, sync, .. } = &*ctx.library;
// this is to skip files that already have checksums
// i'm unsure what the desired behaviour is in this case
// i'm unsure what the desired behavior is in this case
// we can also compare old and new checksums here
// This if is just to make sure, we already queried objects where integrity_checksum is null
if file_path.integrity_checksum.is_none() {

View file

@ -1,8 +1,8 @@
use crate::{
location::{indexer::IndexerError, LocationError},
object::{
file_identifier::FileIdentifierJobError, fs::error::FileSystemJobsError,
media::media_processor::MediaProcessorError, validation::ValidatorError,
fs::error::FileSystemJobsError, media::old_media_processor::MediaProcessorError,
old_file_identifier::FileIdentifierJobError, validation::ValidatorError,
},
};

View file

@ -1,16 +1,16 @@
use crate::{
job::{worker::Worker, DynJob, Job, JobError},
library::Library,
location::indexer::indexer_job::IndexerJobInit,
location::indexer::old_indexer_job::OldIndexerJobInit,
object::{
file_identifier::file_identifier_job::FileIdentifierJobInit,
fs::{
copy::FileCopierJobInit, cut::FileCutterJobInit, delete::FileDeleterJobInit,
erase::FileEraserJobInit,
old_copy::OldFileCopierJobInit, old_cut::OldFileCutterJobInit,
old_delete::OldFileDeleterJobInit, old_erase::OldFileEraserJobInit,
},
media::media_processor::MediaProcessorJobInit,
validation::validator_job::ObjectValidatorJobInit,
media::old_media_processor::OldMediaProcessorJobInit,
old_file_identifier::old_file_identifier_job::OldFileIdentifierJobInit,
validation::old_validator_job::OldObjectValidatorJobInit,
},
old_job::{worker::Worker, DynJob, Job, JobError},
Node,
};
@ -33,12 +33,12 @@ const MAX_WORKERS: usize = 5;
pub enum JobManagerEvent {
IngestJob(Arc<Library>, Box<dyn DynJob>),
Shutdown(oneshot::Sender<()>, Arc<Jobs>),
Shutdown(oneshot::Sender<()>, Arc<OldJobs>),
}
#[must_use = "'job::manager::Actor::start' must be called to start the actor"]
pub struct Actor {
jobs: Arc<Jobs>,
jobs: Arc<OldJobs>,
internal_receiver: mpsc::UnboundedReceiver<JobManagerEvent>,
}
@ -69,14 +69,14 @@ impl Actor {
/// JobManager handles queueing and executing jobs using the `DynJob`
/// Handling persisting JobReports to the database, pause/resuming, and
///
pub struct Jobs {
pub struct OldJobs {
current_jobs_hashes: RwLock<HashSet<u64>>,
job_queue: RwLock<VecDeque<Box<dyn DynJob>>>,
running_workers: RwLock<HashMap<Uuid, Worker>>,
internal_sender: mpsc::UnboundedSender<JobManagerEvent>,
}
impl Jobs {
impl OldJobs {
/// Initializes the JobManager and spawns the internal event loop to listen for ingest.
pub fn new() -> (Arc<Self>, Actor) {
// allow the job manager to control its workers
@ -371,7 +371,7 @@ mod macros {
macro_rules! dispatch_call_to_job_by_name {
($job_name:expr, T -> $call:expr, default = $default:block, jobs = [ $($job:ty),+ $(,)?]) => {{
match $job_name {
$(<$job as $crate::job::StatefulJob>::NAME => {
$(<$job as $crate::old_job::StatefulJob>::NAME => {
type T = $job;
$call
},)+
@ -396,14 +396,14 @@ fn initialize_resumable_job(
Err(JobError::UnknownJobName(job_report.id, job_report.name))
},
jobs = [
MediaProcessorJobInit,
IndexerJobInit,
FileIdentifierJobInit,
ObjectValidatorJobInit,
FileCutterJobInit,
FileCopierJobInit,
FileDeleterJobInit,
FileEraserJobInit,
OldMediaProcessorJobInit,
OldIndexerJobInit,
OldFileIdentifierJobInit,
OldObjectValidatorJobInit,
OldFileCutterJobInit,
OldFileCopierJobInit,
OldFileDeleterJobInit,
OldFileEraserJobInit,
]
)
}

View file

@ -256,7 +256,7 @@ impl<SJob: StatefulJob> Job<SJob> {
node: &Arc<Node>,
library: &Arc<Library>,
) -> Result<(), JobManagerError> {
node.jobs
node.old_jobs
.clone()
.ingest(node, library, Box::new(self))
.await
@ -986,7 +986,7 @@ async fn handle_init_phase<SJob: StatefulJob>(
}
}
Err(JobError::Critical("unexpect job init end without result"))
Err(JobError::Critical("unexpected job init end without result"))
}
type StepTaskOutput<SJob> = Result<
@ -1255,5 +1255,5 @@ async fn handle_single_step<SJob: StatefulJob>(
}
}
Err(JobError::Critical("unexpect job step end without result"))
Err(JobError::Critical("unexpected job step end without result"))
}

View file

@ -29,7 +29,7 @@ use uuid::Uuid;
use super::{
DynJob, JobError, JobIdentity, JobReport, JobReportUpdate, JobRunErrors, JobRunOutput,
JobStatus, Jobs,
JobStatus, OldJobs,
};
const FIVE_SECS: Duration = Duration::from_secs(5);
@ -126,7 +126,7 @@ impl Worker {
mut report: JobReport,
library: Arc<Library>,
node: Arc<Node>,
job_manager: Arc<Jobs>,
job_manager: Arc<OldJobs>,
) -> Result<Self, JobError> {
let (commands_tx, commands_rx) = chan::bounded(8);
@ -647,7 +647,7 @@ impl Worker {
struct JobWorkTable {
job: Box<dyn DynJob>,
manager: Arc<Jobs>,
manager: Arc<OldJobs>,
hash: u64,
report: JobReport,
}

View file

@ -35,7 +35,7 @@ pub enum P2PEvent {
id: Uuid,
percent: u8,
},
SpacedropTimedout {
SpacedropTimedOut {
id: Uuid,
},
SpacedropRejected {

View file

@ -41,7 +41,7 @@ pub struct P2PManager {
pub(crate) events: P2PEvents,
// connect_hook: ConnectHook,
pub(super) spacedrop_pairing_reqs: Arc<Mutex<HashMap<Uuid, oneshot::Sender<Option<String>>>>>,
pub(super) spacedrop_cancelations: Arc<Mutex<HashMap<Uuid, Arc<AtomicBool>>>>,
pub(super) spacedrop_cancellations: Arc<Mutex<HashMap<Uuid, Arc<AtomicBool>>>>,
pub(crate) node_config: Arc<config::Manager>,
}
@ -67,7 +67,7 @@ impl P2PManager {
events: P2PEvents::spawn(p2p.clone()),
// connect_hook: ConnectHook::spawn(p2p),
spacedrop_pairing_reqs: Default::default(),
spacedrop_cancelations: Default::default(),
spacedrop_cancellations: Default::default(),
node_config,
});
this.on_node_config_change().await;
@ -248,9 +248,9 @@ async fn start(
};
match header {
Header::Ping => operations::ping::reciever(stream).await,
Header::Ping => operations::ping::receiver(stream).await,
Header::Spacedrop(req) => {
let Err(()) = operations::spacedrop::reciever(&this, req, stream).await else {
let Err(()) = operations::spacedrop::receiver(&this, req, stream).await else {
return;
};

View file

@ -7,6 +7,6 @@ pub async fn ping() {
todo!();
}
pub(crate) async fn reciever(stream: UnicastStream) {
pub(crate) async fn receiver(stream: UnicastStream) {
debug!("Received ping from peer '{}'", stream.remote_identity());
}

View file

@ -102,7 +102,7 @@ pub async fn spacedrop(
// Add 5 seconds incase the user responded on the deadline and slow network
_ = sleep(SPACEDROP_TIMEOUT + Duration::from_secs(5)) => {
debug!("({id}): timed out, cancelling");
p2p.events.send(P2PEvent::SpacedropTimedout { id }).ok();
p2p.events.send(P2PEvent::SpacedropTimedOut { id }).ok();
return;
},
};
@ -119,7 +119,7 @@ pub async fn spacedrop(
}
let cancelled = Arc::new(AtomicBool::new(false));
p2p.spacedrop_cancelations
p2p.spacedrop_cancellations
.lock()
.unwrap_or_else(PoisonError::into_inner)
.insert(id, cancelled.clone());
@ -190,7 +190,7 @@ impl P2PManager {
pub async fn cancel_spacedrop(&self, id: Uuid) {
if let Some(cancelled) = self
.spacedrop_cancelations
.spacedrop_cancellations
.lock()
.unwrap_or_else(PoisonError::into_inner)
.remove(&id)
@ -200,7 +200,7 @@ impl P2PManager {
}
}
pub(crate) async fn reciever(
pub(crate) async fn receiver(
this: &Arc<P2PManager>,
req: SpaceblockRequests,
mut stream: UnicastStream,
@ -264,7 +264,7 @@ pub(crate) async fn reciever(
info!("({id}): accepted saving to '{:?}'", file_path);
let cancelled = Arc::new(AtomicBool::new(false));
this.spacedrop_cancelations
this.spacedrop_cancellations
.lock()
.unwrap_or_else(PoisonError::into_inner)
.insert(id, cancelled.clone());

View file

@ -48,7 +48,7 @@ where
// Preferences are a set of types that are serialized as a list of key-value pairs,
// where nested type keys are serialized as a dot-separated path.
// They are serailized as a list because this allows preferences to be a synchronisation boundary,
// They are serialized as a list because this allows preferences to be a synchronization boundary,
// whereas their values (referred to as settings) will be overwritten.
pub trait Preferences {
fn to_kvs(self) -> PreferenceKVs;

View file

@ -1,12 +1,12 @@
// ! A system for loading a default set of data on startup. This is ONLY enabled in development builds.
use crate::{
job::JobManagerError,
library::Libraries,
library::{LibraryManagerError, LibraryName},
location::{
delete_location, scan_location, LocationCreateArgs, LocationError, LocationManagerError,
},
old_job::JobManagerError,
util::AbortOnDrop,
Node,
};

View file

@ -3,7 +3,7 @@ use thiserror::Error;
use ort::EnvironmentBuilder;
use tracing::{debug, error};
pub mod image_labeler;
pub mod old_image_labeler;
mod utils;
// This path must be relative to the running binary
@ -82,5 +82,5 @@ pub enum Error {
#[error("failed to initialize AI environment: {0}")]
Init(#[from] ort::Error),
#[error(transparent)]
ImageLabeler(#[from] image_labeler::ImageLabelerError),
ImageLabeler(#[from] old_image_labeler::ImageLabelerError),
}

View file

@ -7,12 +7,12 @@ use thiserror::Error;
use tracing::error;
use uuid::Uuid;
mod actor;
mod model;
mod old_actor;
mod process;
pub use actor::ImageLabeler;
pub use model::{DownloadModelError, Model, YoloV8, DEFAULT_MODEL_VERSION};
pub use old_actor::OldImageLabeler;
pub type BatchToken = Uuid;

View file

@ -203,7 +203,7 @@ async fn download_model(
))))
}
_ => {
info!("Dowloading model from: {} to {}", url, file_path.display());
info!("Downloading model from: {} to {}", url, file_path.display());
let response = reqwest::get(url.as_str()).await?;
// Ensure the request was successful (status code 2xx)
if !response.status().is_success() {

View file

@ -64,7 +64,7 @@ struct ResumableBatch {
file_paths: Vec<file_path_for_media_processor::Data>,
}
pub struct ImageLabeler {
pub struct OldImageLabeler {
to_resume_batches_file_path: PathBuf,
new_batches_tx: chan::Sender<Batch>,
resume_batch_tx: chan::Sender<ResumeBatchRequest>,
@ -74,7 +74,7 @@ pub struct ImageLabeler {
handle: RefCell<Option<JoinHandle<()>>>,
}
impl ImageLabeler {
impl OldImageLabeler {
pub async fn new(
model: Box<dyn Model>,
data_directory: impl AsRef<Path>,
@ -306,7 +306,7 @@ impl ImageLabeler {
/// SAFETY: Due to usage of refcell we lost `Sync` impl, but we only use it to have a shutdown method
/// receiving `&self` which is called once, and we also use `try_borrow_mut` so we never panic
unsafe impl Sync for ImageLabeler {}
unsafe impl Sync for OldImageLabeler {}
async fn actor_loop(
model_and_session: Arc<RwLock<ModelAndSession>>,

View file

@ -23,7 +23,9 @@ use tokio::{
};
use tracing::{error, warn};
use super::{actor::Batch, model::ModelAndSession, BatchToken, ImageLabelerError, LabelerOutput};
use super::{
model::ModelAndSession, old_actor::Batch, BatchToken, ImageLabelerError, LabelerOutput,
};
const MAX_FILE_SIZE: u64 = 100 * 1024 * 1024; // 100 MB
@ -156,7 +158,7 @@ pub(super) async fn spawned_processing(
let semaphore = Arc::new(Semaphore::new(available_parallelism));
// From this point ownwards, we lock the model in read mode
// From this point onwards, we lock the model in read mode
let model_and_session = Arc::new(model_and_session.read_owned().await);
if !model_and_session.can_process() {
@ -198,7 +200,7 @@ pub(super) async fn spawned_processing(
let ids = (
file_path.id,
file_path.object_id.expect("alredy checked above"),
file_path.object_id.expect("already checked above"),
);
if output_tx.is_closed() {

View file

@ -19,7 +19,7 @@ pub(crate) fn get_path_relative_to_exe(path: impl AsRef<Path>) -> PathBuf {
|parent| {
let path = parent.join(path.as_ref());
path.canonicalize().unwrap_or_else(|e| {
error!("Failed to canonilize relative path to exe, return raw path and hope: {e:#?}");
error!("Failed to canonicalize relative path to exe, return raw path and hope: {e:#?}");
path
})
},

View file

@ -9,7 +9,7 @@ use specta::{Any, DataType, NamedType, Type, TypeMap};
/// A type that can be used to return a group of `Reference<T>` and `CacheNode`'s
///
/// You don't need to use this, it's just a shortcut to avoid having to write out the full type everytime.
/// You don't need to use this, it's just a shortcut to avoid having to write out the full type every time.
#[derive(Serialize, Type, Debug)]
pub struct NormalisedResults<T: Model + Type> {
pub items: Vec<Reference<T>>,
@ -18,7 +18,7 @@ pub struct NormalisedResults<T: Model + Type> {
/// A type that can be used to return a group of `Reference<T>` and `CacheNode`'s
///
/// You don't need to use this, it's just a shortcut to avoid having to write out the full type everytime.
/// You don't need to use this, it's just a shortcut to avoid having to write out the full type every time.
#[derive(Serialize, Type, Debug)]
pub struct NormalisedResult<T: Model + Type> {
pub item: Reference<T>,
@ -150,14 +150,14 @@ impl Serialize for CacheNode {
__type: self.0,
__id: &self.1,
v: self.2.as_ref().map_err(|err| {
serde::ser::Error::custom(format!("Failed to serialise node: {}", err))
serde::ser::Error::custom(format!("Failed to serialize node: {}", err))
})?,
}
.serialize(serializer)
}
}
/// A helper for easily normalising data.
/// A helper for easily normalizing data.
pub trait Normalise {
type Item: Model + Type;

View file

@ -569,8 +569,8 @@ pub mod locations {
pub type Response = CloudLocation;
}
pub use authorise::exec as authorise;
pub mod authorise {
pub use authorize::exec as authorize;
pub mod authorize {
use super::*;
pub async fn exec(config: RequestConfig, id: String) -> Result<Response, Error> {
@ -580,7 +580,7 @@ pub mod locations {
config
.client
.post(&format!("{}/api/v1/locations/authorise", config.api_url))
.post(&format!("{}/api/v1/locations/authorize", config.api_url))
.json(&json!({ "id": id }))
.with_auth(auth_token)
.send()

View file

@ -478,7 +478,7 @@ impl MovieDecoder {
"thumb_deint",
"deint=1",
self.filter_graph,
"Failed to create deinterlace filter",
"Failed to create de-interlace filter",
)?;
}

View file

@ -34,7 +34,7 @@ pub struct IsolatedFilePathDataParts<'a> {
#[non_exhaustive]
pub struct IsolatedFilePathData<'a> {
// WARN! These fields MUST NOT be changed outside the location module, that's why they have this visibility
// and are not public. They have some specific logic on them and should not be writen to directly.
// and are not public. They have some specific logic on them and should not be written to directly.
// If you wanna access one of them outside from location module, write yourself an accessor method
// to have read only access to them.
pub(super) location_id: location::id::Type,

View file

@ -43,7 +43,7 @@ pub const PDF_LANDSCAPE_RENDER_WIDTH: pdfium_render::prelude::Pixels = 1123;
#[cfg_attr(feature = "bincode", derive(bincode::Encode, bincode::Decode))]
#[cfg_attr(feature = "serde", serde(rename_all = "lowercase"))]
#[derive(Debug, Clone, Copy)]
pub enum ConvertableExtension {
pub enum ConvertibleExtension {
Bmp,
Dib,
Ff,
@ -74,7 +74,7 @@ pub enum ConvertableExtension {
Webp,
}
impl ConvertableExtension {
impl ConvertibleExtension {
#[must_use]
pub const fn should_rotate(self) -> bool {
!matches!(
@ -88,13 +88,13 @@ impl ConvertableExtension {
}
}
impl Display for ConvertableExtension {
impl Display for ConvertibleExtension {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{self:?}")
}
}
impl TryFrom<String> for ConvertableExtension {
impl TryFrom<String> for ConvertibleExtension {
type Error = crate::Error;
fn try_from(value: String) -> Result<Self, Self::Error> {
@ -133,7 +133,7 @@ impl TryFrom<String> for ConvertableExtension {
}
}
impl TryFrom<&Path> for ConvertableExtension {
impl TryFrom<&Path> for ConvertibleExtension {
type Error = crate::Error;
fn try_from(value: &Path) -> Result<Self, Self::Error> {
@ -146,7 +146,7 @@ impl TryFrom<&Path> for ConvertableExtension {
}
#[cfg(feature = "serde")]
impl serde::Serialize for ConvertableExtension {
impl serde::Serialize for ConvertibleExtension {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
@ -160,7 +160,7 @@ struct ExtensionVisitor;
#[cfg(feature = "serde")]
impl<'de> serde::de::Visitor<'de> for ExtensionVisitor {
type Value = ConvertableExtension;
type Value = ConvertibleExtension;
fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
formatter.write_str("A valid extension string`")
@ -175,7 +175,7 @@ impl<'de> serde::de::Visitor<'de> for ExtensionVisitor {
}
#[cfg(feature = "serde")]
impl<'de> serde::Deserialize<'de> for ConvertableExtension {
impl<'de> serde::Deserialize<'de> for ConvertibleExtension {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,

View file

@ -34,7 +34,7 @@ mod svg;
use consts::MAXIMUM_FILE_SIZE;
// Re-exports
pub use consts::{all_compatible_extensions, ConvertableExtension};
pub use consts::{all_compatible_extensions, ConvertibleExtension};
pub use error::{Error, Result};
pub use handler::{convert_image, format_image};
pub use image::DynamicImage;

View file

@ -14,7 +14,7 @@ pub const DMS_DIVISION: [f64; 3] = [1_f64, 60_f64, 3600_f64];
/// The amount of significant figures we wish to retain after the decimal point.
///
/// This is currrently 8 digits (after the integer) as that is precise enough for most
/// This is currently 8 digits (after the integer) as that is precise enough for most
/// applications.
///
/// This is calculated with `10^n`, where `n` is the desired amount of SFs.

View file

@ -50,7 +50,7 @@ impl MediaDate {
.map(Clone::clone)
}
/// Returns the amount of non-leap secods since the Unix Epoch (1970-01-01T00:00:00+00:00)
/// Returns the amount of non-leap seconds since the Unix Epoch (1970-01-01T00:00:00+00:00)
///
/// This is for search ordering/sorting
#[must_use]

View file

@ -57,7 +57,7 @@ impl From<u32> for FlashMode {
}
impl From<FlashValue> for Option<Flash> {
// TODO(brxken128): This can be heavily optimised with bitwise AND
// TODO(brxken128): This can be heavily optimized with bitwise AND
// e.g. to see if flash was fired, `(value & 1) != 0`
// or to see if red eye reduction was enabled, `(value & 64) != 0`
// May not be worth it as some states may be invalid according to `https://www.awaresystems.be/imaging/tiff/tifftags/privateifd/exif/flash.html`

View file

@ -123,7 +123,7 @@ impl ImageMetadata {
}
}
// TODO(brxken128): more exif spec reading so we can source colour spaces correctly too
// TODO(brxken128): more exif spec reading so we can source color spaces correctly too
// pub enum ImageColorSpace {
// Rgb,
// RgbP,

View file

@ -327,7 +327,7 @@ mod tests {
// This is sent out of band of Spaceblock
let block_size = 25u32;
let data = vec![0u8; block_size as usize];
let block_size = BlockSize::dangerously_new(block_size); // TODO: Determine it using proper algo instead of harcoding it
let block_size = BlockSize::dangerously_new(block_size); // TODO: Determine it using proper algo instead of hardcoding it
let req = SpaceblockRequests {
id: Uuid::new_v4(),
@ -369,7 +369,7 @@ mod tests {
// This is sent out of band of Spaceblock
let block_size = 25u32;
let data = vec![0u8; block_size as usize];
let block_size = BlockSize::dangerously_new(block_size); // TODO: Determine it using proper algo instead of harcoding it
let block_size = BlockSize::dangerously_new(block_size); // TODO: Determine it using proper algo instead of hardcoding it
let req = SpaceblockRequests {
id: Uuid::new_v4(),
@ -412,7 +412,7 @@ mod tests {
// This is sent out of band of Spaceblock
let block_size = 25u32;
let data = vec![0u8; 0]; // Zero sized file
let block_size = BlockSize::dangerously_new(block_size); // TODO: Determine it using proper algo instead of harcoding it
let block_size = BlockSize::dangerously_new(block_size); // TODO: Determine it using proper algo instead of hardcoding it
let req = SpaceblockRequests {
id: Uuid::new_v4(),

View file

@ -1,6 +1,6 @@
//! Temporary library for easier binary encoding/decoding.
//!
//! Eventually these will be deprecated by macros but I can't find one which supports large payloads (basically it needs to write to async stream not in-memory bufffer) -> Binario is my own prototype of a Rust library to do this but it's not prod ready yet.
//! Eventually these will be deprecated by macros but I can't find one which supports large payloads (basically it needs to write to async stream not in-memory buffer) -> Binario is my own prototype of a Rust library to do this but it's not prod ready yet.
//!
use thiserror::Error;
@ -11,7 +11,7 @@ use uuid::Uuid;
#[error(transparent)]
pub enum SpaceTunnelIdentityErr {
#[error("{0}")]
Darlek(#[from] ed25519_dalek::ed25519::Error),
Dalek(#[from] ed25519_dalek::ed25519::Error),
#[error("Invalid key length")]
InvalidKeyLength,
}

View file

@ -19,7 +19,7 @@ pub const REMOTE_IDENTITY_LEN: usize = 32;
#[error(transparent)]
pub enum IdentityErr {
#[error("{0}")]
Darlek(#[from] ed25519_dalek::ed25519::Error),
Dalek(#[from] ed25519_dalek::ed25519::Error),
#[error("Invalid key length")]
InvalidKeyLength,
}

View file

@ -57,6 +57,36 @@
//! system.shutdown().await;
//! }
//! ```
#![warn(
clippy::all,
clippy::pedantic,
clippy::correctness,
clippy::perf,
clippy::style,
clippy::suspicious,
clippy::complexity,
clippy::nursery,
clippy::unwrap_used,
unused_qualifications,
rust_2018_idioms,
trivial_casts,
trivial_numeric_casts,
unused_allocation,
clippy::unnecessary_cast,
clippy::cast_lossless,
clippy::cast_possible_truncation,
clippy::cast_possible_wrap,
clippy::cast_precision_loss,
clippy::cast_sign_loss,
clippy::dbg_macro,
clippy::deprecated_cfg_attr,
clippy::separated_literal_suffix,
deprecated
)]
#![forbid(deprecated_in_future)]
#![allow(clippy::missing_errors_doc, clippy::module_name_repetitions)]
mod error;
mod message;
mod system;

View file

@ -7,7 +7,7 @@ use super::{
};
#[derive(Debug)]
pub(crate) enum SystemMessage {
pub enum SystemMessage {
IdleReport(WorkerId),
WorkingReport(WorkerId),
ResumeTask {
@ -38,7 +38,7 @@ pub(crate) enum SystemMessage {
}
#[derive(Debug)]
pub(crate) enum WorkerMessage<E: RunError> {
pub enum WorkerMessage<E: RunError> {
NewTask(TaskWorkState<E>),
TaskCountRequest(oneshot::Sender<usize>),
ResumeTask {

View file

@ -1,6 +1,7 @@
use std::{
cell::RefCell,
collections::HashSet,
num::NonZeroUsize,
pin::pin,
sync::{
atomic::{AtomicBool, Ordering},
@ -41,7 +42,7 @@ impl<E: RunError> System<E> {
error!("Failed to get available parallelism in the job system: {e:#?}");
1
},
|non_zero| non_zero.get(),
NonZeroUsize::get,
);
let (msgs_tx, msgs_rx) = chan::bounded(8);
@ -64,7 +65,6 @@ impl<E: RunError> System<E> {
let handle = spawn({
let workers = Arc::clone(&workers);
let msgs_rx = msgs_rx.clone();
let idle_workers = Arc::clone(&idle_workers);
async move {
@ -82,7 +82,7 @@ impl<E: RunError> System<E> {
trace!("Task system received shutdown signal and will exit...");
break;
}
trace!("Restarting task system message processing task...")
trace!("Restarting task system message processing task...");
}
info!("Task system gracefully shutdown");
@ -221,6 +221,11 @@ impl<E: RunError> System<E> {
}
/// Shuts down the system, returning all pending and running tasks to their respective handles.
///
/// # Panics
///
/// If the system message channel is closed for some unknown reason or if we fail to respond to
/// oneshot channel with shutdown response.
pub async fn shutdown(&self) {
if let Some(handle) = self
.handle
@ -272,7 +277,7 @@ unsafe impl<E: RunError> Sync for System<E> {}
#[derive(Clone, Debug)]
#[repr(transparent)]
pub(crate) struct SystemComm(chan::Sender<SystemMessage>);
pub struct SystemComm(chan::Sender<SystemMessage>);
impl SystemComm {
pub async fn idle_report(&self, worker_id: usize) {
@ -404,8 +409,6 @@ impl<E: RunError> Clone for Dispatcher<E> {
impl<E: RunError> Dispatcher<E> {
/// Dispatches a task to the system, the task will be assigned to a worker and executed as soon as possible.
pub async fn dispatch(&self, into_task: impl IntoTask<E>) -> TaskHandle<E> {
let task = into_task.into_task();
async fn inner<E: RunError>(this: &Dispatcher<E>, task: Box<dyn Task<E>>) -> TaskHandle<E> {
let worker_id = this
.last_worker_id
@ -425,7 +428,7 @@ impl<E: RunError> Dispatcher<E> {
handle
}
inner(self, task).await
inner(self, into_task.into_task()).await
}
/// Dispatches many tasks to the system, the tasks will be assigned to workers and executed as soon as possible.
@ -453,14 +456,15 @@ impl<E: RunError> Dispatcher<E> {
.into_iter()
.unzip::<_, _, Vec<_>, HashSet<_>>();
workers_ids_set.into_iter().for_each(|worker_id| {
for worker_id in workers_ids_set {
self.idle_workers[worker_id].store(false, Ordering::Relaxed);
});
}
handles
}
/// Returns the number of workers in the system.
#[must_use]
pub fn workers_count(&self) -> usize {
self.workers.len()
}

View file

@ -43,7 +43,7 @@ pub trait IntoAnyTaskOutput {
fn into_output(self) -> TaskOutput;
}
/// Blanket implementation for all types that implements AnyTaskOutput
/// Blanket implementation for all types that implements [`AnyTaskOutput`]
impl<T: AnyTaskOutput + 'static> IntoAnyTaskOutput for T {
fn into_output(self) -> TaskOutput {
TaskOutput::Out(Box::new(self))
@ -85,7 +85,7 @@ pub enum ExecStatus {
}
#[derive(Debug)]
pub(crate) enum InternalTaskExecStatus<E: RunError> {
pub enum InternalTaskExecStatus<E: RunError> {
Done(TaskOutput),
Paused,
Canceled,
@ -95,18 +95,16 @@ pub(crate) enum InternalTaskExecStatus<E: RunError> {
impl<E: RunError> From<Result<ExecStatus, E>> for InternalTaskExecStatus<E> {
fn from(result: Result<ExecStatus, E>) -> Self {
result
.map(|status| match status {
ExecStatus::Done(out) => Self::Done(out),
ExecStatus::Paused => Self::Paused,
ExecStatus::Canceled => Self::Canceled,
})
.unwrap_or_else(|e| Self::Error(e))
result.map_or_else(Self::Error, |status| match status {
ExecStatus::Done(out) => Self::Done(out),
ExecStatus::Paused => Self::Paused,
ExecStatus::Canceled => Self::Canceled,
})
}
}
/// A helper trait to convert any type that implements [`Task<E>`] into a [`Box<dyn Task<E>>`], boxing it.
pub trait IntoTask<E> {
pub trait IntoTask<E>: Send {
fn into_task(self) -> Box<dyn Task<E>>;
}
@ -217,19 +215,22 @@ impl Interrupter {
/// Check if the user requested a pause or a cancel, returning the kind of interruption that was requested
/// in a non-blocking manner.
pub fn try_check_interrupt(&self) -> Option<InterruptionKind> {
if let Some(kind) = InterruptionKind::load(&self.has_interrupted) {
Some(kind)
} else if let Ok(InterruptionRequest { kind, ack }) = self.interrupt_rx.try_recv() {
if ack.send(Ok(())).is_err() {
warn!("TaskInterrupter ack channel closed");
}
InterruptionKind::load(&self.has_interrupted).map_or_else(
|| {
if let Ok(InterruptionRequest { kind, ack }) = self.interrupt_rx.try_recv() {
if ack.send(Ok(())).is_err() {
warn!("TaskInterrupter ack channel closed");
}
self.has_interrupted.store(kind as u8, Ordering::Relaxed);
self.has_interrupted.store(kind as u8, Ordering::Relaxed);
Some(kind)
} else {
None
}
Some(kind)
} else {
None
}
},
Some,
)
}
pub(super) fn reset(&self) {
@ -263,7 +264,7 @@ impl InterruptionKind {
}
#[derive(Debug)]
pub(crate) struct InterruptionRequest {
pub struct InterruptionRequest {
kind: InterruptionKind,
ack: oneshot::Sender<Result<(), SystemError>>,
}
@ -290,11 +291,16 @@ impl<E: RunError> Future for TaskHandle<E> {
impl<E: RunError> TaskHandle<E> {
/// Get the unique identifier of the task
pub fn task_id(&self) -> TaskId {
#[must_use]
pub const fn task_id(&self) -> TaskId {
self.task_id
}
/// Gracefully pause the task at a safe point defined by the user using the [`Interrupter`]
///
/// # Panics
///
/// Will panic if the worker failed to ack the pause request
pub async fn pause(&self) -> Result<(), SystemError> {
let is_paused = self.worktable.is_paused.load(Ordering::Relaxed);
let is_canceled = self.worktable.is_canceled.load(Ordering::Relaxed);
@ -328,6 +334,10 @@ impl<E: RunError> TaskHandle<E> {
}
/// Gracefully cancel the task at a safe point defined by the user using the [`Interrupter`]
///
/// # Panics
///
/// Will panic if the worker failed to ack the cancel request
pub async fn cancel(&self) -> Result<(), SystemError> {
let is_canceled = self.worktable.is_canceled.load(Ordering::Relaxed);
let is_done = self.worktable.is_done.load(Ordering::Relaxed);
@ -383,7 +393,7 @@ impl<E: RunError> TaskHandle<E> {
}
#[derive(Debug)]
pub(crate) struct TaskWorktable {
pub struct TaskWorktable {
started: AtomicBool,
is_running: AtomicBool,
is_done: AtomicBool,
@ -468,7 +478,7 @@ impl TaskWorktable {
}
#[derive(Debug)]
pub(crate) struct TaskWorkState<E: RunError> {
pub struct TaskWorkState<E: RunError> {
pub(crate) task: Box<dyn Task<E>>,
pub(crate) worktable: Arc<TaskWorktable>,
pub(crate) done_tx: oneshot::Sender<Result<TaskStatus<E>, SystemError>>,

View file

@ -24,10 +24,10 @@ use run::run;
const ONE_SECOND: Duration = Duration::from_secs(1);
pub(crate) type WorkerId = usize;
pub(crate) type AtomicWorkerId = AtomicUsize;
pub type WorkerId = usize;
pub type AtomicWorkerId = AtomicUsize;
pub(crate) struct WorkerBuilder<E: RunError> {
pub struct WorkerBuilder<E: RunError> {
id: usize,
msgs_tx: chan::Sender<WorkerMessage<E>>,
msgs_rx: chan::Receiver<WorkerMessage<E>>,
@ -60,9 +60,7 @@ impl<E: RunError> WorkerBuilder<E> {
} = self;
let handle = spawn({
let msgs_rx = msgs_rx.clone();
let system_comm = system_comm.clone();
let task_stealer = task_stealer.clone();
async move {
trace!("Worker <worker_id='{id}'> message processing task starting...");
@ -101,7 +99,7 @@ impl<E: RunError> WorkerBuilder<E> {
}
#[derive(Debug)]
pub(crate) struct Worker<E: RunError> {
pub struct Worker<E: RunError> {
pub id: usize,
system_comm: SystemComm,
msgs_tx: chan::Sender<WorkerMessage<E>>,
@ -231,7 +229,7 @@ impl<E: RunError> Worker<E> {
unsafe impl<E: RunError> Sync for Worker<E> {}
#[derive(Clone)]
pub(crate) struct WorkerComm<E: RunError> {
pub struct WorkerComm<E: RunError> {
worker_id: WorkerId,
msgs_tx: chan::Sender<WorkerMessage<E>>,
}
@ -260,7 +258,7 @@ impl<E: RunError> WorkerComm<E> {
}
}
pub(crate) struct WorkStealer<E: RunError> {
pub struct WorkStealer<E: RunError> {
worker_comms: Arc<Vec<WorkerComm<E>>>,
}
@ -301,12 +299,12 @@ impl<E: RunError> WorkStealer<E> {
if let Some(task) = worker_comm.steal_task(worker_id).await {
return Some(task);
} else {
trace!(
"Worker <worker_id='{}', stealer_id='{worker_id}'> has no tasks to steal",
worker_comm.worker_id
);
}
trace!(
"Worker <worker_id='{}', stealer_id='{worker_id}'> has no tasks to steal",
worker_comm.worker_id
);
}
None

View file

@ -19,17 +19,17 @@ pub(super) async fn run<E: RunError>(
work_stealer: WorkStealer<E>,
msgs_rx: chan::Receiver<WorkerMessage<E>>,
) {
let (mut runner, runner_rx) = Runner::new(id, work_stealer, system_comm);
let mut idle_checker_interval = interval_at(Instant::now(), ONE_SECOND);
idle_checker_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
enum StreamMessage<E: RunError> {
Commands(WorkerMessage<E>),
RunnerMsg(RunnerMessage<E>),
IdleCheck,
}
let (mut runner, runner_rx) = Runner::new(id, work_stealer, system_comm);
let mut idle_checker_interval = interval_at(Instant::now(), ONE_SECOND);
idle_checker_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
let mut msg_stream = pin!((
msgs_rx.map(StreamMessage::Commands),
runner_rx.map(StreamMessage::RunnerMsg),
@ -58,19 +58,14 @@ pub(super) async fn run<E: RunError>(
}
StreamMessage::Commands(WorkerMessage::PauseNotRunningTask { task_id, ack }) => {
if ack
.send(runner.pause_not_running_task(task_id).await)
.is_err()
{
if ack.send(runner.pause_not_running_task(task_id)).is_err() {
warn!("Resume task channel closed before sending ack");
}
}
StreamMessage::Commands(WorkerMessage::CancelNotRunningTask { task_id, ack }) => {
if ack
.send(runner.cancel_not_running_task(task_id).await)
.is_err()
{
runner.cancel_not_running_task(task_id);
if ack.send(Ok(())).is_err() {
warn!("Resume task channel closed before sending ack");
}
}
@ -87,11 +82,11 @@ pub(super) async fn run<E: RunError>(
StreamMessage::Commands(WorkerMessage::StealRequest(tx)) => runner.steal_request(tx),
StreamMessage::Commands(WorkerMessage::WakeUp) => runner.wake_up().await,
StreamMessage::Commands(WorkerMessage::WakeUp) => runner.wake_up(),
// Runner messages
StreamMessage::RunnerMsg(RunnerMessage::TaskOutput(task_id, Ok(output))) => {
runner.process_task_output(task_id, output).await
runner.process_task_output(task_id, output).await;
}
StreamMessage::RunnerMsg(RunnerMessage::TaskOutput(task_id, Err(()))) => {
@ -107,7 +102,7 @@ pub(super) async fn run<E: RunError>(
}
// Idle checking to steal some work
StreamMessage::IdleCheck => runner.idle_check().await,
StreamMessage::IdleCheck => runner.idle_check(),
}
}
}

View file

@ -25,7 +25,8 @@ use super::{
error::{RunError, SystemError},
system::SystemComm,
task::{
ExecStatus, InternalTaskExecStatus, Task, TaskId, TaskOutput, TaskStatus, TaskWorkState,
ExecStatus, InternalTaskExecStatus, Interrupter, Task, TaskId, TaskOutput, TaskStatus,
TaskWorkState, TaskWorktable,
},
},
RunnerMessage, TaskRunnerOutput, WorkStealer, WorkerId, ONE_SECOND,
@ -56,7 +57,7 @@ pub(super) enum PendingTaskKind {
}
impl PendingTaskKind {
fn with_priority(has_priority: bool) -> Self {
const fn with_priority(has_priority: bool) -> Self {
if has_priority {
Self::Priority
} else {
@ -92,7 +93,7 @@ enum WaitingSuspendedTask {
}
impl WaitingSuspendedTask {
fn is_waiting(&self) -> bool {
const fn is_waiting(&self) -> bool {
matches!(self, Self::Task(_))
}
}
@ -110,7 +111,7 @@ pub(super) struct Runner<E: RunError> {
is_idle: bool,
waiting_suspension: WaitingSuspendedTask,
abort_and_suspend_map: HashMap<TaskId, AbortAndSuspendSignalers>,
runner_tx: chan::Sender<RunnerMessage<E>>,
msgs_tx: chan::Sender<RunnerMessage<E>>,
current_task_handle: Option<RunningTask>,
suspend_on_shutdown_rx: chan::Receiver<RunnerMessage<E>>,
current_steal_task_handle: Option<JoinHandle<()>>,
@ -140,7 +141,7 @@ impl<E: RunError> Runner<E> {
is_idle: true,
waiting_suspension: WaitingSuspendedTask::None,
abort_and_suspend_map: HashMap::with_capacity(ABORT_AND_SUSPEND_MAP_INITIAL_SIZE),
runner_tx,
msgs_tx: runner_tx,
current_task_handle: None,
suspend_on_shutdown_rx: runner_rx.clone(),
current_steal_task_handle: None,
@ -153,12 +154,8 @@ impl<E: RunError> Runner<E> {
pub(super) fn total_tasks(&self) -> usize {
let priority_tasks_count = self.priority_tasks.len();
let current_task_count = if self.current_task_handle.is_some() {
1
} else {
0
};
let suspended_task_count = if self.suspended_task.is_some() { 1 } else { 0 };
let current_task_count = usize::from(self.current_task_handle.is_some());
let suspended_task_count = usize::from(self.suspended_task.is_some());
let tasks_count = self.tasks.len();
trace!(
@ -193,7 +190,7 @@ impl<E: RunError> Runner<E> {
let handle = spawn(run_single_task(
self.worker_id,
task_work_state,
self.runner_tx.clone(),
self.msgs_tx.clone(),
suspend_rx,
abort_rx,
));
@ -271,10 +268,7 @@ impl<E: RunError> Runner<E> {
}
}
pub(super) async fn pause_not_running_task(
&mut self,
task_id: TaskId,
) -> Result<(), SystemError> {
pub(super) fn pause_not_running_task(&mut self, task_id: TaskId) -> Result<(), SystemError> {
trace!(
"Pause not running task request: <worker_id='{}', task_id='{task_id}'>",
self.worker_id
@ -299,6 +293,14 @@ impl<E: RunError> Runner<E> {
}
}
if self.pause_suspended_task(task_id) || self.pause_task_from_queues(task_id) {
return Ok(());
}
Err(SystemError::TaskNotFound(task_id))
}
fn pause_suspended_task(&mut self, task_id: TaskId) -> bool {
if let Some(suspended_task) = &self.suspended_task {
if suspended_task.task.id() == task_id {
trace!(
@ -311,10 +313,14 @@ impl<E: RunError> Runner<E> {
self.suspended_task.take().expect("we just checked it"),
);
return Ok(());
return true;
}
}
false
}
fn pause_task_from_queues(&mut self, task_id: TaskId) -> bool {
if let Some(index) = self
.priority_tasks
.iter()
@ -327,7 +333,7 @@ impl<E: RunError> Runner<E> {
.expect("we just checked it"),
);
return Ok(());
return true;
}
if let Some(index) = self
@ -340,16 +346,13 @@ impl<E: RunError> Runner<E> {
self.tasks.remove(index).expect("we just checked it"),
);
return Ok(());
return true;
}
Err(SystemError::TaskNotFound(task_id))
false
}
pub(super) async fn cancel_not_running_task(
&mut self,
task_id: TaskId,
) -> Result<(), SystemError> {
pub(super) fn cancel_not_running_task(&mut self, task_id: TaskId) {
trace!(
"Cancel not running task request: <worker_id='{}', task_id='{task_id}'>",
self.worker_id
@ -362,7 +365,7 @@ impl<E: RunError> Runner<E> {
<worker_id='{}', task_id='{task_id}'>",
self.worker_id
);
return Ok(()); // The task will cancel itself
return; // The task will cancel itself
}
}
@ -379,10 +382,16 @@ impl<E: RunError> Runner<E> {
self.suspended_task.take().expect("we just checked it"),
);
return Ok(());
return;
}
}
self.cancel_task_from_queues(task_id);
// If the task is not found, then it's possible that the user already canceled it but still have the handle
}
fn cancel_task_from_queues(&mut self, task_id: TaskId) {
if let Some(index) = self
.priority_tasks
.iter()
@ -396,7 +405,7 @@ impl<E: RunError> Runner<E> {
.expect("we just checked it"),
);
return Ok(());
return;
}
if let Some(index) = self
@ -409,15 +418,34 @@ impl<E: RunError> Runner<E> {
task_id,
self.tasks.remove(index).expect("we just checked it"),
);
return Ok(());
}
// If the task is not found, then it's possible that the user already canceled it but still have the handle
Ok(())
}
#[inline(always)]
#[inline]
fn add_task_when_idle(
&mut self,
task_id: TaskId,
task_kind: PendingTaskKind,
task_work_state: TaskWorkState<E>,
) {
trace!(
"Idle worker will process the new task: <worker_id='{}', task_id='{task_id}'>",
self.worker_id
);
let handle = self.spawn_task_runner(task_id, task_work_state);
self.current_task_handle = Some(RunningTask {
task_id,
task_kind,
handle,
});
// Doesn't need to report working back to system as it already registered
// that we're not idle anymore when it dispatched the task to this worker
self.is_idle = false;
}
#[inline]
pub(super) async fn inner_add_task(
&mut self,
task_id: TaskId,
@ -425,22 +453,7 @@ impl<E: RunError> Runner<E> {
task_work_state: TaskWorkState<E>,
) -> TaskAddStatus {
if self.is_idle {
trace!(
"Idle worker will process the new task: <worker_id='{}', task_id='{task_id}'>",
self.worker_id
);
let handle = self.spawn_task_runner(task_id, task_work_state);
self.current_task_handle = Some(RunningTask {
task_id,
task_kind,
handle,
});
// Doesn't need to report working back to system as it already registered
// that we're not idle anymore when it dispatched the task to this worker
self.is_idle = false;
self.add_task_when_idle(task_id, task_kind, task_work_state);
TaskAddStatus::Running
} else {
let RunningTask {
@ -470,7 +483,15 @@ impl<E: RunError> Runner<E> {
TaskAddStatus::Enqueued
}
(PendingTaskKind::Priority, PendingTaskKind::Normal) => {
if !self.waiting_suspension.is_waiting() {
if self.waiting_suspension.is_waiting() {
trace!(
"Worker is already waiting for a task to be suspended, will enqueue new task: \
<worker_id='{}', task_id='{task_id}'>",
self.worker_id
);
self.priority_tasks.push_front(task_work_state);
} else {
trace!(
"Old task will be suspended: \
<worker_id='{}', new_task_id='{task_id}', old_task_id='{old_task_id}'>",
@ -496,14 +517,6 @@ impl<E: RunError> Runner<E> {
}
self.waiting_suspension = WaitingSuspendedTask::Task(*old_task_id);
} else {
trace!(
"Worker is already waiting for a task to be suspended, will enqueue new task: \
<worker_id='{}', task_id='{task_id}'>",
self.worker_id
);
self.priority_tasks.push_front(task_work_state);
}
TaskAddStatus::Running
@ -656,43 +669,42 @@ impl<E: RunError> Runner<E> {
self.abort_steal_task();
let Runner {
let Self {
worker_id,
tasks,
paused_tasks,
priority_tasks,
is_idle,
abort_and_suspend_map,
runner_tx,
msgs_tx: runner_tx,
mut current_task_handle,
suspend_on_shutdown_rx,
..
} = self;
let mut suspend_on_shutdown_rx = pin!(suspend_on_shutdown_rx);
if !is_idle {
if is_idle {
trace!("Worker is idle, no tasks to shutdown: <worker_id='{worker_id}'>");
} else {
trace!("Worker is busy, will shutdown tasks: <worker_id='{worker_id}'>");
if let Some(RunningTask {
task_id, handle, ..
}) = current_task_handle.take()
{
abort_and_suspend_map.into_iter().for_each(
|(task_id, AbortAndSuspendSignalers { suspend_tx, .. })| {
if suspend_tx.send(()).is_err() {
warn!(
"Shutdown request channel closed before sending abort signal: \
for (task_id, AbortAndSuspendSignalers { suspend_tx, .. }) in abort_and_suspend_map
{
if suspend_tx.send(()).is_err() {
warn!(
"Shutdown request channel closed before sending abort signal: \
<worker_id='{worker_id}', task_id='{task_id}'>"
);
} else {
trace!(
"Sent suspend signal for task on shutdown: \
);
} else {
trace!(
"Sent suspend signal for task on shutdown: \
<worker_id='{worker_id}', task_id='{task_id}'>"
);
}
},
);
);
}
}
if let Err(e) = handle.await {
error!("Task <worker_id='{worker_id}', task_id='{task_id}'> failed to join: {e:#?}");
@ -700,56 +712,8 @@ impl<E: RunError> Runner<E> {
runner_tx.close();
while let Some(runner_msg) = suspend_on_shutdown_rx.next().await {
match runner_msg {
RunnerMessage::TaskOutput(task_id, res) => match res {
Ok(TaskRunnerOutput {
task_work_state,
status,
}) => match status {
InternalTaskExecStatus::Done(out) => send_complete_task_response(
self.worker_id,
task_id,
task_work_state,
out,
),
InternalTaskExecStatus::Canceled => {
send_cancel_task_response(worker_id, task_id, task_work_state)
}
InternalTaskExecStatus::Suspend
| InternalTaskExecStatus::Paused => {
send_shutdown_task_response(
worker_id,
task_id,
task_work_state,
);
}
InternalTaskExecStatus::Error(e) => {
send_error_task_response(
worker_id,
task_id,
task_work_state,
e,
);
}
},
Err(()) => {
error!(
"Task <worker_id='{worker_id}', task_id='{task_id}'> failed to suspend on shutdown"
);
}
},
RunnerMessage::StoleTask(Some(task_work_state)) => {
send_shutdown_task_response(worker_id, task_id, task_work_state);
}
RunnerMessage::StoleTask(None) => {}
}
}
Self::process_tasks_being_suspended_on_shutdown(worker_id, suspend_on_shutdown_rx)
.await;
}
priority_tasks
@ -762,9 +726,7 @@ impl<E: RunError> Runner<E> {
task_work_state.task.id(),
task_work_state,
);
})
} else {
trace!("Worker is idle, no tasks to shutdown: <worker_id='{worker_id}'>");
});
}
trace!("Worker shutdown process completed: <worker_id='{worker_id}'>");
@ -774,6 +736,55 @@ impl<E: RunError> Runner<E> {
}
}
async fn process_tasks_being_suspended_on_shutdown(
worker_id: WorkerId,
suspend_on_shutdown_rx: chan::Receiver<RunnerMessage<E>>,
) {
let mut suspend_on_shutdown_rx = pin!(suspend_on_shutdown_rx);
while let Some(runner_msg) = suspend_on_shutdown_rx.next().await {
match runner_msg {
RunnerMessage::TaskOutput(task_id, res) => match res {
Ok(TaskRunnerOutput {
task_work_state,
status,
}) => match status {
InternalTaskExecStatus::Done(out) => {
send_complete_task_response(worker_id, task_id, task_work_state, out);
}
InternalTaskExecStatus::Canceled => {
send_cancel_task_response(worker_id, task_id, task_work_state);
}
InternalTaskExecStatus::Suspend | InternalTaskExecStatus::Paused => {
send_shutdown_task_response(worker_id, task_id, task_work_state);
}
InternalTaskExecStatus::Error(e) => {
send_error_task_response(worker_id, task_id, task_work_state, e);
}
},
Err(()) => {
error!(
"Task <worker_id='{worker_id}', task_id='{task_id}'> failed to suspend on shutdown"
);
}
},
RunnerMessage::StoleTask(Some(task_work_state)) => {
send_shutdown_task_response(
worker_id,
task_work_state.task.id(),
task_work_state,
);
}
RunnerMessage::StoleTask(None) => {}
}
}
}
pub(super) fn get_next_task(&mut self) -> Option<(PendingTaskKind, TaskWorkState<E>)> {
if let Some(task) = self.priority_tasks.pop_front() {
return Some((PendingTaskKind::Priority, task));
@ -792,28 +803,8 @@ impl<E: RunError> Runner<E> {
pub(super) fn steal_request(&mut self, tx: oneshot::Sender<Option<TaskWorkState<E>>>) {
trace!("Steal request: <worker_id='{}'>", self.worker_id);
if let Some((kind, task)) = self.get_next_task() {
let task_id = task.task.id();
self.task_kinds.remove(&task_id);
trace!(
"Stealing task: <worker_id='{}', task_id='{task_id}', kind='{kind:#?}'>",
self.worker_id
);
if let Err(Some(task)) = tx.send(Some(task)) {
warn!(
"Steal request channel closed before sending task: <worker_id='{}'>",
self.worker_id
);
match kind {
PendingTaskKind::Normal => self.tasks.push_front(task),
PendingTaskKind::Priority => self.priority_tasks.push_front(task),
PendingTaskKind::Suspended => self.suspended_task = Some(task),
}
self.task_kinds.insert(task_id, kind);
}
if let Some((kind, task_work_state)) = self.get_next_task() {
self.proceed_with_task_to_be_stolen(kind, task_work_state, tx);
} else {
trace!("No task to steal: <worker_id='{}'>", self.worker_id);
if tx.send(None).is_err() {
@ -826,7 +817,45 @@ impl<E: RunError> Runner<E> {
}
}
pub(super) async fn wake_up(&mut self) {
fn proceed_with_task_to_be_stolen(
&mut self,
kind: PendingTaskKind,
task_work_state: TaskWorkState<E>,
tx: oneshot::Sender<Option<TaskWorkState<E>>>,
) {
let task_id = task_work_state.task.id();
self.task_kinds.remove(&task_id);
trace!(
"Stealing task: <worker_id='{}', task_id='{task_id}', kind='{kind:#?}'>",
self.worker_id
);
if let Err(Some(task_work_state)) = tx.send(Some(task_work_state)) {
self.put_back_failed_to_stole_task(task_id, kind, task_work_state);
}
}
fn put_back_failed_to_stole_task(
&mut self,
id: TaskId,
kind: PendingTaskKind,
task_work_state: TaskWorkState<E>,
) {
warn!(
"Steal request channel closed before sending task: <worker_id='{}'>",
self.worker_id
);
match kind {
PendingTaskKind::Normal => self.tasks.push_front(task_work_state),
PendingTaskKind::Priority => self.priority_tasks.push_front(task_work_state),
PendingTaskKind::Suspended => self.suspended_task = Some(task_work_state),
}
self.task_kinds.insert(id, kind);
}
pub(super) fn wake_up(&mut self) {
if self.is_idle {
trace!(
"Worker is idle, waking up: <worker_id='{}'>",
@ -837,7 +866,7 @@ impl<E: RunError> Runner<E> {
self.current_steal_task_handle = Some(dispatch_steal_request(
self.worker_id,
self.work_stealer.clone(),
self.runner_tx.clone(),
self.msgs_tx.clone(),
));
} else {
trace!(
@ -853,7 +882,7 @@ impl<E: RunError> Runner<E> {
}
}
#[inline(always)]
#[inline]
pub(super) async fn dispatch_next_task(&mut self, finished_task_id: TaskId) {
trace!(
"Task finished and will try to process a new task: \
@ -915,7 +944,7 @@ impl<E: RunError> Runner<E> {
self.current_steal_task_handle = Some(dispatch_steal_request(
self.worker_id,
self.work_stealer.clone(),
self.runner_tx.clone(),
self.msgs_tx.clone(),
));
} else {
trace!(
@ -936,7 +965,7 @@ impl<E: RunError> Runner<E> {
) {
match status {
InternalTaskExecStatus::Done(out) => {
send_complete_task_response(self.worker_id, task_id, task_work_state, out)
send_complete_task_response(self.worker_id, task_id, task_work_state, out);
}
InternalTaskExecStatus::Paused => {
@ -948,11 +977,11 @@ impl<E: RunError> Runner<E> {
}
InternalTaskExecStatus::Canceled => {
send_cancel_task_response(self.worker_id, task_id, task_work_state)
send_cancel_task_response(self.worker_id, task_id, task_work_state);
}
InternalTaskExecStatus::Error(e) => {
send_error_task_response(self.worker_id, task_id, task_work_state, e)
send_error_task_response(self.worker_id, task_id, task_work_state, e);
}
InternalTaskExecStatus::Suspend => {
@ -975,7 +1004,7 @@ impl<E: RunError> Runner<E> {
self.dispatch_next_task(task_id).await;
}
pub(super) async fn idle_check(&mut self) {
pub(super) fn idle_check(&mut self) {
if self.is_idle {
trace!(
"Worker is idle for some time and will try to steal a task: <worker_id='{}'>",
@ -983,25 +1012,7 @@ impl<E: RunError> Runner<E> {
);
if self.current_steal_task_handle.is_none() {
let elapsed = self.last_steal_attempt_at.elapsed();
let required = (TEN_SECONDS * self.steal_attempts_count).min(ONE_MINUTE);
trace!(
"Steal attempt required cool down: <worker_id='{}', elapsed='{elapsed:?}', required='{required:?}', steal_attempts_count={}>",
self.worker_id, self.steal_attempts_count);
if elapsed > required {
self.current_steal_task_handle = Some(dispatch_steal_request(
self.worker_id,
self.work_stealer.clone(),
self.runner_tx.clone(),
));
self.last_steal_attempt_at = Instant::now();
} else {
trace!(
"Steal attempt still cooling down: <worker_id='{}', steal_attempts_count={}>",
self.worker_id,
self.steal_attempts_count
);
}
self.steal_attempt();
} else {
trace!(
"Steal task already running, ignoring on this idle check: <worker_id='{}'>",
@ -1009,32 +1020,61 @@ impl<E: RunError> Runner<E> {
);
}
// As we're idle, let's check if we need to do some memory cleanup
if self.tasks.capacity() > TASK_QUEUE_INITIAL_SIZE {
assert_eq!(self.tasks.len(), 0);
self.tasks.shrink_to(TASK_QUEUE_INITIAL_SIZE);
}
self.idle_memory_cleanup();
}
}
if self.task_kinds.capacity() > TASK_QUEUE_INITIAL_SIZE {
assert_eq!(self.task_kinds.len(), 0);
self.task_kinds.shrink_to(TASK_QUEUE_INITIAL_SIZE);
}
fn steal_attempt(&mut self) {
let elapsed = self.last_steal_attempt_at.elapsed();
let required = (TEN_SECONDS * self.steal_attempts_count).min(ONE_MINUTE);
trace!(
"Steal attempt required cool down: \
<worker_id='{}', elapsed='{elapsed:?}', required='{required:?}', steal_attempts_count={}>",
self.worker_id,
self.steal_attempts_count
);
if elapsed > required {
self.current_steal_task_handle = Some(dispatch_steal_request(
self.worker_id,
self.work_stealer.clone(),
self.msgs_tx.clone(),
));
self.last_steal_attempt_at = Instant::now();
} else {
trace!(
"Steal attempt still cooling down: <worker_id='{}', steal_attempts_count={}>",
self.worker_id,
self.steal_attempts_count
);
}
}
if self.priority_tasks.capacity() > PRIORITY_TASK_QUEUE_INITIAL_SIZE {
assert_eq!(self.priority_tasks.len(), 0);
self.priority_tasks
.shrink_to(PRIORITY_TASK_QUEUE_INITIAL_SIZE);
}
fn idle_memory_cleanup(&mut self) {
// As we're idle, let's check if we need to do some memory cleanup
if self.tasks.capacity() > TASK_QUEUE_INITIAL_SIZE {
assert_eq!(self.tasks.len(), 0);
self.tasks.shrink_to(TASK_QUEUE_INITIAL_SIZE);
}
if self.paused_tasks.capacity() != self.paused_tasks.len() {
self.paused_tasks.shrink_to_fit();
}
if self.task_kinds.capacity() > TASK_QUEUE_INITIAL_SIZE {
assert_eq!(self.task_kinds.len(), 0);
self.task_kinds.shrink_to(TASK_QUEUE_INITIAL_SIZE);
}
if self.abort_and_suspend_map.capacity() > ABORT_AND_SUSPEND_MAP_INITIAL_SIZE {
assert!(self.abort_and_suspend_map.len() < ABORT_AND_SUSPEND_MAP_INITIAL_SIZE);
self.abort_and_suspend_map
.shrink_to(ABORT_AND_SUSPEND_MAP_INITIAL_SIZE);
}
if self.priority_tasks.capacity() > PRIORITY_TASK_QUEUE_INITIAL_SIZE {
assert_eq!(self.priority_tasks.len(), 0);
self.priority_tasks
.shrink_to(PRIORITY_TASK_QUEUE_INITIAL_SIZE);
}
if self.paused_tasks.capacity() != self.paused_tasks.len() {
self.paused_tasks.shrink_to_fit();
}
if self.abort_and_suspend_map.capacity() > ABORT_AND_SUSPEND_MAP_INITIAL_SIZE {
assert!(self.abort_and_suspend_map.len() < ABORT_AND_SUSPEND_MAP_INITIAL_SIZE);
self.abort_and_suspend_map
.shrink_to(ABORT_AND_SUSPEND_MAP_INITIAL_SIZE);
}
}
@ -1088,27 +1128,16 @@ impl<E: RunError> Runner<E> {
}
}
async fn run_single_task<E: RunError>(
type RunTaskOutput<E> = (Box<dyn Task<E>>, Result<Result<ExecStatus, E>, SystemError>);
fn handle_run_task_attempt<E: RunError>(
worker_id: WorkerId,
TaskWorkState {
mut task,
worktable,
interrupter,
done_tx,
}: TaskWorkState<E>,
runner_tx: chan::Sender<RunnerMessage<E>>,
suspend_rx: oneshot::Receiver<()>,
abort_rx: oneshot::Receiver<oneshot::Sender<Result<(), SystemError>>>,
) {
let task_id = task.id();
worktable.set_started();
trace!("Running task: <worker_id='{worker_id}', task_id='{task_id}'>");
let handle = spawn({
let interrupter = Arc::clone(&interrupter);
task_id: TaskId,
mut task: Box<dyn Task<E>>,
worktable: &TaskWorktable,
interrupter: Arc<Interrupter>,
) -> JoinHandle<RunTaskOutput<E>> {
spawn({
let already_paused = worktable.is_paused();
let already_canceled = worktable.is_canceled();
let already_aborted = worktable.is_aborted();
@ -1140,73 +1169,68 @@ async fn run_single_task<E: RunError>(
(task, Ok(res))
}
}
});
})
}
let task_abort_handle = handle.abort_handle();
fn handle_task_suspension(
worker_id: WorkerId,
task_id: TaskId,
has_suspended: Arc<AtomicBool>,
worktable: Arc<TaskWorktable>,
suspend_rx: oneshot::Receiver<()>,
) -> JoinHandle<()> {
spawn(async move {
if suspend_rx.await.is_ok() {
let (tx, rx) = oneshot::channel();
let has_suspended = Arc::new(AtomicBool::new(false));
trace!("Suspend signal received: <worker_id='{worker_id}', task_id='{task_id}'>");
let suspender_handle = spawn({
let has_suspended = Arc::clone(&has_suspended);
let worktable = Arc::clone(&worktable);
async move {
if suspend_rx.await.is_ok() {
let (tx, rx) = oneshot::channel();
// The interrupter only knows about Pause and Cancel commands, we use pause as
// the suspend task feature should be invisible to the user
worktable.pause(tx).await;
trace!("Suspend signal received: <worker_id='{worker_id}', task_id='{task_id}'>");
// The interrupter only knows about Pause and Cancel commands, we use pause as
// the suspend task feature should be invisible to the user
worktable.pause(tx).await;
match rx.await {
Ok(Ok(())) => {
trace!("Suspending: <worker_id='{worker_id}', task_id='{task_id}'>");
has_suspended.store(true, Ordering::Relaxed);
}
Ok(Err(e)) => {
error!(
match rx.await {
Ok(Ok(())) => {
trace!("Suspending: <worker_id='{worker_id}', task_id='{task_id}'>");
has_suspended.store(true, Ordering::Relaxed);
}
Ok(Err(e)) => {
error!(
"Task <worker_id='{worker_id}', task_id='{task_id}'> failed to suspend: {e:#?}",
);
}
Err(_) => {
// The task probably finished before we could suspend it so the channel was dropped
trace!("Suspend channel closed: <worker_id='{worker_id}', task_id='{task_id}'>");
}
}
} else {
trace!(
"Suspend channel closed, task probably finished before we could suspend it: \
<worker_id='{worker_id}', task_id='{task_id}'>"
);
Err(_) => {
// The task probably finished before we could suspend it so the channel was dropped
trace!(
"Suspend channel closed: <worker_id='{worker_id}', task_id='{task_id}'>"
);
}
}
}
});
type SpawnedTaskRunOutput<E> = (Box<dyn Task<E>>, Result<Result<ExecStatus, E>, SystemError>);
enum RaceOutput<E: RunError> {
Completed(Result<SpawnedTaskRunOutput<E>, JoinError>),
Abort(oneshot::Sender<Result<(), SystemError>>),
}
match (async { RaceOutput::Completed(handle.await) }, async move {
if let Ok(tx) = abort_rx.await {
trace!("Aborting task: <worker_id='{worker_id}', task_id='{task_id}'>");
RaceOutput::Abort(tx)
} else {
// If the abort channel is closed, we should just ignore it and keep waiting for the task to finish
// as we're being suspended by the worker
trace!(
"Abort channel closed, will wait for task to finish: <worker_id='{worker_id}', task_id='{task_id}'>"
"Suspend channel closed, task probably finished before we could suspend it: \
<worker_id='{worker_id}', task_id='{task_id}'>"
);
pending().await
}
})
.race()
.await
{
RaceOutput::Completed(Ok((task, Ok(res)))) => {
}
type PartialTaskWorkState<E> = (
TaskId,
Arc<TaskWorktable>,
oneshot::Sender<Result<TaskStatus<E>, SystemError>>,
Arc<Interrupter>,
);
async fn emit_task_completed_message<E: RunError>(
worker_id: WorkerId,
run_task_output: RunTaskOutput<E>,
has_suspended: Arc<AtomicBool>,
(task_id, worktable, done_tx, interrupter): PartialTaskWorkState<E>,
runner_tx: chan::Sender<RunnerMessage<E>>,
) {
match run_task_output {
(task, Ok(res)) => {
trace!(
"Task completed ok: <worker_id='{worker_id}', task_id='{task_id}', result={res:?}>"
);
@ -1224,8 +1248,8 @@ async fn run_single_task<E: RunError>(
task_work_state: TaskWorkState {
task,
worktable,
interrupter,
done_tx,
interrupter,
},
status: internal_status,
})
@ -1234,7 +1258,7 @@ async fn run_single_task<E: RunError>(
.expect("Task runner channel closed while sending task output");
}
RaceOutput::Completed(Ok((_, Err(e)))) => {
(_, Err(e)) => {
trace!("Task had an error: <worker_id='{worker_id}', task_id='{task_id}'>");
if done_tx
@ -1253,6 +1277,78 @@ async fn run_single_task<E: RunError>(
.await
.expect("Task runner channel closed while sending task output");
}
}
}
async fn run_single_task<E: RunError>(
worker_id: WorkerId,
TaskWorkState {
task,
worktable,
interrupter,
done_tx,
}: TaskWorkState<E>,
runner_tx: chan::Sender<RunnerMessage<E>>,
suspend_rx: oneshot::Receiver<()>,
abort_rx: oneshot::Receiver<oneshot::Sender<Result<(), SystemError>>>,
) {
enum RaceOutput<E: RunError> {
Completed(Result<RunTaskOutput<E>, JoinError>),
Abort(oneshot::Sender<Result<(), SystemError>>),
}
let task_id = task.id();
worktable.set_started();
trace!("Running task: <worker_id='{worker_id}', task_id='{task_id}'>");
let handle = handle_run_task_attempt(
worker_id,
task_id,
task,
&worktable,
Arc::clone(&interrupter),
);
let task_abort_handle = handle.abort_handle();
let has_suspended = Arc::new(AtomicBool::new(false));
let suspender_handle = handle_task_suspension(
worker_id,
task_id,
Arc::clone(&has_suspended),
Arc::clone(&worktable),
suspend_rx,
);
match (async { RaceOutput::Completed(handle.await) }, async move {
if let Ok(tx) = abort_rx.await {
trace!("Aborting task: <worker_id='{worker_id}', task_id='{task_id}'>");
RaceOutput::Abort(tx)
} else {
// If the abort channel is closed, we should just ignore it and keep waiting for the task to finish
// as we're being suspended by the worker
trace!(
"Abort channel closed, will wait for task to finish: <worker_id='{worker_id}', task_id='{task_id}'>"
);
pending().await
}
})
.race()
.await
{
RaceOutput::Completed(Ok(run_task_output)) => {
emit_task_completed_message(
worker_id,
run_task_output,
has_suspended,
(task_id, worktable, done_tx, interrupter),
runner_tx,
)
.await;
}
RaceOutput::Completed(Err(join_error)) => {
error!("Task <id='{task_id}'> failed to join: {join_error:#?}",);

View file

@ -74,11 +74,11 @@ export type Procedures = {
{ key: "ephemeralFiles.deleteFiles", input: LibraryArgs<string[]>, result: null } |
{ key: "ephemeralFiles.renameFile", input: LibraryArgs<EphemeralRenameFileArgs>, result: null } |
{ key: "files.convertImage", input: LibraryArgs<ConvertImageArgs>, result: null } |
{ key: "files.copyFiles", input: LibraryArgs<FileCopierJobInit>, result: null } |
{ key: "files.copyFiles", input: LibraryArgs<OldFileCopierJobInit>, result: null } |
{ key: "files.createFolder", input: LibraryArgs<CreateFolderArgs>, result: string } |
{ key: "files.cutFiles", input: LibraryArgs<FileCutterJobInit>, result: null } |
{ key: "files.deleteFiles", input: LibraryArgs<FileDeleterJobInit>, result: null } |
{ key: "files.eraseFiles", input: LibraryArgs<FileEraserJobInit>, result: null } |
{ key: "files.cutFiles", input: LibraryArgs<OldFileCutterJobInit>, result: null } |
{ key: "files.deleteFiles", input: LibraryArgs<OldFileDeleterJobInit>, result: null } |
{ key: "files.eraseFiles", input: LibraryArgs<OldFileEraserJobInit>, result: null } |
{ key: "files.removeAccessTime", input: LibraryArgs<number[]>, result: null } |
{ key: "files.renameFile", input: LibraryArgs<RenameFileArgs>, result: null } |
{ key: "files.setFavorite", input: LibraryArgs<SetFavoriteArgs>, result: null } |
@ -190,9 +190,9 @@ export type Composite =
*/
"Live"
export type ConvertImageArgs = { location_id: number; file_path_id: number; delete_src: boolean; desired_extension: ConvertableExtension; quality_percentage: number | null }
export type ConvertImageArgs = { location_id: number; file_path_id: number; delete_src: boolean; desired_extension: ConvertibleExtension; quality_percentage: number | null }
export type ConvertableExtension = "bmp" | "dib" | "ff" | "gif" | "ico" | "jpg" | "jpeg" | "png" | "pnm" | "qoi" | "tga" | "icb" | "vda" | "vst" | "tiff" | "tif" | "hif" | "heif" | "heifs" | "heic" | "heics" | "avif" | "avci" | "avcs" | "svg" | "svgz" | "pdf" | "webp"
export type ConvertibleExtension = "bmp" | "dib" | "ff" | "gif" | "ico" | "jpg" | "jpeg" | "png" | "pnm" | "qoi" | "tga" | "icb" | "vda" | "vst" | "tiff" | "tif" | "hif" | "heif" | "heifs" | "heic" | "heics" | "avif" | "avci" | "avcs" | "svg" | "svgz" | "pdf" | "webp"
export type CreateEphemeralFolderArgs = { path: string; name: string | null }
@ -241,14 +241,6 @@ export type ExplorerSettings<TOrder> = { layoutMode: ExplorerLayout | null; grid
export type Feedback = { message: string; emoji: number }
export type FileCopierJobInit = { source_location_id: number; target_location_id: number; sources_file_path_ids: number[]; target_location_relative_directory_path: string }
export type FileCutterJobInit = { source_location_id: number; target_location_id: number; sources_file_path_ids: number[]; target_location_relative_directory_path: string }
export type FileDeleterJobInit = { location_id: number; file_path_ids: number[] }
export type FileEraserJobInit = { location_id: number; file_path_ids: number[]; passes: string }
export type FilePath = { id: number; pub_id: number[]; is_dir: boolean | null; cas_id: string | null; integrity_checksum: string | null; location_id: number | null; materialized_path: string | null; name: string | null; extension: string | null; hidden: boolean | null; size_in_bytes: string | null; size_in_bytes_bytes: number[] | null; inode: number[] | null; object_id: number | null; key_id: number | null; date_created: string | null; date_modified: string | null; date_indexed: string | null }
export type FilePathCursor = { isDir: boolean; variant: FilePathCursorVariant }
@ -455,14 +447,14 @@ export type NonIndexedPathItem = { path: string; name: string; extension: string
/**
* A type that can be used to return a group of `Reference<T>` and `CacheNode`'s
*
* You don't need to use this, it's just a shortcut to avoid having to write out the full type everytime.
* You don't need to use this, it's just a shortcut to avoid having to write out the full type every time.
*/
export type NormalisedResult<T> = { item: Reference<T>; nodes: CacheNode[] }
/**
* A type that can be used to return a group of `Reference<T>` and `CacheNode`'s
*
* You don't need to use this, it's just a shortcut to avoid having to write out the full type everytime.
* You don't need to use this, it's just a shortcut to avoid having to write out the full type every time.
*/
export type NormalisedResults<T> = { items: Reference<T>[]; nodes: CacheNode[] }
@ -499,6 +491,14 @@ export type ObjectWithFilePaths = { id: number; pub_id: number[]; kind: number |
export type ObjectWithFilePaths2 = { id: number; pub_id: number[]; kind: number | null; key_id: number | null; hidden: boolean | null; favorite: boolean | null; important: boolean | null; note: string | null; date_created: string | null; date_accessed: string | null; file_paths: Reference<FilePath>[] }
export type OldFileCopierJobInit = { source_location_id: number; target_location_id: number; sources_file_path_ids: number[]; target_location_relative_directory_path: string }
export type OldFileCutterJobInit = { source_location_id: number; target_location_id: number; sources_file_path_ids: number[]; target_location_relative_directory_path: string }
export type OldFileDeleterJobInit = { location_id: number; file_path_ids: number[] }
export type OldFileEraserJobInit = { location_id: number; file_path_ids: number[]; passes: string }
/**
* Represents the operating system which the remote peer is running.
* This is not used internally and predominantly is designed to be used for display purposes by the embedding application.
@ -514,7 +514,7 @@ export type P2PDiscoveryState = "Everyone" | "ContactsOnly" | "Disabled"
/**
* TODO: P2P event for the frontend
*/
export type P2PEvent = { type: "DiscoveredPeer"; identity: RemoteIdentity; metadata: PeerMetadata } | { type: "ExpiredPeer"; identity: RemoteIdentity } | { type: "ConnectedPeer"; identity: RemoteIdentity } | { type: "DisconnectedPeer"; identity: RemoteIdentity } | { type: "SpacedropRequest"; id: string; identity: RemoteIdentity; peer_name: string; files: string[] } | { type: "SpacedropProgress"; id: string; percent: number } | { type: "SpacedropTimedout"; id: string } | { type: "SpacedropRejected"; id: string }
export type P2PEvent = { type: "DiscoveredPeer"; identity: RemoteIdentity; metadata: PeerMetadata } | { type: "ExpiredPeer"; identity: RemoteIdentity } | { type: "ConnectedPeer"; identity: RemoteIdentity } | { type: "DisconnectedPeer"; identity: RemoteIdentity } | { type: "SpacedropRequest"; id: string; identity: RemoteIdentity; peer_name: string; files: string[] } | { type: "SpacedropProgress"; id: string; percent: number } | { type: "SpacedropTimedOut"; id: string } | { type: "SpacedropRejected"; id: string }
export type PeerMetadata = { name: string; operating_system: OperatingSystem | null; device_model: HardwareModel | null; version: string | null }