mirror of
https://github.com/spacedriveapp/spacedrive
synced 2024-07-04 13:23:28 +00:00
[ENG-1225] Put brakes on thumbnail generation for ephemeral locations (#1523)
* Done * Small nitpick
This commit is contained in:
parent
e8a30da85e
commit
20fde557d4
|
@ -3,7 +3,7 @@
|
|||
use crate::{
|
||||
api::{CoreEvent, Router},
|
||||
location::LocationManagerError,
|
||||
object::thumbnail_remover,
|
||||
object::media::thumbnail::actor::Thumbnailer,
|
||||
p2p::sync::NetworkedLibraries,
|
||||
};
|
||||
|
||||
|
@ -65,7 +65,7 @@ pub struct Node {
|
|||
pub event_bus: (broadcast::Sender<CoreEvent>, broadcast::Receiver<CoreEvent>),
|
||||
pub notifications: Notifications,
|
||||
pub nlm: Arc<NetworkedLibraries>,
|
||||
pub thumbnail_remover: thumbnail_remover::Actor,
|
||||
pub thumbnailer: Thumbnailer,
|
||||
pub files_over_p2p_flag: Arc<AtomicBool>,
|
||||
pub env: env::Env,
|
||||
pub http: reqwest::Client,
|
||||
|
@ -113,10 +113,7 @@ impl Node {
|
|||
p2p,
|
||||
config,
|
||||
event_bus,
|
||||
thumbnail_remover: thumbnail_remover::Actor::new(
|
||||
data_dir.to_path_buf(),
|
||||
libraries.clone(),
|
||||
),
|
||||
thumbnailer: Thumbnailer::new(data_dir.to_path_buf(), libraries.clone()),
|
||||
libraries,
|
||||
files_over_p2p_flag: Arc::new(AtomicBool::new(false)),
|
||||
http: reqwest::Client::new(),
|
||||
|
|
|
@ -220,7 +220,7 @@ impl StatefulJob for IndexerJobInit {
|
|||
);
|
||||
|
||||
ctx.node
|
||||
.thumbnail_remover
|
||||
.thumbnailer
|
||||
.remove_cas_ids(
|
||||
to_remove
|
||||
.iter()
|
||||
|
|
|
@ -92,7 +92,7 @@ pub async fn shallow(
|
|||
|
||||
debug!("Walker at shallow indexer found {to_remove_count} file_paths to be removed");
|
||||
|
||||
node.thumbnail_remover
|
||||
node.thumbnailer
|
||||
.remove_cas_ids(
|
||||
to_remove
|
||||
.iter()
|
||||
|
|
|
@ -10,7 +10,7 @@ use crate::{
|
|||
loose_find_existing_file_path_params, FilePathError, FilePathMetadata,
|
||||
IsolatedFilePathData, MetadataExt,
|
||||
},
|
||||
find_location, generate_thumbnail,
|
||||
find_location,
|
||||
indexer::reverse_update_directories_sizes,
|
||||
location_with_indexer_rules,
|
||||
manager::LocationManagerError,
|
||||
|
@ -21,7 +21,7 @@ use crate::{
|
|||
media::{
|
||||
media_data_extractor::{can_extract_media_data_for_image, extract_media_data},
|
||||
media_data_image_to_query,
|
||||
thumbnail::get_thumbnail_path,
|
||||
thumbnail::{generate_thumbnail, get_thumbnail_path},
|
||||
},
|
||||
validation::hash::file_checksum,
|
||||
},
|
||||
|
@ -283,7 +283,11 @@ async fn inner_create_file(
|
|||
let inner_extension = extension.clone();
|
||||
if let Some(cas_id) = cas_id {
|
||||
tokio::spawn(async move {
|
||||
generate_thumbnail(&inner_extension, &cas_id, inner_path, &node).await;
|
||||
if let Err(e) =
|
||||
generate_thumbnail(&inner_extension, cas_id, inner_path, node, true).await
|
||||
{
|
||||
error!("Failed to generate thumbnail in the watcher: {e:#?}");
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
|
@ -499,7 +503,12 @@ async fn inner_update_file(
|
|||
let inner_node = node.clone();
|
||||
if let Some(cas_id) = cas_id {
|
||||
tokio::spawn(async move {
|
||||
generate_thumbnail(&ext, &cas_id, inner_path, &inner_node).await;
|
||||
if let Err(e) =
|
||||
generate_thumbnail(&ext, cas_id, inner_path, inner_node, true)
|
||||
.await
|
||||
{
|
||||
error!("Failed to generate thumbnail in the watcher: {e:#?}");
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
|
|
|
@ -1,19 +1,11 @@
|
|||
use crate::{
|
||||
api::CoreEvent,
|
||||
invalidate_query,
|
||||
job::{JobBuilder, JobError, JobManagerError},
|
||||
library::Library,
|
||||
location::file_path_helper::filter_existing_file_path_params,
|
||||
object::{
|
||||
file_identifier::{self, file_identifier_job::FileIdentifierJobInit},
|
||||
media::{
|
||||
media_processor,
|
||||
thumbnail::{
|
||||
can_generate_thumbnail_for_document, can_generate_thumbnail_for_image,
|
||||
generate_image_thumbnail, get_thumb_key, get_thumbnail_path,
|
||||
},
|
||||
MediaProcessorJobInit,
|
||||
},
|
||||
media::{media_processor, MediaProcessorJobInit},
|
||||
},
|
||||
prisma::{file_path, indexer_rules_in_location, location, PrismaClient},
|
||||
util::{db::maybe_missing, error::FileIOError},
|
||||
|
@ -23,12 +15,9 @@ use crate::{
|
|||
use std::{
|
||||
collections::HashSet,
|
||||
path::{Component, Path, PathBuf},
|
||||
str::FromStr,
|
||||
sync::Arc,
|
||||
};
|
||||
|
||||
use sd_file_ext::extensions::{DocumentExtension, ImageExtension};
|
||||
|
||||
use chrono::Utc;
|
||||
use futures::future::TryFutureExt;
|
||||
use normpath::PathExt;
|
||||
|
@ -40,7 +29,7 @@ use serde::Deserialize;
|
|||
use serde_json::json;
|
||||
use specta::Type;
|
||||
use tokio::{fs, io};
|
||||
use tracing::{debug, error, info, trace, warn};
|
||||
use tracing::{debug, info, warn};
|
||||
use uuid::Uuid;
|
||||
|
||||
mod error;
|
||||
|
@ -898,66 +887,6 @@ async fn check_nested_location(
|
|||
Ok(parents_count > 0 || is_a_child_location)
|
||||
}
|
||||
|
||||
pub(super) async fn generate_thumbnail(
|
||||
extension: &str,
|
||||
cas_id: &str,
|
||||
path: impl AsRef<Path>,
|
||||
node: &Arc<Node>,
|
||||
) {
|
||||
let path = path.as_ref();
|
||||
let output_path = get_thumbnail_path(node, cas_id);
|
||||
|
||||
if let Err(e) = fs::metadata(&output_path).await {
|
||||
if e.kind() != io::ErrorKind::NotFound {
|
||||
error!(
|
||||
"Failed to check if thumbnail exists, but we will try to generate it anyway: {e}"
|
||||
);
|
||||
}
|
||||
// Otherwise we good, thumbnail doesn't exist so we can generate it
|
||||
} else {
|
||||
trace!(
|
||||
"Skipping thumbnail generation for {} because it already exists",
|
||||
path.display()
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
if let Ok(extension) = ImageExtension::from_str(extension) {
|
||||
if can_generate_thumbnail_for_image(&extension) {
|
||||
if let Err(e) = generate_image_thumbnail(path, &output_path).await {
|
||||
error!("Failed to image thumbnail on location manager: {e:#?}");
|
||||
}
|
||||
}
|
||||
} else if let Ok(extension) = DocumentExtension::from_str(extension) {
|
||||
if can_generate_thumbnail_for_document(&extension) {
|
||||
if let Err(e) = generate_image_thumbnail(path, &output_path).await {
|
||||
error!("Failed to document thumbnail on location manager: {e:#?}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "ffmpeg")]
|
||||
{
|
||||
use crate::object::media::thumbnail::{
|
||||
can_generate_thumbnail_for_video, generate_video_thumbnail,
|
||||
};
|
||||
use sd_file_ext::extensions::VideoExtension;
|
||||
|
||||
if let Ok(extension) = VideoExtension::from_str(extension) {
|
||||
if can_generate_thumbnail_for_video(&extension) {
|
||||
if let Err(e) = generate_video_thumbnail(path, &output_path).await {
|
||||
error!("Failed to video thumbnail on location manager: {e:#?}");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
trace!("Emitting new thumbnail event");
|
||||
node.emit(CoreEvent::NewThumbnail {
|
||||
thumb_key: get_thumb_key(cas_id),
|
||||
});
|
||||
}
|
||||
|
||||
pub async fn update_location_size(
|
||||
location_id: location::id::Type,
|
||||
library: &Library,
|
||||
|
|
|
@ -1,7 +1,10 @@
|
|||
use crate::{
|
||||
api::locations::ExplorerItem,
|
||||
library::Library,
|
||||
object::{cas::generate_cas_id, media::thumbnail::get_thumb_key},
|
||||
object::{
|
||||
cas::generate_cas_id,
|
||||
media::thumbnail::{get_thumb_key, GenerateThumbnailArgs},
|
||||
},
|
||||
prisma::location,
|
||||
util::error::FileIOError,
|
||||
Node,
|
||||
|
@ -26,7 +29,6 @@ use tracing::{error, warn};
|
|||
|
||||
use super::{
|
||||
file_path_helper::{path_is_hidden, MetadataExt},
|
||||
generate_thumbnail,
|
||||
indexer::rules::{
|
||||
seed::{no_hidden, no_os_protected},
|
||||
IndexerRule, RuleKind,
|
||||
|
@ -104,6 +106,8 @@ pub async fn walk(
|
|||
[(!with_hidden_files).then(|| IndexerRule::from(no_hidden()))],
|
||||
);
|
||||
|
||||
let mut thumbnails_to_generate = vec![];
|
||||
|
||||
while let Some(entry) = read_dir.next_entry().await.map_err(|e| (path, e))? {
|
||||
let Ok((entry_path, name)) = normalize_path(entry.path())
|
||||
.map_err(|e| errors.push(NonIndexedLocationError::from((path, e)).into()))
|
||||
|
@ -161,23 +165,18 @@ pub async fn walk(
|
|||
kind,
|
||||
ObjectKind::Image | ObjectKind::Video | ObjectKind::Document
|
||||
) {
|
||||
if let Ok(cas_id) = generate_cas_id(&entry_path, metadata.len())
|
||||
if let Ok(cas_id) = generate_cas_id(&path, metadata.len())
|
||||
.await
|
||||
.map_err(|e| errors.push(NonIndexedLocationError::from((path, e)).into()))
|
||||
{
|
||||
let thumbnail_key = get_thumb_key(&cas_id);
|
||||
let entry_path = entry_path.clone();
|
||||
let extension = extension.clone();
|
||||
let inner_node = Arc::clone(&node);
|
||||
let inner_cas_id = cas_id.clone();
|
||||
tokio::spawn(async move {
|
||||
generate_thumbnail(&extension, &inner_cas_id, entry_path, &inner_node)
|
||||
.await;
|
||||
});
|
||||
thumbnails_to_generate.push(GenerateThumbnailArgs::new(
|
||||
extension.clone(),
|
||||
cas_id.clone(),
|
||||
path.to_path_buf(),
|
||||
Arc::clone(&node),
|
||||
));
|
||||
|
||||
node.thumbnail_remover
|
||||
.new_non_indexed_thumbnail(cas_id)
|
||||
.await;
|
||||
let thumbnail_key = get_thumb_key(&cas_id);
|
||||
|
||||
Some(thumbnail_key)
|
||||
} else {
|
||||
|
@ -205,6 +204,10 @@ pub async fn walk(
|
|||
}
|
||||
}
|
||||
|
||||
node.thumbnailer
|
||||
.new_non_indexed_thumbnails_batch(thumbnails_to_generate)
|
||||
.await;
|
||||
|
||||
let mut locations = library
|
||||
.db
|
||||
.location()
|
||||
|
|
|
@ -1,35 +1,42 @@
|
|||
use crate::{
|
||||
library::{Libraries, LibraryManagerEvent},
|
||||
object::media::thumbnail::ThumbnailerError,
|
||||
prisma::{file_path, PrismaClient},
|
||||
util::error::{FileIOError, NonUtf8PathError},
|
||||
};
|
||||
|
||||
use std::{
|
||||
collections::{HashMap, HashSet},
|
||||
collections::{HashMap, HashSet, VecDeque},
|
||||
ffi::OsStr,
|
||||
path::{Path, PathBuf},
|
||||
pin::pin,
|
||||
sync::Arc,
|
||||
time::Duration,
|
||||
time::{Duration, SystemTime},
|
||||
};
|
||||
|
||||
use async_channel as chan;
|
||||
use futures::{future::try_join_all, stream::FuturesUnordered, FutureExt};
|
||||
use futures_concurrency::stream::Merge;
|
||||
use futures_concurrency::{
|
||||
future::{Join, Race},
|
||||
stream::Merge,
|
||||
};
|
||||
use thiserror::Error;
|
||||
use tokio::{
|
||||
fs, io,
|
||||
time::{interval_at, Instant, MissedTickBehavior},
|
||||
fs, io, spawn,
|
||||
sync::oneshot,
|
||||
time::{interval, interval_at, timeout, Instant, MissedTickBehavior},
|
||||
};
|
||||
use tokio_stream::{wrappers::IntervalStream, StreamExt};
|
||||
use tokio_util::sync::{CancellationToken, DropGuard};
|
||||
use tracing::{debug, error, trace};
|
||||
use uuid::Uuid;
|
||||
|
||||
use super::media::thumbnail::THUMBNAIL_CACHE_DIR_NAME;
|
||||
use super::{generate_thumbnail, GenerateThumbnailArgs, THUMBNAIL_CACHE_DIR_NAME};
|
||||
|
||||
const ONE_SEC: Duration = Duration::from_secs(1);
|
||||
const THIRTY_SECS: Duration = Duration::from_secs(30);
|
||||
const HALF_HOUR: Duration = Duration::from_secs(30 * 60);
|
||||
const ONE_WEEK: Duration = Duration::from_secs(7 * 24 * 60 * 60);
|
||||
|
||||
#[derive(Error, Debug)]
|
||||
enum Error {
|
||||
|
@ -49,19 +56,24 @@ enum DatabaseMessage {
|
|||
Remove(Uuid),
|
||||
}
|
||||
|
||||
pub struct Actor {
|
||||
// Thumbnails directory have the following structure:
|
||||
// thumbnails/
|
||||
// ├── version.txt
|
||||
//└── <cas_id>[0..2]/ # sharding
|
||||
// └── <cas_id>.webp
|
||||
pub struct Thumbnailer {
|
||||
cas_ids_to_delete_tx: chan::Sender<Vec<String>>,
|
||||
non_indexed_thumbnails_cas_ids_tx: chan::Sender<String>,
|
||||
ephemeral_thumbnails_to_generate_tx: chan::Sender<Vec<GenerateThumbnailArgs>>,
|
||||
_cancel_loop: DropGuard,
|
||||
}
|
||||
|
||||
impl Actor {
|
||||
impl Thumbnailer {
|
||||
pub fn new(data_dir: PathBuf, lm: Arc<Libraries>) -> Self {
|
||||
let mut thumbnails_directory = data_dir;
|
||||
thumbnails_directory.push(THUMBNAIL_CACHE_DIR_NAME);
|
||||
|
||||
let (databases_tx, databases_rx) = chan::bounded(4);
|
||||
let (non_indexed_thumbnails_cas_ids_tx, non_indexed_thumbnails_cas_ids_rx) =
|
||||
let (ephemeral_thumbnails_to_generate_tx, ephemeral_thumbnails_to_generate_rx) =
|
||||
chan::unbounded();
|
||||
let (cas_ids_to_delete_tx, cas_ids_to_delete_rx) = chan::bounded(16);
|
||||
let cancel_token = CancellationToken::new();
|
||||
|
@ -73,7 +85,7 @@ impl Actor {
|
|||
thumbnails_directory.clone(),
|
||||
databases_rx.clone(),
|
||||
cas_ids_to_delete_rx.clone(),
|
||||
non_indexed_thumbnails_cas_ids_rx.clone(),
|
||||
ephemeral_thumbnails_to_generate_rx.clone(),
|
||||
inner_cancel_token.child_token(),
|
||||
))
|
||||
.await
|
||||
|
@ -131,7 +143,7 @@ impl Actor {
|
|||
|
||||
Self {
|
||||
cas_ids_to_delete_tx,
|
||||
non_indexed_thumbnails_cas_ids_tx,
|
||||
ephemeral_thumbnails_to_generate_tx,
|
||||
_cancel_loop: cancel_token.drop_guard(),
|
||||
}
|
||||
}
|
||||
|
@ -140,44 +152,107 @@ impl Actor {
|
|||
thumbnails_directory: PathBuf,
|
||||
databases_rx: chan::Receiver<DatabaseMessage>,
|
||||
cas_ids_to_delete_rx: chan::Receiver<Vec<String>>,
|
||||
non_indexed_thumbnails_cas_ids_rx: chan::Receiver<String>,
|
||||
ephemeral_thumbnails_to_generate_rx: chan::Receiver<Vec<GenerateThumbnailArgs>>,
|
||||
cancel_token: CancellationToken,
|
||||
) {
|
||||
let mut check_interval = interval_at(Instant::now() + THIRTY_SECS, HALF_HOUR);
|
||||
check_interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
|
||||
let mut to_remove_interval = interval_at(Instant::now() + THIRTY_SECS, HALF_HOUR);
|
||||
to_remove_interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
|
||||
|
||||
let mut idle_interval = interval(ONE_SEC);
|
||||
idle_interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
|
||||
|
||||
let mut databases = HashMap::new();
|
||||
let mut non_indexed_thumbnails_cas_ids = HashSet::new();
|
||||
let mut ephemeral_thumbnails_cas_ids = HashSet::new();
|
||||
|
||||
#[derive(Debug)]
|
||||
enum StreamMessage {
|
||||
Run,
|
||||
RemovalTick,
|
||||
ToDelete(Vec<String>),
|
||||
Database(DatabaseMessage),
|
||||
NonIndexedThumbnail(String),
|
||||
EphemeralThumbnailNewBatch(Vec<GenerateThumbnailArgs>),
|
||||
Leftovers(Vec<GenerateThumbnailArgs>),
|
||||
NewEphemeralThumbnailCasIds(Vec<String>),
|
||||
Stop,
|
||||
IdleTick,
|
||||
}
|
||||
|
||||
let cancel = pin!(cancel_token.cancelled());
|
||||
|
||||
// This is a LIFO queue, so we can process the most recent thumbnails first
|
||||
let mut ephemeral_thumbnails_queue = Vec::with_capacity(8);
|
||||
|
||||
// This one is a FIFO queue, so we can process leftovers from the previous batch first
|
||||
let mut ephemeral_thumbnails_leftovers_queue = VecDeque::with_capacity(8);
|
||||
|
||||
let (ephemeral_thumbnails_cas_ids_tx, ephemeral_thumbnails_cas_ids_rx) = chan::bounded(32);
|
||||
let (leftovers_tx, leftovers_rx) = chan::bounded(8);
|
||||
|
||||
let (stop_older_processing_tx, stop_older_processing_rx) = chan::bounded(1);
|
||||
|
||||
let mut current_batch_processing_rx: Option<oneshot::Receiver<()>> = None;
|
||||
|
||||
let mut msg_stream = (
|
||||
databases_rx.map(StreamMessage::Database),
|
||||
cas_ids_to_delete_rx.map(StreamMessage::ToDelete),
|
||||
non_indexed_thumbnails_cas_ids_rx.map(StreamMessage::NonIndexedThumbnail),
|
||||
IntervalStream::new(check_interval).map(|_| StreamMessage::Run),
|
||||
ephemeral_thumbnails_to_generate_rx.map(StreamMessage::EphemeralThumbnailNewBatch),
|
||||
leftovers_rx.map(StreamMessage::Leftovers),
|
||||
ephemeral_thumbnails_cas_ids_rx.map(StreamMessage::NewEphemeralThumbnailCasIds),
|
||||
IntervalStream::new(to_remove_interval).map(|_| StreamMessage::RemovalTick),
|
||||
IntervalStream::new(idle_interval).map(|_| StreamMessage::IdleTick),
|
||||
cancel.into_stream().map(|()| StreamMessage::Stop),
|
||||
)
|
||||
.merge();
|
||||
|
||||
while let Some(msg) = msg_stream.next().await {
|
||||
match msg {
|
||||
StreamMessage::Run => {
|
||||
StreamMessage::IdleTick => {
|
||||
if let Some(done_rx) = current_batch_processing_rx.as_mut() {
|
||||
// Checking if the previous run finished or was aborted to clean state
|
||||
if matches!(
|
||||
done_rx.try_recv(),
|
||||
Ok(()) | Err(oneshot::error::TryRecvError::Closed)
|
||||
) {
|
||||
current_batch_processing_rx = None;
|
||||
}
|
||||
}
|
||||
|
||||
if current_batch_processing_rx.is_none()
|
||||
&& (!ephemeral_thumbnails_queue.is_empty()
|
||||
|| !ephemeral_thumbnails_leftovers_queue.is_empty())
|
||||
{
|
||||
let (done_tx, done_rx) = oneshot::channel();
|
||||
current_batch_processing_rx = Some(done_rx);
|
||||
|
||||
if let Some(batch) = ephemeral_thumbnails_queue.pop() {
|
||||
spawn(batch_processor(
|
||||
batch,
|
||||
ephemeral_thumbnails_cas_ids_tx.clone(),
|
||||
stop_older_processing_rx.clone(),
|
||||
done_tx,
|
||||
leftovers_tx.clone(),
|
||||
false,
|
||||
));
|
||||
} else if let Some(batch) = ephemeral_thumbnails_leftovers_queue.pop_front()
|
||||
{
|
||||
spawn(batch_processor(
|
||||
batch,
|
||||
ephemeral_thumbnails_cas_ids_tx.clone(),
|
||||
stop_older_processing_rx.clone(),
|
||||
done_tx,
|
||||
leftovers_tx.clone(),
|
||||
true,
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
StreamMessage::RemovalTick => {
|
||||
// For any of them we process a clean up if a time since the last one already passed
|
||||
if !databases.is_empty() {
|
||||
if let Err(e) = Self::process_clean_up(
|
||||
&thumbnails_directory,
|
||||
databases.values(),
|
||||
&non_indexed_thumbnails_cas_ids,
|
||||
&ephemeral_thumbnails_cas_ids,
|
||||
)
|
||||
.await
|
||||
{
|
||||
|
@ -195,14 +270,27 @@ impl Actor {
|
|||
}
|
||||
}
|
||||
|
||||
StreamMessage::EphemeralThumbnailNewBatch(batch) => {
|
||||
ephemeral_thumbnails_queue.push(batch);
|
||||
if current_batch_processing_rx.is_some() // Only sends stop signal if there is a batch being processed
|
||||
&& stop_older_processing_tx.send(()).await.is_err()
|
||||
{
|
||||
error!("Thumbnail remover actor died when trying to stop older processing");
|
||||
}
|
||||
}
|
||||
|
||||
StreamMessage::Leftovers(batch) => {
|
||||
ephemeral_thumbnails_leftovers_queue.push_back(batch);
|
||||
}
|
||||
|
||||
StreamMessage::Database(DatabaseMessage::Add(id, db)) => {
|
||||
databases.insert(id, db);
|
||||
}
|
||||
StreamMessage::Database(DatabaseMessage::Remove(id)) => {
|
||||
databases.remove(&id);
|
||||
}
|
||||
StreamMessage::NonIndexedThumbnail(cas_id) => {
|
||||
non_indexed_thumbnails_cas_ids.insert(cas_id);
|
||||
StreamMessage::NewEphemeralThumbnailCasIds(cas_ids) => {
|
||||
ephemeral_thumbnails_cas_ids.extend(cas_ids);
|
||||
}
|
||||
StreamMessage::Stop => {
|
||||
debug!("Thumbnail remover actor is stopping");
|
||||
|
@ -218,7 +306,7 @@ impl Actor {
|
|||
) -> Result<(), Error> {
|
||||
try_join_all(cas_ids.into_iter().map(|cas_id| async move {
|
||||
let thumbnail_path =
|
||||
thumbnails_directory.join(format!("{}/{}.webp", &cas_id[0..2], &cas_id[2..]));
|
||||
thumbnails_directory.join(format!("{}/{cas_id}.webp", &cas_id[0..2]));
|
||||
|
||||
trace!("Removing thumbnail: {}", thumbnail_path.display());
|
||||
|
||||
|
@ -240,12 +328,6 @@ impl Actor {
|
|||
) -> Result<(), Error> {
|
||||
let databases = databases.collect::<Vec<_>>();
|
||||
|
||||
// Thumbnails directory have the following structure:
|
||||
// thumbnails/
|
||||
// ├── version.txt
|
||||
//└── <cas_id>[0..2]/ # sharding
|
||||
// └── <cas_id>.webp
|
||||
|
||||
fs::create_dir_all(&thumbnails_directory)
|
||||
.await
|
||||
.map_err(|e| FileIOError::from((thumbnails_directory, e)))?;
|
||||
|
@ -336,22 +418,40 @@ impl Actor {
|
|||
thumbnails_paths_by_cas_id
|
||||
.retain(|cas_id, _| !non_indexed_thumbnails_cas_ids.contains(cas_id));
|
||||
|
||||
let thumbs_to_remove = thumbnails_paths_by_cas_id.len();
|
||||
let now = SystemTime::now();
|
||||
|
||||
try_join_all(
|
||||
thumbnails_paths_by_cas_id
|
||||
.into_values()
|
||||
.map(|path| async move {
|
||||
trace!("Removing stale thumbnail: {}", path.display());
|
||||
let removed_count = try_join_all(thumbnails_paths_by_cas_id.into_values().map(
|
||||
|path| async move {
|
||||
if let Ok(metadata) = fs::metadata(&path).await {
|
||||
if metadata
|
||||
.accessed()
|
||||
.map(|when| {
|
||||
now.duration_since(when)
|
||||
.map(|duration| duration < ONE_WEEK)
|
||||
.unwrap_or(false)
|
||||
})
|
||||
.unwrap_or(false)
|
||||
{
|
||||
// If the thumbnail was accessed in the last week, we don't remove it yet
|
||||
// as the file is probably still in use
|
||||
return Ok(false);
|
||||
}
|
||||
}
|
||||
|
||||
tracing::warn!("Removing stale thumbnail: {}", path.display());
|
||||
fs::remove_file(&path)
|
||||
.await
|
||||
.map(|()| true)
|
||||
.map_err(|e| FileIOError::from((path, e)))
|
||||
}),
|
||||
)
|
||||
.await?;
|
||||
},
|
||||
))
|
||||
.await?
|
||||
.into_iter()
|
||||
.filter(|r| *r)
|
||||
.count();
|
||||
|
||||
if thumbs_to_remove == thumbs_found {
|
||||
// if we removed all the thumnails we foumd, it means that the directory is empty
|
||||
if thumbs_found == removed_count {
|
||||
// if we removed all the thumnails we found, it means that the directory is empty
|
||||
// and can be removed...
|
||||
trace!(
|
||||
"Removing empty thumbnails sharding directory: {}",
|
||||
|
@ -366,20 +466,121 @@ impl Actor {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn new_non_indexed_thumbnail(&self, cas_id: String) {
|
||||
pub async fn new_non_indexed_thumbnails_batch(&self, batch: Vec<GenerateThumbnailArgs>) {
|
||||
if self
|
||||
.non_indexed_thumbnails_cas_ids_tx
|
||||
.send(cas_id)
|
||||
.ephemeral_thumbnails_to_generate_tx
|
||||
.send(batch)
|
||||
.await
|
||||
.is_err()
|
||||
{
|
||||
error!("Thumbnail remover actor is dead");
|
||||
error!("Thumbnail remover actor is dead: Failed to send new batch");
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn remove_cas_ids(&self, cas_ids: Vec<String>) {
|
||||
if self.cas_ids_to_delete_tx.send(cas_ids).await.is_err() {
|
||||
error!("Thumbnail remover actor is dead");
|
||||
error!("Thumbnail remover actor is dead: Failed to send cas ids to delete");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn batch_processor(
|
||||
batch: Vec<GenerateThumbnailArgs>,
|
||||
generated_cas_ids_tx: chan::Sender<Vec<String>>,
|
||||
stop_rx: chan::Receiver<()>,
|
||||
done_tx: oneshot::Sender<()>,
|
||||
leftovers_tx: chan::Sender<Vec<GenerateThumbnailArgs>>,
|
||||
in_background: bool,
|
||||
) {
|
||||
let mut queue = VecDeque::from(batch);
|
||||
|
||||
enum RaceOutputs {
|
||||
Processed,
|
||||
Stop,
|
||||
}
|
||||
|
||||
// Need this borrow here to satisfy the async move below
|
||||
let generated_cas_ids_tx = &generated_cas_ids_tx;
|
||||
|
||||
while !queue.is_empty() {
|
||||
let chunk = (0..4)
|
||||
.filter_map(|_| queue.pop_front())
|
||||
.map(
|
||||
|GenerateThumbnailArgs {
|
||||
extension,
|
||||
cas_id,
|
||||
path,
|
||||
node,
|
||||
}| {
|
||||
spawn(async move {
|
||||
timeout(
|
||||
THIRTY_SECS,
|
||||
generate_thumbnail(&extension, cas_id, &path, node, in_background),
|
||||
)
|
||||
.await
|
||||
.unwrap_or_else(|_| Err(ThumbnailerError::TimedOut(path.into_boxed_path())))
|
||||
})
|
||||
},
|
||||
)
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
if let RaceOutputs::Stop = (
|
||||
async move {
|
||||
let cas_ids = chunk
|
||||
.join()
|
||||
.await
|
||||
.into_iter()
|
||||
.filter_map(|join_result| {
|
||||
join_result
|
||||
.map_err(|e| error!("Failed to join thumbnail generation task: {e:#?}"))
|
||||
.ok()
|
||||
})
|
||||
.filter_map(|result| {
|
||||
result
|
||||
.map_err(|e| {
|
||||
error!(
|
||||
"Failed to generate thumbnail for ephemeral location: {e:#?}"
|
||||
)
|
||||
})
|
||||
.ok()
|
||||
})
|
||||
.collect();
|
||||
|
||||
if generated_cas_ids_tx.send(cas_ids).await.is_err() {
|
||||
error!("Thumbnail remover actor is dead: Failed to send generated cas ids")
|
||||
}
|
||||
|
||||
trace!("Processed chunk of thumbnails");
|
||||
RaceOutputs::Processed
|
||||
},
|
||||
async {
|
||||
stop_rx
|
||||
.recv()
|
||||
.await
|
||||
.expect("Critical error on thumbnails actor");
|
||||
trace!("Received a stop signal");
|
||||
RaceOutputs::Stop
|
||||
},
|
||||
)
|
||||
.race()
|
||||
.await
|
||||
{
|
||||
// Our queue is always contiguous, so this `from`` is free
|
||||
let leftovers = Vec::from(queue);
|
||||
|
||||
trace!(
|
||||
"Stopped with {} thumbnails left to process",
|
||||
leftovers.len()
|
||||
);
|
||||
if !leftovers.is_empty() && leftovers_tx.send(leftovers).await.is_err() {
|
||||
error!("Thumbnail remover actor is dead: Failed to send leftovers")
|
||||
}
|
||||
|
||||
done_tx.send(()).ok();
|
||||
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
done_tx.send(()).ok();
|
||||
}
|
|
@ -21,6 +21,8 @@ use std::{
|
|||
collections::HashMap,
|
||||
ops::Deref,
|
||||
path::{Path, PathBuf},
|
||||
str::FromStr,
|
||||
sync::Arc,
|
||||
};
|
||||
|
||||
use futures::future::{join_all, try_join_all};
|
||||
|
@ -32,6 +34,7 @@ use tokio::{fs, io, task};
|
|||
use tracing::{error, trace, warn};
|
||||
use webp::Encoder;
|
||||
|
||||
pub mod actor;
|
||||
mod directory;
|
||||
mod shard;
|
||||
|
||||
|
@ -99,11 +102,15 @@ pub enum ThumbnailerError {
|
|||
SdImages(#[from] sd_images::Error),
|
||||
#[error("failed to execute converting task: {0}")]
|
||||
Task(#[from] task::JoinError),
|
||||
#[error(transparent)]
|
||||
FFmpeg(#[from] sd_ffmpeg::ThumbnailerError),
|
||||
#[error("thumbnail generation timed out for {}", .0.display())]
|
||||
TimedOut(Box<Path>),
|
||||
}
|
||||
|
||||
/// This is the target pixel count for all thumbnails to be resized to, and it is eventually downscaled
|
||||
/// to [`TARGET_QUALITY`].
|
||||
const TAGRET_PX: f32 = 262144_f32;
|
||||
const TARGET_PX: f32 = 262144_f32;
|
||||
|
||||
/// This is the target quality that we render thumbnails at, it is a float between 0-100
|
||||
/// and is treated as a percentage (so 30% in this case, or it's the same as multiplying by `0.3`).
|
||||
|
@ -122,9 +129,9 @@ pub struct ThumbnailerMetadata {
|
|||
pub skipped: u32,
|
||||
}
|
||||
|
||||
pub async fn generate_image_thumbnail<P: AsRef<Path>>(
|
||||
file_path: P,
|
||||
output_path: P,
|
||||
pub async fn generate_image_thumbnail(
|
||||
file_path: impl AsRef<Path>,
|
||||
output_path: impl AsRef<Path>,
|
||||
) -> Result<(), ThumbnailerError> {
|
||||
let file_path = file_path.as_ref().to_path_buf();
|
||||
|
||||
|
@ -132,7 +139,7 @@ pub async fn generate_image_thumbnail<P: AsRef<Path>>(
|
|||
let img = format_image(&file_path).map_err(|_| ThumbnailerError::Encoding)?;
|
||||
|
||||
let (w, h) = img.dimensions();
|
||||
let (w_scaled, h_scaled) = scale_dimensions(w as f32, h as f32, TAGRET_PX);
|
||||
let (w_scaled, h_scaled) = scale_dimensions(w as f32, h as f32, TARGET_PX);
|
||||
|
||||
// Optionally, resize the existing photo and convert back into DynamicImage
|
||||
let mut img = DynamicImage::ImageRgba8(imageops::resize(
|
||||
|
@ -177,15 +184,15 @@ pub async fn generate_image_thumbnail<P: AsRef<Path>>(
|
|||
}
|
||||
|
||||
#[cfg(feature = "ffmpeg")]
|
||||
pub async fn generate_video_thumbnail<P: AsRef<Path> + Send>(
|
||||
file_path: P,
|
||||
output_path: P,
|
||||
) -> Result<(), Box<dyn std::error::Error>> {
|
||||
pub async fn generate_video_thumbnail(
|
||||
file_path: impl AsRef<Path>,
|
||||
output_path: impl AsRef<Path>,
|
||||
) -> Result<(), ThumbnailerError> {
|
||||
use sd_ffmpeg::to_thumbnail;
|
||||
|
||||
to_thumbnail(file_path, output_path, 256, TARGET_QUALITY).await?;
|
||||
|
||||
Ok(())
|
||||
to_thumbnail(file_path, output_path, 256, TARGET_QUALITY)
|
||||
.await
|
||||
.map_err(Into::into)
|
||||
}
|
||||
|
||||
#[cfg(feature = "ffmpeg")]
|
||||
|
@ -416,3 +423,81 @@ async fn process_single_thumbnail(
|
|||
});
|
||||
run_metadata.created += 1;
|
||||
}
|
||||
|
||||
// TODO(fogodev): Unify how we generate thumbnails
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct GenerateThumbnailArgs {
|
||||
pub extension: String,
|
||||
pub cas_id: String,
|
||||
pub path: PathBuf,
|
||||
pub node: Arc<Node>,
|
||||
}
|
||||
|
||||
impl GenerateThumbnailArgs {
|
||||
pub fn new(extension: String, cas_id: String, path: PathBuf, node: Arc<Node>) -> Self {
|
||||
Self {
|
||||
extension,
|
||||
cas_id,
|
||||
path,
|
||||
node,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn generate_thumbnail(
|
||||
extension: &str,
|
||||
cas_id: String,
|
||||
path: impl AsRef<Path>,
|
||||
node: Arc<Node>,
|
||||
in_background: bool,
|
||||
) -> Result<String, ThumbnailerError> {
|
||||
let path = path.as_ref();
|
||||
trace!("Generating thumbnail for {}", path.display());
|
||||
let output_path = get_thumbnail_path(&node, &cas_id);
|
||||
|
||||
if let Err(e) = fs::metadata(&output_path).await {
|
||||
if e.kind() != io::ErrorKind::NotFound {
|
||||
error!(
|
||||
"Failed to check if thumbnail exists, but we will try to generate it anyway: {e}"
|
||||
);
|
||||
}
|
||||
// Otherwise we good, thumbnail doesn't exist so we can generate it
|
||||
} else {
|
||||
trace!(
|
||||
"Skipping thumbnail generation for {} because it already exists",
|
||||
path.display()
|
||||
);
|
||||
return Ok(cas_id);
|
||||
}
|
||||
|
||||
if let Ok(extension) = ImageExtension::from_str(extension) {
|
||||
if can_generate_thumbnail_for_image(&extension) {
|
||||
generate_image_thumbnail(&path, &output_path).await?;
|
||||
}
|
||||
} else if let Ok(extension) = DocumentExtension::from_str(extension) {
|
||||
if can_generate_thumbnail_for_document(&extension) {
|
||||
generate_image_thumbnail(&path, &output_path).await?;
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "ffmpeg")]
|
||||
{
|
||||
if let Ok(extension) = VideoExtension::from_str(extension) {
|
||||
if can_generate_thumbnail_for_video(&extension) {
|
||||
generate_video_thumbnail(&path, &output_path).await?;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if !in_background {
|
||||
trace!("Emitting new thumbnail event");
|
||||
node.emit(CoreEvent::NewThumbnail {
|
||||
thumb_key: get_thumb_key(&cas_id),
|
||||
});
|
||||
}
|
||||
|
||||
trace!("Generated thumbnail for {}", path.display());
|
||||
|
||||
Ok(cas_id)
|
||||
}
|
||||
|
|
|
@ -9,7 +9,6 @@ pub mod fs;
|
|||
pub mod media;
|
||||
pub mod orphan_remover;
|
||||
pub mod tag;
|
||||
pub mod thumbnail_remover;
|
||||
pub mod validation;
|
||||
|
||||
// Objects are primarily created by the identifier from Paths
|
||||
|
|
|
@ -7,7 +7,7 @@ use tokio::{
|
|||
time::{interval, Duration},
|
||||
};
|
||||
|
||||
use super::{get_volumes, Volume};
|
||||
use super::get_volumes;
|
||||
|
||||
pub fn spawn_volume_watcher(library: Arc<Library>) {
|
||||
spawn(async move {
|
||||
|
@ -21,11 +21,7 @@ pub fn spawn_volume_watcher(library: Arc<Library>) {
|
|||
|
||||
if existing_volumes != current_volumes {
|
||||
existing_volumes = current_volumes;
|
||||
invalidate_query!(
|
||||
&library,
|
||||
"volumes.list": (),
|
||||
()
|
||||
);
|
||||
invalidate_query!(&library, "volumes.list");
|
||||
}
|
||||
}
|
||||
});
|
||||
|
|
|
@ -18,8 +18,8 @@ pub use thumbnailer::{Thumbnailer, ThumbnailerBuilder};
|
|||
|
||||
/// Helper function to generate a thumbnail file from a video file with reasonable defaults
|
||||
pub async fn to_thumbnail(
|
||||
video_file_path: impl AsRef<Path> + Send,
|
||||
output_thumbnail_path: impl AsRef<Path> + Send,
|
||||
video_file_path: impl AsRef<Path>,
|
||||
output_thumbnail_path: impl AsRef<Path>,
|
||||
size: u32,
|
||||
quality: f32,
|
||||
) -> Result<(), ThumbnailerError> {
|
||||
|
@ -34,7 +34,7 @@ pub async fn to_thumbnail(
|
|||
|
||||
/// Helper function to generate a thumbnail bytes from a video file with reasonable defaults
|
||||
pub async fn to_webp_bytes(
|
||||
video_file_path: impl AsRef<Path> + Send,
|
||||
video_file_path: impl AsRef<Path>,
|
||||
size: u32,
|
||||
quality: f32,
|
||||
) -> Result<Vec<u8>, ThumbnailerError> {
|
||||
|
|
|
@ -16,8 +16,8 @@ impl Thumbnailer {
|
|||
/// Processes an video input file and write to file system a thumbnail with webp format
|
||||
pub async fn process(
|
||||
&self,
|
||||
video_file_path: impl AsRef<Path> + Send,
|
||||
output_thumbnail_path: impl AsRef<Path> + Send,
|
||||
video_file_path: impl AsRef<Path>,
|
||||
output_thumbnail_path: impl AsRef<Path>,
|
||||
) -> Result<(), ThumbnailerError> {
|
||||
let path = output_thumbnail_path.as_ref().parent().ok_or_else(|| {
|
||||
io::Error::new(
|
||||
|
@ -39,7 +39,7 @@ impl Thumbnailer {
|
|||
/// Processes an video input file and returns a webp encoded thumbnail as bytes
|
||||
pub async fn process_to_webp_bytes(
|
||||
&self,
|
||||
video_file_path: impl AsRef<Path> + Send,
|
||||
video_file_path: impl AsRef<Path>,
|
||||
) -> Result<Vec<u8>, ThumbnailerError> {
|
||||
let video_file_path = video_file_path.as_ref().to_path_buf();
|
||||
let prefer_embedded_metadata = self.builder.prefer_embedded_metadata;
|
||||
|
|
Loading…
Reference in a new issue