Fixing Clippy warnings

* Using tokio on all filesystem operations
* Some minor tweaks to be more consistent on paths between &str, AsRef<Path> and PathBuf
* Using logging instead of println
This commit is contained in:
Ericson Fogo Soares 2022-06-28 22:56:49 -03:00
parent 2cc3f3d95d
commit ef2f60ff33
21 changed files with 532 additions and 499 deletions

15
Cargo.lock generated
View file

@ -4872,7 +4872,7 @@ dependencies = [
"thiserror",
"tokio",
"ts-rs",
"uuid 0.8.2",
"uuid 1.1.2",
"walkdir",
"webp",
]
@ -6341,6 +6341,19 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dd6469f4314d5f1ffec476e05f17cc9a78bc7a27a6a857842170bdf8d6f98d2f"
dependencies = [
"getrandom 0.2.7",
"serde",
"uuid-macro-internal",
]
[[package]]
name = "uuid-macro-internal"
version = "1.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "548f7181a5990efa50237abb7ebca410828b57a8955993334679f8b50b35c97d"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]

View file

@ -28,7 +28,7 @@ ts-rs = { version = "6.1", features = ["chrono-impl"] }
prisma-client-rust = { git = "https://github.com/Brendonovich/prisma-client-rust.git", tag = "0.5.0" }
walkdir = "^2.3.2"
lazy_static = "1.4.0"
uuid = "0.8"
uuid = { version = "1.1.2", features = ["v4", "macro-diagnostics", "serde"]}
sysinfo = "0.23.9"
thiserror = "1.0.30"
core-derive = { path = "./derive" }

View file

