[ENG-1398] Make thumbnailer parallelism configurable (#1780)

* Removing migrator

* Adding serde_repr dep

* Generalizing version manager

* Updating library config to use version manager

* Updating node config to use version manager

* Updating stuff to use new configs

* More async

* Library manager errors

* Node and thumbnailer preferences

* Small warning on prep

* Upgrading batch construction for thumbnailer

* Updating thumb version file to new version manager

* Configurable parallelism for thumbnailer

* Integration with frontend and refresh batch

Co-authored-by: Vítor Vasconcellos <HeavenVolkoff@users.noreply.github.com>

---------

Co-authored-by: Vítor Vasconcellos <HeavenVolkoff@users.noreply.github.com>
This commit is contained in:
Ericson "Fogo" Soares 2023-11-23 08:23:32 -03:00 committed by GitHub
parent 43b6453706
commit f662bf7594
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
35 changed files with 1739 additions and 1287 deletions

1
Cargo.lock generated
View file

@ -6858,6 +6858,7 @@ dependencies = [
"serde",
"serde-hashkey",
"serde_json",
"serde_repr",
"serde_with",
"slotmap",
"specta",

View file

@ -444,7 +444,7 @@ pub async fn reveal_items(
.location()
.find_many(vec![
// TODO(N): This will fall apart with removable media and is making an invalid assumption that the `Node` is fixed for an `Instance`.
location::instance_id::equals(Some(library.config().instance_id)),
location::instance_id::equals(Some(library.config().await.instance_id)),
location::id::in_vec(locations),
])
.select(location::select!({ path }))

View file

@ -59,6 +59,7 @@ tokio = { workspace = true, features = [
serde = { version = "1.0", features = ["derive"] }
chrono = { version = "0.4.31", features = ["serde"] }
serde_json = { workspace = true }
serde_repr = "0.1"
futures = "0.3"
rmp-serde = "^1.1.2"
rmpv = "^1.0.1"

View file

@ -83,7 +83,7 @@ pub(crate) fn mount() -> AlphaRouter<Ctx> {
};
if node.config
.write(|mut c| c.auth_token = Some(token))
.write(|c| c.auth_token = Some(token))
.await.is_err() {
break Response::Error;
};
@ -120,7 +120,7 @@ pub(crate) fn mount() -> AlphaRouter<Ctx> {
"logout",
R.mutation(|node, _: ()| async move {
node.config
.write(|mut c| c.auth_token = None)
.write(|c| c.auth_token = None)
.await
.map(|_| ())
.map_err(|_| {

View file

@ -1,27 +1,34 @@
use std::{
cmp,
fs::{self, File},
io::{self, BufReader, BufWriter, Read, Write},
path::PathBuf,
path::{Path, PathBuf},
sync::Arc,
time::{SystemTime, UNIX_EPOCH},
};
use flate2::{bufread::GzDecoder, write::GzEncoder, Compression};
use futures::executor::block_on;
use futures_concurrency::future::TryJoin;
use rspc::{alpha::AlphaRouter, ErrorCode};
use serde::{Serialize, Serializer};
use specta::Type;
use tar::Archive;
use tempfile::tempdir;
use thiserror::Error;
use tokio::task::spawn_blocking;
use tokio::{
fs::{self, File},
io::{
self, AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, BufReader,
BufWriter,
},
spawn,
};
use tracing::{error, info};
use uuid::Uuid;
use crate::{
invalidate_query,
library::{Library, LibraryManagerError},
util::error::FileIOError,
Node,
};
@ -34,66 +41,76 @@ pub(crate) fn mount() -> AlphaRouter<Ctx> {
pub struct Backup {
#[serde(flatten)]
header: Header,
path: String,
path: PathBuf,
}
#[derive(Serialize, Type)]
pub struct GetAll {
backups: Vec<Backup>,
directory: String,
directory: PathBuf,
}
async fn process_backups(path: impl AsRef<Path>) -> Result<Vec<Backup>, BackupError> {
let path = path.as_ref();
let mut read_dir = fs::read_dir(path).await.map_err(|e| {
FileIOError::from((&path, e, "Failed to read backups directory"))
})?;
let mut backups = vec![];
while let Some(entry) = read_dir.next_entry().await.map_err(|e| {
FileIOError::from((path, e, "Failed to read next entry to backup"))
})? {
let entry_path = entry.path();
let metadata = entry.metadata().await.map_err(|e| {
FileIOError::from((
&entry_path,
e,
"Failed to read metadata from backup entry",
))
})?;
if metadata.is_file() {
backups.push(async move {
let mut file = File::open(&entry_path).await.map_err(|e| {
FileIOError::from((&entry_path, e, "Failed to open backup entry"))
})?;
Header::read(&mut file, &entry_path)
.await
.map(|header| Backup {
header,
path: entry_path,
})
});
}
}
backups.try_join().await
}
R.query(|node, _: ()| async move {
let directory = node.data_dir.join("backups");
Ok(GetAll {
backups: if !directory.exists() {
vec![]
} else {
spawn_blocking(move || {
fs::read_dir(node.data_dir.join("backups"))
.map(|dir| {
dir.filter_map(|entry| {
match entry.and_then(|e| Ok((e.metadata()?, e))) {
Ok((metadata, entry)) if metadata.is_file() => {
File::open(entry.path())
.ok()
.and_then(|mut file| {
Header::read(&mut file).ok()
})
.map(|header| Backup {
header,
// TODO: Lossy strings are bad
path: entry
.path()
.to_string_lossy()
.to_string(),
})
}
_ => None,
}
})
.collect::<Vec<_>>()
})
.map_err(|e| {
rspc::Error::with_cause(
ErrorCode::InternalServerError,
"Failed to fetch backups".to_string(),
e,
)
})
})
.await
.map_err(|e| {
rspc::Error::with_cause(
ErrorCode::InternalServerError,
"Failed to fetch backups".to_string(),
e,
)
})??
},
directory: directory.to_string_lossy().to_string(),
})
let backups = match fs::metadata(&directory).await {
Ok(_) => process_backups(directory.clone()).await.map_err(|e| {
rspc::Error::with_cause(
ErrorCode::InternalServerError,
"Failed to fetch backups".to_string(),
e,
)
})?,
Err(e) if e.kind() == io::ErrorKind::NotFound => vec![],
Err(e) => {
return Err(
FileIOError::from((&directory, e, "Failed to fetch backups")).into(),
)
}
};
Ok(GetAll { backups, directory })
})
})
.procedure("backup", {
@ -109,7 +126,7 @@ pub(crate) fn mount() -> AlphaRouter<Ctx> {
R
// TODO: Paths as strings is bad but here we want the flexibility of the frontend allowing any path
.mutation(|node, path: String| async move {
tokio::fs::remove_file(path)
fs::remove_file(path)
.await
.map(|_| {
invalidate_query!(node; node, "backups.getAll");
@ -127,8 +144,8 @@ pub(crate) fn mount() -> AlphaRouter<Ctx> {
async fn start_backup(node: Arc<Node>, library: Arc<Library>) -> Uuid {
let bkp_id = Uuid::new_v4();
spawn_blocking(move || {
match do_backup(bkp_id, &node, &library) {
spawn(async move {
match do_backup(bkp_id, &node, &library).await {
Ok(path) => {
info!(
"Backup '{bkp_id}' for library '{}' created at '{path:?}'!",
@ -152,24 +169,22 @@ async fn start_backup(node: Arc<Node>, library: Arc<Library>) -> Uuid {
#[derive(Error, Debug)]
enum BackupError {
#[error("io error: {0}")]
Io(#[from] io::Error),
#[error("library manager error: {0}")]
LibraryManager(#[from] LibraryManagerError),
#[error("malformed header")]
MalformedHeader,
#[error("Library already exists, please remove it and try again!")]
LibraryAlreadyExists,
#[error(transparent)]
FileIO(#[from] FileIOError),
}
#[derive(Debug)]
pub struct MustRemoveLibraryErr;
// This is intended to be called in a `spawn_blocking` task.
// Async is pure overhead for an IO bound operation like this.
fn do_backup(id: Uuid, node: &Node, library: &Library) -> Result<PathBuf, BackupError> {
async fn do_backup(id: Uuid, node: &Node, library: &Library) -> Result<PathBuf, BackupError> {
let backups_dir = node.data_dir.join("backups");
fs::create_dir_all(&backups_dir)?;
fs::create_dir_all(&backups_dir)
.await
.map_err(|e| FileIOError::from((&backups_dir, e)))?;
let timestamp = SystemTime::now()
.duration_since(UNIX_EPOCH)
@ -177,97 +192,193 @@ fn do_backup(id: Uuid, node: &Node, library: &Library) -> Result<PathBuf, Backup
.as_millis();
let bkp_path = backups_dir.join(format!("{id}.bkp"));
let mut bkp_file = BufWriter::new(File::create(&bkp_path)?);
let mut bkp_file = BufWriter::new(
File::create(&bkp_path)
.await
.map_err(|e| FileIOError::from((&bkp_path, e, "Failed to create backup file")))?,
);
// Header. We do this so the file is self-sufficient.
Header {
id,
timestamp,
library_id: library.id,
library_name: library.config().name.to_string(),
library_name: library.config().await.name.to_string(),
}
.write(&mut bkp_file)
.await
.map_err(|e| FileIOError::from((&bkp_path, e, "Failed to create backup file")))?;
// Introducing this adapter here to bridge tokio stuff to std::io stuff
struct WriterAdapter(BufWriter<File>);
impl std::io::Write for WriterAdapter {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
block_on(self.0.write(buf))
}
fn flush(&mut self) -> io::Result<()> {
block_on(self.0.flush())
}
}
.write(&mut bkp_file)?;
// Regular tar.gz encoded data
let mut tar = tar::Builder::new(GzEncoder::new(bkp_file, Compression::default()));
let mut tar = tar::Builder::new(GzEncoder::new(
WriterAdapter(bkp_file),
Compression::default(),
));
let library_config_path = node
.libraries
.libraries_dir
.join(format!("{}.sdlibrary", library.id));
tar.append_file(
"library.sdlibrary",
&mut File::open(
node.libraries
.libraries_dir
.join(format!("{}.sdlibrary", library.id)),
)?,
)?;
&mut std::fs::File::open(&library_config_path).map_err(|e| {
FileIOError::from((
library_config_path,
e,
"Failed to open library config file to do a backup",
))
})?,
)
.map_err(|e| {
FileIOError::from((
&bkp_path,
e,
"Failed to append library config file to out backup tar.gz file",
))
})?;
let library_db_path = node
.libraries
.libraries_dir
.join(format!("{}.db", library.id));
tar.append_file(
"library.db",
&mut File::open(
node.libraries
.libraries_dir
.join(format!("{}.db", library.id)),
)?,
)?;
&mut std::fs::File::open(&library_db_path).map_err(|e| {
FileIOError::from((
library_db_path,
e,
"Failed to open library database file to do a backup",
))
})?,
)
.map_err(|e| {
FileIOError::from((
&bkp_path,
e,
"Failed to append library database file to out backup tar.gz file",
))
})?;
Ok(bkp_path)
}
fn start_restore(node: Arc<Node>, path: PathBuf) {
spawn_blocking(move || {
match restore_backup(&node, path.clone()) {
Ok(header) => {
info!(
"Restored to '{}' for library '{}'!",
header.id, header.library_id
);
}
Err(e) => {
error!("Error restoring backup '{path:?}': {e:?}");
// TODO: Alert user something went wrong
}
async fn start_restore(node: Arc<Node>, path: PathBuf) {
match restore_backup(&node, &path).await {
Ok(Header { id, library_id, .. }) => {
info!("Restored to '{id}' for library '{library_id}'!",);
}
});
Err(e) => {
error!("Error restoring backup '{}': {e:#?}", path.display());
// TODO: Alert user something went wrong
}
}
}
fn restore_backup(node: &Arc<Node>, path: PathBuf) -> Result<Header, BackupError> {
let mut file = BufReader::new(fs::File::open(path)?);
let header = Header::read(&mut file)?;
async fn restore_backup(node: &Arc<Node>, path: impl AsRef<Path>) -> Result<Header, BackupError> {
let path = path.as_ref();
let mut file = BufReader::new(fs::File::open(path).await.map_err(|e| {
FileIOError::from((path, e, "Failed trying to open backup file to be restored"))
})?);
let header = Header::read(&mut file, path).await?;
// TODO: Actually handle restoring into a library that exists. For now it's easier to error out.
let None = block_on(node.libraries.get_library(&header.library_id)) else {
let None = node.libraries.get_library(&header.library_id).await else {
return Err(BackupError::LibraryAlreadyExists);
};
let temp_dir = tempdir()?;
let temp_dir = tempdir().map_err(|e| {
FileIOError::from((
"/tmp",
e,
"Failed to get a temporary directory to restore backup",
))
})?;
let mut archive = Archive::new(GzDecoder::new(file));
archive.unpack(&temp_dir)?;
// Introducing this adapter here to bridge tokio stuff to std::io stuff
struct ReaderAdapter(BufReader<File>);
let library_path = temp_dir.path().join("library.sdlibrary");
let db_path = temp_dir.path().join("library.db");
impl std::io::Read for ReaderAdapter {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
block_on(self.0.read(buf))
}
}
fs::copy(
library_path,
node.libraries
.libraries_dir
.join(format!("{}.sdlibrary", header.library_id)),
)?;
fs::copy(
db_path,
node.libraries
.libraries_dir
.join(format!("{}.db", header.library_id)),
)?;
impl std::io::BufRead for ReaderAdapter {
fn fill_buf(&mut self) -> io::Result<&[u8]> {
block_on(self.0.fill_buf())
}
let config_path = node
fn consume(&mut self, amt: usize) {
self.0.consume(amt)
}
}
let temp_dir_path = temp_dir.path();
let mut archive = Archive::new(GzDecoder::new(ReaderAdapter(file)));
archive.unpack(&temp_dir).map_err(|e| {
FileIOError::from((temp_dir_path, e, "Failed to unpack backup compressed data"))
})?;
let library_config_path = temp_dir_path.join("library.sdlibrary");
let library_config_restored_path = node
.libraries
.libraries_dir
.join(format!("{}.sdlibrary", header.library_id));
let db_path = config_path.with_extension("db");
block_on(
node.libraries
.load(header.library_id, &db_path, config_path, None, true, node),
)?;
fs::copy(library_config_path, &library_config_restored_path)
.await
.map_err(|e| {
FileIOError::from((
&library_config_restored_path,
e,
"Failed to restore library config file from backup",
))
})?;
let db_path = temp_dir_path.join("library.db");
let db_restored_path = node
.libraries
.libraries_dir
.join(format!("{}.db", header.library_id));
fs::copy(db_path, &db_restored_path).await.map_err(|e| {
FileIOError::from((
&db_restored_path,
e,
"Failed to restore library database file from backup",
))
})?;
node.libraries
.load(
header.library_id,
db_restored_path,
library_config_restored_path,
None,
true,
node,
)
.await?;
Ok(header)
}
@ -294,25 +405,32 @@ where
}
impl Header {
fn write(&self, file: &mut impl Write) -> Result<(), io::Error> {
async fn write(&self, file: &mut (impl AsyncWrite + Unpin)) -> Result<(), io::Error> {
// For future versioning we can bump `1` to `2` and match on it in the decoder.
file.write_all(b"sdbkp1")?;
file.write_all(&self.id.to_bytes_le())?;
file.write_all(&self.timestamp.to_le_bytes())?;
file.write_all(&self.library_id.to_bytes_le())?;
file.write_all(b"sdbkp1").await?;
file.write_all(&self.id.to_bytes_le()).await?;
file.write_all(&self.timestamp.to_le_bytes()).await?;
file.write_all(&self.library_id.to_bytes_le()).await?;
{
let bytes = &self.library_name.as_bytes()
[..cmp::min(u32::MAX as usize, self.library_name.len())];
file.write_all(&(bytes.len() as u32).to_le_bytes())?;
file.write_all(bytes)?;
file.write_all(&(bytes.len() as u32).to_le_bytes()).await?;
file.write_all(bytes).await?;
}
Ok(())
}
fn read(file: &mut impl Read) -> Result<Self, BackupError> {
async fn read(
file: &mut (impl AsyncRead + Unpin),
path: impl AsRef<Path>,
) -> Result<Self, BackupError> {
let mut buf = vec![0u8; 6 + 16 + 16 + 16 + 4];
file.read_exact(&mut buf)?;
let path = path.as_ref();
file.read_exact(&mut buf)
.await
.map_err(|e| FileIOError::from((path, e)))?;
if &buf[..6] != b"sdbkp1" {
return Err(BackupError::MalformedHeader);
}
@ -342,7 +460,10 @@ impl Header {
);
let mut name = vec![0; len as usize];
file.read_exact(&mut name)?;
file.read_exact(&mut name)
.await
.map_err(|e| FileIOError::from((path, e)))?;
String::from_utf8(name).map_err(|_| BackupError::MalformedHeader)?
},
})
@ -353,8 +474,8 @@ impl Header {
mod tests {
use super::*;
#[test]
fn test_backup_header() {
#[tokio::test]
async fn test_backup_header() {
let original = Header {
id: Uuid::new_v4(),
timestamp: 1234567890,
@ -363,9 +484,9 @@ mod tests {
};
let mut buf = Vec::new();
original.write(&mut buf).unwrap();
original.write(&mut buf).await.unwrap();
let decoded = Header::read(&mut buf.as_slice()).unwrap();
let decoded = Header::read(&mut buf.as_slice(), "").await.unwrap();
assert_eq!(original, decoded);
}
}

View file

@ -43,13 +43,17 @@ pub(crate) fn mount() -> AlphaRouter<Ctx> {
.get_all()
.await
.into_iter()
.map(|lib| LibraryConfigWrapped {
uuid: lib.id,
instance_id: lib.instance_uuid,
instance_public_key: lib.identity.to_remote_identity(),
config: lib.config(),
.map(|lib| async move {
LibraryConfigWrapped {
uuid: lib.id,
instance_id: lib.instance_uuid,
instance_public_key: lib.identity.to_remote_identity(),
config: lib.config().await,
}
})
.collect::<Vec<_>>()
.join()
.await
})
})
.procedure("statistics", {
@ -268,7 +272,7 @@ pub(crate) fn mount() -> AlphaRouter<Ctx> {
uuid: library.id,
instance_id: library.instance_uuid,
instance_public_key: library.identity.to_remote_identity(),
config: library.config(),
config: library.config().await,
})
},
)
@ -281,12 +285,16 @@ pub(crate) fn mount() -> AlphaRouter<Ctx> {
pub description: MaybeUndefined<String>,
}
R.mutation(|node, args: EditLibraryArgs| async move {
Ok(node
.libraries
.edit(args.id, args.name, args.description)
.await?)
})
R.mutation(
|node,
EditLibraryArgs {
id,
name,
description,
}: EditLibraryArgs| async move {
Ok(node.libraries.edit(id, name, description).await?)
},
)
})
.procedure(
"delete",

View file

@ -1,12 +1,40 @@
use crate::{invalidate_query, job::JobProgressEvent, node::config::NodeConfig, Node};
use crate::{
invalidate_query,
job::JobProgressEvent,
node::config::{NodeConfig, NodePreferences},
Node,
};
use sd_p2p::P2PStatus;
use std::sync::{atomic::Ordering, Arc};
use itertools::Itertools;
use rspc::{alpha::Rspc, Config, ErrorCode};
use sd_p2p::P2PStatus;
use serde::{Deserialize, Serialize};
use specta::Type;
use std::sync::{atomic::Ordering, Arc};
use uuid::Uuid;
mod auth;
mod backups;
// mod categories;
mod ephemeral_files;
mod files;
mod jobs;
mod keys;
mod libraries;
pub mod locations;
mod nodes;
pub mod notifications;
mod p2p;
mod preferences;
pub(crate) mod search;
mod sync;
mod tags;
pub mod utils;
pub mod volumes;
mod web_api;
use utils::{InvalidRequests, InvalidateOperationEvent};
#[allow(non_upper_case_globals)]
@ -48,26 +76,6 @@ impl BackendFeature {
}
}
mod auth;
mod backups;
// mod categories;
mod ephemeral_files;
mod files;
mod jobs;
mod keys;
mod libraries;
pub mod locations;
mod nodes;
pub mod notifications;
mod p2p;
mod preferences;
pub(crate) mod search;
mod sync;
mod tags;
pub mod utils;
pub mod volumes;
mod web_api;
// A version of [NodeConfig] that is safe to share with the frontend
#[derive(Debug, Serialize, Deserialize, Clone, Type)]
pub struct SanitisedNodeConfig {
@ -78,6 +86,7 @@ pub struct SanitisedNodeConfig {
pub p2p_enabled: bool,
pub p2p_port: Option<u16>,
pub features: Vec<BackendFeature>,
pub preferences: NodePreferences,
}
impl From<NodeConfig> for SanitisedNodeConfig {
@ -88,6 +97,7 @@ impl From<NodeConfig> for SanitisedNodeConfig {
p2p_enabled: value.p2p.enabled,
p2p_port: value.p2p.port,
features: value.features,
preferences: value.preferences,
}
}
}
@ -136,14 +146,14 @@ pub(crate) fn mount() -> Arc<Router> {
let enabled = if config.features.iter().contains(&feature) {
node.config
.write(|mut cfg| {
.write(|cfg| {
cfg.features.retain(|f| *f != feature);
})
.await
.map(|_| false)
} else {
node.config
.write(|mut cfg| {
.write(|cfg| {
cfg.features.push(feature.clone());
})
.await

View file

@ -1,7 +1,8 @@
use crate::{invalidate_query, prisma::location, util::MaybeUndefined};
use rspc::{alpha::AlphaRouter, ErrorCode};
use sd_prisma::prisma::instance;
use rspc::{alpha::AlphaRouter, ErrorCode};
use serde::Deserialize;
use specta::Type;
use tracing::error;
@ -32,7 +33,7 @@ pub(crate) fn mount() -> AlphaRouter<Ctx> {
args.p2p_enabled.is_some() || args.p2p_port.is_defined();
node.config
.write(|mut config| {
.write(|config| {
if let Some(name) = args.name {
config.name = name;
}
@ -76,7 +77,9 @@ pub(crate) fn mount() -> AlphaRouter<Ctx> {
.instance()
.find_many(vec![node_id
.map(|id| instance::node_id::equals(id.as_bytes().to_vec()))
.unwrap_or(instance::id::equals(library.config().instance_id))])
.unwrap_or(instance::id::equals(
library.config().await.instance_id,
))])
.exec()
.await?;
@ -100,4 +103,34 @@ pub(crate) fn mount() -> AlphaRouter<Ctx> {
.collect::<Vec<_>>())
})
})
.procedure("updateThumbnailerPreferences", {
#[derive(Deserialize, Type)]
pub struct UpdateThumbnailerPreferences {
pub background_processing_percentage: u8, // 0-100
}
R.mutation(
|node,
UpdateThumbnailerPreferences {
background_processing_percentage,
}: UpdateThumbnailerPreferences| async move {
node.config
.update_preferences(|preferences| {
preferences
.thumbnailer
.set_background_processing_percentage(
background_processing_percentage,
);
})
.await
.map_err(|e| {
error!("failed to update thumbnailer preferences: {e:#?}");
rspc::Error::with_cause(
ErrorCode::InternalServerError,
"Failed to update thumbnailer preferences".to_string(),
e,
)
})
},
)
})
}

View file

@ -107,7 +107,7 @@ pub(crate) fn mount() -> AlphaRouter<Ctx> {
}
NotificationId::Node(id) => {
node.config
.write(|mut cfg| {
.write(|cfg| {
cfg.notifications
.retain(|n| n.id != NotificationId::Node(id));
})
@ -124,7 +124,7 @@ pub(crate) fn mount() -> AlphaRouter<Ctx> {
.procedure("dismissAll", {
R.query(|node, _: ()| async move {
node.config
.write(|mut cfg| {
.write(|cfg| {
cfg.notifications = vec![];
})
.await

View file

@ -108,13 +108,14 @@ impl Node {
locations,
notifications: notifications::Notifications::new(),
p2p,
config,
thumbnailer: Thumbnailer::new(
data_dir.to_path_buf(),
libraries.clone(),
event_bus.0.clone(),
config.preferences_watcher(),
)
.await,
config,
event_bus,
libraries,
files_over_p2p_flag: Arc::new(AtomicBool::new(false)),
@ -237,7 +238,7 @@ impl Node {
match self
.config
.write(|mut cfg| cfg.notifications.push(notification.clone()))
.write(|cfg| cfg.notifications.push(notification.clone()))
.await
{
Ok(_) => {
@ -280,7 +281,7 @@ impl Node {
#[derive(Error, Debug)]
pub enum NodeError {
#[error("NodeError::FailedToInitializeConfig({0})")]
FailedToInitializeConfig(util::migrator::MigratorError),
FailedToInitializeConfig(config::NodeConfigError),
#[error("failed to initialize library manager: {0}")]
FailedToInitializeLibraryManager(#[from] library::LibraryManagerError),
#[error("failed to initialize location manager: {0}")]
@ -290,7 +291,7 @@ pub enum NodeError {
#[error("invalid platform integer: {0}")]
InvalidPlatformInt(u8),
#[cfg(debug_assertions)]
#[error("Init config error: {0}")]
#[error("init config error: {0}")]
InitConfig(#[from] util::debug_initializer::InitConfigError),
#[error("logger error: {0}")]
Logger(#[from] FromEnvError),

View file

@ -4,20 +4,25 @@ use crate::{
prisma::{file_path, indexer_rule, PrismaClient},
util::{
db::maybe_missing,
migrator::{Migrate, MigratorError},
error::FileIOError,
version_manager::{Kind, ManagedVersion, VersionManager, VersionManagerError},
},
};
use chrono::Utc;
use sd_p2p::spacetunnel::Identity;
use sd_prisma::prisma::{instance, location, node};
use std::{path::PathBuf, sync::Arc};
use std::path::Path;
use chrono::Utc;
use int_enum::IntEnum;
use prisma_client_rust::not;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use serde_json::{json, Map, Value};
use serde_repr::{Deserialize_repr, Serialize_repr};
use specta::Type;
use thiserror::Error;
use tokio::fs;
use tracing::error;
use uuid::Uuid;
@ -32,218 +37,342 @@ pub struct LibraryConfig {
pub description: Option<String>,
/// id of the current instance so we know who this `.db` is. This can be looked up within the `Instance` table.
pub instance_id: i32,
version: LibraryConfigVersion,
}
#[async_trait::async_trait]
impl Migrate for LibraryConfig {
const CURRENT_VERSION: u32 = 9;
#[derive(
IntEnum,
Debug,
Clone,
Copy,
Eq,
PartialEq,
strum::Display,
Serialize_repr,
Deserialize_repr,
Type,
)]
#[repr(u64)]
pub enum LibraryConfigVersion {
V0 = 0,
V1 = 1,
V2 = 2,
V3 = 3,
V4 = 4,
V5 = 5,
V6 = 6,
V7 = 7,
V8 = 8,
V9 = 9,
}
type Ctx = (NodeConfig, Arc<PrismaClient>);
impl ManagedVersion<LibraryConfigVersion> for LibraryConfig {
const LATEST_VERSION: LibraryConfigVersion = LibraryConfigVersion::V9;
fn default(path: PathBuf) -> Result<Self, MigratorError> {
Err(MigratorError::ConfigFileMissing(path))
const KIND: Kind = Kind::Json("version");
type MigrationError = LibraryConfigError;
}
impl LibraryConfig {
pub(crate) async fn new(
name: LibraryName,
description: Option<String>,
instance_id: i32,
path: impl AsRef<Path>,
) -> Result<Self, LibraryConfigError> {
let this = Self {
name,
description,
instance_id,
version: Self::LATEST_VERSION,
};
this.save(path).await.map(|()| this)
}
async fn migrate(
to_version: u32,
config: &mut serde_json::Map<String, serde_json::Value>,
(node_config, db): &Self::Ctx,
) -> Result<(), MigratorError> {
match to_version {
0 => {}
1 => {
let rules = vec![
format!("No OS protected"),
format!("No Hidden"),
format!("No Git"),
format!("Only Images"),
];
pub(crate) async fn load(
path: impl AsRef<Path>,
node_config: &NodeConfig,
db: &PrismaClient,
) -> Result<Self, LibraryConfigError> {
let path = path.as_ref();
db._batch(
rules
.into_iter()
.enumerate()
.map(|(i, name)| {
db.indexer_rule().update_many(
vec![indexer_rule::name::equals(Some(name))],
vec![indexer_rule::pub_id::set(sd_utils::uuid_to_bytes(
Uuid::from_u128(i as u128),
))],
VersionManager::<Self, LibraryConfigVersion>::migrate_and_load(
path,
|current, next| async move {
match (current, next) {
(LibraryConfigVersion::V0, LibraryConfigVersion::V1) => {
let rules = vec![
String::from("No OS protected"),
String::from("No Hidden"),
String::from("No Git"),
String::from("Only Images"),
];
db._batch(
rules
.into_iter()
.enumerate()
.map(|(i, name)| {
db.indexer_rule().update_many(
vec![indexer_rule::name::equals(Some(name))],
vec![indexer_rule::pub_id::set(sd_utils::uuid_to_bytes(
Uuid::from_u128(i as u128),
))],
)
})
.collect::<Vec<_>>(),
)
.await?;
}
(LibraryConfigVersion::V1, LibraryConfigVersion::V2) => {
let mut config = serde_json::from_slice::<Map<String, Value>>(
&fs::read(path).await.map_err(|e| {
VersionManagerError::FileIO(FileIOError::from((path, e)))
})?,
)
.map_err(VersionManagerError::SerdeJson)?;
config.insert(
String::from("identity"),
Value::Array(
Identity::new()
.to_bytes()
.into_iter()
.map(Into::into)
.collect(),
),
);
fs::write(
path,
&serde_json::to_vec(&config).map_err(VersionManagerError::SerdeJson)?,
)
.await
.map_err(|e| VersionManagerError::FileIO(FileIOError::from((path, e))))?;
}
(LibraryConfigVersion::V2, LibraryConfigVersion::V3) => {
// The fact I have to migrate this hurts my soul
if db.node().count(vec![]).exec().await? != 1 {
return Err(LibraryConfigError::TooManyNodes);
}
db.node()
.update_many(
vec![],
vec![
node::pub_id::set(node_config.id.as_bytes().to_vec()),
node::node_peer_id::set(Some(
node_config.keypair.peer_id().to_string(),
)),
],
)
})
.collect::<Vec<_>>(),
)
.await?;
}
2 => {
config.insert(
"identity".into(),
Value::Array(
Identity::new()
.to_bytes()
.into_iter()
.map(|v| v.into())
.collect(),
),
);
}
// The fact I have to migrate this hurts my soul
3 => {
if db.node().count(vec![]).exec().await? != 1 {
return Err(MigratorError::Custom(
"Ummm, there are too many nodes in the database, this should not happen!"
.into(),
));
}
.exec()
.await?;
db.node()
.update_many(
vec![],
vec![
node::pub_id::set(node_config.id.as_bytes().to_vec()),
node::node_peer_id::set(Some(
node_config.keypair.peer_id().to_string(),
)),
],
)
.exec()
.await?;
let mut config = serde_json::from_slice::<Map<String, Value>>(
&fs::read(path).await.map_err(|e| {
VersionManagerError::FileIO(FileIOError::from((path, e)))
})?,
)
.map_err(VersionManagerError::SerdeJson)?;
config.insert("node_id".into(), Value::String(node_config.id.to_string()));
}
4 => {} // -_-
5 => loop {
let paths = db
.file_path()
.find_many(vec![not![file_path::size_in_bytes::equals(None)]])
.take(500)
.select(file_path::select!({ id size_in_bytes }))
.exec()
.await?;
config.insert(String::from("node_id"), json!(node_config.id.to_string()));
if paths.is_empty() {
break;
}
fs::write(
path,
&serde_json::to_vec(&config).map_err(VersionManagerError::SerdeJson)?,
)
.await
.map_err(|e| VersionManagerError::FileIO(FileIOError::from((path, e))))?;
}
db._batch(
paths
.into_iter()
.filter_map(|path| {
maybe_missing(path.size_in_bytes, "file_path.size_in_bytes")
.map_or_else(
|e| {
error!("{e:#?}");
None
},
Some,
)
.map(|size_in_bytes| {
let size = if let Ok(size) = size_in_bytes.parse::<u64>() {
Some(size.to_be_bytes().to_vec())
} else {
error!(
(LibraryConfigVersion::V3, LibraryConfigVersion::V4) => {
// -_-
}
(LibraryConfigVersion::V4, LibraryConfigVersion::V5) => loop {
let paths = db
.file_path()
.find_many(vec![not![file_path::size_in_bytes::equals(None)]])
.take(500)
.select(file_path::select!({ id size_in_bytes }))
.exec()
.await?;
if paths.is_empty() {
break;
}
db._batch(
paths
.into_iter()
.filter_map(|path| {
maybe_missing(path.size_in_bytes, "file_path.size_in_bytes")
.map_or_else(
|e| {
error!("{e:#?}");
None
},
Some,
)
.map(|size_in_bytes| {
let size =
if let Ok(size) = size_in_bytes.parse::<u64>() {
Some(size.to_be_bytes().to_vec())
} else {
error!(
"File path <id='{}'> had invalid size: '{}'",
path.id, size_in_bytes
);
None
};
None
};
db.file_path().update(
file_path::id::equals(path.id),
vec![
file_path::size_in_bytes_bytes::set(size),
file_path::size_in_bytes::set(None),
],
)
db.file_path().update(
file_path::id::equals(path.id),
vec![
file_path::size_in_bytes_bytes::set(size),
file_path::size_in_bytes::set(None),
],
)
})
})
})
.collect::<Vec<_>>(),
)
.await?;
},
6 => {
let nodes = db.node().find_many(vec![]).exec().await?;
.collect::<Vec<_>>(),
)
.await?;
},
if nodes.is_empty() {
println!("6 - No nodes found... How did you even get this far? but this is fine we can fix it.");
} else if nodes.len() > 1 {
return Err(MigratorError::Custom(
"6 - More than one node found in the DB... This can't be automatically reconciled!"
.into(),
));
}
(LibraryConfigVersion::V5, LibraryConfigVersion::V6) => {
let nodes = db.node().find_many(vec![]).exec().await?;
if nodes.is_empty() {
error!("6 - No nodes found... How did you even get this far? but this is fine we can fix it.");
} else if nodes.len() > 1 {
error!("6 - More than one node found in the DB... This can't be automatically reconciled!");
return Err(LibraryConfigError::TooManyNodes);
}
let node = nodes.first();
let now = Utc::now().fixed_offset();
let instance_id = Uuid::new_v4();
instance::Create {
pub_id: instance_id.as_bytes().to_vec(),
identity: node
.and_then(|n| n.identity.clone())
.unwrap_or_else(|| Identity::new().to_bytes()),
node_id: node_config.id.as_bytes().to_vec(),
node_name: node_config.name.clone(),
node_platform: Platform::current() as i32,
last_seen: now,
date_created: node.map(|n| n.date_created).unwrap_or_else(|| now),
_params: vec![],
}
.to_query(db)
.exec()
.await?;
let node = nodes.first();
let now = Utc::now().fixed_offset();
let instance_id = Uuid::new_v4();
config.remove("node_id");
config.remove("identity");
config.insert("instance_id".into(), Value::String(instance_id.to_string()));
}
7 => {
let instances = db.instance().find_many(vec![]).exec().await?;
if instances.len() > 1 {
return Err(MigratorError::Custom(
"7 - More than one instance found in the DB... This can't be automatically reconciled!"
.into(),
));
}
let Some(instance) = instances.first() else {
return Err(MigratorError::Custom(
"7 - No nodes found... How did you even get this far?!".into(),
));
};
config.remove("instance_id");
config.insert("instance_id".into(), Value::Number(instance.id.into()));
// We are relinking all locations to the current instance.
// If you have more than one node in your database and your not @Oscar, something went horribly wrong so this is fine.
db.location()
.update_many(vec![], vec![location::instance_id::set(Some(instance.id))])
.exec()
.await?;
}
8 => {
let instances = db.instance().find_many(vec![]).exec().await?;
let Some(instance) = instances.first() else {
return Err(MigratorError::Custom(
"8 - No nodes found... How did you even get this far?!".into(),
));
};
// This should be in 7 but it's added to ensure to hell it runs.
config.remove("instance_id");
config.insert("instance_id".into(), Value::Number(instance.id.into()));
}
9 => {
db._batch(
db.instance()
.find_many(vec![])
instance::Create {
pub_id: instance_id.as_bytes().to_vec(),
identity: node
.and_then(|n| n.identity.clone())
.unwrap_or_else(|| Identity::new().to_bytes()),
node_id: node_config.id.as_bytes().to_vec(),
node_name: node_config.name.clone(),
node_platform: Platform::current() as i32,
last_seen: now,
date_created: node.map(|n| n.date_created).unwrap_or_else(|| now),
_params: vec![],
}
.to_query(db)
.exec()
.await?
.into_iter()
.map(|i| {
db.instance().update(
instance::id::equals(i.id),
vec![instance::identity::set(
.await?;
let mut config = serde_json::from_slice::<Map<String, Value>>(
&fs::read(path).await.map_err(|e| {
VersionManagerError::FileIO(FileIOError::from((path, e)))
})?,
)
.map_err(VersionManagerError::SerdeJson)?;
config.remove("node_id");
config.remove("identity");
config.insert(String::from("instance_id"), json!(instance_id.to_string()));
fs::write(
path,
&serde_json::to_vec(&config).map_err(VersionManagerError::SerdeJson)?,
)
.await
.map_err(|e| VersionManagerError::FileIO(FileIOError::from((path, e))))?;
}
(LibraryConfigVersion::V6, LibraryConfigVersion::V7) => {
let instances = db.instance().find_many(vec![]).exec().await?;
if instances.len() > 1 {
error!("7 - More than one instance found in the DB... This can't be automatically reconciled!");
return Err(LibraryConfigError::TooManyInstances);
}
let Some(instance) = instances.first() else {
error!("7 - No instance found... How did you even get this far?!");
return Err(LibraryConfigError::MissingInstance);
};
let mut config = serde_json::from_slice::<Map<String, Value>>(
&fs::read(path).await.map_err(|e| {
VersionManagerError::FileIO(FileIOError::from((path, e)))
})?,
)
.map_err(VersionManagerError::SerdeJson)?;
config.remove("instance_id");
config.insert(String::from("instance_id"), json!(instance.id));
fs::write(
path,
&serde_json::to_vec(&config).map_err(VersionManagerError::SerdeJson)?,
)
.await
.map_err(|e| VersionManagerError::FileIO(FileIOError::from((path, e))))?;
// We are relinking all locations to the current instance.
// If you have more than one node in your database and you're not @Oscar, something went horribly wrong so this is fine.
db.location()
.update_many(
vec![],
vec![location::instance_id::set(Some(instance.id))],
)
.exec()
.await?;
}
(LibraryConfigVersion::V7, LibraryConfigVersion::V8) => {
let instances = db.instance().find_many(vec![]).exec().await?;
let Some(instance) = instances.first() else {
error!("8 - No nodes found... How did you even get this far?!");
return Err(LibraryConfigError::MissingInstance);
};
// This should be in 7 but it's added to ensure to hell it runs.
let mut config = serde_json::from_slice::<Map<String, Value>>(
&fs::read(path).await.map_err(|e| {
VersionManagerError::FileIO(FileIOError::from((path, e)))
})?,
)
.map_err(VersionManagerError::SerdeJson)?;
config.remove("instance_id");
config.insert(String::from("instance_id"), json!(instance.id));
fs::write(
path,
&serde_json::to_vec(&config).map_err(VersionManagerError::SerdeJson)?,
)
.await
.map_err(|e| VersionManagerError::FileIO(FileIOError::from((path, e))))?;
}
(LibraryConfigVersion::V8, LibraryConfigVersion::V9) => {
db._batch(
db.instance()
.find_many(vec![])
.exec()
.await?
.into_iter()
.map(|i| {
db.instance().update(
instance::id::equals(i.id),
vec![instance::identity::set(
// This code is assuming you only have the current node.
// If you've paired your node with another node, reset your db.
IdentityOrRemoteIdentity::Identity(
@ -253,15 +382,51 @@ impl Migrate for LibraryConfig {
)
.to_bytes(),
)],
)
})
.collect::<Vec<_>>(),
)
.await?;
}
v => unreachable!("Missing migration for library version {}", v),
}
)
})
.collect::<Vec<_>>(),
)
.await?;
}
Ok(())
_ => {
error!("Library config version is not handled: {:?}", current);
return Err(VersionManagerError::UnexpectedMigration {
current_version: current.int_value(),
next_version: next.int_value(),
}
.into());
}
}
Ok(())
},
)
.await
}
pub(crate) async fn save(&self, path: impl AsRef<Path>) -> Result<(), LibraryConfigError> {
let path = path.as_ref();
fs::write(path, &serde_json::to_vec(self)?)
.await
.map_err(|e| FileIOError::from((path, e)).into())
}
}
#[derive(Error, Debug)]
pub enum LibraryConfigError {
#[error("database error: {0}")]
Database(#[from] prisma_client_rust::QueryError),
#[error("there are too many nodes in the database, this should not happen!")]
TooManyNodes,
#[error("there are too many instances in the database, this should not happen!")]
TooManyInstances,
#[error("missing instances")]
MissingInstance,
#[error(transparent)]
SerdeJson(#[from] serde_json::Error),
#[error(transparent)]
VersionManager(#[from] VersionManagerError<LibraryConfigVersion>),
#[error(transparent)]
FileIO(#[from] FileIOError),
}

View file

@ -12,17 +12,18 @@ use crate::{
Node,
};
use sd_p2p::spacetunnel::Identity;
use sd_prisma::prisma::notification;
use std::{
collections::HashMap,
fmt::{Debug, Formatter},
path::{Path, PathBuf},
sync::{Arc, PoisonError, RwLock, RwLockWriteGuard},
sync::Arc,
};
use chrono::{DateTime, Utc};
use sd_p2p::spacetunnel::Identity;
use sd_prisma::prisma::notification;
use tokio::{fs, io, sync::broadcast};
use tokio::{fs, io, sync::broadcast, sync::RwLock};
use tracing::warn;
use uuid::Uuid;
@ -97,19 +98,20 @@ impl Library {
})
}
pub fn config(&self) -> LibraryConfig {
// We use a `std::sync::RwLock` as we don't want users holding this over await points.
// We currently `.clone()` the value so that will never be a problem, however we could avoid cloning here but that makes for potentially confusing `!Send` errors.
// Tokio also recommend this as it's generally better for avoiding deadlocks and performance - https://tokio.rs/tokio/tutorial/shared-state#holding-a-mutexguard-across-an-await
// We do `PoisonError::into_inner` as that is effectively what `tokio::sync::RwLock` does internally, and if it's fine for them, it's fine for us!
self.config
.read()
.unwrap_or_else(PoisonError::into_inner)
.clone()
pub async fn config(&self) -> LibraryConfig {
self.config.read().await.clone()
}
pub fn config_mut(&self) -> RwLockWriteGuard<'_, LibraryConfig> {
self.config.write().unwrap_or_else(PoisonError::into_inner)
pub async fn update_config(
&self,
update_fn: impl FnOnce(&mut LibraryConfig),
config_path: impl AsRef<Path>,
) -> Result<(), LibraryManagerError> {
let mut config = self.config.write().await;
update_fn(&mut config);
config.save(config_path).await.map_err(Into::into)
}
// TODO: Remove this once we replace the old invalidation system
@ -146,7 +148,7 @@ impl Library {
.find_many(vec![
// TODO(N): This isn't gonna work with removable media and this will likely permanently break if the DB is restored from a backup.
file_path::location::is(vec![location::instance_id::equals(Some(
self.config().instance_id,
self.config().await.instance_id,
))]),
file_path::id::in_vec(ids),
])

View file

@ -1,10 +1,10 @@
use crate::{
library::LibraryConfigError,
location::{indexer, LocationManagerError},
p2p::IdentityOrRemoteIdentityErr,
util::{
db::{self, MissingFieldError},
error::{FileIOError, NonUtf8PathError},
migrator::MigratorError,
},
};
@ -13,24 +13,18 @@ use tracing::error;
#[derive(Error, Debug)]
pub enum LibraryManagerError {
#[error(transparent)]
FileIO(#[from] FileIOError),
#[error("error serializing or deserializing the JSON in the config file: {0}")]
Json(#[from] serde_json::Error),
#[error("database error: {0}")]
Database(#[from] prisma_client_rust::QueryError),
#[error("library not found error")]
LibraryNotFound,
#[error("error migrating the config file: {0}")]
Migration(String),
#[error("failed to parse uuid: {0}")]
Uuid(#[from] uuid::Error),
#[error("failed to run indexer rules seeder: {0}")]
IndexerRulesSeeder(#[from] indexer::rules::seed::SeederError),
// #[error("failed to initialise the key manager: {0}")]
// KeyManager(#[from] sd_crypto::Error),
#[error("failed to run library migrations: {0}")]
MigratorError(#[from] MigratorError),
#[error("error migrating the library: {0}")]
MigrationError(#[from] db::MigrationError),
#[error("invalid library configuration: {0}")]
@ -47,6 +41,11 @@ pub enum LibraryManagerError {
CurrentInstanceNotFound(String),
#[error("missing-field: {0}")]
MissingField(#[from] MissingFieldError),
#[error(transparent)]
FileIO(#[from] FileIOError),
#[error(transparent)]
LibraryConfig(#[from] LibraryConfigError),
}
impl From<LibraryManagerError> for rspc::Error {

View file

@ -13,13 +13,16 @@ use crate::{
util::{
db,
error::{FileIOError, NonUtf8PathError},
migrator::Migrate,
mpscrr, MaybeUndefined,
},
volume::watcher::spawn_volume_watcher,
Node,
};
use sd_core_sync::SyncMessage;
use sd_p2p::spacetunnel::Identity;
use sd_prisma::prisma::instance;
use std::{
collections::HashMap,
path::{Path, PathBuf},
@ -29,9 +32,6 @@ use std::{
use chrono::Utc;
use futures_concurrency::future::{Join, TryJoin};
use sd_core_sync::SyncMessage;
use sd_p2p::spacetunnel::Identity;
use sd_prisma::prisma::instance;
use tokio::{fs, io, sync::RwLock};
use tracing::{debug, error, info, warn};
use uuid::Uuid;
@ -165,15 +165,16 @@ impl Libraries {
));
}
let config = LibraryConfig {
let config_path = self.libraries_dir.join(format!("{id}.sdlibrary"));
let config = LibraryConfig::new(
name,
description,
// First instance will be zero
instance_id: 0,
};
let config_path = self.libraries_dir.join(format!("{id}.sdlibrary"));
config.save(&config_path)?;
0,
&config_path,
)
.await?;
debug!(
"Created library '{}' config at '{}'",
@ -238,29 +239,35 @@ impl Libraries {
) -> Result<(), LibraryManagerError> {
// check library is valid
let libraries = self.libraries.read().await;
let library = libraries
.get(&id)
.ok_or(LibraryManagerError::LibraryNotFound)?;
let library = Arc::clone(
libraries
.get(&id)
.ok_or(LibraryManagerError::LibraryNotFound)?,
);
{
let mut config = library.config_mut();
// update the library
if let Some(name) = name {
config.name = name;
}
match description {
MaybeUndefined::Undefined => {}
MaybeUndefined::Null => config.description = None,
MaybeUndefined::Value(description) => config.description = Some(description),
}
LibraryConfig::save(&config, &self.libraries_dir.join(format!("{id}.sdlibrary")))?;
}
library
.update_config(
|config| {
// update the library
if let Some(name) = name {
config.name = name;
}
match description {
MaybeUndefined::Undefined => {}
MaybeUndefined::Null => config.description = None,
MaybeUndefined::Value(description) => {
config.description = Some(description)
}
}
},
self.libraries_dir.join(format!("{id}.sdlibrary")),
)
.await?;
self.tx
.emit(LibraryManagerEvent::Edit(library.clone()))
.emit(LibraryManagerEvent::Edit(Arc::clone(&library)))
.await;
invalidate_query!(library, "library.list");
Ok(())
@ -356,12 +363,14 @@ impl Libraries {
self: &Arc<Self>,
id: Uuid,
db_path: impl AsRef<Path>,
config_path: PathBuf,
config_path: impl AsRef<Path>,
create: Option<instance::Create>,
should_seed: bool,
node: &Arc<Node>,
) -> Result<Arc<Library>, LibraryManagerError> {
let db_path = db_path.as_ref();
let config_path = config_path.as_ref();
let db_url = format!(
"file:{}?socket_timeout=15&connection_limit=1",
db_path.as_os_str().to_str().ok_or_else(|| {
@ -375,9 +384,7 @@ impl Libraries {
}
let node_config = node.config.get().await;
let config =
LibraryConfig::load_and_migrate(&config_path, &(node_config.clone(), db.clone()))
.await?;
let config = LibraryConfig::load(config_path, &node_config, &db).await?;
let instance = db
.instance()

View file

@ -32,7 +32,7 @@ pub(super) async fn check_online(
let location_path = maybe_missing(&location.path, "location.path").map(Path::new)?;
// TODO(N): This isn't gonna work with removable media and this will likely permanently break if the DB is restored from a backup.
if location.instance_id == Some(library.config().instance_id) {
if location.instance_id == Some(library.config().await.instance_id) {
match fs::metadata(&location_path).await {
Ok(_) => {
node.locations.add_online(pub_id).await;
@ -147,7 +147,7 @@ pub(super) async fn handle_remove_location_request(
let key = (location_id, library.id);
if let Some(location) = get_location(location_id, &library).await {
// TODO(N): This isn't gonna work with removable media and this will likely permanently break if the DB is restored from a backup.
if location.instance_id == Some(library.config().instance_id) {
if location.instance_id == Some(library.config().await.instance_id) {
unwatch_location(location, library.id, locations_watched, locations_unwatched);
locations_unwatched.remove(&key);
forced_unwatch.remove(&key);

View file

@ -527,7 +527,7 @@ impl Locations {
to_remove.remove(&key);
} else if let Some(location) = get_location(location_id, &library).await {
// TODO(N): This isn't gonna work with removable media and this will likely permanently break if the DB is restored from a backup.
if location.instance_id == Some(library.config().instance_id) {
if location.instance_id == Some(library.config().await.instance_id) {
let is_online = match check_online(&location, &node, &library).await {
Ok(is_online) => is_online,
Err(e) => {

View file

@ -350,7 +350,7 @@ impl LocationUpdateArgs {
.await?;
// TODO(N): This will probs fall apart with removable media.
if location.instance_id == Some(library.config().instance_id) {
if location.instance_id == Some(library.config().await.instance_id) {
if let Some(path) = &location.path {
if let Some(mut metadata) =
SpacedriveLocationMetadataFile::try_load(path).await?
@ -443,7 +443,7 @@ pub async fn scan_location(
location: location_with_indexer_rules::Data,
) -> Result<(), JobManagerError> {
// TODO(N): This isn't gonna work with removable media and this will likely permanently break if the DB is restored from a backup.
if location.instance_id != Some(library.config().instance_id) {
if location.instance_id != Some(library.config().await.instance_id) {
return Ok(());
}
@ -479,7 +479,7 @@ pub async fn scan_location_sub_path(
let sub_path = sub_path.as_ref().to_path_buf();
// TODO(N): This isn't gonna work with removable media and this will likely permanently break if the DB is restored from a backup.
if location.instance_id != Some(library.config().instance_id) {
if location.instance_id != Some(library.config().await.instance_id) {
return Ok(());
}
@ -518,7 +518,7 @@ pub async fn light_scan_location(
let sub_path = sub_path.as_ref().to_path_buf();
// TODO(N): This isn't gonna work with removable media and this will likely permanently break if the DB is restored from a backup.
if location.instance_id != Some(library.config().instance_id) {
if location.instance_id != Some(library.config().await.instance_id) {
return Ok(());
}
@ -691,7 +691,7 @@ async fn create_location(
location::name::set(Some(name.clone())),
location::path::set(Some(path)),
location::date_created::set(Some(date_created.into())),
location::instance_id::set(Some(library.config().instance_id)),
location::instance_id::set(Some(library.config().await.instance_id)),
// location::instance::connect(instance::id::equals(
// library.config.instance_id.as_bytes().to_vec(),
// )),
@ -754,7 +754,7 @@ pub async fn delete_location(
// TODO: This should really be queued to the proper node so it will always run
// TODO: Deal with whether a location is online or not
// TODO(N): This isn't gonna work with removable media and this will likely permanently break if the DB is restored from a backup.
if location.instance_id == Some(library.config().instance_id) {
if location.instance_id == Some(library.config().await.instance_id) {
if let Some(path) = &location.path {
if let Ok(Some(mut metadata)) = SpacedriveLocationMetadataFile::try_load(path).await {
metadata

View file

@ -1,18 +1,32 @@
use crate::{
api::{notifications::Notification, BackendFeature},
auth::OAuthToken,
object::media::thumbnail::preferences::ThumbnailerPreferences,
util::{
error::FileIOError,
version_manager::{Kind, ManagedVersion, VersionManager, VersionManagerError},
},
};
use sd_p2p::{Keypair, ManagerConfig};
use serde::{Deserialize, Serialize};
use serde_json::{Map, Value};
use std::{
path::{Path, PathBuf},
sync::Arc,
};
use tokio::sync::{RwLock, RwLockWriteGuard};
use uuid::Uuid;
use crate::{
api::{notifications::Notification, BackendFeature},
auth::OAuthToken,
util::migrator::{Migrate, MigratorError},
use int_enum::IntEnum;
use serde::{Deserialize, Serialize};
use serde_json::{json, Map, Value};
use serde_repr::{Deserialize_repr, Serialize_repr};
use specta::Type;
use thiserror::Error;
use tokio::{
fs,
sync::{watch, RwLock},
};
use tracing::error;
use uuid::Uuid;
/// NODE_STATE_CONFIG_NAME is the name of the file which stores the NodeState
pub const NODE_STATE_CONFIG_NAME: &str = "node_state.sdconfig";
@ -38,122 +52,233 @@ pub struct NodeConfig {
pub features: Vec<BackendFeature>,
/// Authentication for Spacedrive Accounts
pub auth_token: Option<OAuthToken>,
/// The aggreagation of many different preferences for the node
pub preferences: NodePreferences,
version: NodeConfigVersion,
}
#[async_trait::async_trait]
impl Migrate for NodeConfig {
const CURRENT_VERSION: u32 = 1;
#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq, Eq, Type)]
pub struct NodePreferences {
pub thumbnailer: ThumbnailerPreferences,
}
type Ctx = ();
#[derive(
IntEnum, Debug, Clone, Copy, Eq, PartialEq, strum::Display, Serialize_repr, Deserialize_repr,
)]
#[repr(u64)]
pub enum NodeConfigVersion {
V0 = 0,
V1 = 1,
V2 = 2,
}
fn default(_path: PathBuf) -> Result<Self, MigratorError> {
impl ManagedVersion<NodeConfigVersion> for NodeConfig {
const LATEST_VERSION: NodeConfigVersion = NodeConfigVersion::V2;
const KIND: Kind = Kind::Json("version");
type MigrationError = NodeConfigError;
fn from_latest_version() -> Option<Self> {
let mut name = match hostname::get() {
// SAFETY: This is just for display purposes so it doesn't matter if it's lossy
Ok(hostname) => hostname.to_string_lossy().into_owned(),
Err(err) => {
eprintln!("Falling back to default node name as an error occurred getting your systems hostname: '{err}'");
Err(e) => {
error!("Falling back to default node name as an error occurred getting your systems hostname: '{e:#?}'");
"my-spacedrive".into()
}
};
name.truncate(250);
Ok(Self {
Some(Self {
id: Uuid::new_v4(),
name,
keypair: Keypair::generate(),
p2p: Default::default(),
version: Self::LATEST_VERSION,
p2p: ManagerConfig::default(),
features: vec![],
notifications: vec![],
auth_token: None,
preferences: NodePreferences::default(),
})
}
}
async fn migrate(
from_version: u32,
config: &mut Map<String, Value>,
_ctx: &Self::Ctx,
) -> Result<(), MigratorError> {
match from_version {
0 => Ok(()),
1 => {
// All where never hooked up to the UI
config.remove("p2p_email");
config.remove("p2p_img_url");
config.remove("p2p_port");
impl NodeConfig {
pub async fn load(path: impl AsRef<Path>) -> Result<Self, NodeConfigError> {
let path = path.as_ref();
VersionManager::<Self, NodeConfigVersion>::migrate_and_load(
path,
|current, next| async move {
match (current, next) {
(NodeConfigVersion::V0, NodeConfigVersion::V1) => {
let mut config: Map<String, Value> =
serde_json::from_slice(&fs::read(path).await.map_err(|e| {
FileIOError::from((
path,
e,
"Failed to read node config file for migration",
))
})?)
.map_err(VersionManagerError::SerdeJson)?;
// In a recent PR I screwed up Serde `default` so P2P was disabled by default, prior it was always enabled.
// Given the config for it is behind a feature flag (so no one would have changed it) this fixes the default.
if let Some(Value::Object(obj)) = config.get_mut("p2p") {
obj.insert("enabled".into(), Value::Bool(true));
// All were never hooked up to the UI
config.remove("p2p_email");
config.remove("p2p_img_url");
config.remove("p2p_port");
// In a recent PR I screwed up Serde `default` so P2P was disabled by default, prior it was always enabled.
// Given the config for it is behind a feature flag (so no one would have changed it) this fixes the default.
if let Some(Value::Object(obj)) = config.get_mut("p2p") {
obj.insert("enabled".into(), Value::Bool(true));
}
fs::write(
path,
serde_json::to_vec(&config).map_err(VersionManagerError::SerdeJson)?,
)
.await
.map_err(|e| FileIOError::from((path, e)))?;
}
(NodeConfigVersion::V1, NodeConfigVersion::V2) => {
let mut config: Map<String, Value> =
serde_json::from_slice(&fs::read(path).await.map_err(|e| {
FileIOError::from((
path,
e,
"Failed to read node config file for migration",
))
})?)
.map_err(VersionManagerError::SerdeJson)?;
config.insert(
String::from("preferences"),
json!(NodePreferences::default()),
);
let a =
serde_json::to_vec(&config).map_err(VersionManagerError::SerdeJson)?;
fs::write(path, a)
.await
.map_err(|e| FileIOError::from((path, e)))?;
}
_ => {
error!("Node config version is not handled: {:?}", current);
return Err(VersionManagerError::UnexpectedMigration {
current_version: current.int_value(),
next_version: next.int_value(),
}
.into());
}
}
Ok(())
}
v => unreachable!("Missing migration for library version {}", v),
}
}
}
impl Default for NodeConfig {
fn default() -> Self {
NodeConfig {
id: Uuid::new_v4(),
name: match hostname::get() {
// SAFETY: This is just for display purposes so it doesn't matter if it's lossy
Ok(hostname) => hostname.to_string_lossy().into_owned(),
Err(err) => {
eprintln!("Falling back to default node name as an error occurred getting your systems hostname: '{err}'");
"my-spacedrive".into()
}
},
keypair: Keypair::generate(),
p2p: Default::default(),
features: vec![],
notifications: vec![],
auth_token: None,
}
)
.await
}
async fn save(&self, path: impl AsRef<Path>) -> Result<(), NodeConfigError> {
let path = path.as_ref();
fs::write(path, serde_json::to_vec(self)?)
.await
.map_err(|e| FileIOError::from((path, e)))?;
Ok(())
}
}
pub struct Manager(RwLock<NodeConfig>, PathBuf);
pub struct Manager {
config: RwLock<NodeConfig>,
data_directory_path: PathBuf,
config_file_path: PathBuf,
preferences_watcher_tx: watch::Sender<NodePreferences>,
}
impl Manager {
/// new will create a new NodeConfigManager with the given path to the config file.
pub(crate) async fn new(data_path: PathBuf) -> Result<Arc<Self>, MigratorError> {
Ok(Arc::new(Self(
RwLock::new(NodeConfig::load_and_migrate(&Self::path(&data_path), &()).await?),
data_path,
)))
}
pub(crate) async fn new(
data_directory_path: impl AsRef<Path>,
) -> Result<Arc<Self>, NodeConfigError> {
let data_directory_path = data_directory_path.as_ref().to_path_buf();
let config_file_path = data_directory_path.join(NODE_STATE_CONFIG_NAME);
fn path(base_path: &Path) -> PathBuf {
base_path.join(NODE_STATE_CONFIG_NAME)
let config = NodeConfig::load(&config_file_path).await?;
let (preferences_watcher_tx, _preferences_watcher_rx) =
watch::channel(config.preferences.clone());
Ok(Arc::new(Self {
config: RwLock::new(config),
data_directory_path,
config_file_path,
preferences_watcher_tx,
}))
}
/// get will return the current NodeConfig in a read only state.
pub(crate) async fn get(&self) -> NodeConfig {
self.0.read().await.clone()
self.config.read().await.clone()
}
/// get a node config preferences watcher receiver
pub(crate) fn preferences_watcher(&self) -> watch::Receiver<NodePreferences> {
self.preferences_watcher_tx.subscribe()
}
/// data_directory returns the path to the directory storing the configuration data.
pub(crate) fn data_directory(&self) -> PathBuf {
self.1.clone()
self.data_directory_path.clone()
}
/// write allows the user to update the configuration. This is done in a closure while a Mutex lock is held so that the user can't cause a race condition if the config were to be updated in multiple parts of the app at the same time.
pub(crate) async fn write<F: FnOnce(RwLockWriteGuard<NodeConfig>)>(
pub(crate) async fn write<F: FnOnce(&mut NodeConfig)>(
&self,
mutation_fn: F,
) -> Result<NodeConfig, MigratorError> {
mutation_fn(self.0.write().await);
let config = self.0.read().await;
Self::save(&self.1, &config)?;
Ok(config.clone())
) -> Result<NodeConfig, NodeConfigError> {
let mut config = self.config.write().await;
mutation_fn(&mut config);
self.preferences_watcher_tx.send_if_modified(|current| {
let modified = current != &config.preferences;
if modified {
*current = config.preferences.clone();
}
modified
});
config
.save(&self.config_file_path)
.await
.map(|()| config.clone())
}
/// save will write the configuration back to disk
fn save(base_path: &Path, config: &NodeConfig) -> Result<(), MigratorError> {
NodeConfig::save(config, &Self::path(base_path))?;
Ok(())
/// update_preferences allows the user to update the preferences of the node
pub(crate) async fn update_preferences(
&self,
update_fn: impl FnOnce(&mut NodePreferences),
) -> Result<(), NodeConfigError> {
let mut config = self.config.write().await;
update_fn(&mut config.preferences);
self.preferences_watcher_tx
.send_replace(config.preferences.clone());
config.save(&self.config_file_path).await
}
}
#[derive(Error, Debug)]
pub enum NodeConfigError {
#[error(transparent)]
SerdeJson(#[from] serde_json::Error),
#[error(transparent)]
VersionManager(#[from] VersionManagerError<NodeConfigVersion>),
#[error(transparent)]
FileIO(#[from] FileIOError),
}

View file

@ -10,10 +10,12 @@ use crate::{
file_path_for_media_processor, IsolatedFilePathData,
},
prisma::{location, PrismaClient},
util::db::{maybe_missing, MissingFieldError},
util::db::maybe_missing,
Node,
};
use sd_file_ext::extensions::Extension;
use std::{
hash::Hash,
path::{Path, PathBuf},
@ -25,7 +27,6 @@ use async_channel as chan;
use futures::StreamExt;
use itertools::Itertools;
use prisma_client_rust::{raw, PrismaValue};
use sd_file_ext::extensions::Extension;
use serde::{Deserialize, Serialize};
use serde_json::json;
use tokio::time::sleep;
@ -253,7 +254,7 @@ impl StatefulJob for MediaProcessorJobInit {
if progress_rx.is_closed() && total_completed < *total_thumbs as u32 {
warn!(
"Thumbnailer progress reporter channel closed before all thumbnails were
"Thumbnailer progress reporter channel closed before all thumbnails were \
processed, job will wait a bit waiting for a shutdown signal from manager"
);
sleep(Duration::from_secs(5)).await;
@ -299,7 +300,7 @@ async fn dispatch_thumbnails_for_processing(
let location_path = location_path.as_ref();
let file_paths = get_all_children_files_by_extensions(
let mut file_paths = get_all_children_files_by_extensions(
db,
parent_iso_file_path,
&thumbnail::ALL_THUMBNAILABLE_EXTENSIONS,
@ -310,83 +311,57 @@ async fn dispatch_thumbnails_for_processing(
return Ok(0);
}
let mut current_batch = Vec::with_capacity(16);
let first_materialized_path = file_paths[0].materialized_path.clone();
// PDF thumbnails are currently way slower so we process them by last
let mut pdf_thumbs = Vec::with_capacity(16);
// Only the first materialized_path should be processed in foreground
let different_materialized_path_idx = file_paths
.iter()
.position(|file_path| file_path.materialized_path != first_materialized_path);
let mut current_materialized_path = None;
let background_thumbs_args = different_materialized_path_idx
.map(|idx| {
file_paths
.split_off(idx)
.into_iter()
.filter_map(|file_path| prepare_args(location_id, location_path, file_path))
.collect::<Vec<_>>()
})
.unwrap_or_default();
let mut in_background = false;
let foreground_thumbs_args = file_paths
.into_iter()
.filter_map(|file_path| prepare_args(location_id, location_path, file_path))
.collect::<Vec<_>>();
let mut thumbs_count = 0;
let thumbs_count = background_thumbs_args.len() + foreground_thumbs_args.len();
for file_path in file_paths {
// Initializing current_materialized_path with the first file_path materialized_path
if current_materialized_path.is_none() {
current_materialized_path = file_path.materialized_path.clone();
}
debug!(
"Dispatching {thumbs_count} thumbnails to be processed, {} in foreground and {} in background",
foreground_thumbs_args.len(),
background_thumbs_args.len()
);
if file_path.materialized_path != current_materialized_path
&& (!current_batch.is_empty() || !pdf_thumbs.is_empty())
{
// Now we found a different materialized_path so we dispatch the current batch and start a new one
thumbs_count += current_batch.len() as u32;
node.thumbnailer
.new_indexed_thumbnails_batch_with_ticket(
BatchToProcess::new(current_batch, should_regenerate, in_background),
library.id,
location_id,
)
.await;
// We moved our vec so we need a new
current_batch = Vec::with_capacity(16);
in_background = true; // Only the first batch should be processed in foreground
// Exchaging for the first different materialized_path
current_materialized_path = file_path.materialized_path.clone();
}
let file_path_id = file_path.id;
if let Err(e) = add_to_batch(
location_id,
location_path,
file_path,
&mut current_batch,
&mut pdf_thumbs,
) {
error!("Error adding file_path <id='{file_path_id}'> to thumbnail batch: {e:#?}");
}
}
// Dispatching the last batch
if !current_batch.is_empty() {
thumbs_count += current_batch.len() as u32;
if !foreground_thumbs_args.is_empty() {
node.thumbnailer
.new_indexed_thumbnails_batch_with_ticket(
BatchToProcess::new(current_batch, should_regenerate, in_background),
.new_indexed_thumbnails_tracked_batch(
BatchToProcess::new(foreground_thumbs_args, should_regenerate, false),
library.id,
location_id,
)
.await;
}
// We now put the pdf_thumbs to be processed by last
if !pdf_thumbs.is_empty() {
thumbs_count += pdf_thumbs.len() as u32;
if !background_thumbs_args.is_empty() {
node.thumbnailer
.new_indexed_thumbnails_batch_with_ticket(
BatchToProcess::new(pdf_thumbs, should_regenerate, in_background),
.new_indexed_thumbnails_tracked_batch(
BatchToProcess::new(background_thumbs_args, should_regenerate, true),
library.id,
location_id,
)
.await;
}
Ok(thumbs_count)
Ok(thumbs_count as u32)
}
async fn get_files_for_media_data_extraction(
@ -440,26 +415,27 @@ async fn get_all_children_files_by_extensions(
.map_err(Into::into)
}
fn add_to_batch(
fn prepare_args(
location_id: location::id::Type,
location_path: &Path, // This function is only used internally once, so we can pass &Path as a parameter
file_path: file_path_for_media_processor::Data,
current_batch: &mut Vec<GenerateThumbnailArgs>,
pdf_thumbs: &mut Vec<GenerateThumbnailArgs>,
) -> Result<(), MissingFieldError> {
let cas_id = maybe_missing(&file_path.cas_id, "file_path.cas_id")?.clone();
) -> Option<GenerateThumbnailArgs> {
let file_path_id = file_path.id;
let iso_file_path = IsolatedFilePathData::try_from((location_id, file_path))?;
let full_path = location_path.join(&iso_file_path);
let Ok(cas_id) = maybe_missing(&file_path.cas_id, "file_path.cas_id").cloned() else {
error!("Missing cas_id for file_path <id='{file_path_id}'>");
return None;
};
let extension = iso_file_path.extension();
let args = GenerateThumbnailArgs::new(extension.to_string(), cas_id, full_path);
let Ok(iso_file_path) = IsolatedFilePathData::try_from((location_id, file_path)).map_err(|e| {
error!("Failed to extract isolated file path data from file path <id='{file_path_id}'>: {e:#?}");
}) else {
return None;
};
if extension != "pdf" {
current_batch.push(args);
} else {
pdf_thumbs.push(args);
}
Ok(())
Some(GenerateThumbnailArgs::new(
iso_file_path.extension().to_string(),
cas_id,
location_path.join(&iso_file_path),
))
}

View file

@ -12,10 +12,7 @@ use crate::{
Node,
};
use std::{
cmp::Ordering,
path::{Path, PathBuf},
};
use std::path::{Path, PathBuf};
use itertools::Itertools;
use prisma_client_rust::{raw, PrismaValue};
@ -173,23 +170,8 @@ async fn dispatch_thumbnails_for_processing(
.map(|(cas_id, iso_file_path)| {
let full_path = location_path.join(&iso_file_path);
let extension = iso_file_path.extension().to_string();
(
GenerateThumbnailArgs::new(extension.clone(), cas_id, full_path),
extension,
)
GenerateThumbnailArgs::new(iso_file_path.extension().to_string(), cas_id, full_path)
})
.sorted_by(|(_, ext_a), (_, ext_b)|
// This will put PDF files by last as they're currently way slower to be processed
// FIXME(fogodev): Remove this sort when no longer needed
match (*ext_a == "pdf", *ext_b == "pdf") {
(true, true) => Ordering::Equal,
(false, true) => Ordering::Less,
(true, false) => Ordering::Greater,
(false, false) => Ordering::Equal,
})
.map(|(args, _)| args)
.collect::<Vec<_>>();
// Let's not send an empty batch lol

View file

@ -28,6 +28,7 @@ pub fn media_data_image_to_query(
})
}
#[cfg(feature = "location-watcher")]
pub fn media_data_image_to_query_params(
mdi: ImageMetadata,
) -> Result<Vec<SetParam>, MediaDataError> {

View file

@ -1,6 +1,7 @@
use crate::{
api::CoreEvent,
library::{Libraries, LibraryId, LibraryManagerEvent},
node::config::NodePreferences,
util::error::{FileIOError, NonUtf8PathError},
};
@ -16,10 +17,10 @@ use once_cell::sync::OnceCell;
use thiserror::Error;
use tokio::{
fs, spawn,
sync::{broadcast, oneshot, Mutex},
sync::{broadcast, oneshot, watch, Mutex},
time::{sleep, Instant},
};
use tracing::{debug, error};
use tracing::error;
use uuid::Uuid;
use super::{
@ -30,7 +31,7 @@ use super::{
BatchToProcess, ThumbnailKind, ThumbnailerError, ONE_SEC, THUMBNAIL_CACHE_DIR_NAME,
};
static BATCH_SIZE: OnceCell<usize> = OnceCell::new();
static AVAILABLE_PARALLELISM: OnceCell<usize> = OnceCell::new();
#[derive(Error, Debug)]
pub(super) enum ActorError {
@ -74,6 +75,7 @@ impl Thumbnailer {
data_dir: PathBuf,
libraries_manager: Arc<Libraries>,
reporter: broadcast::Sender<CoreEvent>,
node_preferences_rx: watch::Receiver<NodePreferences>,
) -> Self {
let thumbnails_directory = Arc::new(
init_thumbnail_dir(&data_dir, Arc::clone(&libraries_manager))
@ -93,17 +95,13 @@ impl Thumbnailer {
let (cas_ids_to_delete_tx, cas_ids_to_delete_rx) = chan::bounded(16);
let (cancel_tx, cancel_rx) = chan::bounded(1);
BATCH_SIZE
AVAILABLE_PARALLELISM
.set(std::thread::available_parallelism().map_or_else(
|e| {
error!("Failed to get available parallelism: {e:#?}");
4
},
|non_zero| {
let count = non_zero.get();
debug!("Thumbnailer will process batches of {count} thumbnails in parallel.");
count
},
|non_zero| non_zero.get(),
))
.ok();
@ -112,12 +110,14 @@ impl Thumbnailer {
let cancel_rx = cancel_rx.clone();
let thumbnails_directory = Arc::clone(&thumbnails_directory);
let reporter = reporter.clone();
let node_preferences = node_preferences_rx.clone();
async move {
while let Err(e) = spawn(worker(
*BATCH_SIZE
*AVAILABLE_PARALLELISM
.get()
.expect("BATCH_SIZE is set at thumbnailer new method"),
node_preferences.clone(),
reporter.clone(),
thumbnails_directory.clone(),
WorkerChannels {
@ -227,7 +227,7 @@ impl Thumbnailer {
}
#[inline]
pub async fn new_indexed_thumbnails_batch_with_ticket(
pub async fn new_indexed_thumbnails_tracked_batch(
&self,
mut batch: BatchToProcess,
library_id: LibraryId,

View file

@ -3,11 +3,12 @@ use crate::{
object::media::thumbnail::ONE_SEC,
util::{
error::FileIOError,
version_manager::{ManagedVersion, VersionManager, VersionManagerError},
version_manager::{Kind, ManagedVersion, VersionManager, VersionManagerError},
},
};
use sd_prisma::prisma::{file_path, PrismaClient};
use serde_repr::{Deserialize_repr, Serialize_repr};
use std::{
collections::{HashMap, HashSet},
@ -28,18 +29,26 @@ use super::{
VERSION_FILE, WEBP_EXTENSION,
};
#[derive(IntEnum, Debug, Clone, Copy, Eq, PartialEq, strum::Display)]
#[repr(i32)]
enum ThumbnailVersion {
#[derive(
IntEnum, Debug, Clone, Copy, Eq, PartialEq, strum::Display, Serialize_repr, Deserialize_repr,
)]
#[repr(u64)]
pub enum ThumbnailVersion {
V1 = 1,
V2 = 2,
V3 = 3,
Unknown = 0,
}
impl ManagedVersion for ThumbnailVersion {
impl ManagedVersion<Self> for ThumbnailVersion {
const LATEST_VERSION: Self = Self::V3;
const KIND: Kind = Kind::PlainText;
type MigrationError = ThumbnailerError;
fn from_latest_version() -> Option<Self> {
Some(Self::LATEST_VERSION)
}
}
pub(super) async fn init_thumbnail_dir(
@ -96,9 +105,6 @@ async fn process_migration(
) -> Result<(), ThumbnailerError> {
let thumbnails_directory = thumbnails_directory.as_ref();
let version_manager =
VersionManager::<ThumbnailVersion>::new(thumbnails_directory.join(VERSION_FILE));
// create all other directories, for each library and for ephemeral thumbnails
databases
.keys()
@ -115,52 +121,33 @@ async fn process_migration(
.into_iter()
.collect::<Result<Vec<_>, _>>()?;
let current_version = match version_manager.get_version().await {
Ok(version) => version,
Err(e) => {
debug!("Thumbnail version file does not exist, starting fresh: {e:#?}");
// Version file does not exist, start fresh
version_manager.set_version(ThumbnailVersion::V1).await?;
ThumbnailVersion::V1
}
};
VersionManager::<ThumbnailVersion, ThumbnailVersion>::migrate_and_load(
thumbnails_directory.join(VERSION_FILE),
|current, next| {
let databases = &databases;
async move {
match (current, next) {
(ThumbnailVersion::V1, ThumbnailVersion::V2) => {
move_to_shards(thumbnails_directory).await
}
(ThumbnailVersion::V2, ThumbnailVersion::V3) => {
segregate_thumbnails_by_library(thumbnails_directory, databases).await
}
if current_version != ThumbnailVersion::LATEST_VERSION {
info!(
"Migrating thumbnail directory from {:?} to V3",
current_version
);
// Taking a reference to databases so we can move it into the closure and comply with the borrowck
let databases = &databases;
version_manager
.migrate(current_version, |current, next| {
let thumbnail_dir = &thumbnails_directory;
async move {
match (current, next) {
(ThumbnailVersion::V1, ThumbnailVersion::V2) => {
move_to_shards(thumbnail_dir).await
}
(ThumbnailVersion::V2, ThumbnailVersion::V3) => {
segregate_thumbnails_by_library(thumbnail_dir, databases).await
}
_ => {
error!("Thumbnail version is not handled: {:?}", current);
Err(VersionManagerError::UnexpectedMigration {
current_version: current.int_value(),
next_version: next.int_value(),
}
.into())
_ => {
error!("Thumbnail version is not handled: {:?}", current);
Err(VersionManagerError::UnexpectedMigration {
current_version: current.int_value(),
next_version: next.int_value(),
}
.into())
}
}
})
.await?;
}
Ok(())
}
},
)
.await
.map(|_| ())
}
/// This function moves all webp files in the thumbnail directory to their respective shard folders.

View file

@ -25,6 +25,7 @@ use tracing::error;
pub mod actor;
mod clean_up;
mod directory;
pub mod preferences;
mod process;
mod shard;
mod state;
@ -33,6 +34,8 @@ mod worker;
pub use process::{BatchToProcess, GenerateThumbnailArgs};
pub use shard::get_shard_hex;
use directory::ThumbnailVersion;
// Files names constants
const THUMBNAIL_CACHE_DIR_NAME: &str = "thumbnails";
const SAVE_STATE_FILE: &str = "thumbs_to_process.bin";
@ -148,7 +151,7 @@ pub enum ThumbnailerError {
#[error(transparent)]
FileIO(#[from] FileIOError),
#[error(transparent)]
VersionManager(#[from] VersionManagerError),
VersionManager(#[from] VersionManagerError<ThumbnailVersion>),
#[error("failed to encode webp")]
WebPEncoding { path: Box<Path>, reason: String },
#[error("error while converting the image")]

View file

@ -0,0 +1,34 @@
use serde::{Deserialize, Serialize};
use specta::Type;
#[derive(Debug, Deserialize, Serialize, Clone, PartialEq, Eq, Type)]
pub struct ThumbnailerPreferences {
background_processing_percentage: u8, // 0-100
}
impl Default for ThumbnailerPreferences {
fn default() -> Self {
Self {
background_processing_percentage: 50, // 50% of CPU cores available
}
}
}
impl ThumbnailerPreferences {
pub fn background_processing_percentage(&self) -> u8 {
self.background_processing_percentage
}
pub fn set_background_processing_percentage(
&mut self,
mut background_processing_percentage: u8,
) -> &mut Self {
if background_processing_percentage > 100 {
background_processing_percentage = 100;
}
self.background_processing_percentage = background_processing_percentage;
self
}
}

View file

@ -20,17 +20,18 @@ use image::{self, imageops, DynamicImage, GenericImageView};
use serde::{Deserialize, Serialize};
use tokio::{
fs, io,
sync::{broadcast, oneshot},
sync::{broadcast, oneshot, Semaphore},
task::{spawn, spawn_blocking},
time::timeout,
};
use tracing::{error, trace, warn};
use tokio_stream::StreamExt;
use tracing::{debug, error, trace, warn};
use webp::Encoder;
use super::{
can_generate_thumbnail_for_document, can_generate_thumbnail_for_image, get_thumb_key,
shard::get_shard_hex, ThumbnailKind, ThumbnailerError, EPHEMERAL_DIR, TARGET_PX,
TARGET_QUALITY, THIRTY_SECS, WEBP_EXTENSION,
preferences::ThumbnailerPreferences, shard::get_shard_hex, ThumbnailKind, ThumbnailerError,
EPHEMERAL_DIR, TARGET_PX, TARGET_QUALITY, THIRTY_SECS, WEBP_EXTENSION,
};
#[derive(Debug, Serialize, Deserialize)]
@ -98,10 +99,23 @@ pub(super) async fn batch_processor(
}: ProcessorControlChannels,
leftovers_tx: chan::Sender<(BatchToProcess, ThumbnailKind)>,
reporter: broadcast::Sender<CoreEvent>,
batch_size: usize,
(available_parallelism, thumbnailer_preferences): (usize, ThumbnailerPreferences),
) {
trace!(
"Processing thumbnails batch of kind {kind:?} with size {} in {}",
let in_parallel_count = if !in_background {
available_parallelism
} else {
usize::max(
// If the user sets the background processing percentage to 0, we still want to process at least sequentially
thumbnailer_preferences.background_processing_percentage() as usize
* available_parallelism
/ 100,
1,
)
};
debug!(
"Processing thumbnails batch of kind {kind:?} with size {} in {}, \
at most {in_parallel_count} thumbnails at a time",
batch.len(),
if in_background {
"background"
@ -110,6 +124,10 @@ pub(super) async fn batch_processor(
},
);
let semaphore = Arc::new(Semaphore::new(in_parallel_count));
let batch_size = batch.len();
// Tranforming to `VecDeque` so we don't need to move anything as we consume from the beginning
// This from is guaranteed to be O(1)
let mut queue = VecDeque::from(batch);
@ -119,20 +137,38 @@ pub(super) async fn batch_processor(
Stop(oneshot::Sender<()>),
}
while !queue.is_empty() {
let chunk = (0..batch_size)
.filter_map(|_| queue.pop_front())
.map(
|GenerateThumbnailArgs {
extension,
cas_id,
path,
}| {
let (maybe_cas_ids_tx, maybe_cas_ids_rx) = if kind == ThumbnailKind::Ephemeral {
let (tx, rx) = chan::bounded(batch_size);
(Some(tx), Some(rx))
} else {
(None, None)
};
let maybe_stopped_tx = if let RaceOutputs::Stop(stopped_tx) = (
async {
let mut join_handles = Vec::with_capacity(batch_size);
while !queue.is_empty() {
let permit = Arc::clone(&semaphore)
.acquire_owned()
.await
.expect("this semaphore never closes");
let GenerateThumbnailArgs {
extension,
cas_id,
path,
} = queue.pop_front().expect("queue is not empty");
// As we got a permit, then there is available CPU to process this thumbnail
join_handles.push(spawn({
let reporter = reporter.clone();
let thumbnails_directory = thumbnails_directory.as_ref().clone();
spawn(async move {
timeout(
THIRTY_SECS,
let report_progress_tx = batch_report_progress_tx.clone();
let maybe_cas_ids_tx = maybe_cas_ids_tx.clone();
async move {
let res = timeout(THIRTY_SECS, async {
generate_thumbnail(
thumbnails_directory,
ThumbData {
@ -144,116 +180,122 @@ pub(super) async fn batch_processor(
kind,
},
reporter,
),
)
.await
.unwrap_or_else(|_| Err(ThumbnailerError::TimedOut(path.into_boxed_path())))
})
},
)
.collect::<Vec<_>>();
let generated_ephemeral_thumbs_file_names_tx =
generated_ephemeral_thumbs_file_names_tx.clone();
let report_progress_tx = batch_report_progress_tx.clone();
let chunk_len = chunk.len() as u32;
if let RaceOutputs::Stop(stopped_tx) = (
async move {
if let Err(e) = spawn(async move {
let cas_ids = chunk
.join()
.await
.into_iter()
.filter_map(|join_result| {
join_result
.map_err(|e| {
error!("Failed to join thumbnail generation task: {e:#?}")
})
.ok()
})
.filter_map(|result| {
result
.map_err(|e| {
error!(
"Failed to generate thumbnail for {} location: {e:#?}",
if let ThumbnailKind::Ephemeral = kind {
"ephemeral"
} else {
"indexed"
}
)
})
.ok()
})
.map(|cas_id| OsString::from(format!("{}.webp", cas_id)))
.collect();
if kind == ThumbnailKind::Ephemeral
&& generated_ephemeral_thumbs_file_names_tx
.send(cas_ids)
)
.await
.is_err()
{
error!("Thumbnail actor is dead: Failed to send generated cas ids")
.map(|cas_id| {
// this send_blocking never blocks as we have a bounded channel with
// the same capacity as the batch size, so there is always a space
// in the queue
if let Some(cas_ids_tx) = maybe_cas_ids_tx {
cas_ids_tx
.send_blocking(OsString::from(format!("{}.webp", cas_id)))
.expect("channel never closes");
}
})
})
.await
.unwrap_or_else(|_| {
Err(ThumbnailerError::TimedOut(path.into_boxed_path()))
});
if let Some(location_id) = location_id {
report_progress_tx.send((location_id, 1)).await.ok();
}
drop(permit);
res
}
if let Some(location_id) = location_id {
report_progress_tx.send((location_id, chunk_len)).await.ok();
}
})
.await
{
error!("Failed to join spawned task to process thumbnails chunk on a batch: {e:#?}");
}
trace!("Processed chunk with {chunk_len} thumbnails");
RaceOutputs::Processed
},
async {
let tx = stop_rx
.recv()
.await
.expect("Critical error on thumbnails actor");
trace!("Received a stop signal");
RaceOutputs::Stop(tx)
},
)
.race()
.await
{
// Our queue is always contiguous, so this `from` is free
let leftovers = Vec::from(queue);
trace!(
"Stopped with {} thumbnails left to process",
leftovers.len()
);
if !leftovers.is_empty()
&& leftovers_tx
.send((
BatchToProcess {
batch: leftovers,
should_regenerate,
in_background: true, // Leftovers should always be in background
location_id,
},
kind,
))
.await
.is_err()
{
error!("Thumbnail actor is dead: Failed to send leftovers")
}));
}
done_tx.send(()).ok();
stopped_tx.send(()).ok();
for res in join_handles.join().await {
match res {
Ok(Ok(())) => { /* Everything is awesome! */ }
Ok(Err(e)) => {
error!(
"Failed to generate thumbnail for {} location: {e:#?}",
if let ThumbnailKind::Ephemeral = kind {
"ephemeral"
} else {
"indexed"
}
)
}
Err(e) => {
error!("Failed to join thumbnail generation task: {e:#?}");
}
}
}
return;
if let Some(cas_ids_tx) = &maybe_cas_ids_tx {
cas_ids_tx.close();
}
trace!("Processed batch with {batch_size} thumbnails");
RaceOutputs::Processed
},
async {
let tx = stop_rx
.recv()
.await
.expect("Critical error on thumbnails actor");
trace!("Received a stop signal");
RaceOutputs::Stop(tx)
},
)
.race()
.await
{
// Our queue is always contiguous, so this `from` is free
let leftovers = Vec::from(queue);
trace!(
"Stopped with {} thumbnails left to process",
leftovers.len()
);
if !leftovers.is_empty()
&& leftovers_tx
.send((
BatchToProcess {
batch: leftovers,
should_regenerate,
in_background: true, // Leftovers should always be in background
location_id,
},
kind,
))
.await
.is_err()
{
error!("Thumbnail actor is dead: Failed to send leftovers")
}
if let Some(cas_ids_tx) = &maybe_cas_ids_tx {
cas_ids_tx.close();
}
Some(stopped_tx)
} else {
None
};
if let Some(cas_ids_rx) = maybe_cas_ids_rx {
if generated_ephemeral_thumbs_file_names_tx
.send(cas_ids_rx.collect().await)
.await
.is_err()
{
error!("Thumbnail actor is dead: Failed to send generated cas ids")
}
}
trace!("Finished batch!");
if let Some(stopped_tx) = maybe_stopped_tx {
stopped_tx.send(()).ok();
} else {
trace!("Finished batch!");
}
done_tx.send(()).ok();
}

View file

@ -1,4 +1,4 @@
use crate::api::CoreEvent;
use crate::{api::CoreEvent, node::config::NodePreferences};
use std::{collections::HashMap, ffi::OsString, path::PathBuf, pin::pin, sync::Arc};
@ -8,15 +8,19 @@ use async_channel as chan;
use futures_concurrency::stream::Merge;
use tokio::{
spawn,
sync::{broadcast, oneshot},
sync::{broadcast, oneshot, watch},
time::{interval, interval_at, timeout, Instant, MissedTickBehavior},
};
use tokio_stream::{wrappers::IntervalStream, StreamExt};
use tokio_stream::{
wrappers::{IntervalStream, WatchStream},
StreamExt,
};
use tracing::{debug, error, trace};
use super::{
actor::DatabaseMessage,
clean_up::{process_ephemeral_clean_up, process_indexed_clean_up},
preferences::ThumbnailerPreferences,
process::{batch_processor, ProcessorControlChannels},
state::{remove_by_cas_ids, RegisterReporter, ThumbsProcessingSaveState},
BatchToProcess, ThumbnailKind, HALF_HOUR, ONE_SEC, THIRTY_SECS,
@ -32,7 +36,8 @@ pub(super) struct WorkerChannels {
}
pub(super) async fn worker(
batch_size: usize,
available_parallelism: usize,
node_preferences_rx: watch::Receiver<NodePreferences>,
reporter: broadcast::Sender<CoreEvent>,
thumbnails_directory: Arc<PathBuf>,
WorkerChannels {
@ -62,6 +67,7 @@ pub(super) async fn worker(
ProgressManagement(RegisterReporter),
BatchProgress((location::id::Type, u32)),
Shutdown(oneshot::Sender<()>),
UpdatedPreferences(ThumbnailerPreferences),
IdleTick,
}
@ -94,9 +100,14 @@ pub(super) async fn worker(
batch_report_progress_rx.map(StreamMessage::BatchProgress),
cancel_rx.map(StreamMessage::Shutdown),
IntervalStream::new(idle_interval).map(|_| StreamMessage::IdleTick),
WatchStream::new(node_preferences_rx).map(|node_preferences| {
StreamMessage::UpdatedPreferences(node_preferences.thumbnailer)
}),
)
.merge());
let mut thumbnailer_preferences = ThumbnailerPreferences::default();
while let Some(msg) = msg_stream.next().await {
match msg {
StreamMessage::IdleTick => {
@ -144,7 +155,7 @@ pub(super) async fn worker(
},
leftovers_tx.clone(),
reporter.clone(),
batch_size,
(available_parallelism, thumbnailer_preferences.clone()),
));
}
}
@ -204,29 +215,13 @@ pub(super) async fn worker(
}
// Only sends stop signal if there is a batch being processed
if !in_background && current_batch_processing_rx.is_some() {
trace!("Sending stop signal to older processing");
let (tx, rx) = oneshot::channel();
match stop_older_processing_tx.try_send(tx) {
Ok(()) => {
// We put a timeout here to avoid a deadlock in case the older processing already
// finished its batch
if timeout(ONE_SEC, rx).await.is_err() {
stop_older_processing_rx.recv().await.ok();
}
}
Err(e) if e.is_full() => {
// The last signal we sent happened after a batch was already processed
// So we clean the channel and we're good to go.
stop_older_processing_rx.recv().await.ok();
}
Err(_) => {
error!(
"Thumbnail remover actor died when trying to stop older processing"
);
}
}
if !in_background {
stop_batch(
&current_batch_processing_rx,
&stop_older_processing_tx,
&stop_older_processing_rx,
)
.await;
}
}
@ -260,27 +255,12 @@ pub(super) async fn worker(
debug!("Thumbnail actor is shutting down...");
let start = Instant::now();
// First stopping the current batch processing
if current_batch_processing_rx.is_some() {
let (tx, rx) = oneshot::channel();
match stop_older_processing_tx.try_send(tx) {
Ok(()) => {
// We put a timeout here to avoid a deadlock in case the older processing already
// finished its batch
if timeout(ONE_SEC, rx).await.is_err() {
stop_older_processing_rx.recv().await.ok();
}
}
Err(e) if e.is_full() => {
// The last signal we sent happened after a batch was already processed
// So we clean the channel and we're good to go.
stop_older_processing_rx.recv().await.ok();
}
Err(_) => {
error!("Thumbnail actor died when trying to stop older processing");
}
}
}
stop_batch(
&current_batch_processing_rx,
&stop_older_processing_tx,
&stop_older_processing_rx,
)
.await;
// Closing the leftovers channel to stop the batch processor as we already sent
// an stop signal
@ -323,6 +303,48 @@ pub(super) async fn worker(
StreamMessage::ProgressManagement((location_id, progress_tx)) => {
bookkeeper.register_reporter(location_id, progress_tx);
}
StreamMessage::UpdatedPreferences(preferences) => {
thumbnailer_preferences = preferences;
stop_batch(
&current_batch_processing_rx,
&stop_older_processing_tx,
&stop_older_processing_rx,
)
.await;
}
}
}
}
#[inline]
async fn stop_batch(
current_batch_processing_rx: &Option<oneshot::Receiver<()>>,
stop_older_processing_tx: &chan::Sender<oneshot::Sender<()>>,
stop_older_processing_rx: &chan::Receiver<oneshot::Sender<()>>,
) {
// First stopping the current batch processing
if current_batch_processing_rx.is_some() {
trace!("Sending stop signal to older processing");
let (tx, rx) = oneshot::channel();
match stop_older_processing_tx.try_send(tx) {
Ok(()) => {
// We put a timeout here to avoid a deadlock in case the older processing already
// finished its batch
if timeout(ONE_SEC, rx).await.is_err() {
stop_older_processing_rx.recv().await.ok();
}
}
Err(e) if e.is_full() => {
// The last signal we sent happened after a batch was already processed
// So we clean the channel and we're good to go.
stop_older_processing_rx.recv().await.ok();
}
Err(_) => {
error!("Thumbnail actor died when trying to stop older processing");
}
}
}
}

View file

@ -289,12 +289,14 @@ impl PairingManager {
.unwrap();
library_manager.update_instances(library.clone()).await;
let library_config = library.config().await;
stream
.write_all(
&PairingResponse::Accepted {
library_id: library.id,
library_name: library.config().name.into(),
library_description: library.config().description,
library_name: library_config.name.into(),
library_description: library_config.description,
instances: library
.db
.instance()

View file

@ -1,14 +1,8 @@
// ! A system for loading a default set of data on startup. This is ONLY enabled in development builds.
use std::{
io,
path::{Path, PathBuf},
sync::Arc,
time::Duration,
};
use crate::{
job::JobManagerError,
library::Libraries,
library::{LibraryManagerError, LibraryName},
location::{
delete_location, scan_location, LocationCreateArgs, LocationError, LocationManagerError,
@ -17,6 +11,14 @@ use crate::{
util::AbortOnDrop,
Node,
};
use std::{
io,
path::{Path, PathBuf},
sync::Arc,
time::Duration,
};
use prisma_client_rust::QueryError;
use serde::Deserialize;
use thiserror::Error;
@ -27,7 +29,7 @@ use tokio::{
use tracing::{info, warn};
use uuid::Uuid;
use crate::library::Libraries;
use super::error::FileIOError;
#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
@ -58,8 +60,6 @@ pub struct InitConfig {
#[derive(Error, Debug)]
pub enum InitConfigError {
#[error("error loading the init data: {0}")]
Io(#[from] io::Error),
#[error("error parsing the init data: {0}")]
Json(#[from] serde_json::Error),
#[error("job manager: {0}")]
@ -72,21 +72,33 @@ pub enum InitConfigError {
QueryError(#[from] QueryError),
#[error("location error: {0}")]
LocationError(#[from] LocationError),
#[error("failed to get current directory from environment: {0}")]
CurrentDir(io::Error),
#[error(transparent)]
FileIO(#[from] FileIOError),
}
impl InitConfig {
pub async fn load(data_dir: &Path) -> Result<Option<Self>, InitConfigError> {
let path = std::env::current_dir()?
let path = std::env::current_dir()
.map_err(InitConfigError::CurrentDir)?
.join(std::env::var("SD_INIT_DATA").unwrap_or("sd_init.json".to_string()));
if metadata(&path).await.is_ok() {
let config = fs::read_to_string(&path).await?;
let mut config: InitConfig = serde_json::from_str(&config)?;
let config = fs::read(&path)
.await
.map_err(|e| FileIOError::from((&path, e, "Failed to read init config file")))?;
let mut config = serde_json::from_slice::<InitConfig>(&config)?;
config.path = path;
if config.reset_on_startup && data_dir.exists() {
if config.reset_on_startup && metadata(data_dir).await.is_ok() {
warn!("previous 'SD_DATA_DIR' was removed on startup!");
fs::remove_dir_all(&data_dir).await?;
fs::remove_dir_all(data_dir).await.map_err(|e| {
FileIOError::from((data_dir, e, "Failed to remove data directory"))
})?;
}
return Ok(Some(config));
@ -111,24 +123,22 @@ impl InitConfig {
}
}));
let library = match library_manager.get_library(&lib.id).await {
Some(lib) => lib,
None => {
let library = library_manager
.create_with_uuid(lib.id, lib.name, lib.description, true, None, node)
.await?;
let library = if let Some(lib) = library_manager.get_library(&lib.id).await {
lib
} else {
let library = library_manager
.create_with_uuid(lib.id, lib.name, lib.description, true, None, node)
.await?;
match library_manager.get_library(&library.id).await {
Some(lib) => lib,
None => {
warn!(
"Debug init error: library '{}' was not found after being created!",
library.config().name.as_ref()
);
return Ok(());
}
}
}
let Some(lib) = library_manager.get_library(&library.id).await else {
warn!(
"Debug init error: library '{}' was not found after being created!",
library.config().await.name.as_ref()
);
return Ok(());
};
lib
};
if lib.reset_locations_on_startup {
@ -153,32 +163,33 @@ impl InitConfig {
}
let sd_file = PathBuf::from(&loc.path).join(".spacedrive");
if sd_file.exists() {
fs::remove_file(sd_file).await?;
if let Err(e) = fs::remove_file(sd_file).await {
if e.kind() != io::ErrorKind::NotFound {
warn!("failed to remove '.spacedrive' file: {:?}", e);
}
}
let location = LocationCreateArgs {
path: loc.path.clone().into(),
if let Some(location) = (LocationCreateArgs {
path: PathBuf::from(loc.path.clone()),
dry_run: false,
indexer_rules_ids: Vec::new(),
}
})
.create(node, &library)
.await?;
match location {
Some(location) => {
scan_location(node, &library, location).await?;
}
None => {
warn!(
"Debug init error: location '{}' was not found after being created!",
loc.path
);
}
.await?
{
scan_location(node, &library, location).await?;
} else {
warn!(
"Debug init error: location '{}' was not found after being created!",
loc.path
);
}
}
}
info!("Initialized app from file: {:?}", self.path);
info!("Initialized app from file: {}", self.path.display());
Ok(())
}
}

View file

@ -1,257 +0,0 @@
use std::{
any::type_name,
fs::File,
io::{self, Seek, Write},
path::{Path, PathBuf},
};
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use serde_json::{Map, Value};
use specta::Type;
use thiserror::Error;
use super::db::MissingFieldError;
/// is used to decode the configuration and work out what migrations need to be applied before the config can be properly loaded.
/// This allows us to migrate breaking changes to the config format between Spacedrive releases.
#[derive(Debug, Serialize, Deserialize, Clone, Type)]
pub struct BaseConfig {
/// version of Spacedrive. Determined from `CARGO_PKG_VERSION` environment variable.
pub version: u32,
// Collect all extra fields
#[serde(flatten)]
other: Map<String, Value>,
}
/// System for managing app level migrations on a config file so we can introduce breaking changes to the app without the user needing to reset their whole system.
#[async_trait::async_trait]
pub trait Migrate: Sized + DeserializeOwned + Serialize {
const CURRENT_VERSION: u32;
type Ctx: Sync;
fn default(path: PathBuf) -> Result<Self, MigratorError>;
async fn migrate(
from_version: u32,
config: &mut Map<String, Value>,
ctx: &Self::Ctx,
) -> Result<(), MigratorError>;
async fn load_and_migrate(path: &Path, ctx: &Self::Ctx) -> Result<Self, MigratorError> {
match path.try_exists()? {
true => {
let mut file = File::options().write(true).read(true).open(path)?;
let mut cfg: BaseConfig = match serde_json::from_reader(&mut file) {
Ok(cfg) => cfg,
Err(err) => {
// This is for backwards compatibility for the backwards compatibility cause the super super old system store the version as a string.
file.rewind()?;
let mut cfg = serde_json::from_reader::<_, Value>(file)?;
if let Some(obj) = cfg.as_object_mut() {
if obj.contains_key("version") {
return Err(MigratorError::HasSuperLegacyConfig); // This is just to make the error nicer
} else {
return Err(err.into());
}
} else {
return Err(err.into());
}
}
};
file.rewind()?; // Fail early so we don't end up invalid state
if cfg.version > Self::CURRENT_VERSION {
return Err(MigratorError::YourAppIsOutdated);
}
let is_latest = cfg.version == Self::CURRENT_VERSION;
for v in (cfg.version + 1)..=Self::CURRENT_VERSION {
cfg.version = v;
match Self::migrate(v, &mut cfg.other, ctx).await {
Ok(()) => (),
Err(err) => {
file.set_len(0)?; // Truncate the file
file.write_all(serde_json::to_string(&cfg)?.as_bytes())?; // Writes updated version
return Err(err);
}
}
}
if !is_latest {
file.set_len(0)?; // Truncate the file
file.write_all(serde_json::to_string(&cfg)?.as_bytes())?; // Writes updated version
}
Ok(serde_json::from_value(Value::Object(cfg.other))?)
}
false => Ok(serde_json::from_value(Value::Object(
Self::default(path.into())?.save(path)?.other,
))?),
}
}
fn save(&self, path: &Path) -> Result<BaseConfig, MigratorError> {
let config = BaseConfig {
version: Self::CURRENT_VERSION,
other: match serde_json::to_value(self)? {
Value::Object(map) => map,
_ => {
return Err(MigratorError::InvalidType(type_name::<Self>()));
}
},
};
let mut file = File::create(path)?;
file.write_all(serde_json::to_string(&config)?.as_bytes())?;
Ok(config)
}
}
#[derive(Error, Debug)]
pub enum MigratorError {
#[error("Io - error saving or loading the config from the filesystem: {0}")]
Io(#[from] io::Error),
#[error("Json - error serializing or deserializing the JSON in the config file: {0}")]
Json(#[from] serde_json::Error),
#[error(
"YourAppIsOutdated - the config file is for a newer version of the app. Please update to the latest version to load it!"
)]
YourAppIsOutdated,
#[error("Type '{0}' as generic `Migrator::T` must be serialiable to a Serde object!")]
InvalidType(&'static str),
#[error("{0}")]
Database(#[from] prisma_client_rust::QueryError),
#[error("We detected a Spacedrive config from a super early version of the app!")]
HasSuperLegacyConfig,
#[error("file '{}' was not found by the migrator!", .0.display())]
ConfigFileMissing(PathBuf),
#[error("missing-field: {0}")]
MissingField(#[from] MissingFieldError),
#[error("custom migration error: {0}")]
Custom(String),
}
#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::panic)]
mod test {
use std::{fs, io::Read, path::PathBuf};
use serde_json::json;
use super::*;
#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq, Eq)]
struct MyConfigType {
// For testing add new fields without breaking the passing.
#[serde(flatten)]
other: Map<String, Value>,
}
#[async_trait::async_trait]
impl Migrate for MyConfigType {
const CURRENT_VERSION: u32 = 3;
type Ctx = ();
fn default(_path: PathBuf) -> Result<Self, MigratorError> {
Ok(<Self as Default>::default())
}
async fn migrate(
to_version: u32,
config: &mut Map<String, Value>,
_ctx: &Self::Ctx,
) -> Result<(), MigratorError> {
match to_version {
0 => Ok(()),
1 => {
config.insert("a".into(), json!({}));
Ok(())
}
2 => {
config
.get_mut("a")
.and_then(|v| v.as_object_mut())
.map(|v| v.insert("b".into(), json!({})));
Ok(())
}
3 => {
config
.get_mut("a")
.and_then(|v| v.as_object_mut())
.and_then(|v| v.get_mut("b"))
.and_then(|v| v.as_object_mut())
.map(|v| v.insert("c".into(), json!("it works")));
Ok(())
}
v => unreachable!("Missing migration for library version {}", v),
}
}
}
fn path(file_name: &'static str) -> PathBuf {
let dir = PathBuf::from("./migration_test");
std::fs::create_dir(&dir).ok();
dir.join(file_name)
}
fn file_as_str(path: &Path) -> String {
let mut file = File::open(path).unwrap();
let mut contents = String::new();
file.read_to_string(&mut contents).unwrap();
contents
}
fn write_to_file(path: &Path, contents: &str) {
let mut file = File::create(path).unwrap();
file.write_all(contents.as_bytes()).unwrap();
}
#[tokio::test]
async fn test_migrator_happy_path() {
let p = path("test_migrator_happy_path.config");
// Check config is created when it's missing
assert!(!p.exists(), "config file should start out deleted");
std::fs::write(
&p,
serde_json::to_string(&json!({
"version": 0
}))
.unwrap(),
)
.unwrap();
assert!(p.exists(), "config file was not initialised");
assert_eq!(file_as_str(&p), r#"{"version":0}"#);
// Load + migrate config
let _config = MyConfigType::load_and_migrate(&p, &()).await.unwrap();
assert_eq!(
file_as_str(&p),
r#"{"version":3,"a":{"b":{"c":"it works"}}}"#
);
// Cleanup
fs::remove_file(&p).unwrap();
}
#[tokio::test]
pub async fn test_time_traveling_backwards() {
let p = path("test_time_traveling_backwards.config");
// You opened a new database in an older version of the app
write_to_file(&p, r#"{"version":5}"#);
match MyConfigType::load_and_migrate(&p, &()).await {
Err(MigratorError::YourAppIsOutdated) => (),
_ => panic!("Should have failed to load config from a super newer version of the app"),
}
// Cleanup
fs::remove_file(&p).unwrap();
}
}

View file

@ -6,7 +6,6 @@ pub mod error;
pub mod http;
mod infallible_request;
mod maybe_undefined;
pub mod migrator;
pub mod mpscrr;
mod observable;
pub mod version_manager;

View file

@ -1,114 +1,222 @@
use std::{
any::type_name,
fmt::Display,
future::Future,
num::ParseIntError,
path::{Path, PathBuf},
str::FromStr,
any::type_name, fmt::Display, future::Future, num::ParseIntError, path::Path, str::FromStr,
};
use int_enum::IntEnum;
use int_enum::{IntEnum, IntEnumError};
use itertools::Itertools;
use serde::{de::DeserializeOwned, Serialize};
use serde_json::{json, Map, Value};
use thiserror::Error;
use tokio::{fs, io};
use tracing::info;
use tracing::{debug, info, warn};
use super::error::FileIOError;
#[derive(Error, Debug)]
pub enum VersionManagerError {
#[error("invalid version")]
InvalidVersion,
pub enum VersionManagerError<Version: IntEnum<Int = u64>> {
#[error("version file does not exist")]
VersionFileDoesNotExist,
#[error("error while converting integer to enum")]
IntConversionError,
#[error("malformed version file")]
MalformedVersionFile,
#[error("malformed version file, reason: {reason}")]
MalformedVersionFile { reason: &'static str },
#[error("unexpected migration: {current_version} -> {next_version}")]
UnexpectedMigration {
current_version: i32,
next_version: i32,
current_version: u64,
next_version: u64,
},
#[error("failed to convert version to config file")]
ConvertToConfig,
#[error(transparent)]
FileIO(#[from] FileIOError),
#[error(transparent)]
ParseIntError(#[from] ParseIntError),
ParseInt(#[from] ParseIntError),
#[error(transparent)]
SerdeJson(#[from] serde_json::Error),
#[error(transparent)]
IntConversion(#[from] IntEnumError<Version>),
}
pub trait ManagedVersion: IntEnum<Int = i32> + Display + 'static {
const LATEST_VERSION: Self;
type MigrationError: std::error::Error + Display + From<VersionManagerError> + 'static;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Kind {
PlainText,
Json(&'static str), // Version field name!
}
pub trait ManagedVersion<Version: IntEnum<Int = u64> + Display + Eq + Serialize + DeserializeOwned>:
Serialize + DeserializeOwned + 'static
{
const LATEST_VERSION: Version;
const KIND: Kind;
type MigrationError: std::error::Error + Display + From<VersionManagerError<Version>> + 'static;
fn from_latest_version() -> Option<Self> {
None
}
}
/// An abstract system for saving a text file containing a version number.
/// The version number is an integer that can be converted to and from an enum.
/// The enum must implement the IntEnum trait.
pub struct VersionManager<T: ManagedVersion> {
version_file_path: PathBuf,
_marker: std::marker::PhantomData<T>,
pub struct VersionManager<
Config: ManagedVersion<Version>,
Version: IntEnum<Int = u64> + Display + Eq + Serialize + DeserializeOwned,
> {
_marker: std::marker::PhantomData<(Config, Version)>,
}
impl<T: ManagedVersion> VersionManager<T> {
pub fn new(version_file_path: impl AsRef<Path>) -> Self {
VersionManager {
version_file_path: version_file_path.as_ref().into(),
_marker: std::marker::PhantomData,
}
}
pub async fn get_version(&self) -> Result<T, VersionManagerError> {
match fs::read_to_string(&self.version_file_path).await {
Ok(contents) => {
let version = i32::from_str(contents.trim())?;
T::from_int(version).map_err(|_| VersionManagerError::IntConversionError)
}
Err(e) if e.kind() == io::ErrorKind::NotFound => {
Err(VersionManagerError::VersionFileDoesNotExist)
}
Err(e) => Err(FileIOError::from((&self.version_file_path, e)).into()),
}
}
pub async fn set_version(&self, version: T) -> Result<(), VersionManagerError> {
fs::write(
&self.version_file_path,
version.int_value().to_string().as_bytes(),
)
.await
.map_err(|e| FileIOError::from((&self.version_file_path, e)).into())
}
pub async fn migrate<Fut>(
impl<
Config: ManagedVersion<Version>,
Version: IntEnum<Int = u64> + Display + Eq + Serialize + DeserializeOwned,
> VersionManager<Config, Version>
{
async fn get_version(
&self,
current: T,
migrate_fn: impl Fn(T, T) -> Fut,
) -> Result<(), T::MigrationError>
version_file_path: impl AsRef<Path>,
) -> Result<Version, VersionManagerError<Version>> {
let version_file_path = version_file_path.as_ref();
match Config::KIND {
Kind::PlainText => match fs::read_to_string(version_file_path).await {
Ok(contents) => {
let version = u64::from_str(contents.trim())?;
Version::from_int(version).map_err(Into::into)
}
Err(e) if e.kind() == io::ErrorKind::NotFound => {
Err(VersionManagerError::VersionFileDoesNotExist)
}
Err(e) => Err(FileIOError::from((version_file_path, e)).into()),
},
Kind::Json(field) => match fs::read(version_file_path).await {
Ok(bytes) => {
let Some(version) = serde_json::from_slice::<Map<String, Value>>(&bytes)?
.get(field)
.and_then(|version| version.as_u64())
else {
return Err(VersionManagerError::MalformedVersionFile {
reason: "missing version field",
});
};
Version::from_int(version).map_err(Into::into)
}
Err(e) if e.kind() == io::ErrorKind::NotFound => {
Err(VersionManagerError::VersionFileDoesNotExist)
}
Err(e) => Err(FileIOError::from((version_file_path, e)).into()),
},
}
}
async fn set_version(
&self,
version_file_path: impl AsRef<Path>,
version: Version,
) -> Result<(), VersionManagerError<Version>> {
let version_file_path = version_file_path.as_ref();
match Config::KIND {
Kind::PlainText => fs::write(
version_file_path,
version.int_value().to_string().as_bytes(),
)
.await
.map_err(|e| FileIOError::from((version_file_path, e)).into()),
Kind::Json(field) => {
let mut data_value = serde_json::from_slice::<Map<String, Value>>(
&fs::read(version_file_path)
.await
.map_err(|e| FileIOError::from((version_file_path, e)))?,
)?;
data_value.insert(String::from(field), json!(version.int_value()));
fs::write(version_file_path, serde_json::to_vec(&data_value)?)
.await
.map_err(|e| FileIOError::from((version_file_path, e)).into())
}
}
}
pub async fn migrate_and_load<Fut>(
version_file_path: impl AsRef<Path>,
migrate_fn: impl Fn(Version, Version) -> Fut,
) -> Result<Config, Config::MigrationError>
where
Fut: Future<Output = Result<(), T::MigrationError>>,
Fut: Future<Output = Result<(), Config::MigrationError>>,
{
for (current_version, next_version) in
(current.int_value()..=T::LATEST_VERSION.int_value()).tuple_windows()
{
match (T::from_int(current_version), T::from_int(next_version)) {
(Ok(current), Ok(next)) => {
info!(
"Running {} migrator: {} -> {}",
type_name::<T>(),
current,
next
);
migrate_fn(current, next).await?
}
(Err(_), _) | (_, Err(_)) => {
return Err(VersionManagerError::IntConversionError.into())
}
};
let version_file_path = version_file_path.as_ref();
let this = VersionManager {
_marker: std::marker::PhantomData::<(Config, Version)>,
};
let current = match this.get_version(version_file_path).await {
Ok(version) => version,
Err(VersionManagerError::VersionFileDoesNotExist) => {
warn!(
"Config file for {} does not exist, trying to create a new one with version -> {}",
type_name::<Config>(),
Config::LATEST_VERSION
);
let Some(latest_config) = Config::from_latest_version() else {
return Err(VersionManagerError::VersionFileDoesNotExist.into());
};
fs::write(
version_file_path,
match Config::KIND {
Kind::PlainText => Config::LATEST_VERSION
.int_value()
.to_string()
.as_bytes()
.to_vec(),
Kind::Json(_) => serde_json::to_vec(&latest_config)
.map_err(|e| VersionManagerError::SerdeJson(e))?,
},
)
.await
.map_err(|e| {
VersionManagerError::FileIO(FileIOError::from((version_file_path, e)))
})?;
return Ok(latest_config);
}
Err(e) => return Err(e.into()),
};
if current != Config::LATEST_VERSION {
for (current_version, next_version) in
(current.int_value()..=Config::LATEST_VERSION.int_value()).tuple_windows()
{
let (current, next) = (
Version::from_int(current_version).map_err(VersionManagerError::from)?,
Version::from_int(next_version).map_err(VersionManagerError::from)?,
);
info!(
"Running {} migrator: {current} -> {next}",
type_name::<Config>()
);
migrate_fn(current, next).await?;
}
this.set_version(version_file_path, Config::LATEST_VERSION)
.await?;
} else {
debug!("No migration required for {}", type_name::<Config>());
}
self.set_version(T::LATEST_VERSION)
fs::read(version_file_path)
.await
.map_err(Into::into)
.map_err(|e| {
VersionManagerError::FileIO(FileIOError::from((version_file_path, e))).into()
})
.and_then(|bytes| {
serde_json::from_slice(&bytes).map_err(|e| VersionManagerError::SerdeJson(e).into())
})
}
}

View file

@ -1,11 +1,12 @@
import { Info } from '@phosphor-icons/react';
import clsx from 'clsx';
import { PropsWithChildren } from 'react';
import { Tooltip } from '@sd/ui';
import { ErrorMessage, Tooltip } from '@sd/ui';
import { usePlatform } from '~/util/Platform';
interface Props {
title: string;
registerName?: string;
description?: string | JSX.Element;
mini?: boolean;
className?: string;
@ -13,30 +14,37 @@ interface Props {
infoUrl?: string;
}
export default ({ mini, ...props }: PropsWithChildren<Props>) => {
export default ({ mini, registerName, ...props }: PropsWithChildren<Props>) => {
const platform = usePlatform();
if (typeof props.description === 'string')
props.description = <p className="mb-2 text-sm text-gray-400">{props.description}</p>;
return (
<div className="relative flex flex-row">
<div className={clsx('flex w-full flex-col', !mini && 'pb-6', props.className)}>
<div className="mb-1 flex items-center gap-1">
<h3 className="text-sm font-medium text-ink">{props.title}</h3>
{props.toolTipLabel && (
<Tooltip label={props.toolTipLabel as string}>
<Info
onClick={() => props.infoUrl && platform.openLink(props.infoUrl)}
size={15}
/>
</Tooltip>
)}
<>
<div className="relative flex flex-row">
<div className={clsx('flex w-full flex-col', !mini && 'pb-6', props.className)}>
<div className="mb-1 flex items-center gap-1">
<h3 className="text-sm font-medium text-ink">{props.title}</h3>
{props.toolTipLabel && (
<Tooltip label={props.toolTipLabel as string}>
<Info
onClick={() =>
props.infoUrl && platform.openLink(props.infoUrl)
}
size={15}
/>
</Tooltip>
)}
</div>
<div className="w-[85%]">{props.description}</div>
{!mini && props.children}
</div>
<div className="w-[85%]">{props.description}</div>
{!mini && props.children}
{mini && props.children}
</div>
{mini && props.children}
</div>
{registerName ? (
<ErrorMessage name={registerName} className="mt-1 w-full text-xs" />
) : null}
</>
);
};

View file

@ -1,6 +1,6 @@
import clsx from 'clsx';
import { useEffect } from 'react';
import { Controller } from 'react-hook-form';
import { Controller, FormProvider } from 'react-hook-form';
import {
getDebugState,
useBridgeMutation,
@ -9,7 +9,7 @@ import {
useDebugState,
useZodForm
} from '@sd/client';
import { Button, Card, Input, Select, SelectOption, Switch, tw, z } from '@sd/ui';
import { Button, Card, Input, Select, SelectOption, Slider, Switch, tw, z } from '@sd/ui';
import { Icon } from '~/components';
import { useDebouncedFormWatch } from '~/hooks';
import { usePlatform } from '~/util/Platform';
@ -29,32 +29,53 @@ export const Component = () => {
const debugState = useDebugState();
const editNode = useBridgeMutation('nodes.edit');
const connectedPeers = useConnectedPeers();
const updateThumbnailerPreferences = useBridgeMutation('nodes.updateThumbnailerPreferences');
const form = useZodForm({
schema: z.object({
name: z.string().min(1).max(250).optional(),
p2p_enabled: z.boolean().optional(),
p2p_port: u16,
customOrDefault: z.enum(['Custom', 'Default'])
}),
schema: z
.object({
name: z.string().min(1).max(250).optional(),
p2p_enabled: z.boolean().optional(),
p2p_port: u16,
customOrDefault: z.enum(['Custom', 'Default']),
background_processing_percentage: z.coerce
.number({
invalid_type_error: 'Must use numbers from 0 to 100'
})
.int()
.nonnegative()
.lte(100)
})
.strict(),
reValidateMode: 'onChange',
defaultValues: {
name: node.data?.name,
p2p_enabled: node.data?.p2p_enabled,
p2p_port: node.data?.p2p_port || 0,
customOrDefault: node.data?.p2p_port ? 'Custom' : 'Default'
customOrDefault: node.data?.p2p_port ? 'Custom' : 'Default',
background_processing_percentage:
node.data?.preferences.thumbnailer.background_processing_percentage || 50
}
});
const watchCustomOrDefault = form.watch('customOrDefault');
const watchP2pEnabled = form.watch('p2p_enabled');
const watchBackgroundProcessingPercentage = form.watch('background_processing_percentage');
useDebouncedFormWatch(form, async (value) => {
await editNode.mutateAsync({
name: value.name || null,
p2p_enabled: value.p2p_enabled === undefined ? null : value.p2p_enabled,
p2p_port: value.customOrDefault === 'Default' ? 0 : Number(value.p2p_port)
});
if (await form.trigger()) {
await editNode.mutateAsync({
name: value.name || null,
p2p_enabled: value.p2p_enabled === undefined ? null : value.p2p_enabled,
p2p_port: value.customOrDefault === 'Default' ? 0 : Number(value.p2p_port)
});
if (value.background_processing_percentage != undefined) {
await updateThumbnailerPreferences.mutateAsync({
background_processing_percentage: value.background_processing_percentage
});
}
}
node.refetch();
});
@ -68,7 +89,7 @@ export const Component = () => {
}, [form]);
return (
<>
<FormProvider {...form}>
<Heading
title="General Settings"
description="General settings related to this client."
@ -168,6 +189,37 @@ export const Component = () => {
onClick={() => (getDebugState().enabled = !debugState.enabled)}
/>
</Setting>
<Setting
mini
registerName="background_processing_percentage"
title="Thumbnailer CPU usage"
description="Limit how much CPU the thumbnailer can use for background processing."
>
<div className="flex w-80 gap-2">
<Slider
onValueChange={(value) => {
if (value.length > 0) {
form.setValue('background_processing_percentage', value[0] ?? 0);
}
}}
max={100}
step={25}
min={0}
value={[watchBackgroundProcessingPercentage]}
/>
<Input
className="after:h-initial relative w-[8ch] after:absolute after:right-[0.8em] after:top-1/2 after:inline-block after:-translate-y-2/4 after:content-['%']"
defaultValue={
node.data?.preferences.thumbnailer.background_processing_percentage ||
75
}
maxLength={3}
{...form.register('background_processing_percentage', {
valueAsNumber: true
})}
/>
</div>
</Setting>
<div className="flex flex-col gap-4">
<h1 className="mb-3 text-lg font-bold text-ink">Networking</h1>
@ -247,6 +299,6 @@ export const Component = () => {
</div>
</Setting>
</div>
</>
</FormProvider>
);
};

View file

@ -85,6 +85,7 @@ export type Procedures = {
{ key: "locations.subPathRescan", input: LibraryArgs<RescanArgs>, result: null } |
{ key: "locations.update", input: LibraryArgs<LocationUpdateArgs>, result: null } |
{ key: "nodes.edit", input: ChangeNodeNameArgs, result: null } |
{ key: "nodes.updateThumbnailerPreferences", input: UpdateThumbnailerPreferences, result: null } |
{ key: "notifications.test", input: never, result: null } |
{ key: "notifications.testLibrary", input: LibraryArgs<null>, result: null } |
{ key: "p2p.acceptSpacedrop", input: [string, string | null], result: null } |
@ -262,7 +263,9 @@ export type LibraryArgs<T> = { library_id: string; arg: T }
/**
* LibraryConfig holds the configuration for a specific library. This is stored as a '{uuid}.sdlibrary' file.
*/
export type LibraryConfig = { name: LibraryName; description: string | null; instance_id: number }
export type LibraryConfig = { name: LibraryName; description: string | null; instance_id: number; version: LibraryConfigVersion }
export type LibraryConfigVersion = "V0" | "V1" | "V2" | "V3" | "V4" | "V5" | "V6" | "V7" | "V8" | "V9"
export type LibraryConfigWrapped = { uuid: string; instance_id: string; instance_public_key: string; config: LibraryConfig }
@ -318,7 +321,9 @@ export type MediaLocation = { latitude: number; longitude: number; pluscode: Plu
export type MediaMetadata = ({ type: "Image" } & ImageMetadata) | ({ type: "Video" } & VideoMetadata) | ({ type: "Audio" } & AudioMetadata)
export type NodeState = ({ id: string; name: string; p2p_enabled: boolean; p2p_port: number | null; features: BackendFeature[] }) & { data_path: string; p2p: P2PStatus }
export type NodePreferences = { thumbnailer: ThumbnailerPreferences }
export type NodeState = ({ id: string; name: string; p2p_enabled: boolean; p2p_port: number | null; features: BackendFeature[]; preferences: NodePreferences }) & { data_path: string; p2p: P2PStatus }
export type NonIndexedFileSystemEntries = { entries: ExplorerItem[]; errors: Error[] }
@ -406,7 +411,7 @@ export type Response = { Start: { user_code: string; verification_url: string; v
export type RuleKind = "AcceptFilesByGlob" | "RejectFilesByGlob" | "AcceptIfChildrenDirectoriesArePresent" | "RejectIfChildrenDirectoriesArePresent"
export type SanitisedNodeConfig = { id: string; name: string; p2p_enabled: boolean; p2p_port: number | null; features: BackendFeature[] }
export type SanitisedNodeConfig = { id: string; name: string; p2p_enabled: boolean; p2p_port: number | null; features: BackendFeature[]; preferences: NodePreferences }
export type SearchData<T> = { cursor: number[] | null; items: T[] }
@ -440,6 +445,10 @@ export type Target = { Object: number } | { FilePath: number }
export type TextMatch = { contains: string } | { startsWith: string } | { endsWith: string } | { equals: string }
export type ThumbnailerPreferences = { background_processing_percentage: number }
export type UpdateThumbnailerPreferences = { background_processing_percentage: number }
export type VideoMetadata = { duration: number | null; video_codec: string | null; audio_codec: string | null }
export type Volume = { name: string; mount_points: string[]; total_capacity: string; available_capacity: string; disk_type: DiskType; file_system: string | null; is_root_filesystem: boolean }