Resumable Jobs + Lefthook Integration (#344)

* Introducing Lefthook for git hooks automation
* TypeScript typechecking and linting
* Rust formatting and linting
* Spellchecking (also corrected spell errors in some files)
* Check links in md files

* Introducing resumable jobs
* Abstractions to pause and resume jobs automatically when application exits and is started
* Changing database to use Bytes for UUID fields
* Changing uuid fields on core to use uuid::Uuid instead of String
* Updating some dependencies and introducing msg_pack serialization to save job state on database

* Fixing some clippy warnings

* Fixing a regression introduced on identifier job, doing too much db accesses concurrently
This commit is contained in:
Ericson "Fogo" Soares 2022-07-27 04:06:34 -03:00 committed by GitHub
parent 5a7650c187
commit 85e5eec993
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
44 changed files with 3065 additions and 868 deletions

View file

@ -0,0 +1,36 @@
tauri
rustup
aarch
sdcore
dotenv
dotenvy
prismjs
actix
rtype
healthcheck
sdserver
ipfs
impls
crdt
quicktime
creationdate
imageops
thumbnailer
HEXLOWER
chrono
walkdir
thiserror
thumbstrip
repr
Deque
oneshot
sdlibrary
sdconfig
DOTFILE
sysinfo
initialising
struct
UHLC
CRDTs
PRRTT
filesystems

View file

@ -0,0 +1,82 @@
pnpm
titlebar
consts
pallete
unlisten
svgr
middlewares
clsx
SDWEB
tryghost
tsparticles
Opencollective
Waitlist
heroicons
roadmap
semibold
noreferer
Rescan
subpackage
photoslibrary
fontsource
audiomp
audioogg
audiowav
browserslist
bsconfig
cheader
compodoc
cssmap
dartlang
dockerdebug
folderlight
folderopen
fontotf
fontttf
fontwoff
gopackage
haml
imagegif
imageico
imagejpg
imagepng
ipynb
jsmap
lighteditorconfig
nestjscontroller
nestjs
nestjsdecorator
nestjsfilter
nestjsguard
nestjsmodule
nestjsservice
npmlock
nuxt
opengl
photoshop
postcssconfig
powershelldata
reactjs
rjson
symfony
testjs
tmpl
typescriptdef
windi
yarnerror
unlisten
imagewebp
powershellmodule
reactts
testts
zustand
overscan
webp
headlessui
falsey
nums
lacie
classname
wunsub
immer
tada

38
.cspell/project_words.txt Normal file
View file

@ -0,0 +1,38 @@
spacedrive
spacedriveapp
vdfs
haoyuan
brendonovich
codegen
elon
deel
haden
akar
benja
haris
mehrzad
OSSC
josephjacks
rauch
ravikant
neha
narkhede
allred
lütke
tobiaslutke
justinhoffman
rywalker
zacharysmith
sanjay
poonen
mytton
davidmytton
richelsen
lesterlee
alluxio
augusto
marietti
vijay
sharma
naveen
noco

21
.eslintrc.js Normal file
View file

@ -0,0 +1,21 @@
module.exports = {
root: true,
parser: '@typescript-eslint/parser',
parserOptions: {
"project": [
"apps/desktop/tsconfig.json",
"apps/web/tsconfig.json",
"apps/landing/tsconfig.json",
"packages/client/tsconfig.json",
"packages/interface/tsconfig.json",
"packages/ui/tsconfig.json",
],
},
plugins: [
'@typescript-eslint',
],
extends: [
'standard-with-typescript',
'prettier',
],
};

55
Cargo.lock generated
View file

@ -453,6 +453,7 @@ dependencies = [
"num-bigint 0.3.3",
"num-integer",
"num-traits",
"serde",
]
[[package]]
@ -3659,6 +3660,15 @@ dependencies = [
"url",
]
[[package]]
name = "ordered-float"
version = "2.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7940cf2ca942593318d07fcf2596cdca60a85c9e7fab408a5e21a4f9dcd40d87"
dependencies = [
"num-traits",
]
[[package]]
name = "ordermap"
version = "0.3.5"
@ -4140,25 +4150,28 @@ dependencies = [
[[package]]
name = "prisma-client-rust"
version = "0.4.1"
source = "git+https://github.com/Brendonovich/prisma-client-rust.git?tag=0.5.0#31a864ae09e835ff046469345bf6e413ae54a30f"
version = "0.5.2"
source = "git+https://github.com/Brendonovich/prisma-client-rust.git?tag=0.5.2#88ba860aedace6ebf0707fa989138fea87cd0d5e"
dependencies = [
"bigdecimal 0.2.2",
"chrono",
"datamodel",
"indexmap",
"prisma-models",
"query-connector",
"query-core",
"serde",
"serde-value",
"serde_json",
"thiserror",
"user-facing-errors",
"uuid 0.8.2",
]
[[package]]
name = "prisma-client-rust-cli"
version = "0.4.1"
source = "git+https://github.com/Brendonovich/prisma-client-rust.git?tag=0.5.0#31a864ae09e835ff046469345bf6e413ae54a30f"
source = "git+https://github.com/Brendonovich/prisma-client-rust?tag=0.5.0#31a864ae09e835ff046469345bf6e413ae54a30f"
dependencies = [
"convert_case 0.5.0",
"datamodel",
@ -4702,6 +4715,28 @@ dependencies = [
"winapi",
]
[[package]]
name = "rmp"
version = "0.8.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "44519172358fd6d58656c86ab8e7fbc9e1490c3e8f14d35ed78ca0dd07403c9f"
dependencies = [
"byteorder",
"num-traits",
"paste",
]
[[package]]
name = "rmp-serde"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "25786b0d276110195fa3d6f3f31299900cf71dfbd6c28450f3f58a0e7f7a347e"
dependencies = [
"byteorder",
"rmp",
"serde",
]
[[package]]
name = "rusqlite"
version = "0.25.4"
@ -4900,6 +4935,8 @@ dependencies = [
"log",
"prisma-client-rust",
"ring 0.17.0-alpha.11",
"rmp",
"rmp-serde",
"serde",
"serde_json",
"sysinfo",
@ -5005,6 +5042,16 @@ dependencies = [
"serde_derive",
]
[[package]]
name = "serde-value"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f3a1a3341211875ef120e117ea7fd5228530ae7e7036a779fdc9117be6b3282c"
dependencies = [
"ordered-float",
"serde",
]
[[package]]
name = "serde_bytes"
version = "0.11.6"
@ -5286,6 +5333,8 @@ version = "0.1.0"
dependencies = [
"dotenvy",
"env_logger",
"futures",
"log",
"sdcore",
"swift-rs",
"tauri",

View file

@ -59,7 +59,7 @@ For independent creatives, hoarders and those that want to own their digital foo
# What is a VDFS?
A VDFS (virtual distributed filesystem) is a filesystem designed to work across a variety of storage layers. With a uniform API to manipulate and access content across many devices, VSFS is not restricted to a single machine. It achieves this by maintaining a virtual index of all storage locations, synchronizing the database between clients in realtime. This implementation also uses [CAS](https://en.wikipedia.org/wiki/Content-addressable_storage) (Content-addressable storage) to uniquely identify files, while keeping record of logical file paths relative to the storage locations.
A VDFS (virtual distributed filesystem) is a filesystem designed to work across a variety of storage layers. With a uniform API to manipulate and access content across many devices, VDFS is not restricted to a single machine. It achieves this by maintaining a virtual index of all storage locations, synchronizing the database between clients in realtime. This implementation also uses [CAS](https://en.wikipedia.org/wiki/Content-addressable_storage) (Content-addressable storage) to uniquely identify files, while keeping record of logical file paths relative to the storage locations.
The first implementation of a VDFS can be found in this UC Berkeley [paper](https://www2.eecs.berkeley.edu/Pubs/TechRpts/2018/EECS-2018-29.pdf) by Haoyuan Li. This paper describes its use for cloud computing, however the underlying concepts can be translated to open consumer software.
@ -85,7 +85,7 @@ _Note: Links are for highlight purposes only until feature specific documentatio
**To be developed (MVP):**
- **[Photos](#features)** - Photo and video albums similar to Apple/Google photos.
- **[Search](#features)** - Deep search into your filesystem with a keybind, including offline locations.
- **[Search](#features)** - Deep search into your filesystem with a keybinding, including offline locations.
- **[Tags](#features)** - Define routines on custom tags to automate workflows, easily tag files individually, in bulk and automatically via rules.
- **[Extensions](#features)** - Build tools on top of Spacedrive, extend functionality and integrate third party services. Extension directory on [spacedrive.com/extensions](#features).

View file

@ -20,9 +20,11 @@ sdcore = { path = "../../../core" }
# Universal Dependencies
tokio = { version = "1.17.0", features = ["sync"] }
futures = "^0.3"
window-shadows = "0.1.2"
env_logger = "0.9.0"
dotenvy = "0.15.1"
log = { version = "0.4.17", features = ["max_level_trace"] }
# macOS system libs
[target.'cfg(target_os = "macos")'.dependencies]

View file

@ -1,8 +1,13 @@
use std::path::PathBuf;
use std::time::{Duration, Instant};
use dotenvy::dotenv;
use futures::executor::block_on;
use log::{debug, error, info};
use sdcore::{ClientCommand, ClientQuery, CoreEvent, CoreResponse, Node, NodeController};
use tauri::{api::path, Manager};
use tauri::{api::path, Manager, RunEvent};
use tokio::sync::oneshot;
#[cfg(target_os = "macos")]
mod macos;
mod menu;
@ -15,7 +20,7 @@ async fn client_query_transport(
match core.query(data).await {
Ok(response) => Ok(response),
Err(err) => {
println!("query error: {:?}", err);
error!("query error: {:?}", err);
Err(err.to_string())
}
}
@ -42,18 +47,42 @@ async fn app_ready(app_handle: tauri::AppHandle) {
window.show().unwrap();
}
struct ShutdownManager {
shutdown_tx: Option<oneshot::Sender<()>>,
shutdown_completion_rx: Option<oneshot::Receiver<()>>,
}
impl ShutdownManager {
fn shutdown(&mut self) {
if let Some(sender) = self.shutdown_tx.take() {
sender.send(()).unwrap();
if let Some(receiver) = self.shutdown_completion_rx.take() {
block_on(receiver).expect("failed to receive shutdown completion signal");
}
}
}
}
#[tokio::main]
async fn main() {
dotenv().ok();
env_logger::init();
let mut data_dir = path::data_dir().unwrap_or(std::path::PathBuf::from("./"));
data_dir = data_dir.join("spacedrive");
let data_dir = path::data_dir()
.unwrap_or_else(|| PathBuf::from("./"))
.join("spacedrive");
// create an instance of the core
let (controller, mut event_receiver, node) = Node::new(data_dir).await;
tokio::spawn(node.start());
let (controller, mut event_receiver, node, shutdown_completion_rx) = Node::new(data_dir).await;
let (shutdown_tx, shutdown_rx) = oneshot::channel();
let mut shutdown_manager = ShutdownManager {
shutdown_tx: Some(shutdown_tx),
shutdown_completion_rx: Some(shutdown_completion_rx),
};
tokio::spawn(node.start(shutdown_rx));
// create tauri app
tauri::Builder::default()
let app = tauri::Builder::default()
// pass controller to the tauri state manager
.manage(controller)
.setup(|app| {
@ -104,13 +133,31 @@ async fn main() {
Ok(())
})
.on_menu_event(|event| menu::handle_menu_event(event))
.on_menu_event(menu::handle_menu_event)
.invoke_handler(tauri::generate_handler![
client_query_transport,
client_command_transport,
app_ready,
])
.menu(menu::get_menu())
.run(tauri::generate_context!())
.expect("error while running tauri application");
.build(tauri::generate_context!())
.expect("error while building tauri application");
app.run(move |app_handler, event| {
if let RunEvent::ExitRequested { .. } = event {
debug!("Closing all open windows...");
app_handler
.windows()
.iter()
.for_each(|(window_name, window)| {
debug!("closing window: {window_name}");
if let Err(e) = window.close() {
error!("failed to close window '{}': {:#?}", window_name, e);
}
});
info!("Spacedrive shutting down...");
shutdown_manager.shutdown();
app_handler.exit(0);
}
})
}

View file

@ -1,6 +1,8 @@
use std::env::consts;
use tauri::{AboutMetadata, CustomMenuItem, Menu, MenuItem, Submenu, WindowMenuEvent, Wry};
use tauri::{
AboutMetadata, CustomMenuItem, Manager, Menu, MenuItem, Submenu, WindowMenuEvent, Wry,
};
pub(crate) fn get_menu() -> Menu {
match consts::OS {
@ -53,28 +55,25 @@ fn custom_menu_bar() -> Menu {
CustomMenuItem::new("reload_app".to_string(), "Reload").accelerator("CmdOrCtrl+R"),
);
let view_menu = view_menu.add_item(
view_menu.add_item(
CustomMenuItem::new("toggle_devtools".to_string(), "Toggle Developer Tools")
.accelerator("CmdOrCtrl+Alt+I"),
);
view_menu
)
};
let menu = Menu::new()
Menu::new()
.add_submenu(Submenu::new("Spacedrive", app_menu))
.add_submenu(Submenu::new("File", file_menu))
.add_submenu(Submenu::new("Edit", edit_menu))
.add_submenu(Submenu::new("View", view_menu))
.add_submenu(Submenu::new("Window", window_menu));
menu
.add_submenu(Submenu::new("Window", window_menu))
}
pub(crate) fn handle_menu_event(event: WindowMenuEvent<Wry>) {
match event.menu_item_id() {
"quit" => {
std::process::exit(0);
let app = event.window().app_handle();
app.exit(0);
}
"close" => {
let window = event.window();
@ -82,7 +81,6 @@ pub(crate) fn handle_menu_event(event: WindowMenuEvent<Wry>) {
#[cfg(debug_assertions)]
if window.is_devtools_open() {
window.close_devtools();
return;
} else {
window.close().unwrap();
}
@ -91,17 +89,17 @@ pub(crate) fn handle_menu_event(event: WindowMenuEvent<Wry>) {
window.close().unwrap();
}
"reload_app" => {
event
.window()
.with_webview(|webview| {
#[cfg(target_os = "macos")]
{
#[cfg(target_os = "macos")]
{
event
.window()
.with_webview(|webview| {
use crate::macos::reload_webview;
reload_webview(webview.inner() as _);
}
})
.unwrap();
})
.unwrap();
}
}
#[cfg(debug_assertions)]
"toggle_devtools" => {

View file

@ -18,9 +18,9 @@ use actix_web::{
use actix_web_actors::ws;
use serde::{Deserialize, Serialize};
use tokio::sync::mpsc;
use tokio::sync::{mpsc, oneshot};
const DATA_DIR_ENV_VAR: &'static str = "DATA_DIR";
const DATA_DIR_ENV_VAR: &str = "DATA_DIR";
#[derive(Serialize)]
pub struct Event(CoreEvent);
@ -182,10 +182,8 @@ impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for Socket {
};
fut.into_actor(self).spawn(ctx);
()
},
_ => (),
_ => {},
}
}
@ -222,12 +220,12 @@ impl Handler<SocketResponse> for Socket {
#[get("/")]
async fn index() -> impl Responder {
format!("Spacedrive Server!")
"Spacedrive Server!"
}
#[get("/health")]
async fn healthcheck() -> impl Responder {
format!("OK")
"OK"
}
#[get("/ws")]
@ -237,15 +235,14 @@ async fn ws_handler(
controller: web::Data<NodeController>,
server: web::Data<Addr<EventServer>>,
) -> Result<HttpResponse, Error> {
let resp = ws::start(
ws::start(
Socket {
node_controller: controller,
event_server: server,
},
&req,
stream,
);
resp
)
}
async fn not_found() -> impl Responder {
@ -284,14 +281,16 @@ async fn setup() -> (mpsc::Receiver<CoreEvent>, web::Data<NodeController>) {
std::env::current_dir()
.expect(
"Unable to get your currrent directory. Maybe try setting $DATA_DIR?",
"Unable to get your current directory. Maybe try setting $DATA_DIR?",
)
.join("sdserver_data")
},
};
let (controller, event_receiver, node) = Node::new(data_dir_path).await;
tokio::spawn(node.start());
let (controller, event_receiver, node, _shutdown_completion_rx) =
Node::new(data_dir_path).await;
let (_shutdown_tx, shutdown_rx) = oneshot::channel();
tokio::spawn(node.start(shutdown_rx));
(event_receiver, web::Data::new(controller))
}

View file

@ -22,19 +22,21 @@ futures = "0.3"
data-encoding = "2.3.2"
ring = "0.17.0-alpha.10"
int-enum = "0.4.0"
rmp = "^0.8.11"
rmp-serde = "^1.1.0"
# Project dependencies
ts-rs = { version = "6.2", features = ["chrono-impl", "uuid-impl", "serde-compat"] }
prisma-client-rust = { git = "https://github.com/Brendonovich/prisma-client-rust.git", tag = "0.5.0" }
prisma-client-rust = { git = "https://github.com/Brendonovich/prisma-client-rust.git", tag = "0.5.2" }
walkdir = "^2.3.2"
uuid = { version = "^0.8.2", features = ["v4", "serde"]}
sysinfo = "0.23.9"
thiserror = "1.0.30"
core-derive = { path = "./derive" }
tokio = { version = "1.17.0", features = ["sync", "rt"] }
tokio = { version = "^1.17.0", features = ["sync", "rt"] }
include_dir = { version = "0.7.2", features = ["glob"] }
async-trait = "0.1.52"
async-trait = "^0.1.52"
image = "0.24.1"
webp = "0.2.2"
ffmpeg-next = "5.0.3"

View file

@ -1,4 +1,4 @@
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
import type { JobStatus } from "./JobStatus";
export interface JobReport { id: string, name: string, data: string | null, date_created: string, date_modified: string, status: JobStatus, task_count: number, completed_task_count: number, message: string, seconds_elapsed: string, }
export interface JobReport { id: string, name: string, data: Array<number> | null, date_created: string, date_modified: string, status: JobStatus, task_count: number, completed_task_count: number, message: string, seconds_elapsed: string, }

View file

@ -1,3 +1,3 @@
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
export type JobStatus = "Queued" | "Running" | "Completed" | "Canceled" | "Failed";
export type JobStatus = "Queued" | "Running" | "Completed" | "Canceled" | "Failed" | "Paused";

View file

@ -0,0 +1,145 @@
/*
Warnings:
- The primary key for the `jobs` table will be changed. If it partially fails, the table could be left without primary key constraint.
- You are about to alter the column `data` on the `jobs` table. The data in that column could be lost. The data in that column will be cast from `String` to `Binary`.
- You are about to alter the column `id` on the `jobs` table. The data in that column could be lost. The data in that column will be cast from `String` to `Binary`.
- You are about to alter the column `pub_id` on the `nodes` table. The data in that column could be lost. The data in that column will be cast from `String` to `Binary`.
- You are about to alter the column `pub_id` on the `tags` table. The data in that column could be lost. The data in that column will be cast from `String` to `Binary`.
- You are about to alter the column `pub_id` on the `labels` table. The data in that column could be lost. The data in that column will be cast from `String` to `Binary`.
- You are about to alter the column `pub_id` on the `spaces` table. The data in that column could be lost. The data in that column will be cast from `String` to `Binary`.
- You are about to alter the column `pub_id` on the `locations` table. The data in that column could be lost. The data in that column will be cast from `String` to `Binary`.
- You are about to alter the column `record_id` on the `sync_events` table. The data in that column could be lost. The data in that column will be cast from `String` to `Binary`.
- You are about to alter the column `pub_id` on the `albums` table. The data in that column could be lost. The data in that column will be cast from `String` to `Binary`.
- You are about to alter the column `pub_id` on the `comments` table. The data in that column could be lost. The data in that column will be cast from `String` to `Binary`.
*/
-- RedefineTables
PRAGMA foreign_keys=OFF;
CREATE TABLE "new_jobs" (
"id" BLOB NOT NULL PRIMARY KEY,
"name" TEXT NOT NULL,
"node_id" INTEGER NOT NULL,
"action" INTEGER NOT NULL,
"status" INTEGER NOT NULL DEFAULT 0,
"data" BLOB,
"task_count" INTEGER NOT NULL DEFAULT 1,
"completed_task_count" INTEGER NOT NULL DEFAULT 0,
"date_created" DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
"date_modified" DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
"seconds_elapsed" INTEGER NOT NULL DEFAULT 0,
CONSTRAINT "jobs_node_id_fkey" FOREIGN KEY ("node_id") REFERENCES "nodes" ("id") ON DELETE CASCADE ON UPDATE CASCADE
);
INSERT INTO "new_jobs" ("action", "completed_task_count", "data", "date_created", "date_modified", "id", "name", "node_id", "seconds_elapsed", "status", "task_count") SELECT "action", "completed_task_count", "data", "date_created", "date_modified", "id", "name", "node_id", "seconds_elapsed", "status", "task_count" FROM "jobs";
DROP TABLE "jobs";
ALTER TABLE "new_jobs" RENAME TO "jobs";
CREATE TABLE "new_nodes" (
"id" INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
"pub_id" BLOB NOT NULL,
"name" TEXT NOT NULL,
"platform" INTEGER NOT NULL DEFAULT 0,
"version" TEXT,
"last_seen" DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
"timezone" TEXT,
"date_created" DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP
);
INSERT INTO "new_nodes" ("date_created", "id", "last_seen", "name", "platform", "pub_id", "timezone", "version") SELECT "date_created", "id", "last_seen", "name", "platform", "pub_id", "timezone", "version" FROM "nodes";
DROP TABLE "nodes";
ALTER TABLE "new_nodes" RENAME TO "nodes";
CREATE UNIQUE INDEX "nodes_pub_id_key" ON "nodes"("pub_id");
CREATE TABLE "new_tags" (
"id" INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
"pub_id" BLOB NOT NULL,
"name" TEXT,
"color" TEXT,
"total_files" INTEGER DEFAULT 0,
"redundancy_goal" INTEGER DEFAULT 1,
"date_created" DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
"date_modified" DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP
);
INSERT INTO "new_tags" ("color", "date_created", "date_modified", "id", "name", "pub_id", "redundancy_goal", "total_files") SELECT "color", "date_created", "date_modified", "id", "name", "pub_id", "redundancy_goal", "total_files" FROM "tags";
DROP TABLE "tags";
ALTER TABLE "new_tags" RENAME TO "tags";
CREATE UNIQUE INDEX "tags_pub_id_key" ON "tags"("pub_id");
CREATE TABLE "new_labels" (
"id" INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
"pub_id" BLOB NOT NULL,
"name" TEXT,
"date_created" DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
"date_modified" DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP
);
INSERT INTO "new_labels" ("date_created", "date_modified", "id", "name", "pub_id") SELECT "date_created", "date_modified", "id", "name", "pub_id" FROM "labels";
DROP TABLE "labels";
ALTER TABLE "new_labels" RENAME TO "labels";
CREATE UNIQUE INDEX "labels_pub_id_key" ON "labels"("pub_id");
CREATE TABLE "new_spaces" (
"id" INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
"pub_id" BLOB NOT NULL,
"name" TEXT,
"description" TEXT,
"date_created" DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
"date_modified" DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP
);
INSERT INTO "new_spaces" ("date_created", "date_modified", "description", "id", "name", "pub_id") SELECT "date_created", "date_modified", "description", "id", "name", "pub_id" FROM "spaces";
DROP TABLE "spaces";
ALTER TABLE "new_spaces" RENAME TO "spaces";
CREATE UNIQUE INDEX "spaces_pub_id_key" ON "spaces"("pub_id");
CREATE TABLE "new_locations" (
"id" INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
"pub_id" BLOB NOT NULL,
"node_id" INTEGER,
"name" TEXT,
"local_path" TEXT,
"total_capacity" INTEGER,
"available_capacity" INTEGER,
"filesystem" TEXT,
"disk_type" INTEGER,
"is_removable" BOOLEAN,
"is_online" BOOLEAN NOT NULL DEFAULT true,
"date_created" DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
CONSTRAINT "locations_node_id_fkey" FOREIGN KEY ("node_id") REFERENCES "nodes" ("id") ON DELETE SET NULL ON UPDATE CASCADE
);
INSERT INTO "new_locations" ("available_capacity", "date_created", "disk_type", "filesystem", "id", "is_online", "is_removable", "local_path", "name", "node_id", "pub_id", "total_capacity") SELECT "available_capacity", "date_created", "disk_type", "filesystem", "id", "is_online", "is_removable", "local_path", "name", "node_id", "pub_id", "total_capacity" FROM "locations";
DROP TABLE "locations";
ALTER TABLE "new_locations" RENAME TO "locations";
CREATE UNIQUE INDEX "locations_pub_id_key" ON "locations"("pub_id");
CREATE TABLE "new_sync_events" (
"id" INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
"node_id" INTEGER NOT NULL,
"timestamp" TEXT NOT NULL,
"record_id" BLOB NOT NULL,
"kind" INTEGER NOT NULL,
"column" TEXT,
"value" TEXT NOT NULL,
CONSTRAINT "sync_events_node_id_fkey" FOREIGN KEY ("node_id") REFERENCES "nodes" ("id") ON DELETE RESTRICT ON UPDATE CASCADE
);
INSERT INTO "new_sync_events" ("column", "id", "kind", "node_id", "record_id", "timestamp", "value") SELECT "column", "id", "kind", "node_id", "record_id", "timestamp", "value" FROM "sync_events";
DROP TABLE "sync_events";
ALTER TABLE "new_sync_events" RENAME TO "sync_events";
CREATE TABLE "new_albums" (
"id" INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
"pub_id" BLOB NOT NULL,
"name" TEXT NOT NULL,
"is_hidden" BOOLEAN NOT NULL DEFAULT false,
"date_created" DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
"date_modified" DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP
);
INSERT INTO "new_albums" ("date_created", "date_modified", "id", "is_hidden", "name", "pub_id") SELECT "date_created", "date_modified", "id", "is_hidden", "name", "pub_id" FROM "albums";
DROP TABLE "albums";
ALTER TABLE "new_albums" RENAME TO "albums";
CREATE UNIQUE INDEX "albums_pub_id_key" ON "albums"("pub_id");
CREATE TABLE "new_comments" (
"id" INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
"pub_id" BLOB NOT NULL,
"content" TEXT NOT NULL,
"date_created" DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
"date_modified" DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
"file_id" INTEGER,
CONSTRAINT "comments_file_id_fkey" FOREIGN KEY ("file_id") REFERENCES "files" ("id") ON DELETE SET NULL ON UPDATE CASCADE
);
INSERT INTO "new_comments" ("content", "date_created", "date_modified", "file_id", "id", "pub_id") SELECT "content", "date_created", "date_modified", "file_id", "id", "pub_id" FROM "comments";
DROP TABLE "comments";
ALTER TABLE "new_comments" RENAME TO "comments";
CREATE UNIQUE INDEX "comments_pub_id_key" ON "comments"("pub_id");
PRAGMA foreign_key_check;
PRAGMA foreign_keys=ON;

View file

@ -23,7 +23,7 @@ model SyncEvent {
node_id Int
timestamp String
// individual record pub id OR compound many-to-many pub ids
record_id String
record_id Bytes
// the type of operation, I.E: CREATE, UPDATE, DELETE as an enum
kind Int
// the column name for atomic update operations
@ -51,7 +51,7 @@ model Statistics {
model Node {
id Int @id @default(autoincrement())
pub_id String @unique
pub_id Bytes @unique
name String
platform Int @default(0)
version String?
@ -67,7 +67,7 @@ model Node {
}
model Volume {
id Int @id() @default(autoincrement())
id Int @id @default(autoincrement())
node_id Int
name String
mount_point String
@ -84,7 +84,7 @@ model Volume {
model Location {
id Int @id @default(autoincrement())
pub_id String @unique
pub_id Bytes @unique
node_id Int?
name String?
local_path String?
@ -226,7 +226,7 @@ model MediaData {
model Tag {
id Int @id @default(autoincrement())
pub_id String @unique
pub_id Bytes @unique
name String?
color String?
total_files Int? @default(0)
@ -253,7 +253,7 @@ model TagOnFile {
model Label {
id Int @id @default(autoincrement())
pub_id String @unique
pub_id Bytes @unique
name String?
date_created DateTime @default(now())
date_modified DateTime @default(now())
@ -277,7 +277,7 @@ model LabelOnFile {
model Space {
id Int @id @default(autoincrement())
pub_id String @unique
pub_id Bytes @unique
name String?
description String?
date_created DateTime @default(now())
@ -301,12 +301,12 @@ model FileInSpace {
}
model Job {
id String @id
id Bytes @id
name String
node_id Int
action Int
status Int @default(0)
data String?
data Bytes?
task_count Int @default(1)
completed_task_count Int @default(0)
@ -320,7 +320,7 @@ model Job {
model Album {
id Int @id @default(autoincrement())
pub_id String @unique
pub_id Bytes @unique
name String
is_hidden Boolean @default(false)
@ -347,7 +347,7 @@ model FileInAlbum {
model Comment {
id Int @id @default(autoincrement())
pub_id String @unique
pub_id Bytes @unique
content String
date_created DateTime @default(now())
date_modified DateTime @default(now())

View file

@ -1,46 +1,67 @@
use crate::{
job::{Job, JobReportUpdate, JobResult, WorkerContext},
job::{JobError, JobReportUpdate, JobResult, JobState, StatefulJob, WorkerContext},
library::LibraryContext,
prisma::file_path,
sys, CoreEvent,
};
use image::{self, imageops, DynamicImage, GenericImageView};
use log::{debug, error, info};
use std::error::Error;
use std::ops::Deref;
use std::path::{Path, PathBuf};
use tokio::fs;
use log::{error, info, trace};
use serde::{Deserialize, Serialize};
use std::{
error::Error,
ops::Deref,
path::{Path, PathBuf},
};
use tokio::{fs, task::block_in_place};
use webp::Encoder;
#[derive(Debug, Clone)]
pub struct ThumbnailJob {
static THUMBNAIL_SIZE_FACTOR: f32 = 0.2;
static THUMBNAIL_QUALITY: f32 = 30.0;
pub static THUMBNAIL_CACHE_DIR_NAME: &str = "thumbnails";
pub const THUMBNAIL_JOB_NAME: &str = "thumbnailer";
pub struct ThumbnailJob {}
#[derive(Serialize, Deserialize, Clone)]
pub struct ThumbnailJobInit {
pub location_id: i32,
pub path: PathBuf,
pub background: bool,
}
static THUMBNAIL_SIZE_FACTOR: f32 = 0.2;
static THUMBNAIL_QUALITY: f32 = 30.0;
pub static THUMBNAIL_CACHE_DIR_NAME: &str = "thumbnails";
#[derive(Debug, Serialize, Deserialize)]
pub struct ThumbnailJobState {
thumbnail_dir: PathBuf,
root_path: PathBuf,
}
#[async_trait::async_trait]
impl Job for ThumbnailJob {
impl StatefulJob for ThumbnailJob {
type Init = ThumbnailJobInit;
type Data = ThumbnailJobState;
type Step = file_path::Data;
fn name(&self) -> &'static str {
"thumbnailer"
THUMBNAIL_JOB_NAME
}
async fn run(&self, ctx: WorkerContext) -> JobResult {
async fn init(
&self,
ctx: WorkerContext,
state: &mut JobState<Self::Init, Self::Data, Self::Step>,
) -> JobResult {
let library_ctx = ctx.library_ctx();
let thumbnail_dir = library_ctx
.config()
.data_directory()
.join(THUMBNAIL_CACHE_DIR_NAME)
.join(self.location_id.to_string());
.join(state.init.location_id.to_string());
let location = sys::get_location(&library_ctx, self.location_id).await?;
let location = sys::get_location(&library_ctx, state.init.location_id).await?;
info!(
"Searching for images in location {} at path {:#?}",
location.id, self.path
location.id, state.init.path
);
// create all necessary directories if they don't exist
@ -48,7 +69,8 @@ impl Job for ThumbnailJob {
let root_path = location.path.unwrap();
// query database for all files in this location that need thumbnails
let image_files = get_images(&library_ctx, self.location_id, &self.path).await?;
let image_files =
get_images(&library_ctx, state.init.location_id, &state.init.path).await?;
info!("Found {:?} files", image_files.len());
ctx.progress(vec![
@ -56,59 +78,95 @@ impl Job for ThumbnailJob {
JobReportUpdate::Message(format!("Preparing to process {} files", image_files.len())),
]);
for (i, image_file) in image_files.iter().enumerate() {
ctx.progress(vec![JobReportUpdate::Message(format!(
"Processing {}",
image_file.materialized_path
))]);
state.data = Some(ThumbnailJobState {
thumbnail_dir,
root_path,
});
state.steps = image_files.into_iter().collect();
// assemble the file path
let path = Path::new(&root_path).join(&image_file.materialized_path);
debug!("image_file {:?}", image_file);
Ok(())
}
// get cas_id, if none found skip
let cas_id = match image_file.file() {
Ok(file) => {
if let Some(f) = file {
f.cas_id.clone()
} else {
info!(
"skipping thumbnail generation for {}",
image_file.materialized_path
);
continue;
}
async fn execute_step(
&self,
ctx: WorkerContext,
state: &mut JobState<Self::Init, Self::Data, Self::Step>,
) -> JobResult {
let step = &state.steps[0];
ctx.progress(vec![JobReportUpdate::Message(format!(
"Processing {}",
step.materialized_path
))]);
let data = state
.data
.as_ref()
.expect("critical error: missing data on job state");
// assemble the file path
let path = data.root_path.join(&step.materialized_path);
trace!("image_file {:?}", step);
// get cas_id, if none found skip
let cas_id = match step.file() {
Ok(file) => {
if let Some(f) = file {
f.cas_id.clone()
} else {
info!(
"skipping thumbnail generation for {}",
step.materialized_path
);
return Ok(());
}
Err(_) => {
error!("Error getting cas_id {:?}", image_file.materialized_path);
continue;
}
};
// Define and write the WebP-encoded file to a given path
let output_path = thumbnail_dir.join(&cas_id).with_extension("webp");
// check if file exists at output path
if !output_path.exists() {
info!("Writing {:?} to {:?}", path, output_path);
tokio::spawn(async move {
if let Err(e) = generate_thumbnail(&path, &output_path).await {
error!("Error generating thumb {:?}", e);
}
});
ctx.progress(vec![JobReportUpdate::CompletedTaskCount(i + 1)]);
if !self.background {
ctx.library_ctx()
.emit(CoreEvent::NewThumbnail { cas_id })
.await;
};
} else {
info!("Thumb exists, skipping... {}", output_path.display());
}
Err(_) => {
error!("Error getting cas_id {:?}", step.materialized_path);
return Ok(());
}
};
// Define and write the WebP-encoded file to a given path
let output_path = data.thumbnail_dir.join(&cas_id).with_extension("webp");
// check if file exists at output path
if !output_path.exists() {
info!("Writing {:?} to {:?}", path, output_path);
if let Err(e) = generate_thumbnail(&path, &output_path).await {
error!("Error generating thumb {:?}", e);
}
if !state.init.background {
ctx.library_ctx()
.emit(CoreEvent::NewThumbnail { cas_id })
.await;
};
} else {
info!("Thumb exists, skipping... {}", output_path.display());
}
ctx.progress(vec![JobReportUpdate::CompletedTaskCount(
state.step_number + 1,
)]);
Ok(())
}
async fn finalize(
&self,
_ctx: WorkerContext,
state: &mut JobState<Self::Init, Self::Data, Self::Step>,
) -> Result<(), JobError> {
let data = state
.data
.as_ref()
.expect("critical error: missing data on job state");
info!(
"Finished thumbnail generation for location {} at {}",
state.init.location_id,
data.root_path.display()
);
Ok(())
}
}
@ -117,25 +175,29 @@ pub async fn generate_thumbnail<P: AsRef<Path>>(
file_path: P,
output_path: P,
) -> Result<(), Box<dyn Error>> {
// Using `image` crate, open the included .jpg file
let img = image::open(file_path)?;
let (w, h) = img.dimensions();
// Optionally, resize the existing photo and convert back into DynamicImage
let img = DynamicImage::ImageRgba8(imageops::resize(
&img,
(w as f32 * THUMBNAIL_SIZE_FACTOR) as u32,
(h as f32 * THUMBNAIL_SIZE_FACTOR) as u32,
imageops::FilterType::Triangle,
));
// Create the WebP encoder for the above image
let encoder = Encoder::from_image(&img)?;
// Webp creation has blocking code
let webp = block_in_place(|| -> Result<Vec<u8>, Box<dyn Error>> {
// Using `image` crate, open the included .jpg file
let img = image::open(file_path)?;
let (w, h) = img.dimensions();
// Optionally, resize the existing photo and convert back into DynamicImage
let img = DynamicImage::ImageRgba8(imageops::resize(
&img,
(w as f32 * THUMBNAIL_SIZE_FACTOR) as u32,
(h as f32 * THUMBNAIL_SIZE_FACTOR) as u32,
imageops::FilterType::Triangle,
));
// Create the WebP encoder for the above image
let encoder = Encoder::from_image(&img)?;
// Encode the image at a specified quality 0-100
// Encode the image at a specified quality 0-100
// Type WebPMemory is !Send, which makes the Future in this function !Send,
// this make us `deref` to have a `&[u8]` and then `to_owned` to make a Vec<u8>
// which implies on a unwanted clone...
Ok(encoder.encode(THUMBNAIL_QUALITY).deref().to_owned())
})?;
// Type WebPMemory is !Send, which makes the Future in this function !Send,
// this make us `deref` to have a `&[u8]` and then `to_owned` to make a Vec<u8>
// which implies on a unwanted clone...
let webp = encoder.encode(THUMBNAIL_QUALITY).deref().to_owned();
fs::write(output_path, &webp).await?;
Ok(())

View file

@ -1,44 +1,71 @@
use super::checksum::generate_cas_id;
use crate::{
file::FileError,
job::JobReportUpdate,
job::{Job, JobResult, WorkerContext},
job::{JobError, JobReportUpdate, JobResult, JobState, StatefulJob, WorkerContext},
library::LibraryContext,
prisma::{file, file_path},
sys::get_location,
sys::LocationResource,
};
use chrono::{DateTime, FixedOffset};
use futures::future::join_all;
use log::{error, info};
use prisma_client_rust::{prisma_models::PrismaValue, raw, raw::Raw, Direction};
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, HashSet};
use std::path::{Path, PathBuf};
use std::{
collections::{HashMap, HashSet},
path::{Path, PathBuf},
};
use tokio::{fs, io};
// FileIdentifierJob takes file_paths without a file_id and uniquely identifies them
// we break this job into chunks of 100 to improve performance
static CHUNK_SIZE: usize = 100;
pub const IDENTIFIER_JOB_NAME: &str = "file_identifier";
pub struct FileIdentifierJob {}
// FileIdentifierJobInit takes file_paths without a file_id and uniquely identifies them
// first: generating the cas_id and extracting metadata
// finally: creating unique file records, and linking them to their file_paths
#[derive(Debug)]
pub struct FileIdentifierJob {
#[derive(Serialize, Deserialize, Clone)]
pub struct FileIdentifierJobInit {
pub location_id: i32,
pub path: PathBuf,
}
// we break this job into chunks of 100 to improve performance
static CHUNK_SIZE: usize = 100;
#[derive(Serialize, Deserialize)]
pub struct FileIdentifierJobState {
total_count: usize,
task_count: usize,
location: LocationResource,
location_path: PathBuf,
cursor: i32,
}
#[async_trait::async_trait]
impl Job for FileIdentifierJob {
impl StatefulJob for FileIdentifierJob {
type Init = FileIdentifierJobInit;
type Data = FileIdentifierJobState;
type Step = ();
fn name(&self) -> &'static str {
"file_identifier"
IDENTIFIER_JOB_NAME
}
async fn run(&self, ctx: WorkerContext) -> JobResult {
async fn init(
&self,
ctx: WorkerContext,
state: &mut JobState<Self::Init, Self::Data, Self::Step>,
) -> JobResult {
info!("Identifying orphan file paths...");
let location = get_location(&ctx.library_ctx(), self.location_id).await?;
let location_path = location.path.unwrap_or_else(|| "".to_string());
let location = get_location(&ctx.library_ctx(), state.init.location_id).await?;
let location_path = if let Some(ref path) = location.path {
path.clone()
} else {
PathBuf::new()
};
let total_count = count_orphan_file_paths(&ctx.library_ctx(), location.id.into()).await?;
info!("Found {} orphan file paths", total_count);
@ -49,159 +76,188 @@ impl Job for FileIdentifierJob {
// update job with total task count based on orphan file_paths count
ctx.progress(vec![JobReportUpdate::TaskCount(task_count)]);
let mut completed: usize = 0;
let mut cursor: i32 = 1;
// loop until task count is complete
while completed < task_count {
// link file_path ids to a CreateFile struct containing unique file data
let mut chunk: HashMap<i32, CreateFile> = HashMap::new();
let mut cas_lookup: HashMap<String, i32> = HashMap::new();
state.data = Some(FileIdentifierJobState {
total_count,
task_count,
location,
location_path,
cursor: 1,
});
// get chunk of orphans to process
let file_paths = match get_orphan_file_paths(&ctx.library_ctx(), cursor).await {
Ok(file_paths) => file_paths,
state.steps = (0..task_count).map(|_| ()).collect();
Ok(())
}
async fn execute_step(
&self,
ctx: WorkerContext,
state: &mut JobState<Self::Init, Self::Data, Self::Step>,
) -> JobResult {
// link file_path ids to a CreateFile struct containing unique file data
let mut chunk: HashMap<i32, CreateFile> = HashMap::new();
let mut cas_lookup: HashMap<String, i32> = HashMap::new();
let data = state
.data
.as_mut()
.expect("critical error: missing data on job state");
// get chunk of orphans to process
let file_paths = match get_orphan_file_paths(&ctx.library_ctx(), data.cursor).await {
Ok(file_paths) => file_paths,
Err(e) => {
info!("Error getting orphan file paths: {:#?}", e);
return Ok(());
}
};
info!(
"Processing {:?} orphan files. ({} completed of {})",
file_paths.len(),
state.step_number,
data.task_count
);
// analyze each file_path
for file_path in &file_paths {
// get the cas_id and extract metadata
match prepare_file(&data.location_path, file_path).await {
Ok(file) => {
let cas_id = file.cas_id.clone();
// create entry into chunks for created file data
chunk.insert(file_path.id, file);
cas_lookup.insert(cas_id, file_path.id);
}
Err(e) => {
info!("Error getting orphan file paths: {:#?}", e);
info!("Error processing file: {:#?}", e);
continue;
}
};
info!(
"Processing {:?} orphan files. ({} completed of {})",
file_paths.len(),
completed,
task_count
);
}
// analyze each file_path
for file_path in &file_paths {
// get the cas_id and extract metadata
match prepare_file(&location_path, file_path).await {
Ok(file) => {
let cas_id = file.cas_id.clone();
// create entry into chunks for created file data
chunk.insert(file_path.id, file);
cas_lookup.insert(cas_id, file_path.id);
}
Err(e) => {
info!("Error processing file: {:#?}", e);
continue;
}
};
}
// find all existing files by cas id
let generated_cas_ids = chunk.values().map(|c| c.cas_id.clone()).collect();
let existing_files = ctx
.library_ctx()
.db
.file()
.find_many(vec![file::cas_id::in_vec(generated_cas_ids)])
.exec()
.await?;
// find all existing files by cas id
let generated_cas_ids = chunk.values().map(|c| c.cas_id.clone()).collect();
let existing_files = ctx
.library_ctx()
.db
.file()
.find_many(vec![file::cas_id::in_vec(generated_cas_ids)])
.exec()
.await?;
info!("Found {} existing files", existing_files.len());
info!("Found {} existing files", existing_files.len());
// link those existing files to their file paths
// Had to put the file_path in a variable outside of the closure, to satisfy the borrow checker
let library_ctx = ctx.library_ctx();
let prisma_file_path = library_ctx.db.file_path();
// link those existing files to their file paths
// Had to put the file_path in a variable outside of the closure, to satisfy the borrow checker
let library_ctx = ctx.library_ctx();
let prisma_file_path = library_ctx.db.file_path();
for result in join_all(existing_files.iter().map(|file| {
prisma_file_path
.find_unique(file_path::id::equals(
*cas_lookup.get(&file.cas_id).unwrap(),
))
.update(vec![file_path::file_id::set(Some(file.id))])
.exec()
}))
for existing_file in &existing_files {
if let Err(e) = update_file_id_by_cas_id(
&prisma_file_path,
&cas_lookup,
&existing_file.cas_id,
existing_file.id,
)
.await
{
if let Err(e) = result {
error!("Error linking file: {:#?}", e);
}
info!("Error updating file_id: {:#?}", e);
}
}
let existing_files_cas_ids = existing_files
.iter()
.map(|file| file.cas_id.clone())
.collect::<HashSet<_>>();
let existing_files_cas_ids = existing_files
.iter()
.map(|file| file.cas_id.clone())
.collect::<HashSet<_>>();
// extract files that don't already exist in the database
let new_files = chunk
.iter()
.map(|(_id, create_file)| create_file)
.filter(|create_file| !existing_files_cas_ids.contains(&create_file.cas_id))
.collect::<Vec<_>>();
// extract files that don't already exist in the database
let new_files = chunk
.iter()
.map(|(_id, create_file)| create_file)
.filter(|create_file| !existing_files_cas_ids.contains(&create_file.cas_id))
.collect::<Vec<_>>();
// assemble prisma values for new unique files
let mut values = Vec::with_capacity(new_files.len() * 3);
for file in &new_files {
values.extend([
PrismaValue::String(file.cas_id.clone()),
PrismaValue::Int(file.size_in_bytes),
PrismaValue::DateTime(file.date_created),
]);
}
// create new file records with assembled values
let created_files: Vec<FileCreated> = ctx
.library_ctx()
.db
._query_raw(Raw::new(
&format!(
"INSERT INTO files (cas_id, size_in_bytes, date_created) VALUES {}
ON CONFLICT (cas_id) DO NOTHING RETURNING id, cas_id",
vec!["({}, {}, {})"; new_files.len()].join(",")
),
values,
))
.await
.unwrap_or_else(|e| {
error!("Error inserting files: {:#?}", e);
Vec::new()
});
// This code is duplicates, is this right?
for result in join_all(created_files.iter().map(|file| {
// associate newly created files with their respective file_paths
// TODO: this is potentially bottle necking the chunk system, individually linking file_path to file, 100 queries per chunk
// - insert many could work, but I couldn't find a good way to do this in a single SQL query
prisma_file_path
.find_unique(file_path::id::equals(
*cas_lookup.get(&file.cas_id).unwrap(),
))
.update(vec![file_path::file_id::set(Some(file.id))])
.exec()
}))
.await
{
if let Err(e) = result {
error!("Error linking file: {:#?}", e);
}
}
// handle loop end
let last_row = match file_paths.last() {
Some(l) => l,
None => {
break;
}
};
cursor = last_row.id;
completed += 1;
ctx.progress(vec![
JobReportUpdate::CompletedTaskCount(completed),
JobReportUpdate::Message(format!(
"Processed {} of {} orphan files",
completed * CHUNK_SIZE,
total_count
)),
// assemble prisma values for new unique files
let mut values = Vec::with_capacity(new_files.len() * 3);
for file in &new_files {
values.extend([
PrismaValue::String(file.cas_id.clone()),
PrismaValue::Int(file.size_in_bytes),
PrismaValue::DateTime(file.date_created),
]);
}
// create new file records with assembled values
let created_files: Vec<FileCreated> = ctx
.library_ctx()
.db
._query_raw(Raw::new(
&format!(
"INSERT INTO files (cas_id, size_in_bytes, date_created) VALUES {}
ON CONFLICT (cas_id) DO NOTHING RETURNING id, cas_id",
vec!["({}, {}, {})"; new_files.len()].join(",")
),
values,
))
.await
.unwrap_or_else(|e| {
error!("Error inserting files: {:#?}", e);
Vec::new()
});
for created_file in created_files {
// associate newly created files with their respective file_paths
// TODO: this is potentially bottle necking the chunk system, individually linking file_path to file, 100 queries per chunk
// - insert many could work, but I couldn't find a good way to do this in a single SQL query
if let Err(e) = update_file_id_by_cas_id(
&prisma_file_path,
&cas_lookup,
&created_file.cas_id,
created_file.id,
)
.await
{
info!("Error updating file_id: {:#?}", e);
}
}
// handle last step
if let Some(last_row) = file_paths.last() {
data.cursor = last_row.id;
} else {
return Ok(());
}
ctx.progress(vec![
JobReportUpdate::CompletedTaskCount(state.step_number),
JobReportUpdate::Message(format!(
"Processed {} of {} orphan files",
state.step_number * CHUNK_SIZE,
data.total_count
)),
]);
// let _remaining = count_orphan_file_paths(&ctx.core_ctx, location.id.into()).await?;
Ok(())
}
async fn finalize(
&self,
_ctx: WorkerContext,
state: &mut JobState<Self::Init, Self::Data, Self::Step>,
) -> Result<(), JobError> {
let data = state
.data
.as_ref()
.expect("critical error: missing data on job state");
info!(
"Finalizing identifier job at {}, total of {} tasks",
data.location_path.display(),
data.task_count
);
Ok(())
}
}
#[derive(Deserialize, Serialize, Debug)]
@ -241,7 +297,7 @@ pub async fn get_orphan_file_paths(
.take(CHUNK_SIZE as i64)
.exec()
.await
.map_err(|e| e.into())
.map_err(Into::into)
}
#[derive(Deserialize, Serialize, Debug)]
@ -287,3 +343,16 @@ pub async fn prepare_file(
date_created: file_path.date_created,
})
}
async fn update_file_id_by_cas_id(
prisma_file_path: &file_path::Actions<'_>,
cas_lookup: &HashMap<String, i32>,
file_cas_id: &str,
file_id: i32,
) -> prisma_client_rust::Result<Option<file_path::Data>> {
prisma_file_path
.find_unique(file_path::id::equals(*cas_lookup.get(file_cas_id).unwrap()))
.update(vec![file_path::file_id::set(Some(file_id))])
.exec()
.await
}

View file

@ -1,37 +1,270 @@
use crate::job::{Job, JobReportUpdate, JobResult, WorkerContext};
use std::path::PathBuf;
use crate::{
job::{JobReportUpdate, JobResult, JobState, StatefulJob, WorkerContext},
sys::{create_location, LocationResource},
};
use chrono::{DateTime, Utc};
use log::{error, info};
use prisma_client_rust::{raw, raw::Raw, PrismaValue};
use serde::{Deserialize, Serialize};
use std::{
collections::HashMap,
ffi::OsStr,
path::{Path, PathBuf},
time::Duration,
};
use tokio::{fs, time::Instant};
use walkdir::{DirEntry, WalkDir};
use self::scan::ScanProgress;
mod scan;
static BATCH_SIZE: usize = 100;
pub const INDEXER_JOB_NAME: &str = "indexer";
// Re-exporting
pub use scan::*;
#[derive(Clone)]
pub enum ScanProgress {
ChunkCount(usize),
SavedChunks(usize),
Message(String),
}
use scan::scan_path;
pub struct IndexerJob {}
#[derive(Debug)]
pub struct IndexerJob {
#[derive(Serialize, Deserialize, Clone)]
pub struct IndexerJobInit {
pub path: PathBuf,
}
#[async_trait::async_trait]
impl Job for IndexerJob {
fn name(&self) -> &'static str {
"indexer"
#[derive(Serialize, Deserialize)]
pub struct IndexerJobData {
location: LocationResource,
db_write_start: DateTime<Utc>,
scan_read_time: Duration,
total_paths: usize,
}
pub(crate) type IndexerJobStep = Vec<(PathBuf, i32, Option<i32>, bool)>;
impl IndexerJobData {
fn on_scan_progress(ctx: WorkerContext, progress: Vec<ScanProgress>) {
ctx.progress(
progress
.iter()
.map(|p| match p.clone() {
ScanProgress::ChunkCount(c) => JobReportUpdate::TaskCount(c),
ScanProgress::SavedChunks(p) => JobReportUpdate::CompletedTaskCount(p),
ScanProgress::Message(m) => JobReportUpdate::Message(m),
})
.collect(),
)
}
async fn run(&self, ctx: WorkerContext) -> JobResult {
scan_path(&ctx.library_ctx(), &self.path, move |p| {
ctx.progress(
p.iter()
.map(|p| match p.clone() {
ScanProgress::ChunkCount(c) => JobReportUpdate::TaskCount(c),
ScanProgress::SavedChunks(p) => JobReportUpdate::CompletedTaskCount(p),
ScanProgress::Message(m) => JobReportUpdate::Message(m),
})
.collect(),
)
}
#[async_trait::async_trait]
impl StatefulJob for IndexerJob {
type Init = IndexerJobInit;
type Data = IndexerJobData;
type Step = IndexerJobStep;
fn name(&self) -> &'static str {
INDEXER_JOB_NAME
}
// creates a vector of valid path buffers from a directory
async fn init(
&self,
ctx: WorkerContext,
state: &mut JobState<Self::Init, Self::Data, Self::Step>,
) -> JobResult {
let location = create_location(&ctx.library_ctx(), &state.init.path).await?;
// query db to highers id, so we can increment it for the new files indexed
#[derive(Deserialize, Serialize, Debug)]
struct QueryRes {
id: Option<i32>,
}
// grab the next id so we can increment in memory for batch inserting
let first_file_id = match ctx
.library_ctx()
.db
._query_raw::<QueryRes>(raw!("SELECT MAX(id) id FROM file_paths"))
.await
{
Ok(rows) => rows[0].id.unwrap_or(0),
Err(e) => panic!("Error querying for next file id: {:#?}", e),
};
//check is path is a directory
if !state.init.path.is_dir() {
// return Err(anyhow::anyhow!("{} is not a directory", &path));
panic!("{:#?} is not a directory", state.init.path);
}
// spawn a dedicated thread to scan the directory for performance
let path = state.init.path.clone();
let inner_ctx = ctx.clone();
let (paths, scan_start) = tokio::task::spawn_blocking(move || {
// store every valid path discovered
let mut paths: Vec<(PathBuf, i32, Option<i32>, bool)> = Vec::new();
// store a hashmap of directories to their file ids for fast lookup
let mut dirs = HashMap::new();
// begin timer for logging purposes
let scan_start = Instant::now();
let mut next_file_id = first_file_id;
let mut get_id = || {
next_file_id += 1;
next_file_id
};
// walk through directory recursively
for entry in WalkDir::new(&path).into_iter().filter_entry(|dir| {
// check if entry is approved
!is_hidden(dir) && !is_app_bundle(dir) && !is_node_modules(dir) && !is_library(dir)
}) {
// extract directory entry or log and continue if failed
let entry = match entry {
Ok(entry) => entry,
Err(e) => {
error!("Error reading file {}", e);
continue;
}
};
let path = entry.path();
info!("Found filesystem path: {:?}", path);
let parent_path = path
.parent()
.unwrap_or_else(|| Path::new(""))
.to_str()
.unwrap_or("");
let parent_dir_id = dirs.get(&*parent_path);
let path_str = match path.as_os_str().to_str() {
Some(path_str) => path_str,
None => {
error!("Error reading file {}", &path.display());
continue;
}
};
IndexerJobData::on_scan_progress(
inner_ctx.clone(),
vec![
ScanProgress::Message(format!("Scanning {}", path_str)),
ScanProgress::ChunkCount(paths.len() / BATCH_SIZE),
],
);
let file_id = get_id();
let file_type = entry.file_type();
let is_dir = file_type.is_dir();
if is_dir || file_type.is_file() {
paths.push((path.to_owned(), file_id, parent_dir_id.cloned(), is_dir));
}
if is_dir {
let _path = match path.to_str() {
Some(path) => path.to_owned(),
None => continue,
};
dirs.insert(_path, file_id);
}
}
(paths, scan_start)
})
.await
.await?;
state.data = Some(IndexerJobData {
location,
db_write_start: Utc::now(),
scan_read_time: scan_start.elapsed(),
total_paths: paths.len(),
});
state.steps = paths
.chunks(BATCH_SIZE)
.enumerate()
.map(|(i, chunk)| {
IndexerJobData::on_scan_progress(
ctx.clone(),
vec![
ScanProgress::SavedChunks(i as usize),
ScanProgress::Message(format!(
"Writing {} of {} to db",
i * chunk.len(),
paths.len(),
)),
],
);
chunk.to_vec()
})
.collect();
Ok(())
}
async fn execute_step(
&self,
ctx: WorkerContext,
state: &mut JobState<Self::Init, Self::Data, Self::Step>,
) -> JobResult {
// vector to store active models
let mut files = Vec::new();
let step = &state.steps[0];
let data = state
.data
.as_ref()
.expect("critical error: missing data on job state");
for (file_path, file_id, parent_dir_id, is_dir) in step {
files.extend(
match prepare_values(file_path, *file_id, &data.location, parent_dir_id, *is_dir)
.await
{
Ok(values) => values.to_vec(),
Err(e) => {
error!("Error creating file model from path {:?}: {}", file_path, e);
continue;
}
},
);
}
let raw = Raw::new(
&format!("
INSERT INTO file_paths (id, is_dir, location_id, materialized_path, name, extension, parent_id, date_created)
VALUES {}
",
vec!["({}, {}, {}, {}, {}, {}, {}, {})"; step.len()].join(", ")
),
files
);
let count = ctx.library_ctx().db._execute_raw(raw).await;
info!("Inserted {:?} records", count);
Ok(())
}
async fn finalize(
&self,
_ctx: WorkerContext,
state: &mut JobState<Self::Init, Self::Data, Self::Step>,
) -> JobResult {
let data = state
.data
.as_ref()
.expect("critical error: missing data on job state");
info!(
"scan of {:?} completed in {:?}. {:?} files found. db write completed in {:?}",
state.init.path,
data.scan_read_time,
data.total_paths,
Utc::now() - data.db_write_start,
);
Ok(())
}
}
@ -48,3 +281,104 @@ impl Job for IndexerJob {
// // sub-paths that are ignored
// pub always_ignored_sub_paths: Option<String>,
// }
// reads a file at a path and creates an ActiveModel with metadata
async fn prepare_values(
file_path: impl AsRef<Path>,
id: i32,
location: &LocationResource,
parent_id: &Option<i32>,
is_dir: bool,
) -> Result<[PrismaValue; 8], std::io::Error> {
let file_path = file_path.as_ref();
let metadata = fs::metadata(file_path).await?;
let location_path = location.path.as_ref().unwrap();
// let size = metadata.len();
let name;
let extension;
let date_created: DateTime<Utc> = metadata.created().unwrap().into();
// if the 'file_path' is not a directory, then get the extension and name.
// if 'file_path' is a directory, set extension to an empty string to avoid periods in folder names
// - being interpreted as file extensions
if is_dir {
extension = "".to_string();
name = extract_name(file_path.file_name());
} else {
extension = extract_name(file_path.extension());
name = extract_name(file_path.file_stem());
}
let materialized_path = file_path.strip_prefix(location_path).unwrap();
let materialized_path_as_string = materialized_path.to_str().unwrap_or("").to_owned();
let values = [
PrismaValue::Int(id as i64),
PrismaValue::Boolean(metadata.is_dir()),
PrismaValue::Int(location.id as i64),
PrismaValue::String(materialized_path_as_string),
PrismaValue::String(name),
PrismaValue::String(extension.to_lowercase()),
parent_id
.map(|id| PrismaValue::Int(id as i64))
.unwrap_or(PrismaValue::Null),
PrismaValue::DateTime(date_created.into()),
];
Ok(values)
}
// extract name from OsStr returned by PathBuff
fn extract_name(os_string: Option<&OsStr>) -> String {
os_string
.unwrap_or_default()
.to_str()
.unwrap_or_default()
.to_owned()
}
fn is_hidden(entry: &DirEntry) -> bool {
entry
.file_name()
.to_str()
.map(|s| s.starts_with('.'))
.unwrap_or(false)
}
fn is_library(entry: &DirEntry) -> bool {
entry
.path()
.to_str()
// make better this is shit
.map(|s| s.contains("/Library/"))
.unwrap_or(false)
}
fn is_node_modules(entry: &DirEntry) -> bool {
entry
.file_name()
.to_str()
.map(|s| s.contains("node_modules"))
.unwrap_or(false)
}
fn is_app_bundle(entry: &DirEntry) -> bool {
let is_dir = entry.metadata().unwrap().is_dir();
let contains_dot = entry
.file_name()
.to_str()
.map(|s| s.contains(".app") | s.contains(".bundle"))
.unwrap_or(false);
// let is_app_bundle = is_dir && contains_dot;
// if is_app_bundle {
// let path_buff = entry.path();
// let path = path_buff.to_str().unwrap();
// self::path(&path, );
// }
is_dir && contains_dot
}

View file

@ -1,280 +0,0 @@
use crate::job::JobResult;
use crate::library::LibraryContext;
use crate::sys::{create_location, LocationResource};
use chrono::{DateTime, Utc};
use log::{error, info};
use prisma_client_rust::prisma_models::PrismaValue;
use prisma_client_rust::raw;
use prisma_client_rust::raw::Raw;
use serde::{Deserialize, Serialize};
use std::ffi::OsStr;
use std::fmt::Debug;
use std::{
collections::HashMap,
path::{Path, PathBuf},
time::Instant,
};
use tokio::fs;
use walkdir::{DirEntry, WalkDir};
#[derive(Clone)]
pub enum ScanProgress {
ChunkCount(usize),
SavedChunks(usize),
Message(String),
}
static BATCH_SIZE: usize = 100;
// creates a vector of valid path buffers from a directory
pub async fn scan_path(
ctx: &LibraryContext,
path: impl AsRef<Path> + Debug,
on_progress: impl Fn(Vec<ScanProgress>) + Send + Sync + 'static,
) -> JobResult {
let location = create_location(ctx, &path).await?;
// query db to highers id, so we can increment it for the new files indexed
#[derive(Deserialize, Serialize, Debug)]
struct QueryRes {
id: Option<i32>,
}
// grab the next id so we can increment in memory for batch inserting
let first_file_id = match ctx
.db
._query_raw::<QueryRes>(raw!("SELECT MAX(id) id FROM file_paths"))
.await
{
Ok(rows) => rows[0].id.unwrap_or(0),
Err(e) => panic!("Error querying for next file id: {:#?}", e),
};
//check is path is a directory
if !path.as_ref().is_dir() {
// return Err(anyhow::anyhow!("{} is not a directory", &path));
panic!("{:#?} is not a directory", path);
}
let path_buf = path.as_ref().to_path_buf();
// spawn a dedicated thread to scan the directory for performance
let (paths, scan_start, on_progress) = tokio::task::spawn_blocking(move || {
// store every valid path discovered
let mut paths: Vec<(PathBuf, i32, Option<i32>, bool)> = Vec::new();
// store a hashmap of directories to their file ids for fast lookup
let mut dirs: HashMap<String, i32> = HashMap::new();
// begin timer for logging purposes
let scan_start = Instant::now();
let mut next_file_id = first_file_id;
let mut get_id = || {
next_file_id += 1;
next_file_id
};
// walk through directory recursively
for entry in WalkDir::new(path_buf).into_iter().filter_entry(|dir| {
// check if entry is approved
!is_hidden(dir) && !is_app_bundle(dir) && !is_node_modules(dir) && !is_library(dir)
}) {
// extract directory entry or log and continue if failed
let entry = match entry {
Ok(entry) => entry,
Err(e) => {
error!("Error reading file {}", e);
continue;
}
};
let path = entry.path();
info!("Found filesystem path: {:?}", path);
let parent_path = path
.parent()
.unwrap_or_else(|| Path::new(""))
.to_str()
.unwrap_or("");
let parent_dir_id = dirs.get(&*parent_path);
let path_str = match path.as_os_str().to_str() {
Some(path_str) => path_str,
None => {
error!("Error reading file {}", &path.display());
continue;
}
};
on_progress(vec![
ScanProgress::Message(format!("Scanning {}", path_str)),
ScanProgress::ChunkCount(paths.len() / BATCH_SIZE),
]);
let file_id = get_id();
let file_type = entry.file_type();
let is_dir = file_type.is_dir();
if is_dir || file_type.is_file() {
paths.push((path.to_owned(), file_id, parent_dir_id.cloned(), is_dir));
}
if is_dir {
let _path = match path.to_str() {
Some(path) => path.to_owned(),
None => continue,
};
dirs.insert(_path, file_id);
}
}
(paths, scan_start, on_progress)
})
.await?;
let db_write_start = Instant::now();
let scan_read_time = scan_start.elapsed();
for (i, chunk) in paths.chunks(BATCH_SIZE).enumerate() {
on_progress(vec![
ScanProgress::SavedChunks(i as usize),
ScanProgress::Message(format!(
"Writing {} of {} to db",
i * chunk.len(),
paths.len(),
)),
]);
// vector to store active models
let mut files: Vec<PrismaValue> = Vec::new();
for (file_path, file_id, parent_dir_id, is_dir) in chunk {
files.extend(
match prepare_values(file_path, *file_id, &location, parent_dir_id, *is_dir).await {
Ok(values) => values.to_vec(),
Err(e) => {
error!("Error creating file model from path {:?}: {}", file_path, e);
continue;
}
},
);
}
let raw = Raw::new(
&format!("
INSERT INTO file_paths (id, is_dir, location_id, materialized_path, name, extension, parent_id, date_created)
VALUES {}
",
vec!["({}, {}, {}, {}, {}, {}, {}, {})"; chunk.len()].join(", ")
),
files
);
let count = ctx.db._execute_raw(raw).await;
info!("Inserted {:?} records", count);
}
info!(
"scan of {:?} completed in {:?}. {:?} files found. db write completed in {:?}",
&path,
scan_read_time,
paths.len(),
db_write_start.elapsed()
);
Ok(())
}
// reads a file at a path and creates an ActiveModel with metadata
async fn prepare_values(
file_path: &PathBuf,
id: i32,
location: &LocationResource,
parent_id: &Option<i32>,
is_dir: bool,
) -> Result<[PrismaValue; 8], std::io::Error> {
let metadata = fs::metadata(&file_path).await?;
let location_path = Path::new(location.path.as_ref().unwrap().as_str());
// let size = metadata.len();
let name;
let extension;
let date_created: DateTime<Utc> = metadata.created().unwrap().into();
// if the 'file_path' is not a directory, then get the extension and name.
// if 'file_path' is a directory, set extension to an empty string to avoid periods in folder names
// - being interpreted as file extensions
if is_dir {
extension = "".to_string();
name = extract_name(file_path.file_name());
} else {
extension = extract_name(file_path.extension());
name = extract_name(file_path.file_stem());
}
let materialized_path = file_path.strip_prefix(location_path).unwrap();
let materialized_path_as_string = materialized_path.to_str().unwrap_or("").to_owned();
let values = [
PrismaValue::Int(id as i64),
PrismaValue::Boolean(metadata.is_dir()),
PrismaValue::Int(location.id as i64),
PrismaValue::String(materialized_path_as_string),
PrismaValue::String(name),
PrismaValue::String(extension.to_lowercase()),
parent_id
.map(|id| PrismaValue::Int(id as i64))
.unwrap_or(PrismaValue::Null),
PrismaValue::DateTime(date_created.into()),
];
Ok(values)
}
// extract name from OsStr returned by PathBuff
fn extract_name(os_string: Option<&OsStr>) -> String {
os_string
.unwrap_or_default()
.to_str()
.unwrap_or_default()
.to_owned()
}
fn is_hidden(entry: &DirEntry) -> bool {
entry
.file_name()
.to_str()
.map(|s| s.starts_with('.'))
.unwrap_or(false)
}
fn is_library(entry: &DirEntry) -> bool {
entry
.path()
.to_str()
// make better this is shit
.map(|s| s.contains("/Library/"))
.unwrap_or(false)
}
fn is_node_modules(entry: &DirEntry) -> bool {
entry
.file_name()
.to_str()
.map(|s| s.contains("node_modules"))
.unwrap_or(false)
}
fn is_app_bundle(entry: &DirEntry) -> bool {
let is_dir = entry.metadata().unwrap().is_dir();
let contains_dot = entry
.file_name()
.to_str()
.map(|s| s.contains(".app") | s.contains(".bundle"))
.unwrap_or(false);
// let is_app_bundle = is_dir && contains_dot;
// if is_app_bundle {
// let path_buff = entry.path();
// let path = path_buff.to_str().unwrap();
// self::path(&path, );
// }
is_dir && contains_dot
}

View file

@ -1,3 +1,10 @@
use crate::{
library::LibraryContext,
prisma::{self, file, file_path},
sys::SysError,
ClientQuery, CoreError, CoreEvent, CoreResponse, LibraryQuery,
};
use chrono::{DateTime, Utc};
use int_enum::IntEnum;
use serde::{Deserialize, Serialize};
@ -5,12 +12,6 @@ use std::path::PathBuf;
use thiserror::Error;
use ts_rs::TS;
use crate::{
library::LibraryContext,
prisma::{self, file, file_path},
sys::SysError,
ClientQuery, CoreError, CoreEvent, CoreResponse, LibraryQuery,
};
pub mod cas;
pub mod explorer;
pub mod indexer;
@ -58,9 +59,9 @@ pub struct FilePath {
pub file_id: Option<i32>,
pub parent_id: Option<i32>,
pub date_created: DateTime<chrono::Utc>,
pub date_modified: DateTime<chrono::Utc>,
pub date_indexed: DateTime<chrono::Utc>,
pub date_created: DateTime<Utc>,
pub date_modified: DateTime<Utc>,
pub date_indexed: DateTime<Utc>,
pub file: Option<File>,
}
@ -163,15 +164,7 @@ pub async fn set_note(
.await
.unwrap();
ctx.emit(CoreEvent::InvalidateQuery(ClientQuery::LibraryQuery {
library_id: ctx.id.to_string(),
query: LibraryQuery::GetExplorerDir {
limit: 0,
path: PathBuf::new(),
location_id: 0,
},
}))
.await;
send_invalidate_query(&ctx).await;
Ok(CoreResponse::Success(()))
}
@ -190,8 +183,14 @@ pub async fn favorite(
.await
.unwrap();
send_invalidate_query(&ctx).await;
Ok(CoreResponse::Success(()))
}
async fn send_invalidate_query(ctx: &LibraryContext) {
ctx.emit(CoreEvent::InvalidateQuery(ClientQuery::LibraryQuery {
library_id: ctx.id.to_string(),
library_id: ctx.id,
query: LibraryQuery::GetExplorerDir {
limit: 0,
path: PathBuf::new(),
@ -199,6 +198,4 @@ pub async fn favorite(
},
}))
.await;
Ok(CoreResponse::Success(()))
}

View file

@ -1,57 +1,61 @@
use super::{
worker::{Worker, WorkerContext},
JobError,
};
use crate::{
encode::THUMBNAIL_JOB_NAME,
file::{
cas::IDENTIFIER_JOB_NAME,
indexer::{IndexerJob, INDEXER_JOB_NAME},
},
job::{worker::Worker, DynJob, JobError},
library::LibraryContext,
prisma::{job, node},
FileIdentifierJob, Job, ThumbnailJob,
};
use int_enum::IntEnum;
use log::{error, info};
use serde::{Deserialize, Serialize};
use std::{
collections::{HashMap, VecDeque},
error::Error,
fmt::Debug,
fmt::{Display, Formatter},
sync::Arc,
time::Duration,
};
use tokio::{
sync::{broadcast, mpsc, Mutex, RwLock},
time::sleep,
};
use tokio::sync::{mpsc, Mutex, RwLock};
use ts_rs::TS;
use uuid::Uuid;
// db is single threaded, nerd
const MAX_WORKERS: usize = 1;
pub type JobResult = Result<(), Box<dyn Error + Send + Sync>>;
#[async_trait::async_trait]
pub trait Job: Send + Sync + Debug {
fn name(&self) -> &'static str;
async fn run(&self, ctx: WorkerContext) -> JobResult;
}
pub enum JobManagerEvent {
IngestJob(LibraryContext, Box<dyn Job>),
IngestJob(LibraryContext, Box<dyn DynJob>),
}
// jobs struct is maintained by the core
pub struct JobManager {
job_queue: RwLock<VecDeque<Box<dyn Job>>>,
job_queue: RwLock<VecDeque<Box<dyn DynJob>>>,
// workers are spawned when jobs are picked off the queue
running_workers: RwLock<HashMap<String, Arc<Mutex<Worker>>>>,
running_workers: RwLock<HashMap<Uuid, Arc<Mutex<Worker>>>>,
internal_sender: mpsc::UnboundedSender<JobManagerEvent>,
shutdown_tx: Arc<broadcast::Sender<()>>,
}
impl JobManager {
pub fn new() -> Arc<Self> {
let (shutdown_tx, _shutdown_rx) = broadcast::channel(1);
let (internal_sender, mut internal_receiver) = mpsc::unbounded_channel();
let this = Arc::new(Self {
job_queue: RwLock::new(VecDeque::new()),
running_workers: RwLock::new(HashMap::new()),
internal_sender,
shutdown_tx: Arc::new(shutdown_tx),
});
let this2 = this.clone();
tokio::spawn(async move {
// FIXME: if this task crashes, the entire application is unusable
while let Some(event) = internal_receiver.recv().await {
match event {
JobManagerEvent::IngestJob(ctx, job) => this2.clone().ingest(&ctx, job).await,
@ -62,30 +66,36 @@ impl JobManager {
this
}
pub async fn ingest(self: Arc<Self>, ctx: &LibraryContext, job: Box<dyn Job>) {
pub async fn ingest(self: Arc<Self>, ctx: &LibraryContext, mut job: Box<dyn DynJob>) {
// create worker to process job
let mut running_workers = self.running_workers.write().await;
if running_workers.len() < MAX_WORKERS {
info!("Running job: {:?}", job.name());
let worker = Worker::new(job);
let id = worker.id();
let job_report = job
.report()
.take()
.expect("critical error: missing job on worker");
let job_id = job_report.id;
let worker = Worker::new(job, job_report);
let wrapped_worker = Arc::new(Mutex::new(worker));
Worker::spawn(Arc::clone(&self), Arc::clone(&wrapped_worker), ctx.clone()).await;
running_workers.insert(id, wrapped_worker);
running_workers.insert(job_id, wrapped_worker);
} else {
self.job_queue.write().await.push_back(job);
}
}
pub async fn ingest_queue(&self, _ctx: &LibraryContext, job: Box<dyn Job>) {
pub async fn ingest_queue(&self, _ctx: &LibraryContext, job: Box<dyn DynJob>) {
self.job_queue.write().await.push_back(job);
}
pub async fn complete(self: Arc<Self>, ctx: &LibraryContext, job_id: String) {
pub async fn complete(self: Arc<Self>, ctx: &LibraryContext, job_id: Uuid) {
// remove worker from running workers
self.running_workers.write().await.remove(&job_id);
// continue queue
@ -105,7 +115,7 @@ impl JobManager {
for worker in self.running_workers.read().await.values() {
let worker = worker.lock().await;
ret.push(worker.job_report.clone());
ret.push(worker.report());
}
ret
}
@ -131,6 +141,72 @@ impl JobManager {
Ok(jobs.into_iter().map(Into::into).collect())
}
pub fn shutdown_tx(&self) -> Arc<broadcast::Sender<()>> {
Arc::clone(&self.shutdown_tx)
}
pub async fn pause(&self) {
let running_workers_read_guard = self.running_workers.read().await;
if !running_workers_read_guard.is_empty() {
self.shutdown_tx
.send(())
.expect("Failed to send shutdown signal");
}
// Dropping our handle so jobs can finish
drop(running_workers_read_guard);
loop {
sleep(Duration::from_millis(50)).await;
if self.running_workers.read().await.is_empty() {
break;
}
}
}
pub async fn resume_jobs(self: Arc<Self>, ctx: &LibraryContext) -> Result<(), JobError> {
let paused_jobs = ctx
.db
.job()
.find_many(vec![job::status::equals(JobStatus::Paused.int_value())])
.exec()
.await?;
for paused_job_data in paused_jobs {
let paused_job = JobReport::from(paused_job_data);
info!("Resuming job: {}, id: {}", paused_job.name, paused_job.id);
match paused_job.name.as_str() {
THUMBNAIL_JOB_NAME => {
Arc::clone(&self)
.ingest(ctx, Job::resume(paused_job, Box::new(ThumbnailJob {}))?)
.await;
}
INDEXER_JOB_NAME => {
Arc::clone(&self)
.ingest(ctx, Job::resume(paused_job, Box::new(IndexerJob {}))?)
.await;
}
IDENTIFIER_JOB_NAME => {
Arc::clone(&self)
.ingest(
ctx,
Job::resume(paused_job, Box::new(FileIdentifierJob {}))?,
)
.await;
}
_ => {
error!(
"Unknown job type: {}, id: {}",
paused_job.name, paused_job.id
);
return Err(JobError::UnknownJobName(paused_job.id, paused_job.name));
}
};
}
Ok(())
}
}
#[derive(Debug)]
@ -144,9 +220,9 @@ pub enum JobReportUpdate {
#[derive(Debug, Serialize, Deserialize, TS, Clone)]
#[ts(export)]
pub struct JobReport {
pub id: String,
pub id: Uuid,
pub name: String,
pub data: Option<String>,
pub data: Option<Vec<u8>>,
// client_id: i32,
#[ts(type = "string")]
pub date_created: chrono::DateTime<chrono::Utc>,
@ -163,11 +239,21 @@ pub struct JobReport {
pub seconds_elapsed: i32,
}
impl Display for JobReport {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
f,
"Job <name='{}', uuid='{}'> {:#?}",
self.name, self.id, self.status
)
}
}
// convert database struct into a resource struct
impl From<job::Data> for JobReport {
fn from(data: job::Data) -> JobReport {
JobReport {
id: data.id,
id: Uuid::from_slice(&data.id).unwrap(),
name: data.name,
// client_id: data.client_id,
status: JobStatus::from_int(data.status).unwrap(),
@ -183,7 +269,7 @@ impl From<job::Data> for JobReport {
}
impl JobReport {
pub fn new(uuid: String, name: String) -> Self {
pub fn new(uuid: Uuid, name: String) -> Self {
Self {
id: uuid,
name,
@ -209,7 +295,7 @@ impl JobReport {
ctx.db
.job()
.create(
job::id::set(self.id.clone()),
job::id::set(self.id.as_bytes().to_vec()),
job::name::set(self.name.clone()),
job::action::set(1),
job::nodes::link(node::id::equals(ctx.node_local_id)),
@ -222,9 +308,10 @@ impl JobReport {
pub async fn update(&self, ctx: &LibraryContext) -> Result<(), JobError> {
ctx.db
.job()
.find_unique(job::id::equals(self.id.clone()))
.find_unique(job::id::equals(self.id.as_bytes().to_vec()))
.update(vec![
job::status::set(self.status.int_value()),
job::data::set(self.data.clone()),
job::task_count::set(self.task_count),
job::completed_task_count::set(self.completed_task_count),
job::date_modified::set(chrono::Utc::now().into()),
@ -245,4 +332,5 @@ pub enum JobStatus {
Completed = 2,
Canceled = 3,
Failed = 4,
Paused = 5,
}

View file

@ -1,18 +1,187 @@
use std::fmt::Debug;
use crate::{file::FileError, prisma, sys::SysError};
use rmp_serde::{decode::Error as DecodeError, encode::Error as EncodeError};
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use std::{collections::VecDeque, fmt::Debug};
use thiserror::Error;
use uuid::Uuid;
use crate::prisma;
mod jobs;
mod job_manager;
mod worker;
pub use jobs::*;
pub use job_manager::*;
pub use worker::*;
#[derive(Error, Debug)]
pub enum JobError {
#[error("Failed to create job (job_id {job_id:?})")]
CreateFailure { job_id: String },
#[error("Database error")]
#[error("Database error: {0}")]
DatabaseError(#[from] prisma::QueryError),
#[error("System error: {0}")]
SystemError(#[from] SysError),
#[error("I/O error: {0}")]
IOError(#[from] std::io::Error),
#[error("Failed to join Tokio spawn blocking: {0}")]
JoinError(#[from] tokio::task::JoinError),
#[error("File error: {0}")]
FileError(#[from] FileError),
#[error("Job state encode error: {0}")]
StateEncode(#[from] EncodeError),
#[error("Job state decode error: {0}")]
StateDecode(#[from] DecodeError),
#[error("Tried to resume a job with unknown name: job <name='{1}', uuid='{0}'>")]
UnknownJobName(Uuid, String),
#[error(
"Tried to resume a job that doesn't have saved state data: job <name='{1}', uuid='{0}'>"
)]
MissingJobDataState(Uuid, String),
#[error("Job paused")]
Paused(Vec<u8>),
}
pub type JobResult = Result<(), JobError>;
#[async_trait::async_trait]
pub trait StatefulJob: Send + Sync {
type Init: Serialize + DeserializeOwned + Send + Sync;
type Data: Serialize + DeserializeOwned + Send + Sync;
type Step: Serialize + DeserializeOwned + Send + Sync;
fn name(&self) -> &'static str;
async fn init(
&self,
ctx: WorkerContext,
state: &mut JobState<Self::Init, Self::Data, Self::Step>,
) -> JobResult;
async fn execute_step(
&self,
ctx: WorkerContext,
state: &mut JobState<Self::Init, Self::Data, Self::Step>,
) -> JobResult;
async fn finalize(
&self,
ctx: WorkerContext,
state: &mut JobState<Self::Init, Self::Data, Self::Step>,
) -> JobResult;
}
#[async_trait::async_trait]
pub trait DynJob: Send + Sync {
fn report(&mut self) -> &mut Option<JobReport>;
fn name(&self) -> &'static str;
async fn run(&mut self, ctx: WorkerContext) -> JobResult;
}
pub struct Job<Init, Data, Step>
where
Init: Serialize + DeserializeOwned + Send + Sync,
Data: Serialize + DeserializeOwned + Send + Sync,
Step: Serialize + DeserializeOwned + Send + Sync,
{
report: Option<JobReport>,
state: JobState<Init, Data, Step>,
stateful_job: Box<dyn StatefulJob<Init = Init, Data = Data, Step = Step>>,
}
impl<Init, Data, Step> Job<Init, Data, Step>
where
Init: Serialize + DeserializeOwned + Send + Sync,
Data: Serialize + DeserializeOwned + Send + Sync,
Step: Serialize + DeserializeOwned + Send + Sync,
{
pub fn new(
init: Init,
stateful_job: Box<dyn StatefulJob<Init = Init, Data = Data, Step = Step>>,
) -> Box<Self> {
Box::new(Self {
report: Some(JobReport::new(
Uuid::new_v4(),
stateful_job.name().to_string(),
)),
state: JobState {
init,
data: None,
steps: VecDeque::new(),
step_number: 0,
},
stateful_job,
})
}
pub fn resume(
mut report: JobReport,
stateful_job: Box<dyn StatefulJob<Init = Init, Data = Data, Step = Step>>,
) -> Result<Box<Self>, JobError> {
let job_state_data = if let Some(data) = report.data.take() {
data
} else {
return Err(JobError::MissingJobDataState(report.id, report.name));
};
Ok(Box::new(Self {
report: Some(report),
state: rmp_serde::from_slice(&job_state_data)?,
stateful_job,
}))
}
}
#[derive(Serialize, Deserialize)]
pub struct JobState<Init, Data, Step> {
pub init: Init,
pub data: Option<Data>,
pub steps: VecDeque<Step>,
pub step_number: usize,
}
#[async_trait::async_trait]
impl<Init, Data, Step> DynJob for Job<Init, Data, Step>
where
Init: Serialize + DeserializeOwned + Send + Sync,
Data: Serialize + DeserializeOwned + Send + Sync,
Step: Serialize + DeserializeOwned + Send + Sync,
{
fn report(&mut self) -> &mut Option<JobReport> {
&mut self.report
}
fn name(&self) -> &'static str {
self.stateful_job.name()
}
async fn run(&mut self, ctx: WorkerContext) -> JobResult {
// Checking if we have a brand new job, or if we are resuming an old one.
if self.state.data.is_none() {
self.stateful_job.init(ctx.clone(), &mut self.state).await?;
}
let mut shutdown_rx = ctx.shutdown_rx();
let shutdown_rx_fut = shutdown_rx.recv();
tokio::pin!(shutdown_rx_fut);
while !self.state.steps.is_empty() {
tokio::select! {
step_result = self.stateful_job.execute_step(
ctx.clone(),
&mut self.state,
) => {
step_result?;
self.state.steps.pop_front();
}
_ = &mut shutdown_rx_fut => {
return Err(
JobError::Paused(
rmp_serde::to_vec(&self.state)?
)
);
}
}
self.state.step_number += 1;
}
self.stateful_job
.finalize(ctx.clone(), &mut self.state)
.await?;
Ok(())
}
}

View file

@ -1,208 +1,255 @@
use super::{
jobs::{JobReport, JobReportUpdate, JobStatus},
Job, JobManager,
use crate::{
job::{DynJob, JobError, JobManager, JobReportUpdate, JobStatus},
library::LibraryContext,
ClientQuery, CoreEvent, JobReport, LibraryQuery,
};
use crate::{library::LibraryContext, ClientQuery, CoreEvent, LibraryQuery};
use log::error;
use log::{error, info, warn};
use std::{sync::Arc, time::Duration};
use tokio::{
sync::{
broadcast,
mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
Mutex,
},
time::{sleep, Instant},
time::{interval_at, Instant},
};
use uuid::Uuid;
// used to update the worker state from inside the worker thread
#[derive(Debug)]
pub enum WorkerEvent {
Progressed(Vec<JobReportUpdate>),
Completed,
Failed,
}
enum WorkerState {
Pending(Box<dyn Job>, UnboundedReceiver<WorkerEvent>),
Running,
Paused(Vec<u8>),
}
#[derive(Clone)]
pub struct WorkerContext {
pub uuid: String,
library_ctx: LibraryContext,
sender: UnboundedSender<WorkerEvent>,
events_tx: UnboundedSender<WorkerEvent>,
shutdown_tx: Arc<broadcast::Sender<()>>,
}
impl WorkerContext {
pub fn progress(&self, updates: Vec<JobReportUpdate>) {
self.sender
self.events_tx
.send(WorkerEvent::Progressed(updates))
.unwrap_or(());
.expect("critical error: failed to send worker worker progress event updates");
}
pub fn library_ctx(&self) -> LibraryContext {
self.library_ctx.clone()
}
// save the job data to
// pub fn save_data () {
// }
pub fn shutdown_rx(&self) -> broadcast::Receiver<()> {
self.shutdown_tx.subscribe()
}
}
// a worker is a dedicated thread that runs a single job
// once the job is complete the worker will exit
pub struct Worker {
pub job_report: JobReport,
state: WorkerState,
worker_sender: UnboundedSender<WorkerEvent>,
job: Option<Box<dyn DynJob>>,
report: JobReport,
worker_events_tx: UnboundedSender<WorkerEvent>,
worker_events_rx: Option<UnboundedReceiver<WorkerEvent>>,
}
impl Worker {
pub fn new(job: Box<dyn Job>) -> Self {
let (worker_sender, worker_receiver) = unbounded_channel();
let uuid = Uuid::new_v4().to_string();
let name = job.name();
pub fn new(job: Box<dyn DynJob>, report: JobReport) -> Self {
let (worker_events_tx, worker_events_rx) = unbounded_channel();
Self {
state: WorkerState::Pending(job, worker_receiver),
job_report: JobReport::new(uuid, name.to_string()),
worker_sender,
job: Some(job),
report,
worker_events_tx,
worker_events_rx: Some(worker_events_rx),
}
}
pub fn report(&self) -> JobReport {
self.report.clone()
}
// spawns a thread and extracts channel sender to communicate with it
pub async fn spawn(
job_manager: Arc<JobManager>,
worker: Arc<Mutex<Self>>,
worker_mutex: Arc<Mutex<Self>>,
ctx: LibraryContext,
) {
let mut worker = worker_mutex.lock().await;
// we capture the worker receiver channel so state can be updated from inside the worker
let mut worker_mut = worker.lock().await;
// extract owned job and receiver from Self
let (job, worker_receiver) =
match std::mem::replace(&mut worker_mut.state, WorkerState::Running) {
WorkerState::Pending(job, worker_receiver) => {
worker_mut.state = WorkerState::Running;
(job, worker_receiver)
}
WorkerState::Running => unreachable!(),
};
let worker_sender = worker_mut.worker_sender.clone();
let worker_events_tx = worker.worker_events_tx.clone();
let worker_events_rx = worker
.worker_events_rx
.take()
.expect("critical error: missing worker events rx");
worker_mut.job_report.status = JobStatus::Running;
let mut job = worker
.job
.take()
.expect("critical error: missing job on worker");
worker_mut.job_report.create(&ctx).await.unwrap_or(());
let job_id = worker.report.id;
let old_status = worker.report.status;
worker.report.status = JobStatus::Running;
if matches!(old_status, JobStatus::Queued) {
worker.report.create(&ctx).await.unwrap_or(());
}
drop(worker);
// spawn task to handle receiving events from the worker
let library_ctx = ctx.clone();
tokio::spawn(Worker::track_progress(
worker.clone(),
worker_receiver,
Arc::clone(&worker_mutex),
worker_events_rx,
library_ctx.clone(),
));
let uuid = worker_mut.job_report.id.clone();
// spawn task to handle running the job
tokio::spawn(async move {
let worker_ctx = WorkerContext {
uuid,
library_ctx,
sender: worker_sender,
events_tx: worker_events_tx,
shutdown_tx: job_manager.shutdown_tx(),
};
let job_start = Instant::now();
// track time
let sender = worker_ctx.sender.clone();
let events_tx = worker_ctx.events_tx.clone();
tokio::spawn(async move {
let mut interval = interval_at(
Instant::now() + Duration::from_millis(1000),
Duration::from_millis(1000),
);
loop {
let elapsed = job_start.elapsed().as_secs();
sender
interval.tick().await;
if events_tx
.send(WorkerEvent::Progressed(vec![
JobReportUpdate::SecondsElapsed(elapsed),
JobReportUpdate::SecondsElapsed(1),
]))
.unwrap_or(());
sleep(Duration::from_millis(1000)).await;
.is_err() && events_tx.is_closed()
{
break;
}
}
});
if let Err(e) = job.run(worker_ctx.clone()).await {
error!("job '{}' failed with error: {}", worker_ctx.uuid, e);
worker_ctx.sender.send(WorkerEvent::Failed).unwrap_or(());
if let JobError::Paused(state) = e {
worker_ctx
.events_tx
.send(WorkerEvent::Paused(state))
.expect("critical error: failed to send worker pause event");
} else {
error!("job '{}' failed with error: {:#?}", job_id, e);
worker_ctx
.events_tx
.send(WorkerEvent::Failed)
.expect("critical error: failed to send worker fail event");
}
} else {
// handle completion
worker_ctx.sender.send(WorkerEvent::Completed).unwrap_or(());
worker_ctx
.events_tx
.send(WorkerEvent::Completed)
.expect("critical error: failed to send worker complete event");
}
job_manager.complete(&ctx, worker_ctx.uuid).await;
job_manager.complete(&ctx, job_id).await;
});
}
pub fn id(&self) -> String {
self.job_report.id.to_owned()
}
async fn track_progress(
worker: Arc<Mutex<Self>>,
mut channel: UnboundedReceiver<WorkerEvent>,
mut worker_events_rx: UnboundedReceiver<WorkerEvent>,
ctx: LibraryContext,
) {
while let Some(command) = channel.recv().await {
while let Some(command) = worker_events_rx.recv().await {
let mut worker = worker.lock().await;
match command {
WorkerEvent::Progressed(changes) => {
// protect against updates if job is not running
if worker.job_report.status != JobStatus::Running {
if worker.report.status != JobStatus::Running {
continue;
};
for change in changes {
match change {
JobReportUpdate::TaskCount(task_count) => {
worker.job_report.task_count = task_count as i32;
worker.report.task_count = task_count as i32;
}
JobReportUpdate::CompletedTaskCount(completed_task_count) => {
worker.job_report.completed_task_count =
completed_task_count as i32;
worker.report.completed_task_count = completed_task_count as i32;
}
JobReportUpdate::Message(message) => {
worker.job_report.message = message;
worker.report.message = message;
}
JobReportUpdate::SecondsElapsed(seconds) => {
worker.job_report.seconds_elapsed = seconds as i32;
worker.report.seconds_elapsed += seconds as i32;
}
}
}
ctx.emit(CoreEvent::InvalidateQueryDebounced(
ClientQuery::LibraryQuery {
library_id: ctx.id.to_string(),
library_id: ctx.id,
query: LibraryQuery::GetRunningJobs,
},
))
.await;
}
WorkerEvent::Completed => {
worker.job_report.status = JobStatus::Completed;
worker.job_report.update(&ctx).await.unwrap_or(());
worker.report.status = JobStatus::Completed;
worker.report.data = None;
worker
.report
.update(&ctx)
.await
.expect("critical error: failed to update job report");
ctx.emit(CoreEvent::InvalidateQuery(ClientQuery::LibraryQuery {
library_id: ctx.id.to_string(),
library_id: ctx.id,
query: LibraryQuery::GetRunningJobs,
}))
.await;
ctx.emit(CoreEvent::InvalidateQuery(ClientQuery::LibraryQuery {
library_id: ctx.id.to_string(),
library_id: ctx.id,
query: LibraryQuery::GetJobHistory,
}))
.await;
info!("{}", worker.report);
break;
}
WorkerEvent::Failed => {
worker.job_report.status = JobStatus::Failed;
worker.job_report.update(&ctx).await.unwrap_or(());
worker.report.status = JobStatus::Failed;
worker.report.data = None;
worker
.report
.update(&ctx)
.await
.expect("critical error: failed to update job report");
ctx.emit(CoreEvent::InvalidateQuery(ClientQuery::LibraryQuery {
library_id: ctx.id.to_string(),
library_id: ctx.id,
query: LibraryQuery::GetJobHistory,
}))
.await;
warn!("{}", worker.report);
break;
}
WorkerEvent::Paused(state) => {
worker.report.status = JobStatus::Paused;
worker.report.data = Some(state);
worker
.report
.update(&ctx)
.await
.expect("critical error: failed to update job report");
info!("{}", worker.report);
ctx.emit(CoreEvent::InvalidateQuery(ClientQuery::LibraryQuery {
library_id: ctx.id,
query: LibraryQuery::GetJobHistory,
}))
.await;

View file

@ -1,15 +1,19 @@
use crate::{file::cas::FileIdentifierJob, prisma::file as prisma_file, prisma::location};
use job::{JobManager, JobReport};
use library::{LibraryConfig, LibraryConfigWrapped, LibraryManager};
use log::error;
use node::{NodeConfig, NodeConfigManager};
use crate::{
encode::{ThumbnailJob, ThumbnailJobInit},
file::cas::{FileIdentifierJob, FileIdentifierJobInit},
job::{Job, JobManager, JobReport},
library::{LibraryConfig, LibraryConfigWrapped, LibraryManager},
node::{NodeConfig, NodeConfigManager},
prisma::file as prisma_file,
prisma::location,
tag::{Tag, TagWithFiles},
};
use log::{error, info};
use serde::{Deserialize, Serialize};
use std::{
path::{Path, PathBuf},
sync::Arc,
};
use tag::{Tag, TagWithFiles};
use thiserror::Error;
use tokio::{
fs,
@ -19,8 +23,7 @@ use tokio::{
},
};
use ts_rs::TS;
use crate::encode::ThumbnailJob;
use uuid::Uuid;
mod encode;
mod file;
@ -56,7 +59,7 @@ impl NodeController {
})
.unwrap_or(());
// wait for response and return
recv.await.unwrap_or(Err(CoreError::QueryError))
recv.await.unwrap_or(Err(CoreError::Query))
}
pub async fn command(&self, command: ClientCommand) -> Result<CoreResponse, CoreError> {
@ -102,35 +105,56 @@ pub struct Node {
UnboundedReceiver<ReturnableMessage<ClientCommand>>,
),
event_sender: mpsc::Sender<CoreEvent>,
shutdown_completion_tx: oneshot::Sender<()>,
}
impl Node {
// create new instance of node, run startup tasks
pub async fn new(
data_dir: impl AsRef<Path>,
) -> (NodeController, mpsc::Receiver<CoreEvent>, Node) {
fs::create_dir_all(&data_dir).await.unwrap();
) -> (
NodeController,
mpsc::Receiver<CoreEvent>,
Node,
oneshot::Receiver<()>,
) {
let data_dir = data_dir.as_ref();
fs::create_dir_all(data_dir).await.unwrap();
let (event_sender, event_recv) = mpsc::channel(100);
let config = NodeConfigManager::new(data_dir.as_ref().to_owned())
.await
.unwrap();
let config = NodeConfigManager::new(data_dir.to_owned()).await.unwrap();
let (shutdown_completion_tx, shutdown_completion_rx) = oneshot::channel();
let jobs = JobManager::new();
let node_ctx = NodeContext {
event_sender: event_sender.clone(),
config: config.clone(),
jobs: jobs.clone(),
};
let library_manager = LibraryManager::new(data_dir.join("libraries"), node_ctx)
.await
.unwrap();
// Trying to resume possible paused jobs
let inner_library_manager = Arc::clone(&library_manager);
let inner_jobs = Arc::clone(&jobs);
tokio::spawn(async move {
for library_ctx in inner_library_manager.get_all_libraries_ctx().await {
if let Err(e) = Arc::clone(&inner_jobs).resume_jobs(&library_ctx).await {
error!("Failed to resume jobs for library. {:#?}", e);
}
}
});
let node = Node {
config,
library_manager: LibraryManager::new(data_dir.as_ref().join("libraries"), node_ctx)
.await
.unwrap(),
library_manager,
query_channel: unbounded_channel(),
command_channel: unbounded_channel(),
jobs,
event_sender,
shutdown_completion_tx,
};
(
@ -140,6 +164,7 @@ impl Node {
},
event_recv,
node,
shutdown_completion_rx,
)
}
@ -151,7 +176,7 @@ impl Node {
}
}
pub async fn start(mut self) {
pub async fn start(mut self, mut shutdown_rx: oneshot::Receiver<()>) {
loop {
// listen on global messaging channels for incoming messages
tokio::select! {
@ -163,10 +188,24 @@ impl Node {
let res = self.exec_command(msg.data).await;
msg.return_sender.send(res).unwrap_or(());
}
_ = &mut shutdown_rx => {
info!("Initiating shutdown node...");
self.shutdown().await;
info!("Node shutdown complete.");
self.shutdown_completion_tx.send(())
.expect("critical error: failed to send node shutdown completion signal");
break;
}
}
}
}
pub async fn shutdown(&self) {
self.jobs.pause().await
}
async fn exec_command(&mut self, cmd: ClientCommand) -> Result<CoreResponse, CoreError> {
Ok(match cmd {
ClientCommand::CreateLibrary { name } => {
@ -258,19 +297,25 @@ impl Node {
// CRUD for libraries
LibraryCommand::VolUnmount { id: _ } => todo!(),
LibraryCommand::GenerateThumbsForLocation { id, path } => {
ctx.spawn_job(Box::new(ThumbnailJob {
location_id: id,
path,
background: false, // fix
}))
ctx.spawn_job(Job::new(
ThumbnailJobInit {
location_id: id,
path,
background: false, // fix
},
Box::new(ThumbnailJob {}),
))
.await;
CoreResponse::Success(())
}
LibraryCommand::IdentifyUniqueFiles { id, path } => {
ctx.spawn_job(Box::new(FileIdentifierJob {
location_id: id,
path,
}))
ctx.spawn_job(Job::new(
FileIdentifierJobInit {
location_id: id,
path,
},
Box::new(FileIdentifierJob {}),
))
.await;
CoreResponse::Success(())
}
@ -292,7 +337,7 @@ impl Node {
ClientQuery::GetNodes => todo!(),
ClientQuery::GetVolumes => CoreResponse::GetVolumes(sys::Volume::get_volumes()?),
ClientQuery::LibraryQuery { library_id, query } => {
let ctx = match self.library_manager.get_ctx(library_id.clone()).await {
let ctx = match self.library_manager.get_ctx(library_id).await {
Some(ctx) => ctx,
None => {
println!("Library '{}' not found!", library_id);
@ -344,15 +389,15 @@ pub enum ClientCommand {
name: String,
},
EditLibrary {
id: String,
id: Uuid,
name: Option<String>,
description: Option<String>,
},
DeleteLibrary {
id: String,
id: Uuid,
},
LibraryCommand {
library_id: String,
library_id: Uuid,
command: LibraryCommand,
},
}
@ -437,7 +482,7 @@ pub enum ClientQuery {
GetVolumes,
GetNodes,
LibraryQuery {
library_id: String,
library_id: Uuid,
query: LibraryQuery,
},
}
@ -512,17 +557,17 @@ pub enum CoreResponse {
#[derive(Error, Debug)]
pub enum CoreError {
#[error("Query error")]
QueryError,
#[error("System error")]
SysError(#[from] sys::SysError),
#[error("File error")]
FileError(#[from] file::FileError),
#[error("Job error")]
JobError(#[from] job::JobError),
#[error("Database error")]
DatabaseError(#[from] prisma::QueryError),
#[error("Database error")]
LibraryError(#[from] library::LibraryError),
Query,
#[error("System error: {0}")]
Sys(#[from] sys::SysError),
#[error("File error: {0}")]
File(#[from] file::FileError),
#[error("Job error: {0}")]
Job(#[from] job::JobError),
#[error("Database error: {0}")]
Database(#[from] prisma::QueryError),
#[error("Library error: {0}")]
Library(#[from] library::LibraryError),
}
#[derive(Serialize, Deserialize, Debug, Clone, TS)]

View file

@ -7,6 +7,7 @@ use std::{
use serde::{Deserialize, Serialize};
use std::io::Write;
use ts_rs::TS;
use uuid::Uuid;
use crate::node::ConfigMetadata;
@ -64,6 +65,6 @@ impl LibraryConfig {
#[derive(Serialize, Deserialize, Debug, TS)]
#[ts(export)]
pub struct LibraryConfigWrapped {
pub uuid: String,
pub uuid: Uuid,
pub config: LibraryConfig,
}

View file

@ -1,9 +1,7 @@
use crate::{job::DynJob, node::NodeConfigManager, prisma::PrismaClient, CoreEvent, NodeContext};
use std::sync::Arc;
use uuid::Uuid;
use crate::{job::Job, node::NodeConfigManager, prisma::PrismaClient, CoreEvent, NodeContext};
use super::LibraryConfig;
/// LibraryContext holds context for a library which can be passed around the application.
@ -22,11 +20,11 @@ pub struct LibraryContext {
}
impl LibraryContext {
pub(crate) async fn spawn_job(&self, job: Box<dyn Job>) {
pub(crate) async fn spawn_job(&self, job: Box<dyn DynJob>) {
self.node_context.jobs.clone().ingest(self, job).await;
}
pub(crate) async fn queue_job(&self, job: Box<dyn Job>) {
pub(crate) async fn queue_job(&self, job: Box<dyn DynJob>) {
self.node_context.jobs.ingest_queue(self, job).await;
}

View file

@ -85,15 +85,7 @@ impl LibraryManager {
}
let config = LibraryConfig::read(config_path).await?;
libraries.push(
Self::load(
library_id,
db_path.to_str().unwrap(),
config,
node_context.clone(),
)
.await?,
);
libraries.push(Self::load(library_id, &db_path, config, node_context.clone()).await?);
}
let this = Arc::new(Self {
@ -108,8 +100,7 @@ impl LibraryManager {
name: "My Default Library".into(),
..Default::default()
})
.await
.unwrap();
.await?;
}
Ok(this)
@ -148,14 +139,18 @@ impl LibraryManager {
.iter()
.map(|lib| LibraryConfigWrapped {
config: lib.config.clone(),
uuid: lib.id.to_string(),
uuid: lib.id,
})
.collect()
}
pub(crate) async fn get_all_libraries_ctx(&self) -> Vec<LibraryContext> {
self.libraries.read().await.clone()
}
pub(crate) async fn edit(
&self,
id: String,
id: Uuid,
name: Option<String>,
description: Option<String>,
) -> Result<(), LibraryManagerError> {
@ -163,7 +158,7 @@ impl LibraryManager {
let mut libraries = self.libraries.write().await;
let library = libraries
.iter_mut()
.find(|lib| lib.id == Uuid::from_str(&id).unwrap())
.find(|lib| lib.id == id)
.ok_or(LibraryManagerError::LibraryNotFound)?;
// update the library
@ -186,11 +181,9 @@ impl LibraryManager {
Ok(())
}
pub async fn delete_library(&self, id: String) -> Result<(), LibraryManagerError> {
pub async fn delete_library(&self, id: Uuid) -> Result<(), LibraryManagerError> {
let mut libraries = self.libraries.write().await;
let id = Uuid::parse_str(&id)?;
let library = libraries
.iter()
.find(|l| l.id == id)
@ -208,12 +201,12 @@ impl LibraryManager {
}
// get_ctx will return the library context for the given library id.
pub(crate) async fn get_ctx(&self, library_id: String) -> Option<LibraryContext> {
pub(crate) async fn get_ctx(&self, library_id: Uuid) -> Option<LibraryContext> {
self.libraries
.read()
.await
.iter()
.find(|lib| lib.id.to_string() == library_id)
.find(|lib| lib.id == library_id)
.map(Clone::clone)
}
@ -239,12 +232,14 @@ impl LibraryManager {
_ => Platform::Unknown,
};
let uuid_vec = id.as_bytes().to_vec();
let node_data = db
.node()
.upsert(
node::pub_id::equals(id.to_string()),
node::pub_id::equals(uuid_vec.clone()),
(
node::pub_id::set(id.to_string()),
node::pub_id::set(uuid_vec),
node::name::set(node_config.name.clone()),
vec![node::platform::set(platform as i32)],
),

View file

@ -1,3 +1,6 @@
use crate::{prisma, sys::SysError};
use thiserror::Error;
mod library_config;
mod library_ctx;
mod library_manager;
@ -8,10 +11,6 @@ pub use library_ctx::*;
pub use library_manager::*;
pub use statistics::*;
use thiserror::Error;
use crate::{prisma, sys::SysError};
#[derive(Error, Debug)]
pub enum LibraryError {
#[error("Missing library")]

View file

@ -1,8 +1,10 @@
use serde::{Deserialize, Serialize};
use std::fs::File;
use std::io::{self, BufReader, Seek, SeekFrom, Write};
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::{
fs::File,
io::{self, BufReader, Seek, SeekFrom, Write},
path::{Path, PathBuf},
sync::Arc,
};
use thiserror::Error;
use tokio::sync::{RwLock, RwLockWriteGuard};
use ts_rs::TS;

View file

@ -2,6 +2,8 @@ use chrono::{DateTime, Utc};
use int_enum::IntEnum;
use serde::{Deserialize, Serialize};
use ts_rs::TS;
use uuid::Uuid;
mod config;
use crate::prisma::node;
pub use config::*;
@ -9,7 +11,7 @@ pub use config::*;
#[derive(Debug, Clone, Serialize, Deserialize, TS)]
#[ts(export)]
pub struct LibraryNode {
pub uuid: String,
pub uuid: Uuid,
pub name: String,
pub platform: Platform,
pub last_seen: DateTime<Utc>,
@ -18,7 +20,7 @@ pub struct LibraryNode {
impl From<node::Data> for LibraryNode {
fn from(data: node::Data) -> Self {
Self {
uuid: data.pub_id,
uuid: Uuid::from_slice(&data.pub_id).unwrap(),
name: data.name,
platform: IntEnum::from_int(data.platform).unwrap(),
last_seen: data.last_seen.into(),

View file

@ -1,15 +1,21 @@
use super::SysError;
use crate::{
file::{cas::FileIdentifierJob, indexer::IndexerJob},
file::{
cas::FileIdentifierJob,
indexer::{IndexerJob, IndexerJobInit},
},
library::LibraryContext,
node::LibraryNode,
prisma::{file_path, location},
ClientQuery, CoreEvent, LibraryQuery,
ClientQuery, CoreEvent, FileIdentifierJobInit, Job, LibraryQuery, ThumbnailJob,
ThumbnailJobInit,
};
use log::info;
use serde::{Deserialize, Serialize};
use std::fmt::Debug;
use std::path::{Path, PathBuf};
use std::{
fmt::Debug,
path::{Path, PathBuf},
};
use thiserror::Error;
use tokio::{
fs::{metadata, File},
@ -18,14 +24,12 @@ use tokio::{
use ts_rs::TS;
use uuid::Uuid;
use super::SysError;
#[derive(Debug, Clone, Serialize, Deserialize, TS)]
#[ts(export)]
pub struct LocationResource {
pub id: i32,
pub name: Option<String>,
pub path: Option<String>,
pub path: Option<PathBuf>,
pub total_capacity: Option<i32>,
pub available_capacity: Option<i32>,
pub is_removable: Option<bool>,
@ -40,7 +44,7 @@ impl From<location::Data> for LocationResource {
LocationResource {
id: data.id,
name: data.name,
path: data.local_path,
path: data.local_path.map(PathBuf::from),
total_capacity: data.total_capacity,
available_capacity: data.available_capacity,
is_removable: data.is_removable,
@ -88,21 +92,31 @@ pub async fn get_location(
pub async fn scan_location(ctx: &LibraryContext, location_id: i32, path: impl AsRef<Path>) {
let path_buf = path.as_ref().to_path_buf();
ctx.spawn_job(Box::new(IndexerJob {
path: path_buf.clone(),
}))
ctx.spawn_job(Job::new(
IndexerJobInit {
path: path_buf.clone(),
},
Box::new(IndexerJob {}),
))
.await;
ctx.queue_job(Box::new(FileIdentifierJob {
location_id,
path: path_buf,
}))
ctx.queue_job(Job::new(
FileIdentifierJobInit {
location_id,
path: path_buf.clone(),
},
Box::new(FileIdentifierJob {}),
))
.await;
ctx.queue_job(Job::new(
ThumbnailJobInit {
location_id,
path: path_buf,
background: true,
},
Box::new(ThumbnailJob {}),
))
.await;
// TODO: make a way to stop jobs so this can be canceled without rebooting app
// ctx.queue_job(Box::new(ThumbnailJob {
// location_id,
// path: "".to_string(),
// background: false,
// }));
}
pub async fn new_location_and_scan(
@ -173,7 +187,7 @@ pub async fn create_location(
.db
.location()
.create(
location::pub_id::set(uuid.to_string()),
location::pub_id::set(uuid.as_bytes().to_vec()),
vec![
location::name::set(Some(
path.file_name().unwrap().to_string_lossy().to_string(),
@ -231,7 +245,7 @@ pub async fn delete_location(ctx: &LibraryContext, location_id: i32) -> Result<(
.await?;
ctx.emit(CoreEvent::InvalidateQuery(ClientQuery::LibraryQuery {
library_id: ctx.id.to_string(),
library_id: ctx.id,
query: LibraryQuery::GetLocations,
}))
.await;

View file

@ -6,7 +6,7 @@ pub use volumes::*;
use thiserror::Error;
use crate::{job, prisma};
use crate::prisma;
#[derive(Error, Debug)]
pub enum SysError {
@ -14,8 +14,6 @@ pub enum SysError {
Location(#[from] LocationError),
#[error("Error with system volumes")]
Volume(String),
#[error("Error from job runner")]
Job(#[from] job::JobError),
#[error("Database error")]
Database(#[from] prisma::QueryError),
}

View file

@ -11,12 +11,13 @@ use crate::{
use serde::{Deserialize, Serialize};
use thiserror::Error;
use ts_rs::TS;
use uuid::Uuid;
#[derive(Debug, Clone, Serialize, Deserialize, TS)]
#[ts(export)]
pub struct Tag {
pub id: i32,
pub pub_id: String,
pub pub_id: Uuid,
pub name: Option<String>,
pub color: Option<String>,
@ -43,7 +44,7 @@ impl From<tag::Data> for Tag {
fn from(data: tag::Data) -> Self {
Self {
id: data.id,
pub_id: data.pub_id,
pub_id: Uuid::from_slice(&data.pub_id).unwrap(),
name: data.name,
color: data.color,
total_files: data.total_files,
@ -90,7 +91,7 @@ pub async fn create_tag(
.db
.tag()
.create(
tag::pub_id::set(uuid::Uuid::new_v4().to_string()),
tag::pub_id::set(Uuid::new_v4().as_bytes().to_vec()),
vec![tag::name::set(Some(name)), tag::color::set(Some(color))],
)
.exec()
@ -98,7 +99,7 @@ pub async fn create_tag(
.unwrap();
ctx.emit(CoreEvent::InvalidateQuery(ClientQuery::LibraryQuery {
library_id: ctx.id.to_string(),
library_id: ctx.id,
query: LibraryQuery::GetTags,
}))
.await;
@ -121,7 +122,7 @@ pub async fn update_tag(
.unwrap();
ctx.emit(CoreEvent::InvalidateQuery(ClientQuery::LibraryQuery {
library_id: ctx.id.to_string(),
library_id: ctx.id,
query: LibraryQuery::GetTags,
}))
.await;
@ -153,7 +154,7 @@ pub async fn tag_delete(ctx: LibraryContext, id: i32) -> Result<CoreResponse, Co
.unwrap();
ctx.emit(CoreEvent::InvalidateQuery(ClientQuery::LibraryQuery {
library_id: ctx.id.to_string(),
library_id: ctx.id,
query: LibraryQuery::GetTags,
}))
.await;

20
cspell.config.yaml Normal file
View file

@ -0,0 +1,20 @@
$schema: https://raw.githubusercontent.com/streetsidesoftware/cspell/main/cspell.schema.json
version: '0.2'
dictionaryDefinitions:
- name: project_words
path: './.cspell/project_words.txt'
addWords: true
- name: frontend_custom_words
path: './.cspell/frontend_custom_words.txt'
addWords: true
- name: backend_custom_words
path: './.cspell/backend_custom_words.txt'
addWords: true
dictionaries:
- project_words
- frontend_custom_words
- backend_custom_words
ignorePaths:
- 'node_modules'
- '.cspell'
- 'target'

View file

@ -2,8 +2,8 @@
## Database migrations
Currently migrations are applied on app launch with no visual feedback, backup or error handling.
Currently, migrations are applied on app launch with no visual feedback, backup or error handling.
It doesn't appear that migrations are applied succesfully
It doesn't appear that migrations are applied successfully
##

View file

@ -5,7 +5,7 @@ Synchronizing data between clients in a Spacedrive network is accomplished using
Designed for synchronizing data in realtime between [SQLite](https://www.sqlite.org/) databases potentially in the gigabytes.
```rust
// we can now impl specfic CRDT traits to given resources
// we can now impl specific CRDT traits to given resources
enum SyncResource {
FilePath(dyn Replicate),
File(dyn PropertyOperation),
@ -185,7 +185,7 @@ impl Replicate for Job {
We have a simple Rust syntax for creating sync events in the core.
```rust
aysnc fn my_core_function(&ctx: CoreContext) -> Result<()> {
async fn my_core_function(&ctx: CoreContext) -> Result<()> {
let mut file = File::get_unique(1).await?;
ctx.sync.operation(file.id,

View file

@ -15,7 +15,7 @@
**To be developed (MVP):**
- **Photos** - Photo and video albums similar to Apple/Google photos.
- **Search** - Deep search into your filesystem with a keybind, including offline locations.
- **Search** - Deep search into your filesystem with a keybinding, including offline locations.
- **Tags** - Define routines on custom tags to automate workflows, easily tag files individually, in bulk and automatically via rules.
- **Extensions** - Build tools on top of Spacedrive, extend functionality and integrate third party services. Extension directory on [spacedrive.com/extensions](/extensions).
@ -25,7 +25,7 @@
- **Cloud integration** - Index & backup to Apple Photos, Google Drive, Dropbox, OneDrive & Mega + easy API for the community to add more.
- **Encrypted vault(s)** - Effortlessly manage & encrypt sensitive files, built on top of VeraCrypt. Encrypt individual files or create flexible-size vaults.
- **Key manager** - View, mount, dismount and hide keys. Mounted keys automatically unlock respective areas of your filesystem.
- **Redundancy Goal** - Ensure a specific amount of copies exist for your important data, discover at-risk files and monitor device/drive health.
- **Redundancy Goal** - Ensure a specific amount of copies exists for your important data, discover at-risk files and monitor device/drive health.
- **Timeline** - View a linear timeline of content, travel to any time and see media represented visually.
- **Media encoder** - Encode video and audio into various formats, use Tags to automate. Built with FFMPEG.
- **Workers** - Utilize the compute power of your devices in unison to encode and perform tasks at increased speeds.
- **Workers** - Utilize the computing power of your devices in unison to encode and perform tasks at increased speeds.

33
lefthook.yml Normal file
View file

@ -0,0 +1,33 @@
########################################################################################################################
# Refer for explanation to following link: #
# https://github.com/evilmartians/lefthook/blob/master/docs/full_guide.md #
########################################################################################################################
pre-commit:
parallel: true
commands:
type-check:
glob: "*.{ts,tsx}"
run: pnpm typecheck
lint:
glob: '*.{ts,tsx}'
run: pnpm eslint {all_files}
spelling:
glob: '*.{ts,tsx,md,rs}'
run: pnpm cspell {staged_files}
markdown-link-check:
glob: '*.md'
run: pnpm markdown-link-check {staged_files}
rust-fmt:
glob: '*.rs'
run: cargo fmt --all -- --check
rust-lint-tauri:
run: cargo clippy --package spacedrive -- -D warnings
rust-lint-core:
run: cargo clippy --package sdcore --lib -- -D warnings
rust-lint-core-prisma:
run: cargo clippy --package prisma-cli -- -D warnings
rust-lint-core-derive:
run: cargo clippy --package core-derive --lib -- -D warnings
rust-lint-server:
run: cargo clippy --package server -- -D warnings

View file

@ -23,7 +23,20 @@
"typecheck": "pnpm -r exec tsc"
},
"devDependencies": {
"@cspell/dict-rust": "^2.0.1",
"@cspell/dict-typescript": "^2.0.1",
"@evilmartians/lefthook": "^1.0.5",
"@trivago/prettier-plugin-sort-imports": "^3.2.0",
"@typescript-eslint/eslint-plugin": "^5.30.7",
"@typescript-eslint/parser": "^5.30.7",
"cspell": "^6.4.0",
"eslint": "^8.20.0",
"eslint-config-prettier": "^8.5.0",
"eslint-config-standard-with-typescript": "^22.0.0",
"eslint-plugin-import": ">=2.25.2 <3.0.0",
"eslint-plugin-n": ">=15.0.0 <16.0.0",
"eslint-plugin-promise": ">=6.0.0 <7.0.0",
"markdown-link-check": "^3.10.2",
"prettier": "^2.6.2",
"turbo": "^1.2.14",
"typescript": "^4.7.4"

View file

@ -16,7 +16,7 @@ import { SettingsScreen } from './screens/settings/Settings';
import AppearanceSettings from './screens/settings/client/AppearanceSettings';
import ExtensionSettings from './screens/settings/client/ExtensionsSettings';
import GeneralSettings from './screens/settings/client/GeneralSettings';
import KeybindSettings from './screens/settings/client/KeybindSettings';
import KeybindingSettings from './screens/settings/client/KeybindingSettings';
import PrivacySettings from './screens/settings/client/PrivacySettings';
import AboutSpacedrive from './screens/settings/info/AboutSpacedrive';
import Changelog from './screens/settings/info/Changelog';
@ -66,7 +66,7 @@ export function AppRouter() {
<Route index element={<GeneralSettings />} />
<Route path="general" element={<GeneralSettings />} />
<Route path="appearance" element={<AppearanceSettings />} />
<Route path="keybinds" element={<KeybindSettings />} />
<Route path="keybindings" element={<KeybindingSettings />} />
<Route path="extensions" element={<ExtensionSettings />} />
<Route path="p2p" element={<P2PSettings />} />
<Route path="contacts" element={<ContactsSettings />} />

View file

@ -2,7 +2,7 @@ import clsx from 'clsx';
import React from 'react';
export interface CheckboxProps extends React.InputHTMLAttributes<HTMLInputElement> {
containerClasname?: string;
containerClassname?: string;
}
export const Checkbox: React.FC<CheckboxProps> = (props) => {
@ -10,7 +10,7 @@ export const Checkbox: React.FC<CheckboxProps> = (props) => {
<label
className={clsx(
'flex items-center text-sm font-medium text-gray-700 dark:text-gray-100',
props.containerClasname
props.containerClassname
)}
>
<input

View file

@ -59,9 +59,9 @@ export const SettingsScreen: React.FC = () => {
<SettingsIcon component={PaintBrush} />
Appearance
</SidebarLink>
<SidebarLink to="/settings/keybinds">
<SidebarLink to="/settings/keybindings">
<SettingsIcon component={KeyReturn} />
Keybinds
Keybindings
</SidebarLink>
<SidebarLink to="/settings/extensions">
<SettingsIcon component={PuzzlePiece} />

View file

@ -8,11 +8,11 @@ import { SettingsHeader } from '../../../components/settings/SettingsHeader';
export default function AppearanceSettings() {
return (
<SettingsContainer>
<SettingsHeader title="Keybinds" description="Manage client keybinds" />
<SettingsHeader title="Keybindings" description="Manage client keybindings" />
<InputContainer
mini
title="Sync with Library"
description="If enabled your keybinds will be synced with library, otherwise they will apply only to this client."
description="If enabled your keybindings will be synced with library, otherwise they will apply only to this client."
>
<Toggle value />
</InputContainer>

File diff suppressed because it is too large Load diff