@ -1,5 +1,4 @@
extern crate ffmpeg_next as ffmpeg;
use ffmpeg::format;
use ffmpeg_next::format;
#[derive(Default, Debug)]
pub struct MediaItem {
@ -125,10 +124,10 @@ pub struct AudioStream {
// }
// media_item.steams.push(stream_item);
// }
// println!("{:#?}", media_item);
// info!("{:#?}", media_item);
// }
// Err(error) => println!("error: {}", error),
// Err(error) => error!("error: {}", error),
// }
// Ok(())
// }

View file

@ -1,22 +1,21 @@
use crate::job::JobReportUpdate;
use crate::node::get_nodestate;
use crate::{
job::{Job, WorkerContext},
job::{Job, JobReportUpdate, WorkerContext},
node::get_nodestate,
prisma::file_path,
CoreContext,
sys, CoreContext, CoreEvent,
};
use crate::{sys, CoreEvent};
use futures::executor::block_on;
use image::*;
use image::{self, imageops, DynamicImage, GenericImageView};
use log::{error, info};
use std::fs;
use std::error::Error;
use std::path::{Path, PathBuf};
use webp::*;
use tokio::fs;
use webp::Encoder;
#[derive(Debug, Clone)]
pub struct ThumbnailJob {
pub location_id: i32,
pub path: String,
pub path: PathBuf,
pub background: bool,
}
@ -29,30 +28,34 @@ impl Job for ThumbnailJob {
fn name(&self) -> &'static str {
"thumbnailer"
}
async fn run(&self, ctx: WorkerContext) -> Result<(), Box<dyn std::error::Error>> {
let config = get_nodestate();
let core_ctx = ctx.core_ctx.clone();
let location = sys::get_location(&core_ctx, self.location_id).await?;
async fn run(&self, ctx: WorkerContext) -> Result<(), Box<dyn Error>> {
let config = get_nodestate();
let location = sys::get_location(&ctx.core_ctx, self.location_id).await?;
info!(
"Searching for images in location {} at path {}",
"Searching for images in location {} at path {:#?}",
location.id, self.path
);
// create all necessary directories if they don't exist
fs::create_dir_all(
Path::new(&config.data_path)
config
.data_path
.as_ref()
.unwrap()
.join(THUMBNAIL_CACHE_DIR_NAME)
.join(format!("{}", self.location_id)),
)?;
)
.await?;
let root_path = location.path.unwrap();
// query database for all files in this location that need thumbnails
let image_files = get_images(&core_ctx, self.location_id, &self.path).await?;
let image_files = get_images(&ctx.core_ctx, self.location_id, &self.path).await?;
info!("Found {:?} files", image_files.len());
let is_background = self.background.clone();
let is_background = self.background;
tokio::task::spawn_blocking(move || {
ctx.progress(vec![
@ -89,7 +92,10 @@ impl Job for ThumbnailJob {
};
// Define and write the WebP-encoded file to a given path
let output_path = Path::new(&config.data_path)
let output_path = config
.data_path
.as_ref()
.unwrap()
.join(THUMBNAIL_CACHE_DIR_NAME)
.join(format!("{}", location.id))
.join(&cas_id)
@ -98,7 +104,7 @@ impl Job for ThumbnailJob {
// check if file exists at output path
if !output_path.exists() {
info!("Writing {:?} to {:?}", path, output_path);
generate_thumbnail(&path, &output_path)
block_on(generate_thumbnail(&path, &output_path))
.map_err(|e| {
info!("Error generating thumb {:?}", e);
})
@ -120,27 +126,27 @@ impl Job for ThumbnailJob {
}
}
pub fn generate_thumbnail(
file_path: &PathBuf,
output_path: &PathBuf,
) -> Result<(), Box<dyn std::error::Error>> {
pub async fn generate_thumbnail<P: AsRef<Path>>(
file_path: P,
output_path: P,
) -> Result<(), Box<dyn Error>> {
// Using `image` crate, open the included .jpg file
let img = image::open(file_path)?;
let (w, h) = img.dimensions();
// Optionally, resize the existing photo and convert back into DynamicImage
let img: DynamicImage = image::DynamicImage::ImageRgba8(imageops::resize(
let img = DynamicImage::ImageRgba8(imageops::resize(
&img,
(w as f32 * THUMBNAIL_SIZE_FACTOR) as u32,
(h as f32 * THUMBNAIL_SIZE_FACTOR) as u32,
imageops::FilterType::Triangle,
));
// Create the WebP encoder for the above image
let encoder: Encoder = Encoder::from_image(&img)?;
let encoder = Encoder::from_image(&img)?;
// Encode the image at a specified quality 0-100
let webp: WebPMemory = encoder.encode(THUMBNAIL_QUALITY);
let webp = encoder.encode(THUMBNAIL_QUALITY);
std::fs::write(&output_path, &*webp)?;
fs::write(output_path, &*webp).await?;
Ok(())
}
@ -148,7 +154,7 @@ pub fn generate_thumbnail(
pub async fn get_images(
ctx: &CoreContext,
location_id: i32,
path: &str,
path: impl AsRef<Path>,
) -> Result<Vec<file_path::Data>, std::io::Error> {
let mut params = vec![
file_path::location_id::equals(Some(location_id)),
@ -161,8 +167,10 @@ pub async fn get_images(
]),
];
if !path.is_empty() {
params.push(file_path::materialized_path::starts_with(path.to_string()))
let path_str = path.as_ref().to_string_lossy().to_string();
if !path_str.is_empty() {
params.push(file_path::materialized_path::starts_with(path_str))
}
let image_files = ctx

View file

@ -1,36 +1,27 @@
use data_encoding::HEXLOWER;
use ring::digest::{Context, SHA256};
use std::convert::TryInto;
use std::fs::File;
use std::io;
use std::path::PathBuf;
use tokio::{
fs::File,
io::{self, AsyncReadExt, AsyncSeekExt, SeekFrom},
};
static SAMPLE_COUNT: u64 = 4;
static SAMPLE_SIZE: u64 = 10000;
fn read_at(file: &File, offset: u64, size: u64) -> Result<Vec<u8>, io::Error> {
async fn read_at(file: &mut File, offset: u64, size: u64) -> Result<Vec<u8>, io::Error> {
let mut buf = vec![0u8; size as usize];
#[cfg(target_family = "unix")]
{
use std::os::unix::prelude::*;
file.read_exact_at(&mut buf, offset)?;
}
#[cfg(target_family = "windows")]
{
use std::os::windows::prelude::*;
file.seek_read(&mut buf, offset)?;
}
file.seek(SeekFrom::Start(offset)).await?;
file.read_exact(&mut buf).await?;
Ok(buf)
}
pub fn generate_cas_id(path: PathBuf, size: u64) -> Result<String, io::Error> {
pub async fn generate_cas_id(path: PathBuf, size: u64) -> Result<String, io::Error> {
// open file reference
let file = File::open(path)?;
let mut file = File::open(path).await?;
let mut context = Context::new(&SHA256);
@ -39,20 +30,16 @@ pub fn generate_cas_id(path: PathBuf, size: u64) -> Result<String, io::Error> {
// if size is small enough, just read the whole thing
if SAMPLE_COUNT * SAMPLE_SIZE > size {
let buf = read_at(&file, 0, size.try_into().unwrap())?;
let buf = read_at(&mut file, 0, size).await?;
context.update(&buf);
} else {
// loop over samples
for i in 0..SAMPLE_COUNT {
let buf = read_at(
&file,
(size / SAMPLE_COUNT) * i,
SAMPLE_SIZE.try_into().unwrap(),
)?;
let buf = read_at(&mut file, (size / SAMPLE_COUNT) * i, SAMPLE_SIZE).await?;
context.update(&buf);
}
// sample end of file
let buf = read_at(&file, size - SAMPLE_SIZE, SAMPLE_SIZE.try_into().unwrap())?;
let buf = read_at(&mut file, size - SAMPLE_SIZE, SAMPLE_SIZE).await?;
context.update(&buf);
}

View file

@ -13,8 +13,9 @@ use log::info;
use prisma_client_rust::{prisma_models::PrismaValue, raw, raw::Raw, Direction};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::path::Path;
use std::{fs, io};
use std::error::Error;
use std::path::{Path, PathBuf};
use tokio::{fs, io};
// FileIdentifierJob takes file_paths without a file_id and uniquely identifies them
// first: generating the cas_id and extracting metadata
@ -22,7 +23,7 @@ use std::{fs, io};
#[derive(Debug)]
pub struct FileIdentifierJob {
pub location_id: i32,
pub path: String,
pub path: PathBuf,
}
// we break this job into chunks of 100 to improve performance
@ -33,11 +34,12 @@ impl Job for FileIdentifierJob {
fn name(&self) -> &'static str {
"file_identifier"
}
async fn run(&self, ctx: WorkerContext) -> Result<(), Box<dyn std::error::Error>> {
async fn run(&self, ctx: WorkerContext) -> Result<(), Box<dyn Error>> {
info!("Identifying orphan file paths...");
let location = get_location(&ctx.core_ctx, self.location_id).await?;
let location_path = location.path.unwrap_or("".to_string());
let location_path = location.path.unwrap_or_else(|| "".to_string());
let total_count = count_orphan_file_paths(&ctx.core_ctx, location.id.into()).await?;
info!("Found {} orphan file paths", total_count);
@ -63,7 +65,7 @@ impl Job for FileIdentifierJob {
let file_paths = match block_on(get_orphan_file_paths(&ctx.core_ctx, cursor)) {
Ok(file_paths) => file_paths,
Err(e) => {
info!("Error getting orphan file paths: {}", e);
info!("Error getting orphan file paths: {:#?}", e);
continue;
}
};
@ -77,7 +79,7 @@ impl Job for FileIdentifierJob {
// analyze each file_path
for file_path in file_paths.iter() {
// get the cas_id and extract metadata
match prepare_file(&location_path, file_path) {
match block_on(prepare_file(&location_path, file_path)) {
Ok(file) => {
let cas_id = file.cas_id.clone();
// create entry into chunks for created file data
@ -85,7 +87,7 @@ impl Job for FileIdentifierJob {
cas_lookup.insert(cas_id, file_path.id);
}
Err(e) => {
info!("Error processing file: {}", e);
info!("Error processing file: {:#?}", e);
continue;
}
};
@ -106,8 +108,8 @@ impl Job for FileIdentifierJob {
let file_path_id = cas_lookup.get(&file.cas_id).unwrap();
block_on(
db.file_path()
.find_unique(file_path::id::equals(file_path_id.clone()))
.update(vec![file_path::file_id::set(Some(file.id.clone()))])
.find_unique(file_path::id::equals(*file_path_id))
.update(vec![file_path::file_id::set(Some(file.id))])
.exec(),
)
.unwrap();
@ -125,8 +127,8 @@ impl Job for FileIdentifierJob {
for file in new_files.iter() {
values.extend([
PrismaValue::String(file.cas_id.clone()),
PrismaValue::Int(file.size_in_bytes.clone()),
PrismaValue::DateTime(file.date_created.clone()),
PrismaValue::Int(file.size_in_bytes),
PrismaValue::DateTime(file.date_created),
]);
}
@ -140,7 +142,7 @@ impl Job for FileIdentifierJob {
values,
)))
.unwrap_or_else(|e| {
info!("Error inserting files: {}", e);
info!("Error inserting files: {:#?}", e);
Vec::new()
});
@ -151,8 +153,8 @@ impl Job for FileIdentifierJob {
let file_path_id = cas_lookup.get(&file.cas_id).unwrap();
block_on(
db.file_path()
.find_unique(file_path::id::equals(file_path_id.clone()))
.update(vec![file_path::file_id::set(Some(file.id.clone()))])
.find_unique(file_path::id::equals(*file_path_id))
.update(vec![file_path::file_id::set(Some(file.id))])
.exec(),
)
.unwrap();
@ -241,13 +243,15 @@ pub struct FileCreated {
pub cas_id: String,
}
pub fn prepare_file(
location_path: &str,
pub async fn prepare_file(
location_path: impl AsRef<Path>,
file_path: &file_path::Data,
) -> Result<CreateFile, io::Error> {
let path = Path::new(&location_path).join(Path::new(file_path.materialized_path.as_str()));
let path = location_path
.as_ref()
.join(file_path.materialized_path.as_str());
let metadata = fs::metadata(&path)?;
let metadata = fs::metadata(&path).await?;
// let date_created: DateTime<Utc> = metadata.created().unwrap().into();
@ -255,7 +259,7 @@ pub fn prepare_file(
let cas_id = {
if !file_path.is_dir {
let mut ret = generate_cas_id(path.clone(), size.clone()).unwrap();
let mut ret = generate_cas_id(path, size).await?;
ret.truncate(16);
ret
} else {

View file

@ -6,33 +6,37 @@ use crate::{
sys::get_location,
CoreContext,
};
use log::info;
use std::path::Path;
pub async fn open_dir(
ctx: &CoreContext,
location_id: &i32,
path: &str,
location_id: i32,
path: impl AsRef<Path>,
) -> Result<DirectoryWithContents, FileError> {
let db = &ctx.database;
let config = get_nodestate();
// get location
let location = get_location(ctx, location_id.clone()).await?;
let location = get_location(ctx, location_id).await?;
let directory = db
let path_str = path.as_ref().to_string_lossy().to_string();
let directory = ctx
.database
.file_path()
.find_first(vec![
file_path::location_id::equals(Some(location.id)),
file_path::materialized_path::equals(path.into()),
file_path::materialized_path::equals(path_str),
file_path::is_dir::equals(true),
])
.exec()
.await?
.ok_or(FileError::DirectoryNotFound(path.to_string()))?;
.ok_or_else(|| FileError::DirectoryNotFound(path.as_ref().to_path_buf()))?;
println!("DIRECTORY: {:?}", directory);
info!("DIRECTORY: {:?}", directory);
let mut file_paths: Vec<FilePath> = db
let mut file_paths: Vec<FilePath> = ctx
.database
.file_path()
.find_many(vec![
file_path::location_id::equals(Some(location.id)),
@ -45,15 +49,17 @@ pub async fn open_dir(
.map(Into::into)
.collect();
for file_path in &mut file_paths {
if let Some(file) = &mut file_path.file {
let thumb_path = Path::new(&config.data_path)
.join(THUMBNAIL_CACHE_DIR_NAME)
.join(format!("{}", location.id))
.join(file.cas_id.clone())
.with_extension("webp");
if let Some(ref data_path) = config.data_path {
for file_path in &mut file_paths {
if let Some(file) = &mut file_path.file {
let thumb_path = data_path
.join(THUMBNAIL_CACHE_DIR_NAME)
.join(location.id.to_string())
.join(file.cas_id.clone())
.with_extension("webp");
file.has_thumbnail = thumb_path.exists();
file.has_thumbnail = thumb_path.exists();
}
}
}

View file

@ -1,15 +1,18 @@
use crate::job::{Job, JobReportUpdate, WorkerContext};
use std::error::Error;
use std::path::PathBuf;
use self::scan::ScanProgress;
mod scan;
// Re-exporting
pub use scan::*;
pub use scan::scan_path;
use scan::scan_path;
#[derive(Debug)]
pub struct IndexerJob {
pub path: String,
pub path: PathBuf,
}
#[async_trait::async_trait]
@ -17,9 +20,8 @@ impl Job for IndexerJob {
fn name(&self) -> &'static str {
"indexer"
}
async fn run(&self, ctx: WorkerContext) -> Result<(), Box<dyn std::error::Error>> {
let core_ctx = ctx.core_ctx.clone();
scan_path(&core_ctx, self.path.as_str(), move |p| {
async fn run(&self, ctx: WorkerContext) -> Result<(), Box<dyn Error>> {
scan_path(&ctx.core_ctx.clone(), &self.path, move |p| {
ctx.progress(
p.iter()
.map(|p| match p.clone() {

View file

@ -1,13 +1,22 @@
use crate::sys::{create_location, LocationResource};
use crate::CoreContext;
use chrono::{DateTime, FixedOffset, Utc};
use crate::{
sys::{create_location, LocationResource},
CoreContext,
};
use chrono::{DateTime, Utc};
use log::{error, info};
use prisma_client_rust::prisma_models::PrismaValue;
use prisma_client_rust::raw;
use prisma_client_rust::raw::Raw;
use serde::{Deserialize, Serialize};
use std::ffi::OsStr;
use std::{collections::HashMap, fs, path::Path, path::PathBuf, time::Instant};
use std::fmt::Debug;
use std::{
collections::HashMap,
path::{Path, PathBuf},
time::Instant,
};
use tokio::fs;
use walkdir::{DirEntry, WalkDir};
#[derive(Clone)]
@ -22,13 +31,10 @@ static BATCH_SIZE: usize = 100;
// creates a vector of valid path buffers from a directory
pub async fn scan_path(
ctx: &CoreContext,
path: &str,
path: impl AsRef<Path> + Debug,
on_progress: impl Fn(Vec<ScanProgress>) + Send + Sync + 'static,
) -> Result<(), Box<dyn std::error::Error>> {
let db = &ctx.database;
let path = path.to_string();
let location = create_location(&ctx, &path).await?;
let location = create_location(ctx, &path).await?;
// query db to highers id, so we can increment it for the new files indexed
#[derive(Deserialize, Serialize, Debug)]
@ -36,20 +42,22 @@ pub async fn scan_path(
id: Option<i32>,
}
// grab the next id so we can increment in memory for batch inserting
let first_file_id = match db
let first_file_id = match ctx
.database
._query_raw::<QueryRes>(raw!("SELECT MAX(id) id FROM file_paths"))
.await
{
Ok(rows) => rows[0].id.unwrap_or(0),
Err(e) => panic!("Error querying for next file id: {}", e),
Err(e) => panic!("Error querying for next file id: {:#?}", e),
};
//check is path is a directory
if !PathBuf::from(&path).is_dir() {
if !path.as_ref().is_dir() {
// return Err(anyhow::anyhow!("{} is not a directory", &path));
panic!("{} is not a directory", &path);
panic!("{:#?} is not a directory", path);
}
let dir_path = path.clone();
let path_buf = path.as_ref().to_path_buf();
// spawn a dedicated thread to scan the directory for performance
let (paths, scan_start, on_progress) = tokio::task::spawn_blocking(move || {
@ -66,10 +74,9 @@ pub async fn scan_path(
next_file_id
};
// walk through directory recursively
for entry in WalkDir::new(&dir_path).into_iter().filter_entry(|dir| {
let approved =
!is_hidden(dir) && !is_app_bundle(dir) && !is_node_modules(dir) && !is_library(dir);
approved
for entry in WalkDir::new(path_buf).into_iter().filter_entry(|dir| {
// check if entry is approved
!is_hidden(dir) && !is_app_bundle(dir) && !is_node_modules(dir) && !is_library(dir)
}) {
// extract directory entry or log and continue if failed
let entry = match entry {
@ -85,7 +92,7 @@ pub async fn scan_path(
let parent_path = path
.parent()
.unwrap_or(Path::new(""))
.unwrap_or_else(|| Path::new(""))
.to_str()
.unwrap_or("");
let parent_dir_id = dirs.get(&*parent_path);
@ -99,7 +106,7 @@ pub async fn scan_path(
};
on_progress(vec![
ScanProgress::Message(format!("{}", path_str)),
ScanProgress::Message(format!("Scanning {}", path_str)),
ScanProgress::ChunkCount(paths.len() / BATCH_SIZE),
]);
@ -121,8 +128,7 @@ pub async fn scan_path(
}
(paths, scan_start, on_progress)
})
.await
.unwrap();
.await?;
let db_write_start = Instant::now();
let scan_read_time = scan_start.elapsed();
@ -142,7 +148,7 @@ pub async fn scan_path(
for (file_path, file_id, parent_dir_id, is_dir) in chunk {
files.extend(
match prepare_values(&file_path, *file_id, &location, parent_dir_id, *is_dir) {
match prepare_values(file_path, *file_id, &location, parent_dir_id, *is_dir).await {
Ok(values) => values.to_vec(),
Err(e) => {
error!("Error creating file model from path {:?}: {}", file_path, e);
@ -162,7 +168,7 @@ pub async fn scan_path(
files
);
let count = db._execute_raw(raw).await;
let count = ctx.database._execute_raw(raw).await;
info!("Inserted {:?} records", count);
}
@ -177,14 +183,14 @@ pub async fn scan_path(
}
// reads a file at a path and creates an ActiveModel with metadata
fn prepare_values(
async fn prepare_values(
file_path: &PathBuf,
id: i32,
location: &LocationResource,
parent_id: &Option<i32>,
is_dir: bool,
) -> Result<[PrismaValue; 8], std::io::Error> {
let metadata = fs::metadata(&file_path)?;
let metadata = fs::metadata(&file_path).await?;
let location_path = Path::new(location.path.as_ref().unwrap().as_str());
// let size = metadata.len();
let name;
@ -214,7 +220,6 @@ fn prepare_values(
PrismaValue::String(name),
PrismaValue::String(extension.to_lowercase()),
parent_id
.clone()
.map(|id| PrismaValue::Int(id as i64))
.unwrap_or(PrismaValue::Null),
PrismaValue::DateTime(date_created.into()),
@ -236,7 +241,7 @@ fn is_hidden(entry: &DirEntry) -> bool {
entry
.file_name()
.to_str()
.map(|s| s.starts_with("."))
.map(|s| s.starts_with('.'))
.unwrap_or(false)
}
@ -265,7 +270,7 @@ fn is_app_bundle(entry: &DirEntry) -> bool {
.map(|s| s.contains(".app") | s.contains(".bundle"))
.unwrap_or(false);
let is_app_bundle = is_dir && contains_dot;
// let is_app_bundle = is_dir && contains_dot;
// if is_app_bundle {
// let path_buff = entry.path();
// let path = path_buff.to_str().unwrap();
@ -273,5 +278,5 @@ fn is_app_bundle(entry: &DirEntry) -> bool {
// self::path(&path, );
// }
is_app_bundle
is_dir && contains_dot
}

View file

@ -1,3 +1,4 @@
use std::path::PathBuf;
use int_enum::IntEnum;
use serde::{Deserialize, Serialize};
use thiserror::Error;
@ -77,46 +78,52 @@ pub enum FileKind {
Alias = 8,
}
impl Into<File> for file::Data {
fn into(self) -> File {
File {
id: self.id,
cas_id: self.cas_id,
integrity_checksum: self.integrity_checksum,
kind: IntEnum::from_int(self.kind).unwrap(),
size_in_bytes: self.size_in_bytes.to_string(),
// encryption: EncryptionAlgorithm::from_int(self.encryption).unwrap(),
ipfs_id: self.ipfs_id,
hidden: self.hidden,
favorite: self.favorite,
important: self.important,
has_thumbnail: self.has_thumbnail,
has_thumbstrip: self.has_thumbstrip,
has_video_preview: self.has_video_preview,
note: self.note,
date_created: self.date_created.into(),
date_modified: self.date_modified.into(),
date_indexed: self.date_indexed.into(),
impl From<file::Data> for File {
fn from(data: file::Data) -> Self {
Self {
id: data.id,
cas_id: data.cas_id,
integrity_checksum: data.integrity_checksum,
kind: IntEnum::from_int(data.kind).unwrap(),
size_in_bytes: data.size_in_bytes.to_string(),
// encryption: EncryptionAlgorithm::from_int(data.encryption).unwrap(),
ipfs_id: data.ipfs_id,
hidden: data.hidden,
favorite: data.favorite,
important: data.important,
has_thumbnail: data.has_thumbnail,
has_thumbstrip: data.has_thumbstrip,
has_video_preview: data.has_video_preview,
note: data.note,
date_created: data.date_created.into(),
date_modified: data.date_modified.into(),
date_indexed: data.date_indexed.into(),
paths: vec![],
}
}
}
impl Into<FilePath> for file_path::Data {
fn into(mut self) -> FilePath {
FilePath {
id: self.id,
is_dir: self.is_dir,
materialized_path: self.materialized_path,
file_id: self.file_id,
parent_id: self.parent_id,
location_id: self.location_id.unwrap_or(0),
date_indexed: self.date_indexed.into(),
name: self.name,
extension: self.extension,
date_created: self.date_created.into(),
date_modified: self.date_modified.into(),
file: self.file.take().unwrap_or(None).map(|file| (*file).into()),
impl From<Box<file::Data>> for File {
fn from(data: Box<file::Data>) -> Self {
Self::from(*data)
}
}
impl From<file_path::Data> for FilePath {
fn from(data: file_path::Data) -> Self {
Self {
id: data.id,
is_dir: data.is_dir,
materialized_path: data.materialized_path,
file_id: data.file_id,
parent_id: data.parent_id,
location_id: data.location_id.unwrap_or(0),
date_indexed: data.date_indexed.into(),
name: data.name,
extension: data.extension,
date_created: data.date_created.into(),
date_modified: data.date_modified.into(),
file: data.file.unwrap_or(None).map(Into::into),
}
}
}
@ -131,9 +138,9 @@ pub struct DirectoryWithContents {
#[derive(Error, Debug)]
pub enum FileError {
#[error("Directory not found (path: {0:?})")]
DirectoryNotFound(String),
DirectoryNotFound(PathBuf),
#[error("File not found (path: {0:?})")]
FileNotFound(String),
FileNotFound(PathBuf),
#[error("Database error")]
DatabaseError(#[from] prisma::QueryError),
#[error("System error")]
@ -145,7 +152,7 @@ pub async fn set_note(
id: i32,
note: Option<String>,
) -> Result<CoreResponse, CoreError> {
let response = ctx
let _response = ctx
.database
.file()
.find_unique(file::id::equals(id))

View file

@ -23,8 +23,8 @@ const MAX_WORKERS: usize = 1;
#[async_trait::async_trait]
pub trait Job: Send + Sync + Debug {
async fn run(&self, ctx: WorkerContext) -> Result<(), Box<dyn std::error::Error>>;
fn name(&self) -> &'static str;
async fn run(&self, ctx: WorkerContext) -> Result<(), Box<dyn std::error::Error>>;
}
// jobs struct is maintained by the core
@ -52,7 +52,7 @@ impl Jobs {
let wrapped_worker = Arc::new(Mutex::new(worker));
Worker::spawn(wrapped_worker.clone(), ctx).await;
Worker::spawn(Arc::clone(&wrapped_worker), ctx).await;
self.running_workers.insert(id, wrapped_worker);
} else {
@ -84,9 +84,8 @@ impl Jobs {
}
pub async fn queue_pending_job(ctx: &CoreContext) -> Result<(), JobError> {
let db = &ctx.database;
let next_job = db
let _next_job = ctx
.database
.job()
.find_first(vec![job::status::equals(JobStatus::Queued.int_value())])
.exec()
@ -96,14 +95,14 @@ impl Jobs {
}
pub async fn get_history(ctx: &CoreContext) -> Result<Vec<JobReport>, JobError> {
let db = &ctx.database;
let jobs = db
let jobs = ctx
.database
.job()
.find_many(vec![job::status::not(JobStatus::Running.int_value())])
.exec()
.await?;
Ok(jobs.into_iter().map(|j| j.into()).collect())
Ok(jobs.into_iter().map(Into::into).collect())
}
}
@ -138,20 +137,20 @@ pub struct JobReport {
}
// convert database struct into a resource struct
impl Into<JobReport> for job::Data {
fn into(self) -> JobReport {
impl From<job::Data> for JobReport {
fn from(data: job::Data) -> JobReport {
JobReport {
id: self.id,
name: self.name,
// client_id: self.client_id,
status: JobStatus::from_int(self.status).unwrap(),
task_count: self.task_count,
completed_task_count: self.completed_task_count,
date_created: self.date_created.into(),
date_modified: self.date_modified.into(),
data: self.data,
id: data.id,
name: data.name,
// client_id: data.client_id,
status: JobStatus::from_int(data.status).unwrap(),
task_count: data.task_count,
completed_task_count: data.completed_task_count,
date_created: data.date_created.into(),
date_modified: data.date_modified.into(),
data: data.data,
message: String::new(),
seconds_elapsed: self.seconds_elapsed,
seconds_elapsed: data.seconds_elapsed,
}
}
}
@ -177,7 +176,7 @@ impl JobReport {
let mut params = Vec::new();
if let Some(_) = &self.data {
if self.data.is_some() {
params.push(job::data::set(self.data.clone()))
}

View file

@ -3,6 +3,7 @@ use super::{
Job,
};
use crate::{ClientQuery, CoreContext, CoreEvent, InternalEvent};
use log::error;
use std::{sync::Arc, time::Duration};
use tokio::{
sync::{
@ -11,6 +12,8 @@ use tokio::{
},
time::{sleep, Instant},
};
use uuid::Uuid;
// used to update the worker state from inside the worker thread
pub enum WorkerEvent {
Progressed(Vec<JobReportUpdate>),
@ -53,7 +56,7 @@ pub struct Worker {
impl Worker {
pub fn new(job: Box<dyn Job>) -> Self {
let (worker_sender, worker_receiver) = unbounded_channel();
let uuid = uuid::Uuid::new_v4().to_string();
let uuid = Uuid::new_v4().to_string();
let name = job.name();
Self {
@ -80,7 +83,7 @@ impl Worker {
worker_mut.job_report.status = JobStatus::Running;
worker_mut.job_report.create(&ctx).await.unwrap_or(());
worker_mut.job_report.create(ctx).await.unwrap_or(());
// spawn task to handle receiving events from the worker
tokio::spawn(Worker::track_progress(
@ -116,7 +119,7 @@ impl Worker {
let result = job.run(worker_ctx.clone()).await;
if let Err(e) = result {
println!("job failed {:?}", e);
error!("job failed {:?}", e);
worker_ctx.sender.send(WorkerEvent::Failed).unwrap_or(());
} else {
// handle completion

View file

@ -3,13 +3,18 @@ use crate::{
prisma::file as prisma_file, prisma::location, util::db::create_connection,
};
use job::{Job, JobReport, Jobs};
use log::{error, info};
use prisma::PrismaClient;
use serde::{Deserialize, Serialize};
use std::{fs, sync::Arc};
use std::path::PathBuf;
use std::sync::Arc;
use thiserror::Error;
use tokio::sync::{
mpsc::{self, unbounded_channel, UnboundedReceiver, UnboundedSender},
oneshot,
use tokio::{
fs,
sync::{
mpsc::{self, unbounded_channel, UnboundedReceiver, UnboundedSender},
oneshot,
},
};
use ts_rs::TS;
@ -83,19 +88,19 @@ impl CoreContext {
self.internal_sender
.send(InternalEvent::JobIngest(job))
.unwrap_or_else(|e| {
println!("Failed to spawn job. {:?}", e);
error!("Failed to spawn job. {:?}", e);
});
}
pub fn queue_job(&self, job: Box<dyn Job>) {
self.internal_sender
.send(InternalEvent::JobQueue(job))
.unwrap_or_else(|e| {
println!("Failed to queue job. {:?}", e);
error!("Failed to queue job. {:?}", e);
});
}
pub async fn emit(&self, event: CoreEvent) {
self.event_sender.send(event).await.unwrap_or_else(|e| {
println!("Failed to emit event. {:?}", e);
error!("Failed to emit event. {:?}", e);
});
}
}
@ -127,23 +132,23 @@ pub struct Node {
impl Node {
// create new instance of node, run startup tasks
pub async fn new(mut data_dir: std::path::PathBuf) -> (Node, mpsc::Receiver<CoreEvent>) {
pub async fn new(mut data_dir: PathBuf) -> (Node, mpsc::Receiver<CoreEvent>) {
let (event_sender, event_recv) = mpsc::channel(100);
data_dir = data_dir.join("spacedrive");
let data_dir = data_dir.to_str().unwrap();
data_dir.push("spacedrive");
// create data directory if it doesn't exist
fs::create_dir_all(&data_dir).unwrap();
fs::create_dir_all(&data_dir).await.unwrap();
// prepare basic client state
let mut state = NodeState::new(data_dir, "diamond-mastering-space-dragon").unwrap();
let mut state = NodeState::new(data_dir.clone(), "diamond-mastering-space-dragon").unwrap();
// load from disk
state
.read_disk()
.unwrap_or(println!("Error: No node state found, creating new one..."));
.await
.unwrap_or_else(|_| error!("Error: No node state found, creating new one..."));
state.save();
state.save().await;
println!("Node State: {:?}", state);
info!("Node State: {:?}", state);
// connect to default library
let database = Arc::new(
@ -213,32 +218,32 @@ impl Node {
}
// load library database + initialize client with db
pub async fn initializer(&self) {
println!("Initializing...");
info!("Initializing...");
let ctx = self.get_context();
if self.state.libraries.len() == 0 {
if self.state.libraries.is_empty() {
match library::create(&ctx, None).await {
Ok(library) => println!("Created new library: {:?}", library),
Err(e) => println!("Error creating library: {:?}", e),
Ok(library) => info!("Created new library: {:?}", library),
Err(e) => error!("Error creating library: {:?}", e),
}
} else {
for library in self.state.libraries.iter() {
// init database for library
match library::load(&ctx, &library.library_path, &library.library_uuid).await {
Ok(library) => println!("Loaded library: {:?}", library),
Err(e) => println!("Error loading library: {:?}", e),
Ok(library) => info!("Loaded library: {:?}", library),
Err(e) => error!("Error loading library: {:?}", e),
}
}
}
// init node data within library
match node::LibraryNode::create(&self).await {
Ok(_) => println!("Spacedrive online"),
Err(e) => println!("Error initializing node: {:?}", e),
match node::LibraryNode::create(self).await {
Ok(_) => info!("Spacedrive online"),
Err(e) => error!("Error initializing node: {:?}", e),
};
}
async fn exec_command(&mut self, cmd: ClientCommand) -> Result<CoreResponse, CoreError> {
println!("Core command: {:?}", cmd);
info!("Core command: {:?}", cmd);
let ctx = self.get_context();
Ok(match cmd {
// CRUD for locations
@ -299,7 +304,7 @@ impl Node {
CoreResponse::Success(())
}
// ClientCommand::PurgeDatabase => {
// println!("Purging database...");
// info!("Purging database...");
// fs::remove_file(Path::new(&self.state.data_path).join("library.db")).unwrap();
// CoreResponse::Success(())
// }
@ -334,7 +339,7 @@ impl Node {
location_id,
limit: _,
} => CoreResponse::LibGetExplorerDir(
file::explorer::open_dir(&ctx, &location_id, &path).await?,
file::explorer::open_dir(&ctx, location_id, &path).await?,
),
ClientQuery::LibGetTags => todo!(),
ClientQuery::JobGetRunning => {
@ -369,15 +374,15 @@ pub enum ClientCommand {
TagAssign { file_id: i32, tag_id: i32 },
TagDelete { id: i32 },
// Locations
LocCreate { path: String },
LocCreate { path: PathBuf },
LocUpdate { id: i32, name: Option<String> },
LocDelete { id: i32 },
LocRescan { id: i32 },
// System
SysVolumeUnmount { id: i32 },
GenerateThumbsForLocation { id: i32, path: String },
GenerateThumbsForLocation { id: i32, path: PathBuf },
// PurgeDatabase,
IdentifyUniqueFiles { id: i32, path: String },
IdentifyUniqueFiles { id: i32, path: PathBuf },
}
// represents an event this library can emit

View file

@ -1,16 +1,20 @@
use log::info;
use std::fmt::Debug;
use std::path::{Path, PathBuf};
use uuid::Uuid;
use crate::node::{get_nodestate, LibraryState};
use crate::prisma::library;
use crate::util::db::{run_migrations, DatabaseError};
use crate::CoreContext;
use crate::{
node::{get_nodestate, LibraryState},
prisma::library,
util::db::{run_migrations, DatabaseError},
CoreContext,
};
pub static LIBRARY_DB_NAME: &str = "library.db";
pub static DEFAULT_NAME: &str = "My Library";
pub fn get_library_path(data_path: &str) -> String {
let path = data_path.to_owned();
format!("{}/{}", path, LIBRARY_DB_NAME)
pub fn get_library_path(data_path: impl AsRef<Path>) -> PathBuf {
data_path.as_ref().join(LIBRARY_DB_NAME)
}
// pub async fn get(core: &Node) -> Result<library::Data, LibraryError> {
@ -19,7 +23,7 @@ pub fn get_library_path(data_path: &str) -> String {
// let library_state = config.get_current_library();
// println!("{:?}", library_state);
// info!("{:?}", library_state);
// // get library from db
// let library = match db
@ -42,19 +46,19 @@ pub fn get_library_path(data_path: &str) -> String {
pub async fn load(
ctx: &CoreContext,
library_path: &str,
library_path: impl AsRef<Path> + Debug,
library_id: &str,
) -> Result<(), DatabaseError> {
let mut config = get_nodestate();
println!("Initializing library: {} {}", &library_id, library_path);
info!("Initializing library: {} {:#?}", &library_id, library_path);
if config.current_library_uuid != library_id {
config.current_library_uuid = library_id.to_string();
config.save();
config.save().await;
}
// create connection with library database & run migrations
run_migrations(&ctx).await?;
run_migrations(ctx).await?;
// if doesn't exist, mark as offline
Ok(())
}
@ -64,36 +68,35 @@ pub async fn create(ctx: &CoreContext, name: Option<String>) -> Result<(), ()> {
let uuid = Uuid::new_v4().to_string();
println!("Creating library {:?}, UUID: {:?}", name, uuid);
info!("Creating library {:?}, UUID: {:?}", name, uuid);
let library_state = LibraryState {
library_uuid: uuid.clone(),
library_path: get_library_path(&config.data_path),
library_path: get_library_path(config.data_path.as_ref().unwrap()),
..LibraryState::default()
};
run_migrations(&ctx).await.unwrap();
run_migrations(ctx).await.unwrap();
config.libraries.push(library_state);
config.current_library_uuid = uuid;
config.save();
config.save().await;
let db = &ctx.database;
let library = db
let library = ctx
.database
.library()
.create(
library::pub_id::set(config.current_library_uuid),
library::name::set(name.unwrap_or(DEFAULT_NAME.into())),
library::name::set(name.unwrap_or_else(|| DEFAULT_NAME.into())),
vec![],
)
.exec()
.await
.unwrap();
println!("library created in database: {:?}", library);
info!("library created in database: {:?}", library);
Ok(())
}

View file

@ -5,13 +5,14 @@ use crate::{
CoreContext,
};
use fs_extra::dir::get_size;
use log::info;
use serde::{Deserialize, Serialize};
use std::fs;
use tokio::fs;
use ts_rs::TS;
use super::LibraryError;
#[derive(Debug, Serialize, Deserialize, TS, Clone)]
#[derive(Debug, Serialize, Deserialize, TS, Clone, Default)]
#[ts(export)]
pub struct Statistics {
pub total_file_count: i32,
@ -23,29 +24,15 @@ pub struct Statistics {
pub library_db_size: String,
}
impl Into<Statistics> for Data {
fn into(self) -> Statistics {
Statistics {
total_file_count: self.total_file_count,
total_bytes_used: self.total_bytes_used,
total_bytes_capacity: self.total_bytes_capacity,
total_bytes_free: self.total_bytes_free,
total_unique_bytes: self.total_unique_bytes,
preview_media_bytes: self.preview_media_bytes,
library_db_size: String::new(),
}
}
}
impl Default for Statistics {
fn default() -> Self {
impl From<Data> for Statistics {
fn from(data: Data) -> Self {
Self {
total_file_count: 0,
total_bytes_used: String::new(),
total_bytes_capacity: String::new(),
total_bytes_free: String::new(),
total_unique_bytes: String::new(),
preview_media_bytes: String::new(),
total_file_count: data.total_file_count,
total_bytes_used: data.total_bytes_used,
total_bytes_capacity: data.total_bytes_capacity,
total_bytes_free: data.total_bytes_free,
total_unique_bytes: data.total_unique_bytes,
preview_media_bytes: data.preview_media_bytes,
library_db_size: String::new(),
}
}
@ -54,33 +41,29 @@ impl Default for Statistics {
impl Statistics {
pub async fn retrieve(ctx: &CoreContext) -> Result<Statistics, LibraryError> {
let config = get_nodestate();
let db = &ctx.database;
let library_data = config.get_current_library();
let library_statistics_db = match db
let library_statistics_db = ctx
.database
.library_statistics()
.find_unique(id::equals(library_data.library_id))
.exec()
.await?
{
Some(library_statistics_db) => library_statistics_db.into(),
// create the default values if database has no entry
None => Statistics::default(),
};
Ok(library_statistics_db.into())
.map_or_else(Default::default, Into::into);
Ok(library_statistics_db)
}
pub async fn calculate(ctx: &CoreContext) -> Result<Statistics, LibraryError> {
let config = get_nodestate();
let db = &ctx.database;
// get library from client state
let library_data = config.get_current_library();
println!(
info!(
"Calculating library statistics {:?}",
library_data.library_uuid
);
// get library from db
let library = db
let library = ctx
.database
.library()
.find_unique(library::pub_id::equals(
library_data.library_uuid.to_string(),
@ -92,7 +75,8 @@ impl Statistics {
return Err(LibraryError::LibraryNotFound);
}
let library_statistics = db
let library_statistics = ctx
.database
.library_statistics()
.find_unique(id::equals(library_data.library_id))
.exec()
@ -100,9 +84,9 @@ impl Statistics {
// TODO: get from database, not sys
let volumes = Volume::get_volumes();
Volume::save(&ctx).await?;
Volume::save(ctx).await?;
// println!("{:?}", volumes);
// info!("{:?}", volumes);
let mut available_capacity: u64 = 0;
let mut total_capacity: u64 = 0;
@ -113,14 +97,14 @@ impl Statistics {
}
}
let library_db_size = match fs::metadata(library_data.library_path.as_str()) {
let library_db_size = match fs::metadata(library_data.library_path).await {
Ok(metadata) => metadata.len(),
Err(_) => 0,
};
println!("{:?}", library_statistics);
info!("{:?}", library_statistics);
let thumbnail_folder_size = get_size(&format!("{}/{}", config.data_path, "thumbnails"));
let thumbnail_folder_size = get_size(config.data_path.unwrap().join("thumbnails"));
let statistics = Statistics {
library_db_size: library_db_size.to_string(),
@ -135,7 +119,8 @@ impl Statistics {
None => library_data.library_id,
};
db.library_statistics()
ctx.database
.library_statistics()
.upsert(
library_id::equals(library_local_id),
(
@ -143,7 +128,7 @@ impl Statistics {
vec![library_db_size::set(statistics.library_db_size.clone())],
),
vec![
total_file_count::set(statistics.total_file_count.clone()),
total_file_count::set(statistics.total_file_count),
total_bytes_used::set(statistics.total_bytes_used.clone()),
total_bytes_capacity::set(statistics.total_bytes_capacity.clone()),
total_bytes_free::set(statistics.total_bytes_free.clone()),

View file

@ -4,6 +4,7 @@ use crate::{
};
use chrono::{DateTime, Utc};
use int_enum::IntEnum;
use log::info;
use serde::{Deserialize, Serialize};
use std::env;
use thiserror::Error;
@ -22,17 +23,24 @@ pub struct LibraryNode {
pub last_seen: DateTime<Utc>,
}
impl Into<LibraryNode> for node::Data {
fn into(self) -> LibraryNode {
LibraryNode {
uuid: self.pub_id,
name: self.name,
platform: IntEnum::from_int(self.platform).unwrap(),
last_seen: self.last_seen.into(),
impl From<node::Data> for LibraryNode {
fn from(data: node::Data) -> Self {
Self {
uuid: data.pub_id,
name: data.name,
platform: IntEnum::from_int(data.platform).unwrap(),
last_seen: data.last_seen.into(),
}
}
}
impl From<Box<node::Data>> for LibraryNode {
fn from(data: Box<node::Data>) -> Self {
Self::from(*data)
}
}
#[allow(clippy::upper_case_acronyms)]
#[repr(i32)]
#[derive(Debug, Clone, Copy, Serialize, Deserialize, TS, Eq, PartialEq, IntEnum)]
#[ts(export)]
@ -47,11 +55,9 @@ pub enum Platform {
impl LibraryNode {
pub async fn create(node: &Node) -> Result<(), NodeError> {
println!("Creating node...");
info!("Creating node...");
let mut config = state::get_nodestate();
let db = &node.database;
let hostname = match hostname::get() {
Ok(hostname) => hostname.to_str().unwrap_or_default().to_owned(),
Err(_) => "unknown".to_owned(),
@ -64,30 +70,31 @@ impl LibraryNode {
_ => Platform::Unknown,
};
let _node = match db
let node = if let Some(node) = node
.database
.node()
.find_unique(node::pub_id::equals(config.node_pub_id.clone()))
.exec()
.await?
{
Some(node) => node,
None => {
db.node()
.create(
node::pub_id::set(config.node_pub_id.clone()),
node::name::set(hostname.clone()),
vec![node::platform::set(platform as i32)],
)
.exec()
.await?
}
node
} else {
node.database
.node()
.create(
node::pub_id::set(config.node_pub_id.clone()),
node::name::set(hostname.clone()),
vec![node::platform::set(platform as i32)],
)
.exec()
.await?
};
config.node_name = hostname;
config.node_id = _node.id;
config.save();
config.node_id = node.id;
config.save().await;
println!("node: {:?}", &_node);
info!("node: {:?}", node);
Ok(())
}

View file

@ -1,8 +1,12 @@
use lazy_static::lazy_static;
use serde::{Deserialize, Serialize};
use std::fs;
use std::io::{BufReader, Write};
use std::path::PathBuf;
use std::sync::RwLock;
use tokio::io::AsyncReadExt;
use tokio::{
fs,
io::{AsyncWriteExt, BufReader},
};
use ts_rs::TS;
use uuid::Uuid;
@ -13,7 +17,7 @@ pub struct NodeState {
pub node_id: i32,
pub node_name: String,
// config path is stored as struct can exist only in memory during startup and be written to disk later without supplying path
pub data_path: String,
pub data_path: Option<PathBuf>,
// the port this node uses to listen for incoming connections
pub tcp_port: u32,
// all the libraries loaded by this node
@ -29,7 +33,7 @@ pub static NODE_STATE_CONFIG_NAME: &str = "node_state.json";
pub struct LibraryState {
pub library_uuid: String,
pub library_id: i32,
pub library_path: String,
pub library_path: PathBuf,
pub offline: bool,
}
@ -39,49 +43,50 @@ lazy_static! {
}
pub fn get_nodestate() -> NodeState {
match CONFIG.read() {
Ok(guard) => guard.clone().unwrap_or(NodeState::default()),
Err(_) => return NodeState::default(),
if let Ok(guard) = CONFIG.read() {
guard.clone().unwrap_or_default()
} else {
NodeState::default()
}
}
impl NodeState {
pub fn new(data_path: &str, node_name: &str) -> Result<Self, ()> {
pub fn new(data_path: PathBuf, node_name: &str) -> Result<Self, ()> {
let uuid = Uuid::new_v4().to_string();
// create struct and assign defaults
let config = Self {
node_pub_id: uuid,
data_path: data_path.to_string(),
data_path: Some(data_path),
node_name: node_name.to_string(),
..Default::default()
};
Ok(config)
}
pub fn save(&self) {
pub async fn save(&self) {
self.write_memory();
// only write to disk if config path is set
if !&self.data_path.is_empty() {
let config_path = format!("{}/{}", &self.data_path, NODE_STATE_CONFIG_NAME);
let mut file = fs::File::create(config_path).unwrap();
if let Some(ref data_path) = self.data_path {
let config_path = data_path.join(NODE_STATE_CONFIG_NAME);
let mut file = fs::File::create(config_path).await.unwrap();
let json = serde_json::to_string(&self).unwrap();
file.write_all(json.as_bytes()).unwrap();
file.write_all(json.as_bytes()).await.unwrap();
}
}
pub fn read_disk(&mut self) -> Result<(), ()> {
let config_path = format!("{}/{}", &self.data_path, NODE_STATE_CONFIG_NAME);
// open the file and parse json
match fs::File::open(config_path) {
Ok(file) => {
let reader = BufReader::new(file);
let data = serde_json::from_reader(reader).unwrap();
pub async fn read_disk(&mut self) -> Result<(), ()> {
if let Some(ref data_path) = self.data_path {
let config_path = data_path.join(NODE_STATE_CONFIG_NAME);
// open the file and parse json
if let Ok(file) = fs::File::open(config_path).await {
let mut buf = vec![];
let bytes = BufReader::new(file).read_to_end(&mut buf).await.unwrap();
let data = serde_json::from_slice(&buf[..bytes]).unwrap();
// assign to self
*self = data;
}
_ => {}
}
Ok(())
}
@ -91,17 +96,14 @@ impl NodeState {
}
pub fn get_current_library(&self) -> LibraryState {
match self
.libraries
self.libraries
.iter()
.find(|lib| lib.library_uuid == self.current_library_uuid)
{
Some(lib) => lib.clone(),
None => LibraryState::default(),
}
.cloned()
.unwrap_or_default()
}
pub fn get_current_library_db_path(&self) -> String {
format!("{}/library.db", &self.get_current_library().library_path)
pub fn get_current_library_db_path(&self) -> PathBuf {
self.get_current_library().library_path.join("library.db")
}
}

View file

@ -1,15 +1,22 @@
use crate::{
encode::ThumbnailJob,
file::{cas::FileIdentifierJob, indexer::IndexerJob},
node::{get_nodestate, LibraryNode},
prisma::{file_path, location},
ClientQuery, CoreContext, CoreEvent,
};
use prisma_client_rust::{raw, PrismaValue};
use log::info;
use serde::{Deserialize, Serialize};
use std::{fs, io, io::Write, path::Path};
use std::fmt::Debug;
use std::path::{Path, PathBuf};
use thiserror::Error;
use tokio::io::AsyncWriteExt;
use tokio::{
fs::{metadata, File},
io,
};
use ts_rs::TS;
use uuid::Uuid;
use super::SysError;
@ -28,25 +35,25 @@ pub struct LocationResource {
pub date_created: chrono::DateTime<chrono::Utc>,
}
impl Into<LocationResource> for location::Data {
fn into(mut self) -> LocationResource {
impl From<location::Data> for LocationResource {
fn from(data: location::Data) -> Self {
LocationResource {
id: self.id,
name: self.name,
path: self.local_path,
total_capacity: self.total_capacity,
available_capacity: self.available_capacity,
is_removable: self.is_removable,
node: self.node.take().unwrap_or(None).map(|node| (*node).into()),
is_online: self.is_online,
date_created: self.date_created.into(),
id: data.id,
name: data.name,
path: data.local_path,
total_capacity: data.total_capacity,
available_capacity: data.available_capacity,
is_removable: data.is_removable,
node: data.node.unwrap_or(None).map(Into::into),
is_online: data.is_online,
date_created: data.date_created.into(),
}
}
}
#[derive(Serialize, Deserialize, Default)]
pub struct DotSpacedrive {
pub location_uuid: String,
pub location_uuid: Uuid,
pub library_uuid: String,
}
@ -69,24 +76,25 @@ pub async fn get_location(
ctx: &CoreContext,
location_id: i32,
) -> Result<LocationResource, SysError> {
let db = &ctx.database;
// get location by location_id from db and include location_paths
let location = match db
ctx.database
.location()
.find_unique(location::id::equals(location_id))
.exec()
.await?
{
Some(location) => location,
None => Err(LocationError::NotFound(location_id.to_string()))?,
};
Ok(location.into())
.map(Into::into)
.ok_or_else(|| LocationError::IdNotFound(location_id).into())
}
pub fn scan_location(ctx: &CoreContext, location_id: i32, path: String) {
ctx.spawn_job(Box::new(IndexerJob { path: path.clone() }));
ctx.queue_job(Box::new(FileIdentifierJob { location_id, path }));
pub fn scan_location(ctx: &CoreContext, location_id: i32, path: impl AsRef<Path>) {
let path_buf = path.as_ref().to_path_buf();
ctx.spawn_job(Box::new(IndexerJob {
path: path_buf.clone(),
}));
ctx.queue_job(Box::new(FileIdentifierJob {
location_id,
path: path_buf,
}));
// TODO: make a way to stop jobs so this can be canceled without rebooting app
// ctx.queue_job(Box::new(ThumbnailJob {
// location_id,
@ -97,19 +105,18 @@ pub fn scan_location(ctx: &CoreContext, location_id: i32, path: String) {
pub async fn new_location_and_scan(
ctx: &CoreContext,
path: &str,
path: impl AsRef<Path> + Debug,
) -> Result<LocationResource, SysError> {
let location = create_location(&ctx, path).await?;
let location = create_location(ctx, &path).await?;
scan_location(&ctx, location.id, path.to_string());
scan_location(ctx, location.id, path);
Ok(location)
}
pub async fn get_locations(ctx: &CoreContext) -> Result<Vec<LocationResource>, SysError> {
let db = &ctx.database;
let locations = db
let locations = ctx
.database
.location()
.find_many(vec![])
.with(location::node::fetch())
@ -117,119 +124,107 @@ pub async fn get_locations(ctx: &CoreContext) -> Result<Vec<LocationResource>, S
.await?;
// turn locations into LocationResource
let locations: Vec<LocationResource> = locations
.into_iter()
.map(|location| location.into())
.collect();
Ok(locations)
Ok(locations.into_iter().map(LocationResource::from).collect())
}
pub async fn create_location(ctx: &CoreContext, path: &str) -> Result<LocationResource, SysError> {
let db = &ctx.database;
let config = get_nodestate();
pub async fn create_location(
ctx: &CoreContext,
path: impl AsRef<Path> + Debug,
) -> Result<LocationResource, SysError> {
let path = path.as_ref();
// check if we have access to this location
if !Path::new(path).exists() {
Err(LocationError::NotFound(path.to_string()))?;
if !path.exists() {
return Err(LocationError::PathNotFound(path.to_owned()).into());
}
// if on windows
if cfg!(target_family = "windows") {
// try and create a dummy file to see if we can write to this location
match fs::File::create(format!("{}/{}", path.clone(), ".spacewrite")) {
Ok(file) => file,
Err(e) => Err(LocationError::DotfileWriteFailure(e, path.to_string()))?,
};
match fs::remove_file(format!("{}/{}", path.clone(), ".spacewrite")) {
Ok(_) => (),
Err(e) => Err(LocationError::DotfileWriteFailure(e, path.to_string()))?,
}
} else {
// unix allows us to test this more directly
match fs::File::open(&path) {
Ok(_) => println!("Path is valid, creating location for '{}'", &path),
Err(e) => Err(LocationError::FileReadError(e))?,
}
if metadata(path)
.await
.map_err(|e| LocationError::DotfileReadFailure(e, path.to_owned()))?
.permissions()
.readonly()
{
return Err(LocationError::ReadonlyDotFileLocationFailure(path.to_owned()).into());
}
let path_string = path.to_string_lossy().to_string();
// check if location already exists
let location = match db
let location_resource = if let Some(location) = ctx
.database
.location()
.find_first(vec![location::local_path::equals(Some(path.to_string()))])
.find_first(vec![location::local_path::equals(Some(
path_string.clone(),
))])
.exec()
.await?
{
Some(location) => location,
None => {
println!(
"Location does not exist, creating new location for '{}'",
&path
);
let uuid = uuid::Uuid::new_v4();
location.into()
} else {
info!(
"Location does not exist, creating new location for '{:#?}'",
path
);
let uuid = Uuid::new_v4();
let p = Path::new(&path);
let config = get_nodestate();
let location = db
.location()
.create(
location::pub_id::set(uuid.to_string()),
vec![
location::name::set(Some(
p.file_name().unwrap().to_string_lossy().to_string(),
)),
location::is_online::set(true),
location::local_path::set(Some(path.to_string())),
location::node_id::set(Some(config.node_id)),
],
)
.exec()
.await?;
let location = ctx
.database
.location()
.create(
location::pub_id::set(uuid.to_string()),
vec![
location::name::set(Some(
path.file_name().unwrap().to_string_lossy().to_string(),
)),
location::is_online::set(true),
location::local_path::set(Some(path_string)),
location::node_id::set(Some(config.node_id)),
],
)
.exec()
.await?;
println!("Created location: {:?}", location);
info!("Created location: {:?}", location);
// write a file called .spacedrive to path containing the location id in JSON format
let mut dotfile = match fs::File::create(format!("{}/{}", path.clone(), DOTFILE_NAME)) {
Ok(file) => file,
Err(e) => Err(LocationError::DotfileWriteFailure(e, path.to_string()))?,
};
// write a file called .spacedrive to path containing the location id in JSON format
let mut dotfile = File::create(path.with_file_name(DOTFILE_NAME))
.await
.map_err(|e| LocationError::DotfileWriteFailure(e, path.to_owned()))?;
let data = DotSpacedrive {
location_uuid: uuid.to_string(),
library_uuid: config.current_library_uuid,
};
let data = DotSpacedrive {
location_uuid: uuid,
library_uuid: config.current_library_uuid,
};
let json = match serde_json::to_string(&data) {
Ok(json) => json,
Err(e) => Err(LocationError::DotfileSerializeFailure(e, path.to_string()))?,
};
let json_bytes = serde_json::to_vec(&data)
.map_err(|e| LocationError::DotfileSerializeFailure(e, path.to_owned()))?;
match dotfile.write_all(json.as_bytes()) {
Ok(_) => (),
Err(e) => Err(LocationError::DotfileWriteFailure(e, path.to_string()))?,
}
dotfile
.write_all(&json_bytes)
.await
.map_err(|e| LocationError::DotfileWriteFailure(e, path.to_owned()))?;
ctx.emit(CoreEvent::InvalidateQuery(ClientQuery::SysGetLocations))
.await;
ctx.emit(CoreEvent::InvalidateQuery(ClientQuery::SysGetLocations))
.await;
location
}
location.into()
};
Ok(location.into())
Ok(location_resource)
}
pub async fn delete_location(ctx: &CoreContext, location_id: i32) -> Result<(), SysError> {
let db = &ctx.database;
db.file_path()
ctx.database
.file_path()
.find_many(vec![file_path::location_id::equals(Some(location_id))])
.delete()
.exec()
.await?;
db.location()
ctx.database
.location()
.find_unique(location::id::equals(location_id))
.delete()
.exec()
@ -238,7 +233,7 @@ pub async fn delete_location(ctx: &CoreContext, location_id: i32) -> Result<(),
ctx.emit(CoreEvent::InvalidateQuery(ClientQuery::SysGetLocations))
.await;
println!("Location {} deleted", location_id);
info!("Location {} deleted", location_id);
Ok(())
}
@ -246,15 +241,21 @@ pub async fn delete_location(ctx: &CoreContext, location_id: i32) -> Result<(),
#[derive(Error, Debug)]
pub enum LocationError {
#[error("Failed to create location (uuid {uuid:?})")]
CreateFailure { uuid: String },
#[error("Failed to read location dotfile")]
DotfileReadFailure(io::Error),
CreateFailure { uuid: Uuid },
#[error("Failed to read location dotfile (path: {1:?})")]
DotfileReadFailure(io::Error, PathBuf),
#[error("Failed to serialize dotfile for location (at path: {1:?})")]
DotfileSerializeFailure(serde_json::Error, String),
#[error("Location not found (uuid: {1:?})")]
DotfileWriteFailure(io::Error, String),
#[error("Location not found (uuid: {0:?})")]
NotFound(String),
DotfileSerializeFailure(serde_json::Error, PathBuf),
#[error("Dotfile location is read only (at path: {0:?})")]
ReadonlyDotFileLocationFailure(PathBuf),
#[error("Failed to write dotfile (path: {1:?})")]
DotfileWriteFailure(io::Error, PathBuf),
#[error("Location not found (path: {0:?})")]
PathNotFound(PathBuf),
#[error("Location not found (uuid: {0})")]
UuidNotFound(Uuid),
#[error("Location not found (id: {0})")]
IdNotFound(i32),
#[error("Failed to open file from local os")]
FileReadError(io::Error),
#[error("Failed to read mounted volumes from local os")]

View file

@ -11,11 +11,11 @@ use crate::{job, prisma};
#[derive(Error, Debug)]
pub enum SysError {
#[error("Location error")]
LocationError(#[from] LocationError),
Location(#[from] LocationError),
#[error("Error with system volumes")]
VolumeError(String),
Volume(String),
#[error("Error from job runner")]
JobError(#[from] job::JobError),
Job(#[from] job::JobError),
#[error("Database error")]
DatabaseError(#[from] prisma::QueryError),
Database(#[from] prisma::QueryError),
}

View file

@ -1,5 +1,5 @@
// use crate::native;
use crate::{node::get_nodestate, prisma::volume::*};
use crate::{node::get_nodestate, prisma::volume::*, CoreContext};
use serde::{Deserialize, Serialize};
use ts_rs::TS;
// #[cfg(not(target_os = "macos"))]
@ -7,8 +7,6 @@ use std::process::Command;
// #[cfg(not(target_os = "macos"))]
use sysinfo::{DiskExt, System, SystemExt};
use crate::CoreContext;
use super::SysError;
#[derive(Serialize, Deserialize, Debug, Default, Clone, TS)]
@ -27,17 +25,17 @@ pub struct Volume {
impl Volume {
pub async fn save(ctx: &CoreContext) -> Result<(), SysError> {
let db = &ctx.database;
let config = get_nodestate();
let volumes = Self::get_volumes()?;
// enter all volumes associate with this client add to db
for volume in volumes {
db.volume()
ctx.database
.volume()
.upsert(
node_id_mount_point_name(
config.node_id.clone(),
config.node_id,
volume.mount_point.to_string(),
volume.name.to_string(),
),
@ -67,7 +65,7 @@ impl Volume {
Ok(())
}
pub fn get_volumes() -> Result<Vec<Volume>, SysError> {
let all_volumes: Vec<Volume> = System::new_all()
Ok(System::new_all()
.disks()
.iter()
.map(|disk| {
@ -123,15 +121,8 @@ impl Volume {
is_root_filesystem: mount_point == "/",
}
})
.collect();
let volumes = all_volumes
.clone()
.into_iter()
.filter(|volume| !volume.mount_point.starts_with("/System"))
.collect();
Ok(volumes)
.collect())
}
}

View file

@ -2,10 +2,13 @@ use crate::prisma::{self, migration, PrismaClient};
use crate::CoreContext;
use data_encoding::HEXLOWER;
use include_dir::{include_dir, Dir};
use log::{error, info};
use prisma_client_rust::raw;
use ring::digest::{Context, Digest, SHA256};
use std::ffi::OsStr;
use std::fmt::Debug;
use std::io::{self, BufReader, Read};
use std::path::Path;
use thiserror::Error;
const INIT_MIGRATION: &str = include_str!("../../prisma/migrations/migration_table/migration.sql");
@ -17,9 +20,12 @@ pub enum DatabaseError {
ClientError(#[from] prisma::NewClientError),
}
pub async fn create_connection(path: &str) -> Result<PrismaClient, DatabaseError> {
println!("Creating database connection: {:?}", path);
let client = prisma::new_client_with_url(&format!("file:{}", &path)).await?;
pub async fn create_connection(
path: impl AsRef<Path> + Debug,
) -> Result<PrismaClient, DatabaseError> {
info!("Creating database connection: {:?}", path);
let client =
prisma::new_client_with_url(&format!("file:{}", path.as_ref().to_string_lossy())).await?;
Ok(client)
}
@ -47,12 +53,12 @@ pub async fn run_migrations(ctx: &CoreContext) -> Result<(), DatabaseError> {
.await
{
Ok(data) => {
if data.len() == 0 {
if data.is_empty() {
// execute migration
match client._execute_raw(raw!(INIT_MIGRATION)).await {
Ok(_) => {}
Err(e) => {
println!("Failed to create migration table: {}", e);
info!("Failed to create migration table: {}", e);
}
};
@ -64,7 +70,7 @@ pub async fn run_migrations(ctx: &CoreContext) -> Result<(), DatabaseError> {
.unwrap();
#[cfg(debug_assertions)]
println!("Migration table created: {:?}", value);
info!("Migration table created: {:?}", value);
}
let mut migration_subdirs = MIGRATIONS_DIR
@ -89,7 +95,7 @@ pub async fn run_migrations(ctx: &CoreContext) -> Result<(), DatabaseError> {
});
for subdir in migration_subdirs {
println!("{:?}", subdir.path());
info!("{:?}", subdir.path());
let migration_file = subdir
.get_file(subdir.path().join("./migration.sql"))
.unwrap();
@ -110,9 +116,9 @@ pub async fn run_migrations(ctx: &CoreContext) -> Result<(), DatabaseError> {
if existing_migration.is_none() {
#[cfg(debug_assertions)]
println!("Running migration: {}", name);
info!("Running migration: {}", name);
let steps = migration_sql.split(";").collect::<Vec<&str>>();
let steps = migration_sql.split(';').collect::<Vec<&str>>();
let steps = &steps[0..steps.len() - 1];
client
@ -138,15 +144,15 @@ pub async fn run_migrations(ctx: &CoreContext) -> Result<(), DatabaseError> {
.unwrap();
}
Err(e) => {
println!("Error running migration: {}", name);
println!("{}", e);
error!("Error running migration: {}", name);
error!("{:?}", e);
break;
}
}
}
#[cfg(debug_assertions)]
println!("Migration {} recorded successfully", name);
info!("Migration {} recorded successfully", name);
}
}
}