diff --git a/core/src/lib.rs b/core/src/lib.rs index e8c6abd8f..bb1b8beb8 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -3,7 +3,7 @@ use crate::{ api::{CoreEvent, Router}, location::LocationManagerError, - object::thumbnail_remover, + object::media::thumbnail::actor::Thumbnailer, p2p::sync::NetworkedLibraries, }; @@ -65,7 +65,7 @@ pub struct Node { pub event_bus: (broadcast::Sender, broadcast::Receiver), pub notifications: Notifications, pub nlm: Arc, - pub thumbnail_remover: thumbnail_remover::Actor, + pub thumbnailer: Thumbnailer, pub files_over_p2p_flag: Arc, pub env: env::Env, pub http: reqwest::Client, @@ -113,10 +113,7 @@ impl Node { p2p, config, event_bus, - thumbnail_remover: thumbnail_remover::Actor::new( - data_dir.to_path_buf(), - libraries.clone(), - ), + thumbnailer: Thumbnailer::new(data_dir.to_path_buf(), libraries.clone()), libraries, files_over_p2p_flag: Arc::new(AtomicBool::new(false)), http: reqwest::Client::new(), diff --git a/core/src/location/indexer/indexer_job.rs b/core/src/location/indexer/indexer_job.rs index fa5921620..9be3a9653 100644 --- a/core/src/location/indexer/indexer_job.rs +++ b/core/src/location/indexer/indexer_job.rs @@ -220,7 +220,7 @@ impl StatefulJob for IndexerJobInit { ); ctx.node - .thumbnail_remover + .thumbnailer .remove_cas_ids( to_remove .iter() diff --git a/core/src/location/indexer/shallow.rs b/core/src/location/indexer/shallow.rs index fa8ac1bf8..98ef8b14b 100644 --- a/core/src/location/indexer/shallow.rs +++ b/core/src/location/indexer/shallow.rs @@ -92,7 +92,7 @@ pub async fn shallow( debug!("Walker at shallow indexer found {to_remove_count} file_paths to be removed"); - node.thumbnail_remover + node.thumbnailer .remove_cas_ids( to_remove .iter() diff --git a/core/src/location/manager/watcher/utils.rs b/core/src/location/manager/watcher/utils.rs index f2606bdc6..7d84b120d 100644 --- a/core/src/location/manager/watcher/utils.rs +++ b/core/src/location/manager/watcher/utils.rs @@ -10,7 +10,7 @@ use crate::{ loose_find_existing_file_path_params, FilePathError, FilePathMetadata, IsolatedFilePathData, MetadataExt, }, - find_location, generate_thumbnail, + find_location, indexer::reverse_update_directories_sizes, location_with_indexer_rules, manager::LocationManagerError, @@ -21,7 +21,7 @@ use crate::{ media::{ media_data_extractor::{can_extract_media_data_for_image, extract_media_data}, media_data_image_to_query, - thumbnail::get_thumbnail_path, + thumbnail::{generate_thumbnail, get_thumbnail_path}, }, validation::hash::file_checksum, }, @@ -283,7 +283,11 @@ async fn inner_create_file( let inner_extension = extension.clone(); if let Some(cas_id) = cas_id { tokio::spawn(async move { - generate_thumbnail(&inner_extension, &cas_id, inner_path, &node).await; + if let Err(e) = + generate_thumbnail(&inner_extension, cas_id, inner_path, node, true).await + { + error!("Failed to generate thumbnail in the watcher: {e:#?}"); + } }); } @@ -499,7 +503,12 @@ async fn inner_update_file( let inner_node = node.clone(); if let Some(cas_id) = cas_id { tokio::spawn(async move { - generate_thumbnail(&ext, &cas_id, inner_path, &inner_node).await; + if let Err(e) = + generate_thumbnail(&ext, cas_id, inner_path, inner_node, true) + .await + { + error!("Failed to generate thumbnail in the watcher: {e:#?}"); + } }); } diff --git a/core/src/location/mod.rs b/core/src/location/mod.rs index 5ef71f68e..d926a953d 100644 --- a/core/src/location/mod.rs +++ b/core/src/location/mod.rs @@ -1,19 +1,11 @@ use crate::{ - api::CoreEvent, invalidate_query, job::{JobBuilder, JobError, JobManagerError}, library::Library, location::file_path_helper::filter_existing_file_path_params, object::{ file_identifier::{self, file_identifier_job::FileIdentifierJobInit}, - media::{ - media_processor, - thumbnail::{ - can_generate_thumbnail_for_document, can_generate_thumbnail_for_image, - generate_image_thumbnail, get_thumb_key, get_thumbnail_path, - }, - MediaProcessorJobInit, - }, + media::{media_processor, MediaProcessorJobInit}, }, prisma::{file_path, indexer_rules_in_location, location, PrismaClient}, util::{db::maybe_missing, error::FileIOError}, @@ -23,12 +15,9 @@ use crate::{ use std::{ collections::HashSet, path::{Component, Path, PathBuf}, - str::FromStr, sync::Arc, }; -use sd_file_ext::extensions::{DocumentExtension, ImageExtension}; - use chrono::Utc; use futures::future::TryFutureExt; use normpath::PathExt; @@ -40,7 +29,7 @@ use serde::Deserialize; use serde_json::json; use specta::Type; use tokio::{fs, io}; -use tracing::{debug, error, info, trace, warn}; +use tracing::{debug, info, warn}; use uuid::Uuid; mod error; @@ -898,66 +887,6 @@ async fn check_nested_location( Ok(parents_count > 0 || is_a_child_location) } -pub(super) async fn generate_thumbnail( - extension: &str, - cas_id: &str, - path: impl AsRef, - node: &Arc, -) { - let path = path.as_ref(); - let output_path = get_thumbnail_path(node, cas_id); - - if let Err(e) = fs::metadata(&output_path).await { - if e.kind() != io::ErrorKind::NotFound { - error!( - "Failed to check if thumbnail exists, but we will try to generate it anyway: {e}" - ); - } - // Otherwise we good, thumbnail doesn't exist so we can generate it - } else { - trace!( - "Skipping thumbnail generation for {} because it already exists", - path.display() - ); - return; - } - - if let Ok(extension) = ImageExtension::from_str(extension) { - if can_generate_thumbnail_for_image(&extension) { - if let Err(e) = generate_image_thumbnail(path, &output_path).await { - error!("Failed to image thumbnail on location manager: {e:#?}"); - } - } - } else if let Ok(extension) = DocumentExtension::from_str(extension) { - if can_generate_thumbnail_for_document(&extension) { - if let Err(e) = generate_image_thumbnail(path, &output_path).await { - error!("Failed to document thumbnail on location manager: {e:#?}"); - } - } - } - - #[cfg(feature = "ffmpeg")] - { - use crate::object::media::thumbnail::{ - can_generate_thumbnail_for_video, generate_video_thumbnail, - }; - use sd_file_ext::extensions::VideoExtension; - - if let Ok(extension) = VideoExtension::from_str(extension) { - if can_generate_thumbnail_for_video(&extension) { - if let Err(e) = generate_video_thumbnail(path, &output_path).await { - error!("Failed to video thumbnail on location manager: {e:#?}"); - } - } - } - } - - trace!("Emitting new thumbnail event"); - node.emit(CoreEvent::NewThumbnail { - thumb_key: get_thumb_key(cas_id), - }); -} - pub async fn update_location_size( location_id: location::id::Type, library: &Library, diff --git a/core/src/location/non_indexed.rs b/core/src/location/non_indexed.rs index 05f77b943..d990c634b 100644 --- a/core/src/location/non_indexed.rs +++ b/core/src/location/non_indexed.rs @@ -1,7 +1,10 @@ use crate::{ api::locations::ExplorerItem, library::Library, - object::{cas::generate_cas_id, media::thumbnail::get_thumb_key}, + object::{ + cas::generate_cas_id, + media::thumbnail::{get_thumb_key, GenerateThumbnailArgs}, + }, prisma::location, util::error::FileIOError, Node, @@ -26,7 +29,6 @@ use tracing::{error, warn}; use super::{ file_path_helper::{path_is_hidden, MetadataExt}, - generate_thumbnail, indexer::rules::{ seed::{no_hidden, no_os_protected}, IndexerRule, RuleKind, @@ -104,6 +106,8 @@ pub async fn walk( [(!with_hidden_files).then(|| IndexerRule::from(no_hidden()))], ); + let mut thumbnails_to_generate = vec![]; + while let Some(entry) = read_dir.next_entry().await.map_err(|e| (path, e))? { let Ok((entry_path, name)) = normalize_path(entry.path()) .map_err(|e| errors.push(NonIndexedLocationError::from((path, e)).into())) @@ -161,23 +165,18 @@ pub async fn walk( kind, ObjectKind::Image | ObjectKind::Video | ObjectKind::Document ) { - if let Ok(cas_id) = generate_cas_id(&entry_path, metadata.len()) + if let Ok(cas_id) = generate_cas_id(&path, metadata.len()) .await .map_err(|e| errors.push(NonIndexedLocationError::from((path, e)).into())) { - let thumbnail_key = get_thumb_key(&cas_id); - let entry_path = entry_path.clone(); - let extension = extension.clone(); - let inner_node = Arc::clone(&node); - let inner_cas_id = cas_id.clone(); - tokio::spawn(async move { - generate_thumbnail(&extension, &inner_cas_id, entry_path, &inner_node) - .await; - }); + thumbnails_to_generate.push(GenerateThumbnailArgs::new( + extension.clone(), + cas_id.clone(), + path.to_path_buf(), + Arc::clone(&node), + )); - node.thumbnail_remover - .new_non_indexed_thumbnail(cas_id) - .await; + let thumbnail_key = get_thumb_key(&cas_id); Some(thumbnail_key) } else { @@ -205,6 +204,10 @@ pub async fn walk( } } + node.thumbnailer + .new_non_indexed_thumbnails_batch(thumbnails_to_generate) + .await; + let mut locations = library .db .location() diff --git a/core/src/object/thumbnail_remover.rs b/core/src/object/media/thumbnail/actor.rs similarity index 50% rename from core/src/object/thumbnail_remover.rs rename to core/src/object/media/thumbnail/actor.rs index 22798811c..1ae88e472 100644 --- a/core/src/object/thumbnail_remover.rs +++ b/core/src/object/media/thumbnail/actor.rs @@ -1,35 +1,42 @@ use crate::{ library::{Libraries, LibraryManagerEvent}, + object::media::thumbnail::ThumbnailerError, prisma::{file_path, PrismaClient}, util::error::{FileIOError, NonUtf8PathError}, }; use std::{ - collections::{HashMap, HashSet}, + collections::{HashMap, HashSet, VecDeque}, ffi::OsStr, path::{Path, PathBuf}, pin::pin, sync::Arc, - time::Duration, + time::{Duration, SystemTime}, }; use async_channel as chan; use futures::{future::try_join_all, stream::FuturesUnordered, FutureExt}; -use futures_concurrency::stream::Merge; +use futures_concurrency::{ + future::{Join, Race}, + stream::Merge, +}; use thiserror::Error; use tokio::{ - fs, io, - time::{interval_at, Instant, MissedTickBehavior}, + fs, io, spawn, + sync::oneshot, + time::{interval, interval_at, timeout, Instant, MissedTickBehavior}, }; use tokio_stream::{wrappers::IntervalStream, StreamExt}; use tokio_util::sync::{CancellationToken, DropGuard}; use tracing::{debug, error, trace}; use uuid::Uuid; -use super::media::thumbnail::THUMBNAIL_CACHE_DIR_NAME; +use super::{generate_thumbnail, GenerateThumbnailArgs, THUMBNAIL_CACHE_DIR_NAME}; +const ONE_SEC: Duration = Duration::from_secs(1); const THIRTY_SECS: Duration = Duration::from_secs(30); const HALF_HOUR: Duration = Duration::from_secs(30 * 60); +const ONE_WEEK: Duration = Duration::from_secs(7 * 24 * 60 * 60); #[derive(Error, Debug)] enum Error { @@ -49,19 +56,24 @@ enum DatabaseMessage { Remove(Uuid), } -pub struct Actor { +// Thumbnails directory have the following structure: +// thumbnails/ +// ├── version.txt +//└── [0..2]/ # sharding +// └── .webp +pub struct Thumbnailer { cas_ids_to_delete_tx: chan::Sender>, - non_indexed_thumbnails_cas_ids_tx: chan::Sender, + ephemeral_thumbnails_to_generate_tx: chan::Sender>, _cancel_loop: DropGuard, } -impl Actor { +impl Thumbnailer { pub fn new(data_dir: PathBuf, lm: Arc) -> Self { let mut thumbnails_directory = data_dir; thumbnails_directory.push(THUMBNAIL_CACHE_DIR_NAME); let (databases_tx, databases_rx) = chan::bounded(4); - let (non_indexed_thumbnails_cas_ids_tx, non_indexed_thumbnails_cas_ids_rx) = + let (ephemeral_thumbnails_to_generate_tx, ephemeral_thumbnails_to_generate_rx) = chan::unbounded(); let (cas_ids_to_delete_tx, cas_ids_to_delete_rx) = chan::bounded(16); let cancel_token = CancellationToken::new(); @@ -73,7 +85,7 @@ impl Actor { thumbnails_directory.clone(), databases_rx.clone(), cas_ids_to_delete_rx.clone(), - non_indexed_thumbnails_cas_ids_rx.clone(), + ephemeral_thumbnails_to_generate_rx.clone(), inner_cancel_token.child_token(), )) .await @@ -131,7 +143,7 @@ impl Actor { Self { cas_ids_to_delete_tx, - non_indexed_thumbnails_cas_ids_tx, + ephemeral_thumbnails_to_generate_tx, _cancel_loop: cancel_token.drop_guard(), } } @@ -140,44 +152,107 @@ impl Actor { thumbnails_directory: PathBuf, databases_rx: chan::Receiver, cas_ids_to_delete_rx: chan::Receiver>, - non_indexed_thumbnails_cas_ids_rx: chan::Receiver, + ephemeral_thumbnails_to_generate_rx: chan::Receiver>, cancel_token: CancellationToken, ) { - let mut check_interval = interval_at(Instant::now() + THIRTY_SECS, HALF_HOUR); - check_interval.set_missed_tick_behavior(MissedTickBehavior::Skip); + let mut to_remove_interval = interval_at(Instant::now() + THIRTY_SECS, HALF_HOUR); + to_remove_interval.set_missed_tick_behavior(MissedTickBehavior::Skip); + + let mut idle_interval = interval(ONE_SEC); + idle_interval.set_missed_tick_behavior(MissedTickBehavior::Skip); let mut databases = HashMap::new(); - let mut non_indexed_thumbnails_cas_ids = HashSet::new(); + let mut ephemeral_thumbnails_cas_ids = HashSet::new(); #[derive(Debug)] enum StreamMessage { - Run, + RemovalTick, ToDelete(Vec), Database(DatabaseMessage), - NonIndexedThumbnail(String), + EphemeralThumbnailNewBatch(Vec), + Leftovers(Vec), + NewEphemeralThumbnailCasIds(Vec), Stop, + IdleTick, } let cancel = pin!(cancel_token.cancelled()); + // This is a LIFO queue, so we can process the most recent thumbnails first + let mut ephemeral_thumbnails_queue = Vec::with_capacity(8); + + // This one is a FIFO queue, so we can process leftovers from the previous batch first + let mut ephemeral_thumbnails_leftovers_queue = VecDeque::with_capacity(8); + + let (ephemeral_thumbnails_cas_ids_tx, ephemeral_thumbnails_cas_ids_rx) = chan::bounded(32); + let (leftovers_tx, leftovers_rx) = chan::bounded(8); + + let (stop_older_processing_tx, stop_older_processing_rx) = chan::bounded(1); + + let mut current_batch_processing_rx: Option> = None; + let mut msg_stream = ( databases_rx.map(StreamMessage::Database), cas_ids_to_delete_rx.map(StreamMessage::ToDelete), - non_indexed_thumbnails_cas_ids_rx.map(StreamMessage::NonIndexedThumbnail), - IntervalStream::new(check_interval).map(|_| StreamMessage::Run), + ephemeral_thumbnails_to_generate_rx.map(StreamMessage::EphemeralThumbnailNewBatch), + leftovers_rx.map(StreamMessage::Leftovers), + ephemeral_thumbnails_cas_ids_rx.map(StreamMessage::NewEphemeralThumbnailCasIds), + IntervalStream::new(to_remove_interval).map(|_| StreamMessage::RemovalTick), + IntervalStream::new(idle_interval).map(|_| StreamMessage::IdleTick), cancel.into_stream().map(|()| StreamMessage::Stop), ) .merge(); while let Some(msg) = msg_stream.next().await { match msg { - StreamMessage::Run => { + StreamMessage::IdleTick => { + if let Some(done_rx) = current_batch_processing_rx.as_mut() { + // Checking if the previous run finished or was aborted to clean state + if matches!( + done_rx.try_recv(), + Ok(()) | Err(oneshot::error::TryRecvError::Closed) + ) { + current_batch_processing_rx = None; + } + } + + if current_batch_processing_rx.is_none() + && (!ephemeral_thumbnails_queue.is_empty() + || !ephemeral_thumbnails_leftovers_queue.is_empty()) + { + let (done_tx, done_rx) = oneshot::channel(); + current_batch_processing_rx = Some(done_rx); + + if let Some(batch) = ephemeral_thumbnails_queue.pop() { + spawn(batch_processor( + batch, + ephemeral_thumbnails_cas_ids_tx.clone(), + stop_older_processing_rx.clone(), + done_tx, + leftovers_tx.clone(), + false, + )); + } else if let Some(batch) = ephemeral_thumbnails_leftovers_queue.pop_front() + { + spawn(batch_processor( + batch, + ephemeral_thumbnails_cas_ids_tx.clone(), + stop_older_processing_rx.clone(), + done_tx, + leftovers_tx.clone(), + true, + )); + } + } + } + + StreamMessage::RemovalTick => { // For any of them we process a clean up if a time since the last one already passed if !databases.is_empty() { if let Err(e) = Self::process_clean_up( &thumbnails_directory, databases.values(), - &non_indexed_thumbnails_cas_ids, + &ephemeral_thumbnails_cas_ids, ) .await { @@ -195,14 +270,27 @@ impl Actor { } } + StreamMessage::EphemeralThumbnailNewBatch(batch) => { + ephemeral_thumbnails_queue.push(batch); + if current_batch_processing_rx.is_some() // Only sends stop signal if there is a batch being processed + && stop_older_processing_tx.send(()).await.is_err() + { + error!("Thumbnail remover actor died when trying to stop older processing"); + } + } + + StreamMessage::Leftovers(batch) => { + ephemeral_thumbnails_leftovers_queue.push_back(batch); + } + StreamMessage::Database(DatabaseMessage::Add(id, db)) => { databases.insert(id, db); } StreamMessage::Database(DatabaseMessage::Remove(id)) => { databases.remove(&id); } - StreamMessage::NonIndexedThumbnail(cas_id) => { - non_indexed_thumbnails_cas_ids.insert(cas_id); + StreamMessage::NewEphemeralThumbnailCasIds(cas_ids) => { + ephemeral_thumbnails_cas_ids.extend(cas_ids); } StreamMessage::Stop => { debug!("Thumbnail remover actor is stopping"); @@ -218,7 +306,7 @@ impl Actor { ) -> Result<(), Error> { try_join_all(cas_ids.into_iter().map(|cas_id| async move { let thumbnail_path = - thumbnails_directory.join(format!("{}/{}.webp", &cas_id[0..2], &cas_id[2..])); + thumbnails_directory.join(format!("{}/{cas_id}.webp", &cas_id[0..2])); trace!("Removing thumbnail: {}", thumbnail_path.display()); @@ -240,12 +328,6 @@ impl Actor { ) -> Result<(), Error> { let databases = databases.collect::>(); - // Thumbnails directory have the following structure: - // thumbnails/ - // ├── version.txt - //└── [0..2]/ # sharding - // └── .webp - fs::create_dir_all(&thumbnails_directory) .await .map_err(|e| FileIOError::from((thumbnails_directory, e)))?; @@ -336,22 +418,40 @@ impl Actor { thumbnails_paths_by_cas_id .retain(|cas_id, _| !non_indexed_thumbnails_cas_ids.contains(cas_id)); - let thumbs_to_remove = thumbnails_paths_by_cas_id.len(); + let now = SystemTime::now(); - try_join_all( - thumbnails_paths_by_cas_id - .into_values() - .map(|path| async move { - trace!("Removing stale thumbnail: {}", path.display()); - fs::remove_file(&path) - .await - .map_err(|e| FileIOError::from((path, e))) - }), - ) - .await?; + let removed_count = try_join_all(thumbnails_paths_by_cas_id.into_values().map( + |path| async move { + if let Ok(metadata) = fs::metadata(&path).await { + if metadata + .accessed() + .map(|when| { + now.duration_since(when) + .map(|duration| duration < ONE_WEEK) + .unwrap_or(false) + }) + .unwrap_or(false) + { + // If the thumbnail was accessed in the last week, we don't remove it yet + // as the file is probably still in use + return Ok(false); + } + } - if thumbs_to_remove == thumbs_found { - // if we removed all the thumnails we foumd, it means that the directory is empty + tracing::warn!("Removing stale thumbnail: {}", path.display()); + fs::remove_file(&path) + .await + .map(|()| true) + .map_err(|e| FileIOError::from((path, e))) + }, + )) + .await? + .into_iter() + .filter(|r| *r) + .count(); + + if thumbs_found == removed_count { + // if we removed all the thumnails we found, it means that the directory is empty // and can be removed... trace!( "Removing empty thumbnails sharding directory: {}", @@ -366,20 +466,121 @@ impl Actor { Ok(()) } - pub async fn new_non_indexed_thumbnail(&self, cas_id: String) { + pub async fn new_non_indexed_thumbnails_batch(&self, batch: Vec) { if self - .non_indexed_thumbnails_cas_ids_tx - .send(cas_id) + .ephemeral_thumbnails_to_generate_tx + .send(batch) .await .is_err() { - error!("Thumbnail remover actor is dead"); + error!("Thumbnail remover actor is dead: Failed to send new batch"); } } pub async fn remove_cas_ids(&self, cas_ids: Vec) { if self.cas_ids_to_delete_tx.send(cas_ids).await.is_err() { - error!("Thumbnail remover actor is dead"); + error!("Thumbnail remover actor is dead: Failed to send cas ids to delete"); } } } + +async fn batch_processor( + batch: Vec, + generated_cas_ids_tx: chan::Sender>, + stop_rx: chan::Receiver<()>, + done_tx: oneshot::Sender<()>, + leftovers_tx: chan::Sender>, + in_background: bool, +) { + let mut queue = VecDeque::from(batch); + + enum RaceOutputs { + Processed, + Stop, + } + + // Need this borrow here to satisfy the async move below + let generated_cas_ids_tx = &generated_cas_ids_tx; + + while !queue.is_empty() { + let chunk = (0..4) + .filter_map(|_| queue.pop_front()) + .map( + |GenerateThumbnailArgs { + extension, + cas_id, + path, + node, + }| { + spawn(async move { + timeout( + THIRTY_SECS, + generate_thumbnail(&extension, cas_id, &path, node, in_background), + ) + .await + .unwrap_or_else(|_| Err(ThumbnailerError::TimedOut(path.into_boxed_path()))) + }) + }, + ) + .collect::>(); + + if let RaceOutputs::Stop = ( + async move { + let cas_ids = chunk + .join() + .await + .into_iter() + .filter_map(|join_result| { + join_result + .map_err(|e| error!("Failed to join thumbnail generation task: {e:#?}")) + .ok() + }) + .filter_map(|result| { + result + .map_err(|e| { + error!( + "Failed to generate thumbnail for ephemeral location: {e:#?}" + ) + }) + .ok() + }) + .collect(); + + if generated_cas_ids_tx.send(cas_ids).await.is_err() { + error!("Thumbnail remover actor is dead: Failed to send generated cas ids") + } + + trace!("Processed chunk of thumbnails"); + RaceOutputs::Processed + }, + async { + stop_rx + .recv() + .await + .expect("Critical error on thumbnails actor"); + trace!("Received a stop signal"); + RaceOutputs::Stop + }, + ) + .race() + .await + { + // Our queue is always contiguous, so this `from`` is free + let leftovers = Vec::from(queue); + + trace!( + "Stopped with {} thumbnails left to process", + leftovers.len() + ); + if !leftovers.is_empty() && leftovers_tx.send(leftovers).await.is_err() { + error!("Thumbnail remover actor is dead: Failed to send leftovers") + } + + done_tx.send(()).ok(); + + return; + } + } + + done_tx.send(()).ok(); +} diff --git a/core/src/object/media/thumbnail/mod.rs b/core/src/object/media/thumbnail/mod.rs index 841282e8b..627d0f39b 100644 --- a/core/src/object/media/thumbnail/mod.rs +++ b/core/src/object/media/thumbnail/mod.rs @@ -21,6 +21,8 @@ use std::{ collections::HashMap, ops::Deref, path::{Path, PathBuf}, + str::FromStr, + sync::Arc, }; use futures::future::{join_all, try_join_all}; @@ -32,6 +34,7 @@ use tokio::{fs, io, task}; use tracing::{error, trace, warn}; use webp::Encoder; +pub mod actor; mod directory; mod shard; @@ -99,11 +102,15 @@ pub enum ThumbnailerError { SdImages(#[from] sd_images::Error), #[error("failed to execute converting task: {0}")] Task(#[from] task::JoinError), + #[error(transparent)] + FFmpeg(#[from] sd_ffmpeg::ThumbnailerError), + #[error("thumbnail generation timed out for {}", .0.display())] + TimedOut(Box), } /// This is the target pixel count for all thumbnails to be resized to, and it is eventually downscaled /// to [`TARGET_QUALITY`]. -const TAGRET_PX: f32 = 262144_f32; +const TARGET_PX: f32 = 262144_f32; /// This is the target quality that we render thumbnails at, it is a float between 0-100 /// and is treated as a percentage (so 30% in this case, or it's the same as multiplying by `0.3`). @@ -122,9 +129,9 @@ pub struct ThumbnailerMetadata { pub skipped: u32, } -pub async fn generate_image_thumbnail>( - file_path: P, - output_path: P, +pub async fn generate_image_thumbnail( + file_path: impl AsRef, + output_path: impl AsRef, ) -> Result<(), ThumbnailerError> { let file_path = file_path.as_ref().to_path_buf(); @@ -132,7 +139,7 @@ pub async fn generate_image_thumbnail>( let img = format_image(&file_path).map_err(|_| ThumbnailerError::Encoding)?; let (w, h) = img.dimensions(); - let (w_scaled, h_scaled) = scale_dimensions(w as f32, h as f32, TAGRET_PX); + let (w_scaled, h_scaled) = scale_dimensions(w as f32, h as f32, TARGET_PX); // Optionally, resize the existing photo and convert back into DynamicImage let mut img = DynamicImage::ImageRgba8(imageops::resize( @@ -177,15 +184,15 @@ pub async fn generate_image_thumbnail>( } #[cfg(feature = "ffmpeg")] -pub async fn generate_video_thumbnail + Send>( - file_path: P, - output_path: P, -) -> Result<(), Box> { +pub async fn generate_video_thumbnail( + file_path: impl AsRef, + output_path: impl AsRef, +) -> Result<(), ThumbnailerError> { use sd_ffmpeg::to_thumbnail; - to_thumbnail(file_path, output_path, 256, TARGET_QUALITY).await?; - - Ok(()) + to_thumbnail(file_path, output_path, 256, TARGET_QUALITY) + .await + .map_err(Into::into) } #[cfg(feature = "ffmpeg")] @@ -416,3 +423,81 @@ async fn process_single_thumbnail( }); run_metadata.created += 1; } + +// TODO(fogodev): Unify how we generate thumbnails + +#[derive(Debug)] +pub struct GenerateThumbnailArgs { + pub extension: String, + pub cas_id: String, + pub path: PathBuf, + pub node: Arc, +} + +impl GenerateThumbnailArgs { + pub fn new(extension: String, cas_id: String, path: PathBuf, node: Arc) -> Self { + Self { + extension, + cas_id, + path, + node, + } + } +} + +pub async fn generate_thumbnail( + extension: &str, + cas_id: String, + path: impl AsRef, + node: Arc, + in_background: bool, +) -> Result { + let path = path.as_ref(); + trace!("Generating thumbnail for {}", path.display()); + let output_path = get_thumbnail_path(&node, &cas_id); + + if let Err(e) = fs::metadata(&output_path).await { + if e.kind() != io::ErrorKind::NotFound { + error!( + "Failed to check if thumbnail exists, but we will try to generate it anyway: {e}" + ); + } + // Otherwise we good, thumbnail doesn't exist so we can generate it + } else { + trace!( + "Skipping thumbnail generation for {} because it already exists", + path.display() + ); + return Ok(cas_id); + } + + if let Ok(extension) = ImageExtension::from_str(extension) { + if can_generate_thumbnail_for_image(&extension) { + generate_image_thumbnail(&path, &output_path).await?; + } + } else if let Ok(extension) = DocumentExtension::from_str(extension) { + if can_generate_thumbnail_for_document(&extension) { + generate_image_thumbnail(&path, &output_path).await?; + } + } + + #[cfg(feature = "ffmpeg")] + { + if let Ok(extension) = VideoExtension::from_str(extension) { + if can_generate_thumbnail_for_video(&extension) { + generate_video_thumbnail(&path, &output_path).await?; + } + } + } + + if !in_background { + trace!("Emitting new thumbnail event"); + node.emit(CoreEvent::NewThumbnail { + thumb_key: get_thumb_key(&cas_id), + }); + } + + trace!("Generated thumbnail for {}", path.display()); + + Ok(cas_id) +} diff --git a/core/src/object/mod.rs b/core/src/object/mod.rs index 156e30842..c77eaf987 100644 --- a/core/src/object/mod.rs +++ b/core/src/object/mod.rs @@ -9,7 +9,6 @@ pub mod fs; pub mod media; pub mod orphan_remover; pub mod tag; -pub mod thumbnail_remover; pub mod validation; // Objects are primarily created by the identifier from Paths diff --git a/core/src/volume/watcher.rs b/core/src/volume/watcher.rs index 7c721f242..3074f22a6 100644 --- a/core/src/volume/watcher.rs +++ b/core/src/volume/watcher.rs @@ -7,7 +7,7 @@ use tokio::{ time::{interval, Duration}, }; -use super::{get_volumes, Volume}; +use super::get_volumes; pub fn spawn_volume_watcher(library: Arc) { spawn(async move { @@ -21,11 +21,7 @@ pub fn spawn_volume_watcher(library: Arc) { if existing_volumes != current_volumes { existing_volumes = current_volumes; - invalidate_query!( - &library, - "volumes.list": (), - () - ); + invalidate_query!(&library, "volumes.list"); } } }); diff --git a/crates/ffmpeg/src/lib.rs b/crates/ffmpeg/src/lib.rs index dacb4b93d..e1b6db295 100644 --- a/crates/ffmpeg/src/lib.rs +++ b/crates/ffmpeg/src/lib.rs @@ -18,8 +18,8 @@ pub use thumbnailer::{Thumbnailer, ThumbnailerBuilder}; /// Helper function to generate a thumbnail file from a video file with reasonable defaults pub async fn to_thumbnail( - video_file_path: impl AsRef + Send, - output_thumbnail_path: impl AsRef + Send, + video_file_path: impl AsRef, + output_thumbnail_path: impl AsRef, size: u32, quality: f32, ) -> Result<(), ThumbnailerError> { @@ -34,7 +34,7 @@ pub async fn to_thumbnail( /// Helper function to generate a thumbnail bytes from a video file with reasonable defaults pub async fn to_webp_bytes( - video_file_path: impl AsRef + Send, + video_file_path: impl AsRef, size: u32, quality: f32, ) -> Result, ThumbnailerError> { diff --git a/crates/ffmpeg/src/thumbnailer.rs b/crates/ffmpeg/src/thumbnailer.rs index 1dc4ac98a..aa54a625c 100644 --- a/crates/ffmpeg/src/thumbnailer.rs +++ b/crates/ffmpeg/src/thumbnailer.rs @@ -16,8 +16,8 @@ impl Thumbnailer { /// Processes an video input file and write to file system a thumbnail with webp format pub async fn process( &self, - video_file_path: impl AsRef + Send, - output_thumbnail_path: impl AsRef + Send, + video_file_path: impl AsRef, + output_thumbnail_path: impl AsRef, ) -> Result<(), ThumbnailerError> { let path = output_thumbnail_path.as_ref().parent().ok_or_else(|| { io::Error::new( @@ -39,7 +39,7 @@ impl Thumbnailer { /// Processes an video input file and returns a webp encoded thumbnail as bytes pub async fn process_to_webp_bytes( &self, - video_file_path: impl AsRef + Send, + video_file_path: impl AsRef, ) -> Result, ThumbnailerError> { let video_file_path = video_file_path.as_ref().to_path_buf(); let prefer_embedded_metadata = self.builder.prefer_embedded_metadata;