From 85e5eec993d3e805e432e4dd9013fbf015807cb0 Mon Sep 17 00:00:00 2001 From: "Ericson \"Fogo\" Soares" Date: Wed, 27 Jul 2022 04:06:34 -0300 Subject: [PATCH] Resumable Jobs + Lefthook Integration (#344) * Introducing Lefthook for git hooks automation * TypeScript typechecking and linting * Rust formatting and linting * Spellchecking (also corrected spell errors in some files) * Check links in md files * Introducing resumable jobs * Abstractions to pause and resume jobs automatically when application exits and is started * Changing database to use Bytes for UUID fields * Changing uuid fields on core to use uuid::Uuid instead of String * Updating some dependencies and introducing msg_pack serialization to save job state on database * Fixing some clippy warnings * Fixing a regression introduced on identifier job, doing too much db accesses concurrently --- .cspell/backend_custom_words.txt | 36 + .cspell/frontend_custom_words.txt | 82 ++ .cspell/project_words.txt | 38 + .eslintrc.js | 21 + Cargo.lock | 55 +- README.md | 4 +- apps/desktop/src-tauri/Cargo.toml | 2 + apps/desktop/src-tauri/src/main.rs | 67 +- apps/desktop/src-tauri/src/menu.rs | 36 +- apps/server/src/main.rs | 25 +- core/Cargo.toml | 8 +- core/bindings/JobReport.ts | 2 +- core/bindings/JobStatus.ts | 2 +- .../migration.sql | 145 ++ core/prisma/schema.prisma | 22 +- core/src/encode/thumb.rs | 226 ++-- core/src/file/cas/identifier.rs | 363 +++-- core/src/file/indexer/mod.rs | 384 +++++- core/src/file/indexer/scan.rs | 280 ---- core/src/file/mod.rs | 39 +- core/src/job/{jobs.rs => job_manager.rs} | 148 ++- core/src/job/mod.rs | 185 ++- core/src/job/worker.rs | 207 +-- core/src/lib.rs | 133 +- core/src/library/library_config.rs | 3 +- core/src/library/library_ctx.rs | 8 +- core/src/library/library_manager.rs | 37 +- core/src/library/mod.rs | 7 +- core/src/node/config.rs | 10 +- core/src/node/mod.rs | 6 +- core/src/sys/locations.rs | 62 +- core/src/sys/mod.rs | 4 +- core/src/tag/mod.rs | 13 +- cspell.config.yaml | 20 + docs/architecture/database.md | 4 +- docs/architecture/distributed-data-sync.md | 4 +- docs/product/roadmap.md | 6 +- lefthook.yml | 33 + package.json | 13 + packages/interface/src/AppRouter.tsx | 4 +- .../src/components/primitive/Checkbox.tsx | 4 +- .../src/screens/settings/Settings.tsx | 4 +- ...indSettings.tsx => KeybindingSettings.tsx} | 4 +- pnpm-lock.yaml | 1177 ++++++++++++++++- 44 files changed, 3065 insertions(+), 868 deletions(-) create mode 100644 .cspell/backend_custom_words.txt create mode 100644 .cspell/frontend_custom_words.txt create mode 100644 .cspell/project_words.txt create mode 100644 .eslintrc.js create mode 100644 core/prisma/migrations/20220722181530_alter_uuids_to_bytes/migration.sql delete mode 100644 core/src/file/indexer/scan.rs rename core/src/job/{jobs.rs => job_manager.rs} (62%) create mode 100644 cspell.config.yaml create mode 100644 lefthook.yml rename packages/interface/src/screens/settings/client/{KeybindSettings.tsx => KeybindingSettings.tsx} (72%) diff --git a/.cspell/backend_custom_words.txt b/.cspell/backend_custom_words.txt new file mode 100644 index 000000000..4df68be4e --- /dev/null +++ b/.cspell/backend_custom_words.txt @@ -0,0 +1,36 @@ +tauri +rustup +aarch +sdcore +dotenv +dotenvy +prismjs +actix +rtype +healthcheck +sdserver +ipfs +impls +crdt +quicktime +creationdate +imageops +thumbnailer +HEXLOWER +chrono +walkdir +thiserror +thumbstrip +repr +Deque +oneshot +sdlibrary +sdconfig +DOTFILE +sysinfo +initialising +struct +UHLC +CRDTs +PRRTT +filesystems \ No newline at end of file diff --git a/.cspell/frontend_custom_words.txt b/.cspell/frontend_custom_words.txt new file mode 100644 index 000000000..34fcd1c69 --- /dev/null +++ b/.cspell/frontend_custom_words.txt @@ -0,0 +1,82 @@ +pnpm +titlebar +consts +pallete +unlisten +svgr +middlewares +clsx +SDWEB +tryghost +tsparticles +Opencollective +Waitlist +heroicons +roadmap +semibold +noreferer +Rescan +subpackage +photoslibrary +fontsource +audiomp +audioogg +audiowav +browserslist +bsconfig +cheader +compodoc +cssmap +dartlang +dockerdebug +folderlight +folderopen +fontotf +fontttf +fontwoff +gopackage +haml +imagegif +imageico +imagejpg +imagepng +ipynb +jsmap +lighteditorconfig +nestjscontroller +nestjs +nestjsdecorator +nestjsfilter +nestjsguard +nestjsmodule +nestjsservice +npmlock +nuxt +opengl +photoshop +postcssconfig +powershelldata +reactjs +rjson +symfony +testjs +tmpl +typescriptdef +windi +yarnerror +unlisten +imagewebp +powershellmodule +reactts +testts +zustand +overscan +webp +headlessui +falsey +nums +lacie +classname +wunsub +immer +tada \ No newline at end of file diff --git a/.cspell/project_words.txt b/.cspell/project_words.txt new file mode 100644 index 000000000..63d75c0f9 --- /dev/null +++ b/.cspell/project_words.txt @@ -0,0 +1,38 @@ +spacedrive +spacedriveapp +vdfs +haoyuan +brendonovich +codegen +elon +deel +haden +akar +benja +haris +mehrzad +OSSC +josephjacks +rauch +ravikant +neha +narkhede +allred +lütke +tobiaslutke +justinhoffman +rywalker +zacharysmith +sanjay +poonen +mytton +davidmytton +richelsen +lesterlee +alluxio +augusto +marietti +vijay +sharma +naveen +noco \ No newline at end of file diff --git a/.eslintrc.js b/.eslintrc.js new file mode 100644 index 000000000..f7d773462 --- /dev/null +++ b/.eslintrc.js @@ -0,0 +1,21 @@ +module.exports = { + root: true, + parser: '@typescript-eslint/parser', + parserOptions: { + "project": [ + "apps/desktop/tsconfig.json", + "apps/web/tsconfig.json", + "apps/landing/tsconfig.json", + "packages/client/tsconfig.json", + "packages/interface/tsconfig.json", + "packages/ui/tsconfig.json", + ], + }, + plugins: [ + '@typescript-eslint', + ], + extends: [ + 'standard-with-typescript', + 'prettier', + ], +}; \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index 2df75e10d..e83b469b1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -453,6 +453,7 @@ dependencies = [ "num-bigint 0.3.3", "num-integer", "num-traits", + "serde", ] [[package]] @@ -3659,6 +3660,15 @@ dependencies = [ "url", ] +[[package]] +name = "ordered-float" +version = "2.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7940cf2ca942593318d07fcf2596cdca60a85c9e7fab408a5e21a4f9dcd40d87" +dependencies = [ + "num-traits", +] + [[package]] name = "ordermap" version = "0.3.5" @@ -4140,25 +4150,28 @@ dependencies = [ [[package]] name = "prisma-client-rust" -version = "0.4.1" -source = "git+https://github.com/Brendonovich/prisma-client-rust.git?tag=0.5.0#31a864ae09e835ff046469345bf6e413ae54a30f" +version = "0.5.2" +source = "git+https://github.com/Brendonovich/prisma-client-rust.git?tag=0.5.2#88ba860aedace6ebf0707fa989138fea87cd0d5e" dependencies = [ "bigdecimal 0.2.2", "chrono", "datamodel", + "indexmap", "prisma-models", "query-connector", "query-core", "serde", + "serde-value", "serde_json", "thiserror", "user-facing-errors", + "uuid 0.8.2", ] [[package]] name = "prisma-client-rust-cli" version = "0.4.1" -source = "git+https://github.com/Brendonovich/prisma-client-rust.git?tag=0.5.0#31a864ae09e835ff046469345bf6e413ae54a30f" +source = "git+https://github.com/Brendonovich/prisma-client-rust?tag=0.5.0#31a864ae09e835ff046469345bf6e413ae54a30f" dependencies = [ "convert_case 0.5.0", "datamodel", @@ -4702,6 +4715,28 @@ dependencies = [ "winapi", ] +[[package]] +name = "rmp" +version = "0.8.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "44519172358fd6d58656c86ab8e7fbc9e1490c3e8f14d35ed78ca0dd07403c9f" +dependencies = [ + "byteorder", + "num-traits", + "paste", +] + +[[package]] +name = "rmp-serde" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "25786b0d276110195fa3d6f3f31299900cf71dfbd6c28450f3f58a0e7f7a347e" +dependencies = [ + "byteorder", + "rmp", + "serde", +] + [[package]] name = "rusqlite" version = "0.25.4" @@ -4900,6 +4935,8 @@ dependencies = [ "log", "prisma-client-rust", "ring 0.17.0-alpha.11", + "rmp", + "rmp-serde", "serde", "serde_json", "sysinfo", @@ -5005,6 +5042,16 @@ dependencies = [ "serde_derive", ] +[[package]] +name = "serde-value" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f3a1a3341211875ef120e117ea7fd5228530ae7e7036a779fdc9117be6b3282c" +dependencies = [ + "ordered-float", + "serde", +] + [[package]] name = "serde_bytes" version = "0.11.6" @@ -5286,6 +5333,8 @@ version = "0.1.0" dependencies = [ "dotenvy", "env_logger", + "futures", + "log", "sdcore", "swift-rs", "tauri", diff --git a/README.md b/README.md index ae9203fd8..39333c6e5 100644 --- a/README.md +++ b/README.md @@ -59,7 +59,7 @@ For independent creatives, hoarders and those that want to own their digital foo # What is a VDFS? -A VDFS (virtual distributed filesystem) is a filesystem designed to work across a variety of storage layers. With a uniform API to manipulate and access content across many devices, VSFS is not restricted to a single machine. It achieves this by maintaining a virtual index of all storage locations, synchronizing the database between clients in realtime. This implementation also uses [CAS](https://en.wikipedia.org/wiki/Content-addressable_storage) (Content-addressable storage) to uniquely identify files, while keeping record of logical file paths relative to the storage locations. +A VDFS (virtual distributed filesystem) is a filesystem designed to work across a variety of storage layers. With a uniform API to manipulate and access content across many devices, VDFS is not restricted to a single machine. It achieves this by maintaining a virtual index of all storage locations, synchronizing the database between clients in realtime. This implementation also uses [CAS](https://en.wikipedia.org/wiki/Content-addressable_storage) (Content-addressable storage) to uniquely identify files, while keeping record of logical file paths relative to the storage locations. The first implementation of a VDFS can be found in this UC Berkeley [paper](https://www2.eecs.berkeley.edu/Pubs/TechRpts/2018/EECS-2018-29.pdf) by Haoyuan Li. This paper describes its use for cloud computing, however the underlying concepts can be translated to open consumer software. @@ -85,7 +85,7 @@ _Note: Links are for highlight purposes only until feature specific documentatio **To be developed (MVP):** - **[Photos](#features)** - Photo and video albums similar to Apple/Google photos. -- **[Search](#features)** - Deep search into your filesystem with a keybind, including offline locations. +- **[Search](#features)** - Deep search into your filesystem with a keybinding, including offline locations. - **[Tags](#features)** - Define routines on custom tags to automate workflows, easily tag files individually, in bulk and automatically via rules. - **[Extensions](#features)** - Build tools on top of Spacedrive, extend functionality and integrate third party services. Extension directory on [spacedrive.com/extensions](#features). diff --git a/apps/desktop/src-tauri/Cargo.toml b/apps/desktop/src-tauri/Cargo.toml index ab91a3359..ff2a9b3c2 100644 --- a/apps/desktop/src-tauri/Cargo.toml +++ b/apps/desktop/src-tauri/Cargo.toml @@ -20,9 +20,11 @@ sdcore = { path = "../../../core" } # Universal Dependencies tokio = { version = "1.17.0", features = ["sync"] } +futures = "^0.3" window-shadows = "0.1.2" env_logger = "0.9.0" dotenvy = "0.15.1" +log = { version = "0.4.17", features = ["max_level_trace"] } # macOS system libs [target.'cfg(target_os = "macos")'.dependencies] diff --git a/apps/desktop/src-tauri/src/main.rs b/apps/desktop/src-tauri/src/main.rs index 3c6210e46..67cce98ad 100644 --- a/apps/desktop/src-tauri/src/main.rs +++ b/apps/desktop/src-tauri/src/main.rs @@ -1,8 +1,13 @@ +use std::path::PathBuf; use std::time::{Duration, Instant}; use dotenvy::dotenv; +use futures::executor::block_on; +use log::{debug, error, info}; use sdcore::{ClientCommand, ClientQuery, CoreEvent, CoreResponse, Node, NodeController}; -use tauri::{api::path, Manager}; +use tauri::{api::path, Manager, RunEvent}; +use tokio::sync::oneshot; + #[cfg(target_os = "macos")] mod macos; mod menu; @@ -15,7 +20,7 @@ async fn client_query_transport( match core.query(data).await { Ok(response) => Ok(response), Err(err) => { - println!("query error: {:?}", err); + error!("query error: {:?}", err); Err(err.to_string()) } } @@ -42,18 +47,42 @@ async fn app_ready(app_handle: tauri::AppHandle) { window.show().unwrap(); } +struct ShutdownManager { + shutdown_tx: Option>, + shutdown_completion_rx: Option>, +} + +impl ShutdownManager { + fn shutdown(&mut self) { + if let Some(sender) = self.shutdown_tx.take() { + sender.send(()).unwrap(); + if let Some(receiver) = self.shutdown_completion_rx.take() { + block_on(receiver).expect("failed to receive shutdown completion signal"); + } + } + } +} + #[tokio::main] async fn main() { dotenv().ok(); env_logger::init(); - let mut data_dir = path::data_dir().unwrap_or(std::path::PathBuf::from("./")); - data_dir = data_dir.join("spacedrive"); + let data_dir = path::data_dir() + .unwrap_or_else(|| PathBuf::from("./")) + .join("spacedrive"); + // create an instance of the core - let (controller, mut event_receiver, node) = Node::new(data_dir).await; - tokio::spawn(node.start()); + let (controller, mut event_receiver, node, shutdown_completion_rx) = Node::new(data_dir).await; + let (shutdown_tx, shutdown_rx) = oneshot::channel(); + let mut shutdown_manager = ShutdownManager { + shutdown_tx: Some(shutdown_tx), + shutdown_completion_rx: Some(shutdown_completion_rx), + }; + + tokio::spawn(node.start(shutdown_rx)); // create tauri app - tauri::Builder::default() + let app = tauri::Builder::default() // pass controller to the tauri state manager .manage(controller) .setup(|app| { @@ -104,13 +133,31 @@ async fn main() { Ok(()) }) - .on_menu_event(|event| menu::handle_menu_event(event)) + .on_menu_event(menu::handle_menu_event) .invoke_handler(tauri::generate_handler![ client_query_transport, client_command_transport, app_ready, ]) .menu(menu::get_menu()) - .run(tauri::generate_context!()) - .expect("error while running tauri application"); + .build(tauri::generate_context!()) + .expect("error while building tauri application"); + + app.run(move |app_handler, event| { + if let RunEvent::ExitRequested { .. } = event { + debug!("Closing all open windows..."); + app_handler + .windows() + .iter() + .for_each(|(window_name, window)| { + debug!("closing window: {window_name}"); + if let Err(e) = window.close() { + error!("failed to close window '{}': {:#?}", window_name, e); + } + }); + info!("Spacedrive shutting down..."); + shutdown_manager.shutdown(); + app_handler.exit(0); + } + }) } diff --git a/apps/desktop/src-tauri/src/menu.rs b/apps/desktop/src-tauri/src/menu.rs index 8e501161b..66e272f80 100644 --- a/apps/desktop/src-tauri/src/menu.rs +++ b/apps/desktop/src-tauri/src/menu.rs @@ -1,6 +1,8 @@ use std::env::consts; -use tauri::{AboutMetadata, CustomMenuItem, Menu, MenuItem, Submenu, WindowMenuEvent, Wry}; +use tauri::{ + AboutMetadata, CustomMenuItem, Manager, Menu, MenuItem, Submenu, WindowMenuEvent, Wry, +}; pub(crate) fn get_menu() -> Menu { match consts::OS { @@ -53,28 +55,25 @@ fn custom_menu_bar() -> Menu { CustomMenuItem::new("reload_app".to_string(), "Reload").accelerator("CmdOrCtrl+R"), ); - let view_menu = view_menu.add_item( + view_menu.add_item( CustomMenuItem::new("toggle_devtools".to_string(), "Toggle Developer Tools") .accelerator("CmdOrCtrl+Alt+I"), - ); - - view_menu + ) }; - let menu = Menu::new() + Menu::new() .add_submenu(Submenu::new("Spacedrive", app_menu)) .add_submenu(Submenu::new("File", file_menu)) .add_submenu(Submenu::new("Edit", edit_menu)) .add_submenu(Submenu::new("View", view_menu)) - .add_submenu(Submenu::new("Window", window_menu)); - - menu + .add_submenu(Submenu::new("Window", window_menu)) } pub(crate) fn handle_menu_event(event: WindowMenuEvent) { match event.menu_item_id() { "quit" => { - std::process::exit(0); + let app = event.window().app_handle(); + app.exit(0); } "close" => { let window = event.window(); @@ -82,7 +81,6 @@ pub(crate) fn handle_menu_event(event: WindowMenuEvent) { #[cfg(debug_assertions)] if window.is_devtools_open() { window.close_devtools(); - return; } else { window.close().unwrap(); } @@ -91,17 +89,17 @@ pub(crate) fn handle_menu_event(event: WindowMenuEvent) { window.close().unwrap(); } "reload_app" => { - event - .window() - .with_webview(|webview| { - #[cfg(target_os = "macos")] - { + #[cfg(target_os = "macos")] + { + event + .window() + .with_webview(|webview| { use crate::macos::reload_webview; reload_webview(webview.inner() as _); - } - }) - .unwrap(); + }) + .unwrap(); + } } #[cfg(debug_assertions)] "toggle_devtools" => { diff --git a/apps/server/src/main.rs b/apps/server/src/main.rs index 7f74760fb..1bdef0dd6 100644 --- a/apps/server/src/main.rs +++ b/apps/server/src/main.rs @@ -18,9 +18,9 @@ use actix_web::{ use actix_web_actors::ws; use serde::{Deserialize, Serialize}; -use tokio::sync::mpsc; +use tokio::sync::{mpsc, oneshot}; -const DATA_DIR_ENV_VAR: &'static str = "DATA_DIR"; +const DATA_DIR_ENV_VAR: &str = "DATA_DIR"; #[derive(Serialize)] pub struct Event(CoreEvent); @@ -182,10 +182,8 @@ impl StreamHandler> for Socket { }; fut.into_actor(self).spawn(ctx); - - () }, - _ => (), + _ => {}, } } @@ -222,12 +220,12 @@ impl Handler for Socket { #[get("/")] async fn index() -> impl Responder { - format!("Spacedrive Server!") + "Spacedrive Server!" } #[get("/health")] async fn healthcheck() -> impl Responder { - format!("OK") + "OK" } #[get("/ws")] @@ -237,15 +235,14 @@ async fn ws_handler( controller: web::Data, server: web::Data>, ) -> Result { - let resp = ws::start( + ws::start( Socket { node_controller: controller, event_server: server, }, &req, stream, - ); - resp + ) } async fn not_found() -> impl Responder { @@ -284,14 +281,16 @@ async fn setup() -> (mpsc::Receiver, web::Data) { std::env::current_dir() .expect( - "Unable to get your currrent directory. Maybe try setting $DATA_DIR?", + "Unable to get your current directory. Maybe try setting $DATA_DIR?", ) .join("sdserver_data") }, }; - let (controller, event_receiver, node) = Node::new(data_dir_path).await; - tokio::spawn(node.start()); + let (controller, event_receiver, node, _shutdown_completion_rx) = + Node::new(data_dir_path).await; + let (_shutdown_tx, shutdown_rx) = oneshot::channel(); + tokio::spawn(node.start(shutdown_rx)); (event_receiver, web::Data::new(controller)) } diff --git a/core/Cargo.toml b/core/Cargo.toml index 6e6640a65..bc6968e0b 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -22,19 +22,21 @@ futures = "0.3" data-encoding = "2.3.2" ring = "0.17.0-alpha.10" int-enum = "0.4.0" +rmp = "^0.8.11" +rmp-serde = "^1.1.0" # Project dependencies ts-rs = { version = "6.2", features = ["chrono-impl", "uuid-impl", "serde-compat"] } -prisma-client-rust = { git = "https://github.com/Brendonovich/prisma-client-rust.git", tag = "0.5.0" } +prisma-client-rust = { git = "https://github.com/Brendonovich/prisma-client-rust.git", tag = "0.5.2" } walkdir = "^2.3.2" uuid = { version = "^0.8.2", features = ["v4", "serde"]} sysinfo = "0.23.9" thiserror = "1.0.30" core-derive = { path = "./derive" } -tokio = { version = "1.17.0", features = ["sync", "rt"] } +tokio = { version = "^1.17.0", features = ["sync", "rt"] } include_dir = { version = "0.7.2", features = ["glob"] } -async-trait = "0.1.52" +async-trait = "^0.1.52" image = "0.24.1" webp = "0.2.2" ffmpeg-next = "5.0.3" diff --git a/core/bindings/JobReport.ts b/core/bindings/JobReport.ts index bd25c8d21..d6f964bee 100644 --- a/core/bindings/JobReport.ts +++ b/core/bindings/JobReport.ts @@ -1,4 +1,4 @@ // This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. import type { JobStatus } from "./JobStatus"; -export interface JobReport { id: string, name: string, data: string | null, date_created: string, date_modified: string, status: JobStatus, task_count: number, completed_task_count: number, message: string, seconds_elapsed: string, } \ No newline at end of file +export interface JobReport { id: string, name: string, data: Array | null, date_created: string, date_modified: string, status: JobStatus, task_count: number, completed_task_count: number, message: string, seconds_elapsed: string, } \ No newline at end of file diff --git a/core/bindings/JobStatus.ts b/core/bindings/JobStatus.ts index 58c3a06b3..88aa16ae5 100644 --- a/core/bindings/JobStatus.ts +++ b/core/bindings/JobStatus.ts @@ -1,3 +1,3 @@ // This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. -export type JobStatus = "Queued" | "Running" | "Completed" | "Canceled" | "Failed"; \ No newline at end of file +export type JobStatus = "Queued" | "Running" | "Completed" | "Canceled" | "Failed" | "Paused"; \ No newline at end of file diff --git a/core/prisma/migrations/20220722181530_alter_uuids_to_bytes/migration.sql b/core/prisma/migrations/20220722181530_alter_uuids_to_bytes/migration.sql new file mode 100644 index 000000000..fb53d5d65 --- /dev/null +++ b/core/prisma/migrations/20220722181530_alter_uuids_to_bytes/migration.sql @@ -0,0 +1,145 @@ +/* + Warnings: + + - The primary key for the `jobs` table will be changed. If it partially fails, the table could be left without primary key constraint. + - You are about to alter the column `data` on the `jobs` table. The data in that column could be lost. The data in that column will be cast from `String` to `Binary`. + - You are about to alter the column `id` on the `jobs` table. The data in that column could be lost. The data in that column will be cast from `String` to `Binary`. + - You are about to alter the column `pub_id` on the `nodes` table. The data in that column could be lost. The data in that column will be cast from `String` to `Binary`. + - You are about to alter the column `pub_id` on the `tags` table. The data in that column could be lost. The data in that column will be cast from `String` to `Binary`. + - You are about to alter the column `pub_id` on the `labels` table. The data in that column could be lost. The data in that column will be cast from `String` to `Binary`. + - You are about to alter the column `pub_id` on the `spaces` table. The data in that column could be lost. The data in that column will be cast from `String` to `Binary`. + - You are about to alter the column `pub_id` on the `locations` table. The data in that column could be lost. The data in that column will be cast from `String` to `Binary`. + - You are about to alter the column `record_id` on the `sync_events` table. The data in that column could be lost. The data in that column will be cast from `String` to `Binary`. + - You are about to alter the column `pub_id` on the `albums` table. The data in that column could be lost. The data in that column will be cast from `String` to `Binary`. + - You are about to alter the column `pub_id` on the `comments` table. The data in that column could be lost. The data in that column will be cast from `String` to `Binary`. + +*/ +-- RedefineTables +PRAGMA foreign_keys=OFF; +CREATE TABLE "new_jobs" ( + "id" BLOB NOT NULL PRIMARY KEY, + "name" TEXT NOT NULL, + "node_id" INTEGER NOT NULL, + "action" INTEGER NOT NULL, + "status" INTEGER NOT NULL DEFAULT 0, + "data" BLOB, + "task_count" INTEGER NOT NULL DEFAULT 1, + "completed_task_count" INTEGER NOT NULL DEFAULT 0, + "date_created" DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, + "date_modified" DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, + "seconds_elapsed" INTEGER NOT NULL DEFAULT 0, + CONSTRAINT "jobs_node_id_fkey" FOREIGN KEY ("node_id") REFERENCES "nodes" ("id") ON DELETE CASCADE ON UPDATE CASCADE +); +INSERT INTO "new_jobs" ("action", "completed_task_count", "data", "date_created", "date_modified", "id", "name", "node_id", "seconds_elapsed", "status", "task_count") SELECT "action", "completed_task_count", "data", "date_created", "date_modified", "id", "name", "node_id", "seconds_elapsed", "status", "task_count" FROM "jobs"; +DROP TABLE "jobs"; +ALTER TABLE "new_jobs" RENAME TO "jobs"; +CREATE TABLE "new_nodes" ( + "id" INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, + "pub_id" BLOB NOT NULL, + "name" TEXT NOT NULL, + "platform" INTEGER NOT NULL DEFAULT 0, + "version" TEXT, + "last_seen" DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, + "timezone" TEXT, + "date_created" DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP +); +INSERT INTO "new_nodes" ("date_created", "id", "last_seen", "name", "platform", "pub_id", "timezone", "version") SELECT "date_created", "id", "last_seen", "name", "platform", "pub_id", "timezone", "version" FROM "nodes"; +DROP TABLE "nodes"; +ALTER TABLE "new_nodes" RENAME TO "nodes"; +CREATE UNIQUE INDEX "nodes_pub_id_key" ON "nodes"("pub_id"); +CREATE TABLE "new_tags" ( + "id" INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, + "pub_id" BLOB NOT NULL, + "name" TEXT, + "color" TEXT, + "total_files" INTEGER DEFAULT 0, + "redundancy_goal" INTEGER DEFAULT 1, + "date_created" DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, + "date_modified" DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP +); +INSERT INTO "new_tags" ("color", "date_created", "date_modified", "id", "name", "pub_id", "redundancy_goal", "total_files") SELECT "color", "date_created", "date_modified", "id", "name", "pub_id", "redundancy_goal", "total_files" FROM "tags"; +DROP TABLE "tags"; +ALTER TABLE "new_tags" RENAME TO "tags"; +CREATE UNIQUE INDEX "tags_pub_id_key" ON "tags"("pub_id"); +CREATE TABLE "new_labels" ( + "id" INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, + "pub_id" BLOB NOT NULL, + "name" TEXT, + "date_created" DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, + "date_modified" DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP +); +INSERT INTO "new_labels" ("date_created", "date_modified", "id", "name", "pub_id") SELECT "date_created", "date_modified", "id", "name", "pub_id" FROM "labels"; +DROP TABLE "labels"; +ALTER TABLE "new_labels" RENAME TO "labels"; +CREATE UNIQUE INDEX "labels_pub_id_key" ON "labels"("pub_id"); +CREATE TABLE "new_spaces" ( + "id" INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, + "pub_id" BLOB NOT NULL, + "name" TEXT, + "description" TEXT, + "date_created" DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, + "date_modified" DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP +); +INSERT INTO "new_spaces" ("date_created", "date_modified", "description", "id", "name", "pub_id") SELECT "date_created", "date_modified", "description", "id", "name", "pub_id" FROM "spaces"; +DROP TABLE "spaces"; +ALTER TABLE "new_spaces" RENAME TO "spaces"; +CREATE UNIQUE INDEX "spaces_pub_id_key" ON "spaces"("pub_id"); +CREATE TABLE "new_locations" ( + "id" INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, + "pub_id" BLOB NOT NULL, + "node_id" INTEGER, + "name" TEXT, + "local_path" TEXT, + "total_capacity" INTEGER, + "available_capacity" INTEGER, + "filesystem" TEXT, + "disk_type" INTEGER, + "is_removable" BOOLEAN, + "is_online" BOOLEAN NOT NULL DEFAULT true, + "date_created" DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, + CONSTRAINT "locations_node_id_fkey" FOREIGN KEY ("node_id") REFERENCES "nodes" ("id") ON DELETE SET NULL ON UPDATE CASCADE +); +INSERT INTO "new_locations" ("available_capacity", "date_created", "disk_type", "filesystem", "id", "is_online", "is_removable", "local_path", "name", "node_id", "pub_id", "total_capacity") SELECT "available_capacity", "date_created", "disk_type", "filesystem", "id", "is_online", "is_removable", "local_path", "name", "node_id", "pub_id", "total_capacity" FROM "locations"; +DROP TABLE "locations"; +ALTER TABLE "new_locations" RENAME TO "locations"; +CREATE UNIQUE INDEX "locations_pub_id_key" ON "locations"("pub_id"); +CREATE TABLE "new_sync_events" ( + "id" INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, + "node_id" INTEGER NOT NULL, + "timestamp" TEXT NOT NULL, + "record_id" BLOB NOT NULL, + "kind" INTEGER NOT NULL, + "column" TEXT, + "value" TEXT NOT NULL, + CONSTRAINT "sync_events_node_id_fkey" FOREIGN KEY ("node_id") REFERENCES "nodes" ("id") ON DELETE RESTRICT ON UPDATE CASCADE +); +INSERT INTO "new_sync_events" ("column", "id", "kind", "node_id", "record_id", "timestamp", "value") SELECT "column", "id", "kind", "node_id", "record_id", "timestamp", "value" FROM "sync_events"; +DROP TABLE "sync_events"; +ALTER TABLE "new_sync_events" RENAME TO "sync_events"; +CREATE TABLE "new_albums" ( + "id" INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, + "pub_id" BLOB NOT NULL, + "name" TEXT NOT NULL, + "is_hidden" BOOLEAN NOT NULL DEFAULT false, + "date_created" DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, + "date_modified" DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP +); +INSERT INTO "new_albums" ("date_created", "date_modified", "id", "is_hidden", "name", "pub_id") SELECT "date_created", "date_modified", "id", "is_hidden", "name", "pub_id" FROM "albums"; +DROP TABLE "albums"; +ALTER TABLE "new_albums" RENAME TO "albums"; +CREATE UNIQUE INDEX "albums_pub_id_key" ON "albums"("pub_id"); +CREATE TABLE "new_comments" ( + "id" INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, + "pub_id" BLOB NOT NULL, + "content" TEXT NOT NULL, + "date_created" DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, + "date_modified" DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, + "file_id" INTEGER, + CONSTRAINT "comments_file_id_fkey" FOREIGN KEY ("file_id") REFERENCES "files" ("id") ON DELETE SET NULL ON UPDATE CASCADE +); +INSERT INTO "new_comments" ("content", "date_created", "date_modified", "file_id", "id", "pub_id") SELECT "content", "date_created", "date_modified", "file_id", "id", "pub_id" FROM "comments"; +DROP TABLE "comments"; +ALTER TABLE "new_comments" RENAME TO "comments"; +CREATE UNIQUE INDEX "comments_pub_id_key" ON "comments"("pub_id"); +PRAGMA foreign_key_check; +PRAGMA foreign_keys=ON; diff --git a/core/prisma/schema.prisma b/core/prisma/schema.prisma index 47a0da0c1..5808ff3e6 100644 --- a/core/prisma/schema.prisma +++ b/core/prisma/schema.prisma @@ -23,7 +23,7 @@ model SyncEvent { node_id Int timestamp String // individual record pub id OR compound many-to-many pub ids - record_id String + record_id Bytes // the type of operation, I.E: CREATE, UPDATE, DELETE as an enum kind Int // the column name for atomic update operations @@ -51,7 +51,7 @@ model Statistics { model Node { id Int @id @default(autoincrement()) - pub_id String @unique + pub_id Bytes @unique name String platform Int @default(0) version String? @@ -67,7 +67,7 @@ model Node { } model Volume { - id Int @id() @default(autoincrement()) + id Int @id @default(autoincrement()) node_id Int name String mount_point String @@ -84,7 +84,7 @@ model Volume { model Location { id Int @id @default(autoincrement()) - pub_id String @unique + pub_id Bytes @unique node_id Int? name String? local_path String? @@ -226,7 +226,7 @@ model MediaData { model Tag { id Int @id @default(autoincrement()) - pub_id String @unique + pub_id Bytes @unique name String? color String? total_files Int? @default(0) @@ -253,7 +253,7 @@ model TagOnFile { model Label { id Int @id @default(autoincrement()) - pub_id String @unique + pub_id Bytes @unique name String? date_created DateTime @default(now()) date_modified DateTime @default(now()) @@ -277,7 +277,7 @@ model LabelOnFile { model Space { id Int @id @default(autoincrement()) - pub_id String @unique + pub_id Bytes @unique name String? description String? date_created DateTime @default(now()) @@ -301,12 +301,12 @@ model FileInSpace { } model Job { - id String @id + id Bytes @id name String node_id Int action Int status Int @default(0) - data String? + data Bytes? task_count Int @default(1) completed_task_count Int @default(0) @@ -320,7 +320,7 @@ model Job { model Album { id Int @id @default(autoincrement()) - pub_id String @unique + pub_id Bytes @unique name String is_hidden Boolean @default(false) @@ -347,7 +347,7 @@ model FileInAlbum { model Comment { id Int @id @default(autoincrement()) - pub_id String @unique + pub_id Bytes @unique content String date_created DateTime @default(now()) date_modified DateTime @default(now()) diff --git a/core/src/encode/thumb.rs b/core/src/encode/thumb.rs index 065ca20d5..bde57e343 100644 --- a/core/src/encode/thumb.rs +++ b/core/src/encode/thumb.rs @@ -1,46 +1,67 @@ use crate::{ - job::{Job, JobReportUpdate, JobResult, WorkerContext}, + job::{JobError, JobReportUpdate, JobResult, JobState, StatefulJob, WorkerContext}, library::LibraryContext, prisma::file_path, sys, CoreEvent, }; use image::{self, imageops, DynamicImage, GenericImageView}; -use log::{debug, error, info}; -use std::error::Error; -use std::ops::Deref; -use std::path::{Path, PathBuf}; -use tokio::fs; +use log::{error, info, trace}; +use serde::{Deserialize, Serialize}; +use std::{ + error::Error, + ops::Deref, + path::{Path, PathBuf}, +}; +use tokio::{fs, task::block_in_place}; use webp::Encoder; -#[derive(Debug, Clone)] -pub struct ThumbnailJob { +static THUMBNAIL_SIZE_FACTOR: f32 = 0.2; +static THUMBNAIL_QUALITY: f32 = 30.0; +pub static THUMBNAIL_CACHE_DIR_NAME: &str = "thumbnails"; +pub const THUMBNAIL_JOB_NAME: &str = "thumbnailer"; + +pub struct ThumbnailJob {} + +#[derive(Serialize, Deserialize, Clone)] +pub struct ThumbnailJobInit { pub location_id: i32, pub path: PathBuf, pub background: bool, } -static THUMBNAIL_SIZE_FACTOR: f32 = 0.2; -static THUMBNAIL_QUALITY: f32 = 30.0; -pub static THUMBNAIL_CACHE_DIR_NAME: &str = "thumbnails"; +#[derive(Debug, Serialize, Deserialize)] +pub struct ThumbnailJobState { + thumbnail_dir: PathBuf, + root_path: PathBuf, +} #[async_trait::async_trait] -impl Job for ThumbnailJob { +impl StatefulJob for ThumbnailJob { + type Init = ThumbnailJobInit; + type Data = ThumbnailJobState; + type Step = file_path::Data; + fn name(&self) -> &'static str { - "thumbnailer" + THUMBNAIL_JOB_NAME } - async fn run(&self, ctx: WorkerContext) -> JobResult { + + async fn init( + &self, + ctx: WorkerContext, + state: &mut JobState, + ) -> JobResult { let library_ctx = ctx.library_ctx(); let thumbnail_dir = library_ctx .config() .data_directory() .join(THUMBNAIL_CACHE_DIR_NAME) - .join(self.location_id.to_string()); + .join(state.init.location_id.to_string()); - let location = sys::get_location(&library_ctx, self.location_id).await?; + let location = sys::get_location(&library_ctx, state.init.location_id).await?; info!( "Searching for images in location {} at path {:#?}", - location.id, self.path + location.id, state.init.path ); // create all necessary directories if they don't exist @@ -48,7 +69,8 @@ impl Job for ThumbnailJob { let root_path = location.path.unwrap(); // query database for all files in this location that need thumbnails - let image_files = get_images(&library_ctx, self.location_id, &self.path).await?; + let image_files = + get_images(&library_ctx, state.init.location_id, &state.init.path).await?; info!("Found {:?} files", image_files.len()); ctx.progress(vec![ @@ -56,59 +78,95 @@ impl Job for ThumbnailJob { JobReportUpdate::Message(format!("Preparing to process {} files", image_files.len())), ]); - for (i, image_file) in image_files.iter().enumerate() { - ctx.progress(vec![JobReportUpdate::Message(format!( - "Processing {}", - image_file.materialized_path - ))]); + state.data = Some(ThumbnailJobState { + thumbnail_dir, + root_path, + }); + state.steps = image_files.into_iter().collect(); - // assemble the file path - let path = Path::new(&root_path).join(&image_file.materialized_path); - debug!("image_file {:?}", image_file); + Ok(()) + } - // get cas_id, if none found skip - let cas_id = match image_file.file() { - Ok(file) => { - if let Some(f) = file { - f.cas_id.clone() - } else { - info!( - "skipping thumbnail generation for {}", - image_file.materialized_path - ); - continue; - } + async fn execute_step( + &self, + ctx: WorkerContext, + state: &mut JobState, + ) -> JobResult { + let step = &state.steps[0]; + ctx.progress(vec![JobReportUpdate::Message(format!( + "Processing {}", + step.materialized_path + ))]); + + let data = state + .data + .as_ref() + .expect("critical error: missing data on job state"); + + // assemble the file path + let path = data.root_path.join(&step.materialized_path); + trace!("image_file {:?}", step); + + // get cas_id, if none found skip + let cas_id = match step.file() { + Ok(file) => { + if let Some(f) = file { + f.cas_id.clone() + } else { + info!( + "skipping thumbnail generation for {}", + step.materialized_path + ); + return Ok(()); } - Err(_) => { - error!("Error getting cas_id {:?}", image_file.materialized_path); - continue; - } - }; - - // Define and write the WebP-encoded file to a given path - let output_path = thumbnail_dir.join(&cas_id).with_extension("webp"); - - // check if file exists at output path - if !output_path.exists() { - info!("Writing {:?} to {:?}", path, output_path); - tokio::spawn(async move { - if let Err(e) = generate_thumbnail(&path, &output_path).await { - error!("Error generating thumb {:?}", e); - } - }); - - ctx.progress(vec![JobReportUpdate::CompletedTaskCount(i + 1)]); - - if !self.background { - ctx.library_ctx() - .emit(CoreEvent::NewThumbnail { cas_id }) - .await; - }; - } else { - info!("Thumb exists, skipping... {}", output_path.display()); } + Err(_) => { + error!("Error getting cas_id {:?}", step.materialized_path); + return Ok(()); + } + }; + + // Define and write the WebP-encoded file to a given path + let output_path = data.thumbnail_dir.join(&cas_id).with_extension("webp"); + + // check if file exists at output path + if !output_path.exists() { + info!("Writing {:?} to {:?}", path, output_path); + + if let Err(e) = generate_thumbnail(&path, &output_path).await { + error!("Error generating thumb {:?}", e); + } + + if !state.init.background { + ctx.library_ctx() + .emit(CoreEvent::NewThumbnail { cas_id }) + .await; + }; + } else { + info!("Thumb exists, skipping... {}", output_path.display()); } + ctx.progress(vec![JobReportUpdate::CompletedTaskCount( + state.step_number + 1, + )]); + + Ok(()) + } + + async fn finalize( + &self, + _ctx: WorkerContext, + state: &mut JobState, + ) -> Result<(), JobError> { + let data = state + .data + .as_ref() + .expect("critical error: missing data on job state"); + info!( + "Finished thumbnail generation for location {} at {}", + state.init.location_id, + data.root_path.display() + ); Ok(()) } } @@ -117,25 +175,29 @@ pub async fn generate_thumbnail>( file_path: P, output_path: P, ) -> Result<(), Box> { - // Using `image` crate, open the included .jpg file - let img = image::open(file_path)?; - let (w, h) = img.dimensions(); - // Optionally, resize the existing photo and convert back into DynamicImage - let img = DynamicImage::ImageRgba8(imageops::resize( - &img, - (w as f32 * THUMBNAIL_SIZE_FACTOR) as u32, - (h as f32 * THUMBNAIL_SIZE_FACTOR) as u32, - imageops::FilterType::Triangle, - )); - // Create the WebP encoder for the above image - let encoder = Encoder::from_image(&img)?; + // Webp creation has blocking code + let webp = block_in_place(|| -> Result, Box> { + // Using `image` crate, open the included .jpg file + let img = image::open(file_path)?; + let (w, h) = img.dimensions(); + // Optionally, resize the existing photo and convert back into DynamicImage + let img = DynamicImage::ImageRgba8(imageops::resize( + &img, + (w as f32 * THUMBNAIL_SIZE_FACTOR) as u32, + (h as f32 * THUMBNAIL_SIZE_FACTOR) as u32, + imageops::FilterType::Triangle, + )); + // Create the WebP encoder for the above image + let encoder = Encoder::from_image(&img)?; - // Encode the image at a specified quality 0-100 + // Encode the image at a specified quality 0-100 + + // Type WebPMemory is !Send, which makes the Future in this function !Send, + // this make us `deref` to have a `&[u8]` and then `to_owned` to make a Vec + // which implies on a unwanted clone... + Ok(encoder.encode(THUMBNAIL_QUALITY).deref().to_owned()) + })?; - // Type WebPMemory is !Send, which makes the Future in this function !Send, - // this make us `deref` to have a `&[u8]` and then `to_owned` to make a Vec - // which implies on a unwanted clone... - let webp = encoder.encode(THUMBNAIL_QUALITY).deref().to_owned(); fs::write(output_path, &webp).await?; Ok(()) diff --git a/core/src/file/cas/identifier.rs b/core/src/file/cas/identifier.rs index a05684eda..a2f329365 100644 --- a/core/src/file/cas/identifier.rs +++ b/core/src/file/cas/identifier.rs @@ -1,44 +1,71 @@ use super::checksum::generate_cas_id; + use crate::{ file::FileError, - job::JobReportUpdate, - job::{Job, JobResult, WorkerContext}, + job::{JobError, JobReportUpdate, JobResult, JobState, StatefulJob, WorkerContext}, library::LibraryContext, prisma::{file, file_path}, sys::get_location, + sys::LocationResource, }; use chrono::{DateTime, FixedOffset}; use futures::future::join_all; use log::{error, info}; use prisma_client_rust::{prisma_models::PrismaValue, raw, raw::Raw, Direction}; use serde::{Deserialize, Serialize}; -use std::collections::{HashMap, HashSet}; -use std::path::{Path, PathBuf}; +use std::{ + collections::{HashMap, HashSet}, + path::{Path, PathBuf}, +}; use tokio::{fs, io}; -// FileIdentifierJob takes file_paths without a file_id and uniquely identifies them +// we break this job into chunks of 100 to improve performance +static CHUNK_SIZE: usize = 100; +pub const IDENTIFIER_JOB_NAME: &str = "file_identifier"; + +pub struct FileIdentifierJob {} + +// FileIdentifierJobInit takes file_paths without a file_id and uniquely identifies them // first: generating the cas_id and extracting metadata // finally: creating unique file records, and linking them to their file_paths -#[derive(Debug)] -pub struct FileIdentifierJob { +#[derive(Serialize, Deserialize, Clone)] +pub struct FileIdentifierJobInit { pub location_id: i32, pub path: PathBuf, } -// we break this job into chunks of 100 to improve performance -static CHUNK_SIZE: usize = 100; +#[derive(Serialize, Deserialize)] +pub struct FileIdentifierJobState { + total_count: usize, + task_count: usize, + location: LocationResource, + location_path: PathBuf, + cursor: i32, +} #[async_trait::async_trait] -impl Job for FileIdentifierJob { +impl StatefulJob for FileIdentifierJob { + type Init = FileIdentifierJobInit; + type Data = FileIdentifierJobState; + type Step = (); + fn name(&self) -> &'static str { - "file_identifier" + IDENTIFIER_JOB_NAME } - async fn run(&self, ctx: WorkerContext) -> JobResult { + async fn init( + &self, + ctx: WorkerContext, + state: &mut JobState, + ) -> JobResult { info!("Identifying orphan file paths..."); - let location = get_location(&ctx.library_ctx(), self.location_id).await?; - let location_path = location.path.unwrap_or_else(|| "".to_string()); + let location = get_location(&ctx.library_ctx(), state.init.location_id).await?; + let location_path = if let Some(ref path) = location.path { + path.clone() + } else { + PathBuf::new() + }; let total_count = count_orphan_file_paths(&ctx.library_ctx(), location.id.into()).await?; info!("Found {} orphan file paths", total_count); @@ -49,159 +76,188 @@ impl Job for FileIdentifierJob { // update job with total task count based on orphan file_paths count ctx.progress(vec![JobReportUpdate::TaskCount(task_count)]); - let mut completed: usize = 0; - let mut cursor: i32 = 1; - // loop until task count is complete - while completed < task_count { - // link file_path ids to a CreateFile struct containing unique file data - let mut chunk: HashMap = HashMap::new(); - let mut cas_lookup: HashMap = HashMap::new(); + state.data = Some(FileIdentifierJobState { + total_count, + task_count, + location, + location_path, + cursor: 1, + }); - // get chunk of orphans to process - let file_paths = match get_orphan_file_paths(&ctx.library_ctx(), cursor).await { - Ok(file_paths) => file_paths, + state.steps = (0..task_count).map(|_| ()).collect(); + + Ok(()) + } + + async fn execute_step( + &self, + ctx: WorkerContext, + state: &mut JobState, + ) -> JobResult { + // link file_path ids to a CreateFile struct containing unique file data + let mut chunk: HashMap = HashMap::new(); + let mut cas_lookup: HashMap = HashMap::new(); + + let data = state + .data + .as_mut() + .expect("critical error: missing data on job state"); + + // get chunk of orphans to process + let file_paths = match get_orphan_file_paths(&ctx.library_ctx(), data.cursor).await { + Ok(file_paths) => file_paths, + Err(e) => { + info!("Error getting orphan file paths: {:#?}", e); + return Ok(()); + } + }; + info!( + "Processing {:?} orphan files. ({} completed of {})", + file_paths.len(), + state.step_number, + data.task_count + ); + + // analyze each file_path + for file_path in &file_paths { + // get the cas_id and extract metadata + match prepare_file(&data.location_path, file_path).await { + Ok(file) => { + let cas_id = file.cas_id.clone(); + // create entry into chunks for created file data + chunk.insert(file_path.id, file); + cas_lookup.insert(cas_id, file_path.id); + } Err(e) => { - info!("Error getting orphan file paths: {:#?}", e); + info!("Error processing file: {:#?}", e); continue; } }; - info!( - "Processing {:?} orphan files. ({} completed of {})", - file_paths.len(), - completed, - task_count - ); + } - // analyze each file_path - for file_path in &file_paths { - // get the cas_id and extract metadata - match prepare_file(&location_path, file_path).await { - Ok(file) => { - let cas_id = file.cas_id.clone(); - // create entry into chunks for created file data - chunk.insert(file_path.id, file); - cas_lookup.insert(cas_id, file_path.id); - } - Err(e) => { - info!("Error processing file: {:#?}", e); - continue; - } - }; - } + // find all existing files by cas id + let generated_cas_ids = chunk.values().map(|c| c.cas_id.clone()).collect(); + let existing_files = ctx + .library_ctx() + .db + .file() + .find_many(vec![file::cas_id::in_vec(generated_cas_ids)]) + .exec() + .await?; - // find all existing files by cas id - let generated_cas_ids = chunk.values().map(|c| c.cas_id.clone()).collect(); - let existing_files = ctx - .library_ctx() - .db - .file() - .find_many(vec![file::cas_id::in_vec(generated_cas_ids)]) - .exec() - .await?; + info!("Found {} existing files", existing_files.len()); - info!("Found {} existing files", existing_files.len()); + // link those existing files to their file paths + // Had to put the file_path in a variable outside of the closure, to satisfy the borrow checker + let library_ctx = ctx.library_ctx(); + let prisma_file_path = library_ctx.db.file_path(); - // link those existing files to their file paths - // Had to put the file_path in a variable outside of the closure, to satisfy the borrow checker - let library_ctx = ctx.library_ctx(); - let prisma_file_path = library_ctx.db.file_path(); - for result in join_all(existing_files.iter().map(|file| { - prisma_file_path - .find_unique(file_path::id::equals( - *cas_lookup.get(&file.cas_id).unwrap(), - )) - .update(vec![file_path::file_id::set(Some(file.id))]) - .exec() - })) + for existing_file in &existing_files { + if let Err(e) = update_file_id_by_cas_id( + &prisma_file_path, + &cas_lookup, + &existing_file.cas_id, + existing_file.id, + ) .await { - if let Err(e) = result { - error!("Error linking file: {:#?}", e); - } + info!("Error updating file_id: {:#?}", e); } + } - let existing_files_cas_ids = existing_files - .iter() - .map(|file| file.cas_id.clone()) - .collect::>(); + let existing_files_cas_ids = existing_files + .iter() + .map(|file| file.cas_id.clone()) + .collect::>(); - // extract files that don't already exist in the database - let new_files = chunk - .iter() - .map(|(_id, create_file)| create_file) - .filter(|create_file| !existing_files_cas_ids.contains(&create_file.cas_id)) - .collect::>(); + // extract files that don't already exist in the database + let new_files = chunk + .iter() + .map(|(_id, create_file)| create_file) + .filter(|create_file| !existing_files_cas_ids.contains(&create_file.cas_id)) + .collect::>(); - // assemble prisma values for new unique files - let mut values = Vec::with_capacity(new_files.len() * 3); - for file in &new_files { - values.extend([ - PrismaValue::String(file.cas_id.clone()), - PrismaValue::Int(file.size_in_bytes), - PrismaValue::DateTime(file.date_created), - ]); - } - - // create new file records with assembled values - let created_files: Vec = ctx - .library_ctx() - .db - ._query_raw(Raw::new( - &format!( - "INSERT INTO files (cas_id, size_in_bytes, date_created) VALUES {} - ON CONFLICT (cas_id) DO NOTHING RETURNING id, cas_id", - vec!["({}, {}, {})"; new_files.len()].join(",") - ), - values, - )) - .await - .unwrap_or_else(|e| { - error!("Error inserting files: {:#?}", e); - Vec::new() - }); - - // This code is duplicates, is this right? - for result in join_all(created_files.iter().map(|file| { - // associate newly created files with their respective file_paths - // TODO: this is potentially bottle necking the chunk system, individually linking file_path to file, 100 queries per chunk - // - insert many could work, but I couldn't find a good way to do this in a single SQL query - prisma_file_path - .find_unique(file_path::id::equals( - *cas_lookup.get(&file.cas_id).unwrap(), - )) - .update(vec![file_path::file_id::set(Some(file.id))]) - .exec() - })) - .await - { - if let Err(e) = result { - error!("Error linking file: {:#?}", e); - } - } - - // handle loop end - let last_row = match file_paths.last() { - Some(l) => l, - None => { - break; - } - }; - cursor = last_row.id; - completed += 1; - - ctx.progress(vec![ - JobReportUpdate::CompletedTaskCount(completed), - JobReportUpdate::Message(format!( - "Processed {} of {} orphan files", - completed * CHUNK_SIZE, - total_count - )), + // assemble prisma values for new unique files + let mut values = Vec::with_capacity(new_files.len() * 3); + for file in &new_files { + values.extend([ + PrismaValue::String(file.cas_id.clone()), + PrismaValue::Int(file.size_in_bytes), + PrismaValue::DateTime(file.date_created), ]); } + // create new file records with assembled values + let created_files: Vec = ctx + .library_ctx() + .db + ._query_raw(Raw::new( + &format!( + "INSERT INTO files (cas_id, size_in_bytes, date_created) VALUES {} + ON CONFLICT (cas_id) DO NOTHING RETURNING id, cas_id", + vec!["({}, {}, {})"; new_files.len()].join(",") + ), + values, + )) + .await + .unwrap_or_else(|e| { + error!("Error inserting files: {:#?}", e); + Vec::new() + }); + + for created_file in created_files { + // associate newly created files with their respective file_paths + // TODO: this is potentially bottle necking the chunk system, individually linking file_path to file, 100 queries per chunk + // - insert many could work, but I couldn't find a good way to do this in a single SQL query + if let Err(e) = update_file_id_by_cas_id( + &prisma_file_path, + &cas_lookup, + &created_file.cas_id, + created_file.id, + ) + .await + { + info!("Error updating file_id: {:#?}", e); + } + } + + // handle last step + if let Some(last_row) = file_paths.last() { + data.cursor = last_row.id; + } else { + return Ok(()); + } + + ctx.progress(vec![ + JobReportUpdate::CompletedTaskCount(state.step_number), + JobReportUpdate::Message(format!( + "Processed {} of {} orphan files", + state.step_number * CHUNK_SIZE, + data.total_count + )), + ]); + // let _remaining = count_orphan_file_paths(&ctx.core_ctx, location.id.into()).await?; Ok(()) } + + async fn finalize( + &self, + _ctx: WorkerContext, + state: &mut JobState, + ) -> Result<(), JobError> { + let data = state + .data + .as_ref() + .expect("critical error: missing data on job state"); + info!( + "Finalizing identifier job at {}, total of {} tasks", + data.location_path.display(), + data.task_count + ); + + Ok(()) + } } #[derive(Deserialize, Serialize, Debug)] @@ -241,7 +297,7 @@ pub async fn get_orphan_file_paths( .take(CHUNK_SIZE as i64) .exec() .await - .map_err(|e| e.into()) + .map_err(Into::into) } #[derive(Deserialize, Serialize, Debug)] @@ -287,3 +343,16 @@ pub async fn prepare_file( date_created: file_path.date_created, }) } + +async fn update_file_id_by_cas_id( + prisma_file_path: &file_path::Actions<'_>, + cas_lookup: &HashMap, + file_cas_id: &str, + file_id: i32, +) -> prisma_client_rust::Result> { + prisma_file_path + .find_unique(file_path::id::equals(*cas_lookup.get(file_cas_id).unwrap())) + .update(vec![file_path::file_id::set(Some(file_id))]) + .exec() + .await +} diff --git a/core/src/file/indexer/mod.rs b/core/src/file/indexer/mod.rs index 942a1e3b8..46969ad94 100644 --- a/core/src/file/indexer/mod.rs +++ b/core/src/file/indexer/mod.rs @@ -1,37 +1,270 @@ -use crate::job::{Job, JobReportUpdate, JobResult, WorkerContext}; -use std::path::PathBuf; +use crate::{ + job::{JobReportUpdate, JobResult, JobState, StatefulJob, WorkerContext}, + sys::{create_location, LocationResource}, +}; +use chrono::{DateTime, Utc}; +use log::{error, info}; +use prisma_client_rust::{raw, raw::Raw, PrismaValue}; +use serde::{Deserialize, Serialize}; +use std::{ + collections::HashMap, + ffi::OsStr, + path::{Path, PathBuf}, + time::Duration, +}; +use tokio::{fs, time::Instant}; +use walkdir::{DirEntry, WalkDir}; -use self::scan::ScanProgress; -mod scan; +static BATCH_SIZE: usize = 100; +pub const INDEXER_JOB_NAME: &str = "indexer"; -// Re-exporting -pub use scan::*; +#[derive(Clone)] +pub enum ScanProgress { + ChunkCount(usize), + SavedChunks(usize), + Message(String), +} -use scan::scan_path; +pub struct IndexerJob {} -#[derive(Debug)] -pub struct IndexerJob { +#[derive(Serialize, Deserialize, Clone)] +pub struct IndexerJobInit { pub path: PathBuf, } -#[async_trait::async_trait] -impl Job for IndexerJob { - fn name(&self) -> &'static str { - "indexer" +#[derive(Serialize, Deserialize)] +pub struct IndexerJobData { + location: LocationResource, + db_write_start: DateTime, + scan_read_time: Duration, + total_paths: usize, +} + +pub(crate) type IndexerJobStep = Vec<(PathBuf, i32, Option, bool)>; + +impl IndexerJobData { + fn on_scan_progress(ctx: WorkerContext, progress: Vec) { + ctx.progress( + progress + .iter() + .map(|p| match p.clone() { + ScanProgress::ChunkCount(c) => JobReportUpdate::TaskCount(c), + ScanProgress::SavedChunks(p) => JobReportUpdate::CompletedTaskCount(p), + ScanProgress::Message(m) => JobReportUpdate::Message(m), + }) + .collect(), + ) } - async fn run(&self, ctx: WorkerContext) -> JobResult { - scan_path(&ctx.library_ctx(), &self.path, move |p| { - ctx.progress( - p.iter() - .map(|p| match p.clone() { - ScanProgress::ChunkCount(c) => JobReportUpdate::TaskCount(c), - ScanProgress::SavedChunks(p) => JobReportUpdate::CompletedTaskCount(p), - ScanProgress::Message(m) => JobReportUpdate::Message(m), - }) - .collect(), - ) +} + +#[async_trait::async_trait] +impl StatefulJob for IndexerJob { + type Init = IndexerJobInit; + type Data = IndexerJobData; + type Step = IndexerJobStep; + + fn name(&self) -> &'static str { + INDEXER_JOB_NAME + } + + // creates a vector of valid path buffers from a directory + async fn init( + &self, + ctx: WorkerContext, + state: &mut JobState, + ) -> JobResult { + let location = create_location(&ctx.library_ctx(), &state.init.path).await?; + + // query db to highers id, so we can increment it for the new files indexed + #[derive(Deserialize, Serialize, Debug)] + struct QueryRes { + id: Option, + } + // grab the next id so we can increment in memory for batch inserting + let first_file_id = match ctx + .library_ctx() + .db + ._query_raw::(raw!("SELECT MAX(id) id FROM file_paths")) + .await + { + Ok(rows) => rows[0].id.unwrap_or(0), + Err(e) => panic!("Error querying for next file id: {:#?}", e), + }; + + //check is path is a directory + if !state.init.path.is_dir() { + // return Err(anyhow::anyhow!("{} is not a directory", &path)); + panic!("{:#?} is not a directory", state.init.path); + } + + // spawn a dedicated thread to scan the directory for performance + let path = state.init.path.clone(); + let inner_ctx = ctx.clone(); + let (paths, scan_start) = tokio::task::spawn_blocking(move || { + // store every valid path discovered + let mut paths: Vec<(PathBuf, i32, Option, bool)> = Vec::new(); + // store a hashmap of directories to their file ids for fast lookup + let mut dirs = HashMap::new(); + // begin timer for logging purposes + let scan_start = Instant::now(); + + let mut next_file_id = first_file_id; + let mut get_id = || { + next_file_id += 1; + next_file_id + }; + // walk through directory recursively + for entry in WalkDir::new(&path).into_iter().filter_entry(|dir| { + // check if entry is approved + !is_hidden(dir) && !is_app_bundle(dir) && !is_node_modules(dir) && !is_library(dir) + }) { + // extract directory entry or log and continue if failed + let entry = match entry { + Ok(entry) => entry, + Err(e) => { + error!("Error reading file {}", e); + continue; + } + }; + let path = entry.path(); + + info!("Found filesystem path: {:?}", path); + + let parent_path = path + .parent() + .unwrap_or_else(|| Path::new("")) + .to_str() + .unwrap_or(""); + let parent_dir_id = dirs.get(&*parent_path); + + let path_str = match path.as_os_str().to_str() { + Some(path_str) => path_str, + None => { + error!("Error reading file {}", &path.display()); + continue; + } + }; + + IndexerJobData::on_scan_progress( + inner_ctx.clone(), + vec![ + ScanProgress::Message(format!("Scanning {}", path_str)), + ScanProgress::ChunkCount(paths.len() / BATCH_SIZE), + ], + ); + + let file_id = get_id(); + let file_type = entry.file_type(); + let is_dir = file_type.is_dir(); + + if is_dir || file_type.is_file() { + paths.push((path.to_owned(), file_id, parent_dir_id.cloned(), is_dir)); + } + + if is_dir { + let _path = match path.to_str() { + Some(path) => path.to_owned(), + None => continue, + }; + dirs.insert(_path, file_id); + } + } + (paths, scan_start) }) - .await + .await?; + + state.data = Some(IndexerJobData { + location, + db_write_start: Utc::now(), + scan_read_time: scan_start.elapsed(), + total_paths: paths.len(), + }); + + state.steps = paths + .chunks(BATCH_SIZE) + .enumerate() + .map(|(i, chunk)| { + IndexerJobData::on_scan_progress( + ctx.clone(), + vec![ + ScanProgress::SavedChunks(i as usize), + ScanProgress::Message(format!( + "Writing {} of {} to db", + i * chunk.len(), + paths.len(), + )), + ], + ); + chunk.to_vec() + }) + .collect(); + + Ok(()) + } + + async fn execute_step( + &self, + ctx: WorkerContext, + state: &mut JobState, + ) -> JobResult { + // vector to store active models + let mut files = Vec::new(); + let step = &state.steps[0]; + + let data = state + .data + .as_ref() + .expect("critical error: missing data on job state"); + + for (file_path, file_id, parent_dir_id, is_dir) in step { + files.extend( + match prepare_values(file_path, *file_id, &data.location, parent_dir_id, *is_dir) + .await + { + Ok(values) => values.to_vec(), + Err(e) => { + error!("Error creating file model from path {:?}: {}", file_path, e); + continue; + } + }, + ); + } + + let raw = Raw::new( + &format!(" + INSERT INTO file_paths (id, is_dir, location_id, materialized_path, name, extension, parent_id, date_created) + VALUES {} + ", + vec!["({}, {}, {}, {}, {}, {}, {}, {})"; step.len()].join(", ") + ), + files + ); + + let count = ctx.library_ctx().db._execute_raw(raw).await; + + info!("Inserted {:?} records", count); + + Ok(()) + } + + async fn finalize( + &self, + _ctx: WorkerContext, + state: &mut JobState, + ) -> JobResult { + let data = state + .data + .as_ref() + .expect("critical error: missing data on job state"); + info!( + "scan of {:?} completed in {:?}. {:?} files found. db write completed in {:?}", + state.init.path, + data.scan_read_time, + data.total_paths, + Utc::now() - data.db_write_start, + ); + + Ok(()) } } @@ -48,3 +281,104 @@ impl Job for IndexerJob { // // sub-paths that are ignored // pub always_ignored_sub_paths: Option, // } + +// reads a file at a path and creates an ActiveModel with metadata +async fn prepare_values( + file_path: impl AsRef, + id: i32, + location: &LocationResource, + parent_id: &Option, + is_dir: bool, +) -> Result<[PrismaValue; 8], std::io::Error> { + let file_path = file_path.as_ref(); + + let metadata = fs::metadata(file_path).await?; + let location_path = location.path.as_ref().unwrap(); + // let size = metadata.len(); + let name; + let extension; + let date_created: DateTime = metadata.created().unwrap().into(); + + // if the 'file_path' is not a directory, then get the extension and name. + + // if 'file_path' is a directory, set extension to an empty string to avoid periods in folder names + // - being interpreted as file extensions + if is_dir { + extension = "".to_string(); + name = extract_name(file_path.file_name()); + } else { + extension = extract_name(file_path.extension()); + name = extract_name(file_path.file_stem()); + } + + let materialized_path = file_path.strip_prefix(location_path).unwrap(); + let materialized_path_as_string = materialized_path.to_str().unwrap_or("").to_owned(); + + let values = [ + PrismaValue::Int(id as i64), + PrismaValue::Boolean(metadata.is_dir()), + PrismaValue::Int(location.id as i64), + PrismaValue::String(materialized_path_as_string), + PrismaValue::String(name), + PrismaValue::String(extension.to_lowercase()), + parent_id + .map(|id| PrismaValue::Int(id as i64)) + .unwrap_or(PrismaValue::Null), + PrismaValue::DateTime(date_created.into()), + ]; + + Ok(values) +} + +// extract name from OsStr returned by PathBuff +fn extract_name(os_string: Option<&OsStr>) -> String { + os_string + .unwrap_or_default() + .to_str() + .unwrap_or_default() + .to_owned() +} + +fn is_hidden(entry: &DirEntry) -> bool { + entry + .file_name() + .to_str() + .map(|s| s.starts_with('.')) + .unwrap_or(false) +} + +fn is_library(entry: &DirEntry) -> bool { + entry + .path() + .to_str() + // make better this is shit + .map(|s| s.contains("/Library/")) + .unwrap_or(false) +} + +fn is_node_modules(entry: &DirEntry) -> bool { + entry + .file_name() + .to_str() + .map(|s| s.contains("node_modules")) + .unwrap_or(false) +} + +fn is_app_bundle(entry: &DirEntry) -> bool { + let is_dir = entry.metadata().unwrap().is_dir(); + let contains_dot = entry + .file_name() + .to_str() + .map(|s| s.contains(".app") | s.contains(".bundle")) + .unwrap_or(false); + + // let is_app_bundle = is_dir && contains_dot; + // if is_app_bundle { + // let path_buff = entry.path(); + // let path = path_buff.to_str().unwrap(); + + // self::path(&path, ); + // } + + is_dir && contains_dot +} diff --git a/core/src/file/indexer/scan.rs b/core/src/file/indexer/scan.rs deleted file mode 100644 index 87c830f6a..000000000 --- a/core/src/file/indexer/scan.rs +++ /dev/null @@ -1,280 +0,0 @@ -use crate::job::JobResult; -use crate::library::LibraryContext; -use crate::sys::{create_location, LocationResource}; -use chrono::{DateTime, Utc}; -use log::{error, info}; -use prisma_client_rust::prisma_models::PrismaValue; -use prisma_client_rust::raw; -use prisma_client_rust::raw::Raw; -use serde::{Deserialize, Serialize}; -use std::ffi::OsStr; -use std::fmt::Debug; -use std::{ - collections::HashMap, - path::{Path, PathBuf}, - time::Instant, -}; -use tokio::fs; -use walkdir::{DirEntry, WalkDir}; - -#[derive(Clone)] -pub enum ScanProgress { - ChunkCount(usize), - SavedChunks(usize), - Message(String), -} - -static BATCH_SIZE: usize = 100; - -// creates a vector of valid path buffers from a directory -pub async fn scan_path( - ctx: &LibraryContext, - path: impl AsRef + Debug, - on_progress: impl Fn(Vec) + Send + Sync + 'static, -) -> JobResult { - let location = create_location(ctx, &path).await?; - - // query db to highers id, so we can increment it for the new files indexed - #[derive(Deserialize, Serialize, Debug)] - struct QueryRes { - id: Option, - } - // grab the next id so we can increment in memory for batch inserting - let first_file_id = match ctx - .db - ._query_raw::(raw!("SELECT MAX(id) id FROM file_paths")) - .await - { - Ok(rows) => rows[0].id.unwrap_or(0), - Err(e) => panic!("Error querying for next file id: {:#?}", e), - }; - - //check is path is a directory - if !path.as_ref().is_dir() { - // return Err(anyhow::anyhow!("{} is not a directory", &path)); - panic!("{:#?} is not a directory", path); - } - - let path_buf = path.as_ref().to_path_buf(); - - // spawn a dedicated thread to scan the directory for performance - let (paths, scan_start, on_progress) = tokio::task::spawn_blocking(move || { - // store every valid path discovered - let mut paths: Vec<(PathBuf, i32, Option, bool)> = Vec::new(); - // store a hashmap of directories to their file ids for fast lookup - let mut dirs: HashMap = HashMap::new(); - // begin timer for logging purposes - let scan_start = Instant::now(); - - let mut next_file_id = first_file_id; - let mut get_id = || { - next_file_id += 1; - next_file_id - }; - // walk through directory recursively - for entry in WalkDir::new(path_buf).into_iter().filter_entry(|dir| { - // check if entry is approved - !is_hidden(dir) && !is_app_bundle(dir) && !is_node_modules(dir) && !is_library(dir) - }) { - // extract directory entry or log and continue if failed - let entry = match entry { - Ok(entry) => entry, - Err(e) => { - error!("Error reading file {}", e); - continue; - } - }; - let path = entry.path(); - - info!("Found filesystem path: {:?}", path); - - let parent_path = path - .parent() - .unwrap_or_else(|| Path::new("")) - .to_str() - .unwrap_or(""); - let parent_dir_id = dirs.get(&*parent_path); - - let path_str = match path.as_os_str().to_str() { - Some(path_str) => path_str, - None => { - error!("Error reading file {}", &path.display()); - continue; - } - }; - - on_progress(vec![ - ScanProgress::Message(format!("Scanning {}", path_str)), - ScanProgress::ChunkCount(paths.len() / BATCH_SIZE), - ]); - - let file_id = get_id(); - let file_type = entry.file_type(); - let is_dir = file_type.is_dir(); - - if is_dir || file_type.is_file() { - paths.push((path.to_owned(), file_id, parent_dir_id.cloned(), is_dir)); - } - - if is_dir { - let _path = match path.to_str() { - Some(path) => path.to_owned(), - None => continue, - }; - dirs.insert(_path, file_id); - } - } - (paths, scan_start, on_progress) - }) - .await?; - - let db_write_start = Instant::now(); - let scan_read_time = scan_start.elapsed(); - - for (i, chunk) in paths.chunks(BATCH_SIZE).enumerate() { - on_progress(vec![ - ScanProgress::SavedChunks(i as usize), - ScanProgress::Message(format!( - "Writing {} of {} to db", - i * chunk.len(), - paths.len(), - )), - ]); - - // vector to store active models - let mut files: Vec = Vec::new(); - - for (file_path, file_id, parent_dir_id, is_dir) in chunk { - files.extend( - match prepare_values(file_path, *file_id, &location, parent_dir_id, *is_dir).await { - Ok(values) => values.to_vec(), - Err(e) => { - error!("Error creating file model from path {:?}: {}", file_path, e); - continue; - } - }, - ); - } - - let raw = Raw::new( - &format!(" - INSERT INTO file_paths (id, is_dir, location_id, materialized_path, name, extension, parent_id, date_created) - VALUES {} - ", - vec!["({}, {}, {}, {}, {}, {}, {}, {})"; chunk.len()].join(", ") - ), - files - ); - - let count = ctx.db._execute_raw(raw).await; - - info!("Inserted {:?} records", count); - } - info!( - "scan of {:?} completed in {:?}. {:?} files found. db write completed in {:?}", - &path, - scan_read_time, - paths.len(), - db_write_start.elapsed() - ); - Ok(()) -} - -// reads a file at a path and creates an ActiveModel with metadata -async fn prepare_values( - file_path: &PathBuf, - id: i32, - location: &LocationResource, - parent_id: &Option, - is_dir: bool, -) -> Result<[PrismaValue; 8], std::io::Error> { - let metadata = fs::metadata(&file_path).await?; - let location_path = Path::new(location.path.as_ref().unwrap().as_str()); - // let size = metadata.len(); - let name; - let extension; - let date_created: DateTime = metadata.created().unwrap().into(); - - // if the 'file_path' is not a directory, then get the extension and name. - - // if 'file_path' is a directory, set extension to an empty string to avoid periods in folder names - // - being interpreted as file extensions - if is_dir { - extension = "".to_string(); - name = extract_name(file_path.file_name()); - } else { - extension = extract_name(file_path.extension()); - name = extract_name(file_path.file_stem()); - } - - let materialized_path = file_path.strip_prefix(location_path).unwrap(); - let materialized_path_as_string = materialized_path.to_str().unwrap_or("").to_owned(); - - let values = [ - PrismaValue::Int(id as i64), - PrismaValue::Boolean(metadata.is_dir()), - PrismaValue::Int(location.id as i64), - PrismaValue::String(materialized_path_as_string), - PrismaValue::String(name), - PrismaValue::String(extension.to_lowercase()), - parent_id - .map(|id| PrismaValue::Int(id as i64)) - .unwrap_or(PrismaValue::Null), - PrismaValue::DateTime(date_created.into()), - ]; - - Ok(values) -} - -// extract name from OsStr returned by PathBuff -fn extract_name(os_string: Option<&OsStr>) -> String { - os_string - .unwrap_or_default() - .to_str() - .unwrap_or_default() - .to_owned() -} - -fn is_hidden(entry: &DirEntry) -> bool { - entry - .file_name() - .to_str() - .map(|s| s.starts_with('.')) - .unwrap_or(false) -} - -fn is_library(entry: &DirEntry) -> bool { - entry - .path() - .to_str() - // make better this is shit - .map(|s| s.contains("/Library/")) - .unwrap_or(false) -} - -fn is_node_modules(entry: &DirEntry) -> bool { - entry - .file_name() - .to_str() - .map(|s| s.contains("node_modules")) - .unwrap_or(false) -} - -fn is_app_bundle(entry: &DirEntry) -> bool { - let is_dir = entry.metadata().unwrap().is_dir(); - let contains_dot = entry - .file_name() - .to_str() - .map(|s| s.contains(".app") | s.contains(".bundle")) - .unwrap_or(false); - - // let is_app_bundle = is_dir && contains_dot; - // if is_app_bundle { - // let path_buff = entry.path(); - // let path = path_buff.to_str().unwrap(); - - // self::path(&path, ); - // } - - is_dir && contains_dot -} diff --git a/core/src/file/mod.rs b/core/src/file/mod.rs index 6def304ad..a642eb7c8 100644 --- a/core/src/file/mod.rs +++ b/core/src/file/mod.rs @@ -1,3 +1,10 @@ +use crate::{ + library::LibraryContext, + prisma::{self, file, file_path}, + sys::SysError, + ClientQuery, CoreError, CoreEvent, CoreResponse, LibraryQuery, +}; + use chrono::{DateTime, Utc}; use int_enum::IntEnum; use serde::{Deserialize, Serialize}; @@ -5,12 +12,6 @@ use std::path::PathBuf; use thiserror::Error; use ts_rs::TS; -use crate::{ - library::LibraryContext, - prisma::{self, file, file_path}, - sys::SysError, - ClientQuery, CoreError, CoreEvent, CoreResponse, LibraryQuery, -}; pub mod cas; pub mod explorer; pub mod indexer; @@ -58,9 +59,9 @@ pub struct FilePath { pub file_id: Option, pub parent_id: Option, - pub date_created: DateTime, - pub date_modified: DateTime, - pub date_indexed: DateTime, + pub date_created: DateTime, + pub date_modified: DateTime, + pub date_indexed: DateTime, pub file: Option, } @@ -163,15 +164,7 @@ pub async fn set_note( .await .unwrap(); - ctx.emit(CoreEvent::InvalidateQuery(ClientQuery::LibraryQuery { - library_id: ctx.id.to_string(), - query: LibraryQuery::GetExplorerDir { - limit: 0, - path: PathBuf::new(), - location_id: 0, - }, - })) - .await; + send_invalidate_query(&ctx).await; Ok(CoreResponse::Success(())) } @@ -190,8 +183,14 @@ pub async fn favorite( .await .unwrap(); + send_invalidate_query(&ctx).await; + + Ok(CoreResponse::Success(())) +} + +async fn send_invalidate_query(ctx: &LibraryContext) { ctx.emit(CoreEvent::InvalidateQuery(ClientQuery::LibraryQuery { - library_id: ctx.id.to_string(), + library_id: ctx.id, query: LibraryQuery::GetExplorerDir { limit: 0, path: PathBuf::new(), @@ -199,6 +198,4 @@ pub async fn favorite( }, })) .await; - - Ok(CoreResponse::Success(())) } diff --git a/core/src/job/jobs.rs b/core/src/job/job_manager.rs similarity index 62% rename from core/src/job/jobs.rs rename to core/src/job/job_manager.rs index 0cca9b8ad..512885971 100644 --- a/core/src/job/jobs.rs +++ b/core/src/job/job_manager.rs @@ -1,57 +1,61 @@ -use super::{ - worker::{Worker, WorkerContext}, - JobError, -}; use crate::{ + encode::THUMBNAIL_JOB_NAME, + file::{ + cas::IDENTIFIER_JOB_NAME, + indexer::{IndexerJob, INDEXER_JOB_NAME}, + }, + job::{worker::Worker, DynJob, JobError}, library::LibraryContext, prisma::{job, node}, + FileIdentifierJob, Job, ThumbnailJob, }; use int_enum::IntEnum; use log::{error, info}; use serde::{Deserialize, Serialize}; use std::{ collections::{HashMap, VecDeque}, - error::Error, fmt::Debug, + fmt::{Display, Formatter}, sync::Arc, + time::Duration, +}; +use tokio::{ + sync::{broadcast, mpsc, Mutex, RwLock}, + time::sleep, }; -use tokio::sync::{mpsc, Mutex, RwLock}; use ts_rs::TS; +use uuid::Uuid; // db is single threaded, nerd const MAX_WORKERS: usize = 1; -pub type JobResult = Result<(), Box>; - -#[async_trait::async_trait] -pub trait Job: Send + Sync + Debug { - fn name(&self) -> &'static str; - async fn run(&self, ctx: WorkerContext) -> JobResult; -} - pub enum JobManagerEvent { - IngestJob(LibraryContext, Box), + IngestJob(LibraryContext, Box), } // jobs struct is maintained by the core pub struct JobManager { - job_queue: RwLock>>, + job_queue: RwLock>>, // workers are spawned when jobs are picked off the queue - running_workers: RwLock>>>, + running_workers: RwLock>>>, internal_sender: mpsc::UnboundedSender, + shutdown_tx: Arc>, } impl JobManager { pub fn new() -> Arc { + let (shutdown_tx, _shutdown_rx) = broadcast::channel(1); let (internal_sender, mut internal_receiver) = mpsc::unbounded_channel(); let this = Arc::new(Self { job_queue: RwLock::new(VecDeque::new()), running_workers: RwLock::new(HashMap::new()), internal_sender, + shutdown_tx: Arc::new(shutdown_tx), }); let this2 = this.clone(); tokio::spawn(async move { + // FIXME: if this task crashes, the entire application is unusable while let Some(event) = internal_receiver.recv().await { match event { JobManagerEvent::IngestJob(ctx, job) => this2.clone().ingest(&ctx, job).await, @@ -62,30 +66,36 @@ impl JobManager { this } - pub async fn ingest(self: Arc, ctx: &LibraryContext, job: Box) { + pub async fn ingest(self: Arc, ctx: &LibraryContext, mut job: Box) { // create worker to process job let mut running_workers = self.running_workers.write().await; if running_workers.len() < MAX_WORKERS { info!("Running job: {:?}", job.name()); - let worker = Worker::new(job); - let id = worker.id(); + let job_report = job + .report() + .take() + .expect("critical error: missing job on worker"); + + let job_id = job_report.id; + + let worker = Worker::new(job, job_report); let wrapped_worker = Arc::new(Mutex::new(worker)); Worker::spawn(Arc::clone(&self), Arc::clone(&wrapped_worker), ctx.clone()).await; - running_workers.insert(id, wrapped_worker); + running_workers.insert(job_id, wrapped_worker); } else { self.job_queue.write().await.push_back(job); } } - pub async fn ingest_queue(&self, _ctx: &LibraryContext, job: Box) { + pub async fn ingest_queue(&self, _ctx: &LibraryContext, job: Box) { self.job_queue.write().await.push_back(job); } - pub async fn complete(self: Arc, ctx: &LibraryContext, job_id: String) { + pub async fn complete(self: Arc, ctx: &LibraryContext, job_id: Uuid) { // remove worker from running workers self.running_workers.write().await.remove(&job_id); // continue queue @@ -105,7 +115,7 @@ impl JobManager { for worker in self.running_workers.read().await.values() { let worker = worker.lock().await; - ret.push(worker.job_report.clone()); + ret.push(worker.report()); } ret } @@ -131,6 +141,72 @@ impl JobManager { Ok(jobs.into_iter().map(Into::into).collect()) } + + pub fn shutdown_tx(&self) -> Arc> { + Arc::clone(&self.shutdown_tx) + } + + pub async fn pause(&self) { + let running_workers_read_guard = self.running_workers.read().await; + if !running_workers_read_guard.is_empty() { + self.shutdown_tx + .send(()) + .expect("Failed to send shutdown signal"); + } + // Dropping our handle so jobs can finish + drop(running_workers_read_guard); + + loop { + sleep(Duration::from_millis(50)).await; + if self.running_workers.read().await.is_empty() { + break; + } + } + } + + pub async fn resume_jobs(self: Arc, ctx: &LibraryContext) -> Result<(), JobError> { + let paused_jobs = ctx + .db + .job() + .find_many(vec![job::status::equals(JobStatus::Paused.int_value())]) + .exec() + .await?; + + for paused_job_data in paused_jobs { + let paused_job = JobReport::from(paused_job_data); + + info!("Resuming job: {}, id: {}", paused_job.name, paused_job.id); + match paused_job.name.as_str() { + THUMBNAIL_JOB_NAME => { + Arc::clone(&self) + .ingest(ctx, Job::resume(paused_job, Box::new(ThumbnailJob {}))?) + .await; + } + INDEXER_JOB_NAME => { + Arc::clone(&self) + .ingest(ctx, Job::resume(paused_job, Box::new(IndexerJob {}))?) + .await; + } + IDENTIFIER_JOB_NAME => { + Arc::clone(&self) + .ingest( + ctx, + Job::resume(paused_job, Box::new(FileIdentifierJob {}))?, + ) + .await; + } + _ => { + error!( + "Unknown job type: {}, id: {}", + paused_job.name, paused_job.id + ); + return Err(JobError::UnknownJobName(paused_job.id, paused_job.name)); + } + }; + } + + Ok(()) + } } #[derive(Debug)] @@ -144,9 +220,9 @@ pub enum JobReportUpdate { #[derive(Debug, Serialize, Deserialize, TS, Clone)] #[ts(export)] pub struct JobReport { - pub id: String, + pub id: Uuid, pub name: String, - pub data: Option, + pub data: Option>, // client_id: i32, #[ts(type = "string")] pub date_created: chrono::DateTime, @@ -163,11 +239,21 @@ pub struct JobReport { pub seconds_elapsed: i32, } +impl Display for JobReport { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!( + f, + "Job {:#?}", + self.name, self.id, self.status + ) + } +} + // convert database struct into a resource struct impl From for JobReport { fn from(data: job::Data) -> JobReport { JobReport { - id: data.id, + id: Uuid::from_slice(&data.id).unwrap(), name: data.name, // client_id: data.client_id, status: JobStatus::from_int(data.status).unwrap(), @@ -183,7 +269,7 @@ impl From for JobReport { } impl JobReport { - pub fn new(uuid: String, name: String) -> Self { + pub fn new(uuid: Uuid, name: String) -> Self { Self { id: uuid, name, @@ -209,7 +295,7 @@ impl JobReport { ctx.db .job() .create( - job::id::set(self.id.clone()), + job::id::set(self.id.as_bytes().to_vec()), job::name::set(self.name.clone()), job::action::set(1), job::nodes::link(node::id::equals(ctx.node_local_id)), @@ -222,9 +308,10 @@ impl JobReport { pub async fn update(&self, ctx: &LibraryContext) -> Result<(), JobError> { ctx.db .job() - .find_unique(job::id::equals(self.id.clone())) + .find_unique(job::id::equals(self.id.as_bytes().to_vec())) .update(vec![ job::status::set(self.status.int_value()), + job::data::set(self.data.clone()), job::task_count::set(self.task_count), job::completed_task_count::set(self.completed_task_count), job::date_modified::set(chrono::Utc::now().into()), @@ -245,4 +332,5 @@ pub enum JobStatus { Completed = 2, Canceled = 3, Failed = 4, + Paused = 5, } diff --git a/core/src/job/mod.rs b/core/src/job/mod.rs index c337a037b..6099a78b3 100644 --- a/core/src/job/mod.rs +++ b/core/src/job/mod.rs @@ -1,18 +1,187 @@ -use std::fmt::Debug; +use crate::{file::FileError, prisma, sys::SysError}; +use rmp_serde::{decode::Error as DecodeError, encode::Error as EncodeError}; +use serde::{de::DeserializeOwned, Deserialize, Serialize}; +use std::{collections::VecDeque, fmt::Debug}; use thiserror::Error; +use uuid::Uuid; -use crate::prisma; - -mod jobs; +mod job_manager; mod worker; -pub use jobs::*; +pub use job_manager::*; pub use worker::*; #[derive(Error, Debug)] pub enum JobError { - #[error("Failed to create job (job_id {job_id:?})")] - CreateFailure { job_id: String }, - #[error("Database error")] + #[error("Database error: {0}")] DatabaseError(#[from] prisma::QueryError), + #[error("System error: {0}")] + SystemError(#[from] SysError), + #[error("I/O error: {0}")] + IOError(#[from] std::io::Error), + #[error("Failed to join Tokio spawn blocking: {0}")] + JoinError(#[from] tokio::task::JoinError), + #[error("File error: {0}")] + FileError(#[from] FileError), + #[error("Job state encode error: {0}")] + StateEncode(#[from] EncodeError), + #[error("Job state decode error: {0}")] + StateDecode(#[from] DecodeError), + #[error("Tried to resume a job with unknown name: job ")] + UnknownJobName(Uuid, String), + #[error( + "Tried to resume a job that doesn't have saved state data: job " + )] + MissingJobDataState(Uuid, String), + #[error("Job paused")] + Paused(Vec), +} + +pub type JobResult = Result<(), JobError>; + +#[async_trait::async_trait] +pub trait StatefulJob: Send + Sync { + type Init: Serialize + DeserializeOwned + Send + Sync; + type Data: Serialize + DeserializeOwned + Send + Sync; + type Step: Serialize + DeserializeOwned + Send + Sync; + + fn name(&self) -> &'static str; + async fn init( + &self, + ctx: WorkerContext, + state: &mut JobState, + ) -> JobResult; + + async fn execute_step( + &self, + ctx: WorkerContext, + state: &mut JobState, + ) -> JobResult; + + async fn finalize( + &self, + ctx: WorkerContext, + state: &mut JobState, + ) -> JobResult; +} + +#[async_trait::async_trait] +pub trait DynJob: Send + Sync { + fn report(&mut self) -> &mut Option; + fn name(&self) -> &'static str; + async fn run(&mut self, ctx: WorkerContext) -> JobResult; +} + +pub struct Job +where + Init: Serialize + DeserializeOwned + Send + Sync, + Data: Serialize + DeserializeOwned + Send + Sync, + Step: Serialize + DeserializeOwned + Send + Sync, +{ + report: Option, + state: JobState, + stateful_job: Box>, +} + +impl Job +where + Init: Serialize + DeserializeOwned + Send + Sync, + Data: Serialize + DeserializeOwned + Send + Sync, + Step: Serialize + DeserializeOwned + Send + Sync, +{ + pub fn new( + init: Init, + stateful_job: Box>, + ) -> Box { + Box::new(Self { + report: Some(JobReport::new( + Uuid::new_v4(), + stateful_job.name().to_string(), + )), + state: JobState { + init, + data: None, + steps: VecDeque::new(), + step_number: 0, + }, + stateful_job, + }) + } + + pub fn resume( + mut report: JobReport, + stateful_job: Box>, + ) -> Result, JobError> { + let job_state_data = if let Some(data) = report.data.take() { + data + } else { + return Err(JobError::MissingJobDataState(report.id, report.name)); + }; + + Ok(Box::new(Self { + report: Some(report), + state: rmp_serde::from_slice(&job_state_data)?, + stateful_job, + })) + } +} + +#[derive(Serialize, Deserialize)] +pub struct JobState { + pub init: Init, + pub data: Option, + pub steps: VecDeque, + pub step_number: usize, +} + +#[async_trait::async_trait] +impl DynJob for Job +where + Init: Serialize + DeserializeOwned + Send + Sync, + Data: Serialize + DeserializeOwned + Send + Sync, + Step: Serialize + DeserializeOwned + Send + Sync, +{ + fn report(&mut self) -> &mut Option { + &mut self.report + } + + fn name(&self) -> &'static str { + self.stateful_job.name() + } + async fn run(&mut self, ctx: WorkerContext) -> JobResult { + // Checking if we have a brand new job, or if we are resuming an old one. + if self.state.data.is_none() { + self.stateful_job.init(ctx.clone(), &mut self.state).await?; + } + + let mut shutdown_rx = ctx.shutdown_rx(); + let shutdown_rx_fut = shutdown_rx.recv(); + tokio::pin!(shutdown_rx_fut); + + while !self.state.steps.is_empty() { + tokio::select! { + step_result = self.stateful_job.execute_step( + ctx.clone(), + &mut self.state, + ) => { + step_result?; + self.state.steps.pop_front(); + } + _ = &mut shutdown_rx_fut => { + return Err( + JobError::Paused( + rmp_serde::to_vec(&self.state)? + ) + ); + } + } + self.state.step_number += 1; + } + + self.stateful_job + .finalize(ctx.clone(), &mut self.state) + .await?; + + Ok(()) + } } diff --git a/core/src/job/worker.rs b/core/src/job/worker.rs index 143335786..1049ea8b5 100644 --- a/core/src/job/worker.rs +++ b/core/src/job/worker.rs @@ -1,208 +1,255 @@ -use super::{ - jobs::{JobReport, JobReportUpdate, JobStatus}, - Job, JobManager, +use crate::{ + job::{DynJob, JobError, JobManager, JobReportUpdate, JobStatus}, + library::LibraryContext, + ClientQuery, CoreEvent, JobReport, LibraryQuery, }; -use crate::{library::LibraryContext, ClientQuery, CoreEvent, LibraryQuery}; -use log::error; +use log::{error, info, warn}; use std::{sync::Arc, time::Duration}; use tokio::{ sync::{ + broadcast, mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}, Mutex, }, - time::{sleep, Instant}, + time::{interval_at, Instant}, }; -use uuid::Uuid; // used to update the worker state from inside the worker thread +#[derive(Debug)] pub enum WorkerEvent { Progressed(Vec), Completed, Failed, -} - -enum WorkerState { - Pending(Box, UnboundedReceiver), - Running, + Paused(Vec), } #[derive(Clone)] pub struct WorkerContext { - pub uuid: String, library_ctx: LibraryContext, - sender: UnboundedSender, + events_tx: UnboundedSender, + shutdown_tx: Arc>, } impl WorkerContext { pub fn progress(&self, updates: Vec) { - self.sender + self.events_tx .send(WorkerEvent::Progressed(updates)) - .unwrap_or(()); + .expect("critical error: failed to send worker worker progress event updates"); } pub fn library_ctx(&self) -> LibraryContext { self.library_ctx.clone() } - // save the job data to - // pub fn save_data () { - // } + pub fn shutdown_rx(&self) -> broadcast::Receiver<()> { + self.shutdown_tx.subscribe() + } } // a worker is a dedicated thread that runs a single job // once the job is complete the worker will exit pub struct Worker { - pub job_report: JobReport, - state: WorkerState, - worker_sender: UnboundedSender, + job: Option>, + report: JobReport, + worker_events_tx: UnboundedSender, + worker_events_rx: Option>, } impl Worker { - pub fn new(job: Box) -> Self { - let (worker_sender, worker_receiver) = unbounded_channel(); - let uuid = Uuid::new_v4().to_string(); - let name = job.name(); + pub fn new(job: Box, report: JobReport) -> Self { + let (worker_events_tx, worker_events_rx) = unbounded_channel(); Self { - state: WorkerState::Pending(job, worker_receiver), - job_report: JobReport::new(uuid, name.to_string()), - worker_sender, + job: Some(job), + report, + worker_events_tx, + worker_events_rx: Some(worker_events_rx), } } + + pub fn report(&self) -> JobReport { + self.report.clone() + } // spawns a thread and extracts channel sender to communicate with it pub async fn spawn( job_manager: Arc, - worker: Arc>, + worker_mutex: Arc>, ctx: LibraryContext, ) { + let mut worker = worker_mutex.lock().await; // we capture the worker receiver channel so state can be updated from inside the worker - let mut worker_mut = worker.lock().await; - // extract owned job and receiver from Self - let (job, worker_receiver) = - match std::mem::replace(&mut worker_mut.state, WorkerState::Running) { - WorkerState::Pending(job, worker_receiver) => { - worker_mut.state = WorkerState::Running; - (job, worker_receiver) - } - WorkerState::Running => unreachable!(), - }; - let worker_sender = worker_mut.worker_sender.clone(); + let worker_events_tx = worker.worker_events_tx.clone(); + let worker_events_rx = worker + .worker_events_rx + .take() + .expect("critical error: missing worker events rx"); - worker_mut.job_report.status = JobStatus::Running; + let mut job = worker + .job + .take() + .expect("critical error: missing job on worker"); - worker_mut.job_report.create(&ctx).await.unwrap_or(()); + let job_id = worker.report.id; + let old_status = worker.report.status; + worker.report.status = JobStatus::Running; + if matches!(old_status, JobStatus::Queued) { + worker.report.create(&ctx).await.unwrap_or(()); + } + drop(worker); // spawn task to handle receiving events from the worker let library_ctx = ctx.clone(); tokio::spawn(Worker::track_progress( - worker.clone(), - worker_receiver, + Arc::clone(&worker_mutex), + worker_events_rx, library_ctx.clone(), )); - let uuid = worker_mut.job_report.id.clone(); // spawn task to handle running the job - tokio::spawn(async move { let worker_ctx = WorkerContext { - uuid, library_ctx, - sender: worker_sender, + events_tx: worker_events_tx, + shutdown_tx: job_manager.shutdown_tx(), }; - let job_start = Instant::now(); // track time - let sender = worker_ctx.sender.clone(); + let events_tx = worker_ctx.events_tx.clone(); tokio::spawn(async move { + let mut interval = interval_at( + Instant::now() + Duration::from_millis(1000), + Duration::from_millis(1000), + ); loop { - let elapsed = job_start.elapsed().as_secs(); - sender + interval.tick().await; + if events_tx .send(WorkerEvent::Progressed(vec![ - JobReportUpdate::SecondsElapsed(elapsed), + JobReportUpdate::SecondsElapsed(1), ])) - .unwrap_or(()); - sleep(Duration::from_millis(1000)).await; + .is_err() && events_tx.is_closed() + { + break; + } } }); if let Err(e) = job.run(worker_ctx.clone()).await { - error!("job '{}' failed with error: {}", worker_ctx.uuid, e); - worker_ctx.sender.send(WorkerEvent::Failed).unwrap_or(()); + if let JobError::Paused(state) = e { + worker_ctx + .events_tx + .send(WorkerEvent::Paused(state)) + .expect("critical error: failed to send worker pause event"); + } else { + error!("job '{}' failed with error: {:#?}", job_id, e); + worker_ctx + .events_tx + .send(WorkerEvent::Failed) + .expect("critical error: failed to send worker fail event"); + } } else { // handle completion - worker_ctx.sender.send(WorkerEvent::Completed).unwrap_or(()); + worker_ctx + .events_tx + .send(WorkerEvent::Completed) + .expect("critical error: failed to send worker complete event"); } - job_manager.complete(&ctx, worker_ctx.uuid).await; + job_manager.complete(&ctx, job_id).await; }); } - pub fn id(&self) -> String { - self.job_report.id.to_owned() - } - async fn track_progress( worker: Arc>, - mut channel: UnboundedReceiver, + mut worker_events_rx: UnboundedReceiver, ctx: LibraryContext, ) { - while let Some(command) = channel.recv().await { + while let Some(command) = worker_events_rx.recv().await { let mut worker = worker.lock().await; match command { WorkerEvent::Progressed(changes) => { // protect against updates if job is not running - if worker.job_report.status != JobStatus::Running { + if worker.report.status != JobStatus::Running { continue; }; for change in changes { match change { JobReportUpdate::TaskCount(task_count) => { - worker.job_report.task_count = task_count as i32; + worker.report.task_count = task_count as i32; } JobReportUpdate::CompletedTaskCount(completed_task_count) => { - worker.job_report.completed_task_count = - completed_task_count as i32; + worker.report.completed_task_count = completed_task_count as i32; } JobReportUpdate::Message(message) => { - worker.job_report.message = message; + worker.report.message = message; } JobReportUpdate::SecondsElapsed(seconds) => { - worker.job_report.seconds_elapsed = seconds as i32; + worker.report.seconds_elapsed += seconds as i32; } } } ctx.emit(CoreEvent::InvalidateQueryDebounced( ClientQuery::LibraryQuery { - library_id: ctx.id.to_string(), + library_id: ctx.id, query: LibraryQuery::GetRunningJobs, }, )) .await; } WorkerEvent::Completed => { - worker.job_report.status = JobStatus::Completed; - worker.job_report.update(&ctx).await.unwrap_or(()); + worker.report.status = JobStatus::Completed; + worker.report.data = None; + worker + .report + .update(&ctx) + .await + .expect("critical error: failed to update job report"); ctx.emit(CoreEvent::InvalidateQuery(ClientQuery::LibraryQuery { - library_id: ctx.id.to_string(), + library_id: ctx.id, query: LibraryQuery::GetRunningJobs, })) .await; ctx.emit(CoreEvent::InvalidateQuery(ClientQuery::LibraryQuery { - library_id: ctx.id.to_string(), + library_id: ctx.id, query: LibraryQuery::GetJobHistory, })) .await; + info!("{}", worker.report); + break; } WorkerEvent::Failed => { - worker.job_report.status = JobStatus::Failed; - worker.job_report.update(&ctx).await.unwrap_or(()); + worker.report.status = JobStatus::Failed; + worker.report.data = None; + worker + .report + .update(&ctx) + .await + .expect("critical error: failed to update job report"); ctx.emit(CoreEvent::InvalidateQuery(ClientQuery::LibraryQuery { - library_id: ctx.id.to_string(), + library_id: ctx.id, + query: LibraryQuery::GetJobHistory, + })) + .await; + warn!("{}", worker.report); + + break; + } + WorkerEvent::Paused(state) => { + worker.report.status = JobStatus::Paused; + worker.report.data = Some(state); + worker + .report + .update(&ctx) + .await + .expect("critical error: failed to update job report"); + info!("{}", worker.report); + + ctx.emit(CoreEvent::InvalidateQuery(ClientQuery::LibraryQuery { + library_id: ctx.id, query: LibraryQuery::GetJobHistory, })) .await; diff --git a/core/src/lib.rs b/core/src/lib.rs index 05cda65a0..3526be1a1 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -1,15 +1,19 @@ -use crate::{file::cas::FileIdentifierJob, prisma::file as prisma_file, prisma::location}; -use job::{JobManager, JobReport}; -use library::{LibraryConfig, LibraryConfigWrapped, LibraryManager}; -use log::error; -use node::{NodeConfig, NodeConfigManager}; +use crate::{ + encode::{ThumbnailJob, ThumbnailJobInit}, + file::cas::{FileIdentifierJob, FileIdentifierJobInit}, + job::{Job, JobManager, JobReport}, + library::{LibraryConfig, LibraryConfigWrapped, LibraryManager}, + node::{NodeConfig, NodeConfigManager}, + prisma::file as prisma_file, + prisma::location, + tag::{Tag, TagWithFiles}, +}; +use log::{error, info}; use serde::{Deserialize, Serialize}; use std::{ path::{Path, PathBuf}, sync::Arc, }; -use tag::{Tag, TagWithFiles}; - use thiserror::Error; use tokio::{ fs, @@ -19,8 +23,7 @@ use tokio::{ }, }; use ts_rs::TS; - -use crate::encode::ThumbnailJob; +use uuid::Uuid; mod encode; mod file; @@ -56,7 +59,7 @@ impl NodeController { }) .unwrap_or(()); // wait for response and return - recv.await.unwrap_or(Err(CoreError::QueryError)) + recv.await.unwrap_or(Err(CoreError::Query)) } pub async fn command(&self, command: ClientCommand) -> Result { @@ -102,35 +105,56 @@ pub struct Node { UnboundedReceiver>, ), event_sender: mpsc::Sender, + shutdown_completion_tx: oneshot::Sender<()>, } impl Node { // create new instance of node, run startup tasks pub async fn new( data_dir: impl AsRef, - ) -> (NodeController, mpsc::Receiver, Node) { - fs::create_dir_all(&data_dir).await.unwrap(); + ) -> ( + NodeController, + mpsc::Receiver, + Node, + oneshot::Receiver<()>, + ) { + let data_dir = data_dir.as_ref(); + fs::create_dir_all(data_dir).await.unwrap(); let (event_sender, event_recv) = mpsc::channel(100); - let config = NodeConfigManager::new(data_dir.as_ref().to_owned()) - .await - .unwrap(); + let config = NodeConfigManager::new(data_dir.to_owned()).await.unwrap(); + + let (shutdown_completion_tx, shutdown_completion_rx) = oneshot::channel(); + let jobs = JobManager::new(); let node_ctx = NodeContext { event_sender: event_sender.clone(), config: config.clone(), jobs: jobs.clone(), }; + let library_manager = LibraryManager::new(data_dir.join("libraries"), node_ctx) + .await + .unwrap(); + + // Trying to resume possible paused jobs + let inner_library_manager = Arc::clone(&library_manager); + let inner_jobs = Arc::clone(&jobs); + tokio::spawn(async move { + for library_ctx in inner_library_manager.get_all_libraries_ctx().await { + if let Err(e) = Arc::clone(&inner_jobs).resume_jobs(&library_ctx).await { + error!("Failed to resume jobs for library. {:#?}", e); + } + } + }); let node = Node { config, - library_manager: LibraryManager::new(data_dir.as_ref().join("libraries"), node_ctx) - .await - .unwrap(), + library_manager, query_channel: unbounded_channel(), command_channel: unbounded_channel(), jobs, event_sender, + shutdown_completion_tx, }; ( @@ -140,6 +164,7 @@ impl Node { }, event_recv, node, + shutdown_completion_rx, ) } @@ -151,7 +176,7 @@ impl Node { } } - pub async fn start(mut self) { + pub async fn start(mut self, mut shutdown_rx: oneshot::Receiver<()>) { loop { // listen on global messaging channels for incoming messages tokio::select! { @@ -163,10 +188,24 @@ impl Node { let res = self.exec_command(msg.data).await; msg.return_sender.send(res).unwrap_or(()); } + + _ = &mut shutdown_rx => { + info!("Initiating shutdown node..."); + self.shutdown().await; + info!("Node shutdown complete."); + self.shutdown_completion_tx.send(()) + .expect("critical error: failed to send node shutdown completion signal"); + + break; + } } } } + pub async fn shutdown(&self) { + self.jobs.pause().await + } + async fn exec_command(&mut self, cmd: ClientCommand) -> Result { Ok(match cmd { ClientCommand::CreateLibrary { name } => { @@ -258,19 +297,25 @@ impl Node { // CRUD for libraries LibraryCommand::VolUnmount { id: _ } => todo!(), LibraryCommand::GenerateThumbsForLocation { id, path } => { - ctx.spawn_job(Box::new(ThumbnailJob { - location_id: id, - path, - background: false, // fix - })) + ctx.spawn_job(Job::new( + ThumbnailJobInit { + location_id: id, + path, + background: false, // fix + }, + Box::new(ThumbnailJob {}), + )) .await; CoreResponse::Success(()) } LibraryCommand::IdentifyUniqueFiles { id, path } => { - ctx.spawn_job(Box::new(FileIdentifierJob { - location_id: id, - path, - })) + ctx.spawn_job(Job::new( + FileIdentifierJobInit { + location_id: id, + path, + }, + Box::new(FileIdentifierJob {}), + )) .await; CoreResponse::Success(()) } @@ -292,7 +337,7 @@ impl Node { ClientQuery::GetNodes => todo!(), ClientQuery::GetVolumes => CoreResponse::GetVolumes(sys::Volume::get_volumes()?), ClientQuery::LibraryQuery { library_id, query } => { - let ctx = match self.library_manager.get_ctx(library_id.clone()).await { + let ctx = match self.library_manager.get_ctx(library_id).await { Some(ctx) => ctx, None => { println!("Library '{}' not found!", library_id); @@ -344,15 +389,15 @@ pub enum ClientCommand { name: String, }, EditLibrary { - id: String, + id: Uuid, name: Option, description: Option, }, DeleteLibrary { - id: String, + id: Uuid, }, LibraryCommand { - library_id: String, + library_id: Uuid, command: LibraryCommand, }, } @@ -437,7 +482,7 @@ pub enum ClientQuery { GetVolumes, GetNodes, LibraryQuery { - library_id: String, + library_id: Uuid, query: LibraryQuery, }, } @@ -512,17 +557,17 @@ pub enum CoreResponse { #[derive(Error, Debug)] pub enum CoreError { #[error("Query error")] - QueryError, - #[error("System error")] - SysError(#[from] sys::SysError), - #[error("File error")] - FileError(#[from] file::FileError), - #[error("Job error")] - JobError(#[from] job::JobError), - #[error("Database error")] - DatabaseError(#[from] prisma::QueryError), - #[error("Database error")] - LibraryError(#[from] library::LibraryError), + Query, + #[error("System error: {0}")] + Sys(#[from] sys::SysError), + #[error("File error: {0}")] + File(#[from] file::FileError), + #[error("Job error: {0}")] + Job(#[from] job::JobError), + #[error("Database error: {0}")] + Database(#[from] prisma::QueryError), + #[error("Library error: {0}")] + Library(#[from] library::LibraryError), } #[derive(Serialize, Deserialize, Debug, Clone, TS)] diff --git a/core/src/library/library_config.rs b/core/src/library/library_config.rs index f3ab140f1..d9a0db90a 100644 --- a/core/src/library/library_config.rs +++ b/core/src/library/library_config.rs @@ -7,6 +7,7 @@ use std::{ use serde::{Deserialize, Serialize}; use std::io::Write; use ts_rs::TS; +use uuid::Uuid; use crate::node::ConfigMetadata; @@ -64,6 +65,6 @@ impl LibraryConfig { #[derive(Serialize, Deserialize, Debug, TS)] #[ts(export)] pub struct LibraryConfigWrapped { - pub uuid: String, + pub uuid: Uuid, pub config: LibraryConfig, } diff --git a/core/src/library/library_ctx.rs b/core/src/library/library_ctx.rs index 50bc5ea94..3526f7498 100644 --- a/core/src/library/library_ctx.rs +++ b/core/src/library/library_ctx.rs @@ -1,9 +1,7 @@ +use crate::{job::DynJob, node::NodeConfigManager, prisma::PrismaClient, CoreEvent, NodeContext}; use std::sync::Arc; - use uuid::Uuid; -use crate::{job::Job, node::NodeConfigManager, prisma::PrismaClient, CoreEvent, NodeContext}; - use super::LibraryConfig; /// LibraryContext holds context for a library which can be passed around the application. @@ -22,11 +20,11 @@ pub struct LibraryContext { } impl LibraryContext { - pub(crate) async fn spawn_job(&self, job: Box) { + pub(crate) async fn spawn_job(&self, job: Box) { self.node_context.jobs.clone().ingest(self, job).await; } - pub(crate) async fn queue_job(&self, job: Box) { + pub(crate) async fn queue_job(&self, job: Box) { self.node_context.jobs.ingest_queue(self, job).await; } diff --git a/core/src/library/library_manager.rs b/core/src/library/library_manager.rs index 89917a4cf..fb17618f2 100644 --- a/core/src/library/library_manager.rs +++ b/core/src/library/library_manager.rs @@ -85,15 +85,7 @@ impl LibraryManager { } let config = LibraryConfig::read(config_path).await?; - libraries.push( - Self::load( - library_id, - db_path.to_str().unwrap(), - config, - node_context.clone(), - ) - .await?, - ); + libraries.push(Self::load(library_id, &db_path, config, node_context.clone()).await?); } let this = Arc::new(Self { @@ -108,8 +100,7 @@ impl LibraryManager { name: "My Default Library".into(), ..Default::default() }) - .await - .unwrap(); + .await?; } Ok(this) @@ -148,14 +139,18 @@ impl LibraryManager { .iter() .map(|lib| LibraryConfigWrapped { config: lib.config.clone(), - uuid: lib.id.to_string(), + uuid: lib.id, }) .collect() } + pub(crate) async fn get_all_libraries_ctx(&self) -> Vec { + self.libraries.read().await.clone() + } + pub(crate) async fn edit( &self, - id: String, + id: Uuid, name: Option, description: Option, ) -> Result<(), LibraryManagerError> { @@ -163,7 +158,7 @@ impl LibraryManager { let mut libraries = self.libraries.write().await; let library = libraries .iter_mut() - .find(|lib| lib.id == Uuid::from_str(&id).unwrap()) + .find(|lib| lib.id == id) .ok_or(LibraryManagerError::LibraryNotFound)?; // update the library @@ -186,11 +181,9 @@ impl LibraryManager { Ok(()) } - pub async fn delete_library(&self, id: String) -> Result<(), LibraryManagerError> { + pub async fn delete_library(&self, id: Uuid) -> Result<(), LibraryManagerError> { let mut libraries = self.libraries.write().await; - let id = Uuid::parse_str(&id)?; - let library = libraries .iter() .find(|l| l.id == id) @@ -208,12 +201,12 @@ impl LibraryManager { } // get_ctx will return the library context for the given library id. - pub(crate) async fn get_ctx(&self, library_id: String) -> Option { + pub(crate) async fn get_ctx(&self, library_id: Uuid) -> Option { self.libraries .read() .await .iter() - .find(|lib| lib.id.to_string() == library_id) + .find(|lib| lib.id == library_id) .map(Clone::clone) } @@ -239,12 +232,14 @@ impl LibraryManager { _ => Platform::Unknown, }; + let uuid_vec = id.as_bytes().to_vec(); + let node_data = db .node() .upsert( - node::pub_id::equals(id.to_string()), + node::pub_id::equals(uuid_vec.clone()), ( - node::pub_id::set(id.to_string()), + node::pub_id::set(uuid_vec), node::name::set(node_config.name.clone()), vec![node::platform::set(platform as i32)], ), diff --git a/core/src/library/mod.rs b/core/src/library/mod.rs index 23aed8efa..7d7c34608 100644 --- a/core/src/library/mod.rs +++ b/core/src/library/mod.rs @@ -1,3 +1,6 @@ +use crate::{prisma, sys::SysError}; +use thiserror::Error; + mod library_config; mod library_ctx; mod library_manager; @@ -8,10 +11,6 @@ pub use library_ctx::*; pub use library_manager::*; pub use statistics::*; -use thiserror::Error; - -use crate::{prisma, sys::SysError}; - #[derive(Error, Debug)] pub enum LibraryError { #[error("Missing library")] diff --git a/core/src/node/config.rs b/core/src/node/config.rs index ea1a09f1a..9a8ba76ac 100644 --- a/core/src/node/config.rs +++ b/core/src/node/config.rs @@ -1,8 +1,10 @@ use serde::{Deserialize, Serialize}; -use std::fs::File; -use std::io::{self, BufReader, Seek, SeekFrom, Write}; -use std::path::{Path, PathBuf}; -use std::sync::Arc; +use std::{ + fs::File, + io::{self, BufReader, Seek, SeekFrom, Write}, + path::{Path, PathBuf}, + sync::Arc, +}; use thiserror::Error; use tokio::sync::{RwLock, RwLockWriteGuard}; use ts_rs::TS; diff --git a/core/src/node/mod.rs b/core/src/node/mod.rs index 5276ab015..c06a3fb8b 100644 --- a/core/src/node/mod.rs +++ b/core/src/node/mod.rs @@ -2,6 +2,8 @@ use chrono::{DateTime, Utc}; use int_enum::IntEnum; use serde::{Deserialize, Serialize}; use ts_rs::TS; +use uuid::Uuid; + mod config; use crate::prisma::node; pub use config::*; @@ -9,7 +11,7 @@ pub use config::*; #[derive(Debug, Clone, Serialize, Deserialize, TS)] #[ts(export)] pub struct LibraryNode { - pub uuid: String, + pub uuid: Uuid, pub name: String, pub platform: Platform, pub last_seen: DateTime, @@ -18,7 +20,7 @@ pub struct LibraryNode { impl From for LibraryNode { fn from(data: node::Data) -> Self { Self { - uuid: data.pub_id, + uuid: Uuid::from_slice(&data.pub_id).unwrap(), name: data.name, platform: IntEnum::from_int(data.platform).unwrap(), last_seen: data.last_seen.into(), diff --git a/core/src/sys/locations.rs b/core/src/sys/locations.rs index 2daa65c3d..764e1a97a 100644 --- a/core/src/sys/locations.rs +++ b/core/src/sys/locations.rs @@ -1,15 +1,21 @@ +use super::SysError; use crate::{ - file::{cas::FileIdentifierJob, indexer::IndexerJob}, + file::{ + cas::FileIdentifierJob, + indexer::{IndexerJob, IndexerJobInit}, + }, library::LibraryContext, node::LibraryNode, prisma::{file_path, location}, - ClientQuery, CoreEvent, LibraryQuery, + ClientQuery, CoreEvent, FileIdentifierJobInit, Job, LibraryQuery, ThumbnailJob, + ThumbnailJobInit, }; - use log::info; use serde::{Deserialize, Serialize}; -use std::fmt::Debug; -use std::path::{Path, PathBuf}; +use std::{ + fmt::Debug, + path::{Path, PathBuf}, +}; use thiserror::Error; use tokio::{ fs::{metadata, File}, @@ -18,14 +24,12 @@ use tokio::{ use ts_rs::TS; use uuid::Uuid; -use super::SysError; - #[derive(Debug, Clone, Serialize, Deserialize, TS)] #[ts(export)] pub struct LocationResource { pub id: i32, pub name: Option, - pub path: Option, + pub path: Option, pub total_capacity: Option, pub available_capacity: Option, pub is_removable: Option, @@ -40,7 +44,7 @@ impl From for LocationResource { LocationResource { id: data.id, name: data.name, - path: data.local_path, + path: data.local_path.map(PathBuf::from), total_capacity: data.total_capacity, available_capacity: data.available_capacity, is_removable: data.is_removable, @@ -88,21 +92,31 @@ pub async fn get_location( pub async fn scan_location(ctx: &LibraryContext, location_id: i32, path: impl AsRef) { let path_buf = path.as_ref().to_path_buf(); - ctx.spawn_job(Box::new(IndexerJob { - path: path_buf.clone(), - })) + ctx.spawn_job(Job::new( + IndexerJobInit { + path: path_buf.clone(), + }, + Box::new(IndexerJob {}), + )) .await; - ctx.queue_job(Box::new(FileIdentifierJob { - location_id, - path: path_buf, - })) + ctx.queue_job(Job::new( + FileIdentifierJobInit { + location_id, + path: path_buf.clone(), + }, + Box::new(FileIdentifierJob {}), + )) + .await; + + ctx.queue_job(Job::new( + ThumbnailJobInit { + location_id, + path: path_buf, + background: true, + }, + Box::new(ThumbnailJob {}), + )) .await; - // TODO: make a way to stop jobs so this can be canceled without rebooting app - // ctx.queue_job(Box::new(ThumbnailJob { - // location_id, - // path: "".to_string(), - // background: false, - // })); } pub async fn new_location_and_scan( @@ -173,7 +187,7 @@ pub async fn create_location( .db .location() .create( - location::pub_id::set(uuid.to_string()), + location::pub_id::set(uuid.as_bytes().to_vec()), vec![ location::name::set(Some( path.file_name().unwrap().to_string_lossy().to_string(), @@ -231,7 +245,7 @@ pub async fn delete_location(ctx: &LibraryContext, location_id: i32) -> Result<( .await?; ctx.emit(CoreEvent::InvalidateQuery(ClientQuery::LibraryQuery { - library_id: ctx.id.to_string(), + library_id: ctx.id, query: LibraryQuery::GetLocations, })) .await; diff --git a/core/src/sys/mod.rs b/core/src/sys/mod.rs index 8eafbf28b..af63fd920 100644 --- a/core/src/sys/mod.rs +++ b/core/src/sys/mod.rs @@ -6,7 +6,7 @@ pub use volumes::*; use thiserror::Error; -use crate::{job, prisma}; +use crate::prisma; #[derive(Error, Debug)] pub enum SysError { @@ -14,8 +14,6 @@ pub enum SysError { Location(#[from] LocationError), #[error("Error with system volumes")] Volume(String), - #[error("Error from job runner")] - Job(#[from] job::JobError), #[error("Database error")] Database(#[from] prisma::QueryError), } diff --git a/core/src/tag/mod.rs b/core/src/tag/mod.rs index 415c1780c..e6da9476b 100644 --- a/core/src/tag/mod.rs +++ b/core/src/tag/mod.rs @@ -11,12 +11,13 @@ use crate::{ use serde::{Deserialize, Serialize}; use thiserror::Error; use ts_rs::TS; +use uuid::Uuid; #[derive(Debug, Clone, Serialize, Deserialize, TS)] #[ts(export)] pub struct Tag { pub id: i32, - pub pub_id: String, + pub pub_id: Uuid, pub name: Option, pub color: Option, @@ -43,7 +44,7 @@ impl From for Tag { fn from(data: tag::Data) -> Self { Self { id: data.id, - pub_id: data.pub_id, + pub_id: Uuid::from_slice(&data.pub_id).unwrap(), name: data.name, color: data.color, total_files: data.total_files, @@ -90,7 +91,7 @@ pub async fn create_tag( .db .tag() .create( - tag::pub_id::set(uuid::Uuid::new_v4().to_string()), + tag::pub_id::set(Uuid::new_v4().as_bytes().to_vec()), vec![tag::name::set(Some(name)), tag::color::set(Some(color))], ) .exec() @@ -98,7 +99,7 @@ pub async fn create_tag( .unwrap(); ctx.emit(CoreEvent::InvalidateQuery(ClientQuery::LibraryQuery { - library_id: ctx.id.to_string(), + library_id: ctx.id, query: LibraryQuery::GetTags, })) .await; @@ -121,7 +122,7 @@ pub async fn update_tag( .unwrap(); ctx.emit(CoreEvent::InvalidateQuery(ClientQuery::LibraryQuery { - library_id: ctx.id.to_string(), + library_id: ctx.id, query: LibraryQuery::GetTags, })) .await; @@ -153,7 +154,7 @@ pub async fn tag_delete(ctx: LibraryContext, id: i32) -> Result Result<()> { +async fn my_core_function(&ctx: CoreContext) -> Result<()> { let mut file = File::get_unique(1).await?; ctx.sync.operation(file.id, diff --git a/docs/product/roadmap.md b/docs/product/roadmap.md index 8233cf573..cd14aafb8 100644 --- a/docs/product/roadmap.md +++ b/docs/product/roadmap.md @@ -15,7 +15,7 @@ **To be developed (MVP):** - **Photos** - Photo and video albums similar to Apple/Google photos. -- **Search** - Deep search into your filesystem with a keybind, including offline locations. +- **Search** - Deep search into your filesystem with a keybinding, including offline locations. - **Tags** - Define routines on custom tags to automate workflows, easily tag files individually, in bulk and automatically via rules. - **Extensions** - Build tools on top of Spacedrive, extend functionality and integrate third party services. Extension directory on [spacedrive.com/extensions](/extensions). @@ -25,7 +25,7 @@ - **Cloud integration** - Index & backup to Apple Photos, Google Drive, Dropbox, OneDrive & Mega + easy API for the community to add more. - **Encrypted vault(s)** - Effortlessly manage & encrypt sensitive files, built on top of VeraCrypt. Encrypt individual files or create flexible-size vaults. - **Key manager** - View, mount, dismount and hide keys. Mounted keys automatically unlock respective areas of your filesystem. -- **Redundancy Goal** - Ensure a specific amount of copies exist for your important data, discover at-risk files and monitor device/drive health. +- **Redundancy Goal** - Ensure a specific amount of copies exists for your important data, discover at-risk files and monitor device/drive health. - **Timeline** - View a linear timeline of content, travel to any time and see media represented visually. - **Media encoder** - Encode video and audio into various formats, use Tags to automate. Built with FFMPEG. -- **Workers** - Utilize the compute power of your devices in unison to encode and perform tasks at increased speeds. +- **Workers** - Utilize the computing power of your devices in unison to encode and perform tasks at increased speeds. diff --git a/lefthook.yml b/lefthook.yml new file mode 100644 index 000000000..4b4b62880 --- /dev/null +++ b/lefthook.yml @@ -0,0 +1,33 @@ +######################################################################################################################## +# Refer for explanation to following link: # +# https://github.com/evilmartians/lefthook/blob/master/docs/full_guide.md # +######################################################################################################################## + +pre-commit: + parallel: true + commands: + type-check: + glob: "*.{ts,tsx}" + run: pnpm typecheck + lint: + glob: '*.{ts,tsx}' + run: pnpm eslint {all_files} + spelling: + glob: '*.{ts,tsx,md,rs}' + run: pnpm cspell {staged_files} + markdown-link-check: + glob: '*.md' + run: pnpm markdown-link-check {staged_files} + rust-fmt: + glob: '*.rs' + run: cargo fmt --all -- --check + rust-lint-tauri: + run: cargo clippy --package spacedrive -- -D warnings + rust-lint-core: + run: cargo clippy --package sdcore --lib -- -D warnings + rust-lint-core-prisma: + run: cargo clippy --package prisma-cli -- -D warnings + rust-lint-core-derive: + run: cargo clippy --package core-derive --lib -- -D warnings + rust-lint-server: + run: cargo clippy --package server -- -D warnings diff --git a/package.json b/package.json index af5a13315..1124def65 100644 --- a/package.json +++ b/package.json @@ -23,7 +23,20 @@ "typecheck": "pnpm -r exec tsc" }, "devDependencies": { + "@cspell/dict-rust": "^2.0.1", + "@cspell/dict-typescript": "^2.0.1", + "@evilmartians/lefthook": "^1.0.5", "@trivago/prettier-plugin-sort-imports": "^3.2.0", + "@typescript-eslint/eslint-plugin": "^5.30.7", + "@typescript-eslint/parser": "^5.30.7", + "cspell": "^6.4.0", + "eslint": "^8.20.0", + "eslint-config-prettier": "^8.5.0", + "eslint-config-standard-with-typescript": "^22.0.0", + "eslint-plugin-import": ">=2.25.2 <3.0.0", + "eslint-plugin-n": ">=15.0.0 <16.0.0", + "eslint-plugin-promise": ">=6.0.0 <7.0.0", + "markdown-link-check": "^3.10.2", "prettier": "^2.6.2", "turbo": "^1.2.14", "typescript": "^4.7.4" diff --git a/packages/interface/src/AppRouter.tsx b/packages/interface/src/AppRouter.tsx index 644864922..ccd9171a3 100644 --- a/packages/interface/src/AppRouter.tsx +++ b/packages/interface/src/AppRouter.tsx @@ -16,7 +16,7 @@ import { SettingsScreen } from './screens/settings/Settings'; import AppearanceSettings from './screens/settings/client/AppearanceSettings'; import ExtensionSettings from './screens/settings/client/ExtensionsSettings'; import GeneralSettings from './screens/settings/client/GeneralSettings'; -import KeybindSettings from './screens/settings/client/KeybindSettings'; +import KeybindingSettings from './screens/settings/client/KeybindingSettings'; import PrivacySettings from './screens/settings/client/PrivacySettings'; import AboutSpacedrive from './screens/settings/info/AboutSpacedrive'; import Changelog from './screens/settings/info/Changelog'; @@ -66,7 +66,7 @@ export function AppRouter() { } /> } /> } /> - } /> + } /> } /> } /> } /> diff --git a/packages/interface/src/components/primitive/Checkbox.tsx b/packages/interface/src/components/primitive/Checkbox.tsx index d8c649a67..39ca5ccec 100644 --- a/packages/interface/src/components/primitive/Checkbox.tsx +++ b/packages/interface/src/components/primitive/Checkbox.tsx @@ -2,7 +2,7 @@ import clsx from 'clsx'; import React from 'react'; export interface CheckboxProps extends React.InputHTMLAttributes { - containerClasname?: string; + containerClassname?: string; } export const Checkbox: React.FC = (props) => { @@ -10,7 +10,7 @@ export const Checkbox: React.FC = (props) => {