[ENG-1267] Move thumbnails generation away from media processor (#1585)

* New vscode task to start developing

* Updating db in case of library updates just in case

* Done

* Forgot to remove some debug logs

* Rust fmt

* Saving thumbnails processing state on app shutdown

* bruh
This commit is contained in:
Ericson "Fogo" Soares 2023-10-16 23:09:36 -03:00 committed by GitHub
parent 97ee8a21c6
commit bfe5a65949
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
16 changed files with 961 additions and 827 deletions

7
.vscode/tasks.json vendored
View file

@ -19,6 +19,13 @@
"group": "none",
"problemMatcher": ["$rustc"]
},
{
"type": "shell",
"label": "start",
"command": "sh",
"args": ["-c", "'pnpm i && pnpm prep'"],
"problemMatcher": ["$tsc-watch", "$rustc"]
},
{
"type": "shell",
"label": "ui:dev",

View file

@ -47,7 +47,6 @@ impl BackendFeature {
}
}
mod api;
mod auth;
mod backups;
mod categories;
@ -65,6 +64,7 @@ mod sync;
mod tags;
pub mod utils;
pub mod volumes;
mod web_api;
// A version of [NodeConfig] that is safe to share with the frontend
#[derive(Debug, Serialize, Deserialize, Clone, Type)]
@ -167,7 +167,7 @@ pub(crate) fn mount() -> Arc<Router> {
Ok(())
})
})
.merge("api.", api::mount())
.merge("api.", web_api::mount())
.merge("auth.", auth::mount())
.merge("search.", search::mount())
.merge("library.", libraries::mount())

View file

@ -113,8 +113,13 @@ impl Node {
notifications: notifications::Notifications::new(),
p2p,
config,
thumbnailer: Thumbnailer::new(
data_dir.to_path_buf(),
libraries.clone(),
event_bus.0.clone(),
)
.await,
event_bus,
thumbnailer: Thumbnailer::new(data_dir.to_path_buf(), libraries.clone()),
libraries,
files_over_p2p_flag: Arc::new(AtomicBool::new(false)),
http: reqwest::Client::new(),
@ -210,6 +215,7 @@ impl Node {
pub async fn shutdown(&self) {
info!("Spacedrive shutting down...");
self.thumbnailer.shutdown().await;
self.jobs.shutdown().await;
self.p2p.shutdown().await;
info!("Spacedrive Core shutdown successful!");

View file

@ -84,6 +84,10 @@ impl<'a> IsolatedFilePathData<'a> {
self.location_id
}
pub fn extension(&self) -> &str {
self.extension.as_ref()
}
pub fn is_root(&self) -> bool {
self.is_dir
&& self.materialized_path == "/"

View file

@ -66,7 +66,7 @@ impl<'lib> EventHandler<'lib> for LinuxEventHandler<'lib> {
}
async fn handle_event(&mut self, event: Event) -> Result<(), LocationManagerError> {
tracing::debug!("Received Linux event: {:#?}", event);
trace!("Received Linux event: {:#?}", event);
let Event {
kind, mut paths, ..

View file

@ -21,7 +21,7 @@ use crate::{
media::{
media_data_extractor::{can_extract_media_data_for_image, extract_media_data},
media_data_image_to_query,
thumbnail::{generate_thumbnail, get_thumbnail_path},
thumbnail::get_thumbnail_path,
},
validation::hash::file_checksum,
},
@ -59,6 +59,7 @@ use serde_json::json;
use tokio::{
fs,
io::{self, ErrorKind},
spawn,
time::Instant,
};
use tracing::{debug, error, trace, warn};
@ -278,13 +279,16 @@ async fn inner_create_file(
if !extension.is_empty() && matches!(kind, ObjectKind::Image | ObjectKind::Video) {
// Running in a detached task as thumbnail generation can take a while and we don't want to block the watcher
let inner_path = path.to_path_buf();
let node = node.clone();
let inner_extension = extension.clone();
if let Some(cas_id) = cas_id {
tokio::spawn(async move {
if let Err(e) =
generate_thumbnail(&inner_extension, cas_id, inner_path, node, true).await
let extension = extension.clone();
let path = path.to_path_buf();
let node = node.clone();
spawn(async move {
if let Err(e) = node
.thumbnailer
.generate_single_thumbnail(&extension, cas_id, path)
.await
{
error!("Failed to generate thumbnail in the watcher: {e:#?}");
}
@ -499,13 +503,14 @@ async fn inner_update_file(
if library.thumbnail_exists(node, old_cas_id).await? {
if let Some(ext) = file_path.extension.clone() {
// Running in a detached task as thumbnail generation can take a while and we don't want to block the watcher
let inner_path = full_path.to_path_buf();
let inner_node = node.clone();
if let Some(cas_id) = cas_id {
tokio::spawn(async move {
if let Err(e) =
generate_thumbnail(&ext, cas_id, inner_path, inner_node, true)
.await
let node = node.clone();
let path = full_path.to_path_buf();
spawn(async move {
if let Err(e) = node
.thumbnailer
.generate_single_thumbnail(&ext, cas_id, path)
.await
{
error!("Failed to generate thumbnail in the watcher: {e:#?}");
}

View file

@ -3,7 +3,10 @@ use crate::{
library::Library,
object::{
cas::generate_cas_id,
media::thumbnail::{get_thumb_key, GenerateThumbnailArgs},
media::thumbnail::{
actor::{BatchToProcess, GenerateThumbnailArgs},
get_thumb_key,
},
},
prisma::location,
util::error::FileIOError,
@ -107,6 +110,8 @@ pub async fn walk(
);
let mut thumbnails_to_generate = vec![];
// Generating thumbnails for PDFs is kinda slow, so we're leaving them for last in the batch
let mut document_thumbnails_to_generate = vec![];
while let Some(entry) = read_dir.next_entry().await.map_err(|e| (path, e))? {
let Ok((entry_path, name)) = normalize_path(entry.path())
@ -161,20 +166,39 @@ pub async fn walk(
.map(Into::into)
.unwrap_or(ObjectKind::Unknown);
let thumbnail_key = if matches!(
kind,
ObjectKind::Image | ObjectKind::Video | ObjectKind::Document
) {
let should_generate_thumbnail = {
#[cfg(feature = "ffmpeg")]
{
matches!(
kind,
ObjectKind::Image | ObjectKind::Video | ObjectKind::Document
)
}
#[cfg(not(feature = "ffmpeg"))]
{
matches!(kind, ObjectKind::Image | ObjectKind::Document)
}
};
let thumbnail_key = if should_generate_thumbnail {
if let Ok(cas_id) = generate_cas_id(&path, metadata.len())
.await
.map_err(|e| errors.push(NonIndexedLocationError::from((path, e)).into()))
{
thumbnails_to_generate.push(GenerateThumbnailArgs::new(
extension.clone(),
cas_id.clone(),
path.to_path_buf(),
Arc::clone(&node),
));
if kind == ObjectKind::Document {
document_thumbnails_to_generate.push(GenerateThumbnailArgs::new(
extension.clone(),
cas_id.clone(),
path.to_path_buf(),
));
} else {
thumbnails_to_generate.push(GenerateThumbnailArgs::new(
extension.clone(),
cas_id.clone(),
path.to_path_buf(),
));
}
let thumbnail_key = get_thumb_key(&cas_id);
@ -204,8 +228,14 @@ pub async fn walk(
}
}
thumbnails_to_generate.extend(document_thumbnails_to_generate);
node.thumbnailer
.new_non_indexed_thumbnails_batch(thumbnails_to_generate)
.new_ephemeral_thumbnails_batch(BatchToProcess {
batch: thumbnails_to_generate,
should_regenerate: false,
in_background: false,
})
.await;
let mut locations = library

View file

@ -10,7 +10,7 @@ use sd_media_metadata::ImageMetadata;
use std::{collections::HashSet, path::Path};
use futures::future::join_all;
use futures_concurrency::future::Join;
use once_cell::sync::Lazy;
use serde::{Deserialize, Serialize};
use thiserror::Error;
@ -65,13 +65,13 @@ pub async fn extract_media_data(path: impl AsRef<Path>) -> Result<ImageMetadata,
}
pub async fn process(
files_paths: impl IntoIterator<Item = &file_path_for_media_processor::Data>,
files_paths: &[file_path_for_media_processor::Data],
location_id: location::id::Type,
location_path: impl AsRef<Path>,
db: &PrismaClient,
ctx_update_fn: &impl Fn(usize),
) -> Result<(MediaDataExtractorMetadata, JobRunErrors), MediaDataError> {
let mut run_metadata = MediaDataExtractorMetadata::default();
let files_paths = files_paths.into_iter().collect::<Vec<_>>();
if files_paths.is_empty() {
return Ok((run_metadata, JobRunErrors::default()));
}
@ -104,26 +104,29 @@ pub async fn process(
run_metadata.skipped = objects_already_with_media_data.len() as u32;
let (media_datas, errors) = {
let maybe_media_data = join_all(
files_paths
.into_iter()
.filter_map(|file_path| {
file_path.object_id.and_then(|object_id| {
(!objects_already_with_media_data.contains(&object_id))
.then_some((file_path, object_id))
})
let maybe_media_data = files_paths
.iter()
.enumerate()
.filter_map(|(idx, file_path)| {
file_path.object_id.and_then(|object_id| {
(!objects_already_with_media_data.contains(&object_id))
.then_some((idx, file_path, object_id))
})
.filter_map(|(file_path, object_id)| {
IsolatedFilePathData::try_from((location_id, file_path))
.map_err(|e| error!("{e:#?}"))
.ok()
.map(|iso_file_path| (location_path.join(iso_file_path), object_id))
})
.map(|(path, object_id)| async move {
(extract_media_data(&path).await, path, object_id)
}),
)
.await;
})
.filter_map(|(idx, file_path, object_id)| {
IsolatedFilePathData::try_from((location_id, file_path))
.map_err(|e| error!("{e:#?}"))
.ok()
.map(|iso_file_path| (idx, location_path.join(iso_file_path), object_id))
})
.map(|(idx, path, object_id)| async move {
let res = extract_media_data(&path).await;
ctx_update_fn(idx + 1);
(res, path, object_id)
})
.collect::<Vec<_>>()
.join()
.await;
let total_media_data = maybe_media_data.len();

View file

@ -9,26 +9,26 @@ use crate::{
ensure_file_path_exists, ensure_sub_path_is_directory, ensure_sub_path_is_in_location,
file_path_for_media_processor, IsolatedFilePathData,
},
object::media::media_data_extractor,
object::media::thumbnail::{self, init_thumbnail_dir},
prisma::{location, PrismaClient},
util::db::maybe_missing,
};
use std::{
collections::HashMap,
future::Future,
hash::Hash,
path::{Path, PathBuf},
};
use itertools::Itertools;
use prisma_client_rust::{raw, PrismaValue};
use sd_file_ext::extensions::Extension;
use serde::{Deserialize, Serialize};
use serde_json::json;
use tracing::{debug, info};
use super::{
get_all_children_files_by_extensions, process, MediaProcessorEntry, MediaProcessorEntryKind,
MediaProcessorError, MediaProcessorMetadata, ThumbnailerEntryKind,
dispatch_thumbnails_for_processing, media_data_extractor, process, MediaProcessorError,
MediaProcessorMetadata,
};
const BATCH_SIZE: usize = 10;
@ -51,17 +51,14 @@ impl Hash for MediaProcessorJobInit {
#[derive(Debug, Serialize, Deserialize)]
pub struct MediaProcessorJobData {
thumbnails_base_dir: PathBuf,
location_path: PathBuf,
to_process_path: PathBuf,
}
type MediaProcessorJobStep = Vec<MediaProcessorEntry>;
#[async_trait::async_trait]
impl StatefulJob for MediaProcessorJobInit {
type Data = MediaProcessorJobData;
type Step = MediaProcessorJobStep;
type Step = Vec<file_path_for_media_processor::Data>;
type RunMetadata = MediaProcessorMetadata;
const NAME: &'static str = "media_processor";
@ -78,10 +75,6 @@ impl StatefulJob for MediaProcessorJobInit {
) -> Result<JobInitOutput<Self::RunMetadata, Self::Step>, JobError> {
let Library { db, .. } = ctx.library.as_ref();
let thumbnails_base_dir = init_thumbnail_dir(ctx.node.config.data_directory())
.await
.map_err(MediaProcessorError::from)?;
let location_id = self.location.id;
let location_path =
maybe_missing(&self.location.path, "location.path").map(PathBuf::from)?;
@ -120,53 +113,37 @@ impl StatefulJob for MediaProcessorJobInit {
"Searching for media files in location {location_id} at directory \"{iso_file_path}\""
);
let thumbnailer_files = get_files_for_thumbnailer(db, &iso_file_path).await?;
dispatch_thumbnails_for_processing(
location_id,
&location_path,
&iso_file_path,
&ctx.library,
&ctx.node,
false,
get_all_children_files_by_extensions,
)
.await?;
let mut media_data_files_map = get_files_for_media_data_extraction(db, &iso_file_path)
.await?
.map(|file_path| (file_path.id, file_path))
.collect::<HashMap<_, _>>();
let file_paths = get_files_for_media_data_extraction(db, &iso_file_path).await?;
let mut total_files_for_thumbnailer = 0;
let total_files = file_paths.len();
let chunked_files = thumbnailer_files
let chunked_files = file_paths
.into_iter()
.map(|(file_path, thumb_kind)| {
total_files_for_thumbnailer += 1;
MediaProcessorEntry {
operation_kind: if media_data_files_map.remove(&file_path.id).is_some() {
MediaProcessorEntryKind::MediaDataAndThumbnailer(thumb_kind)
} else {
MediaProcessorEntryKind::Thumbnailer(thumb_kind)
},
file_path,
}
})
.collect::<Vec<_>>()
.into_iter()
.chain(
media_data_files_map
.into_values()
.map(|file_path| MediaProcessorEntry {
operation_kind: MediaProcessorEntryKind::MediaData,
file_path,
}),
)
.chunks(BATCH_SIZE)
.into_iter()
.map(|chunk| chunk.collect::<Vec<_>>())
.collect::<Vec<_>>();
ctx.progress(vec![
JobReportUpdate::TaskCount(total_files_for_thumbnailer),
JobReportUpdate::TaskCount(total_files),
JobReportUpdate::Message(format!(
"Preparing to process {total_files_for_thumbnailer} files in {} chunks",
"Preparing to process {total_files} files in {} chunks",
chunked_files.len()
)),
]);
*data = Some(MediaProcessorJobData {
thumbnails_base_dir,
location_path,
to_process_path,
});
@ -177,18 +154,19 @@ impl StatefulJob for MediaProcessorJobInit {
async fn execute_step(
&self,
ctx: &WorkerContext,
CurrentStep { step, step_number }: CurrentStep<'_, Self::Step>,
CurrentStep {
step: file_paths,
step_number,
}: CurrentStep<'_, Self::Step>,
data: &Self::Data,
_: &Self::RunMetadata,
) -> Result<JobStepOutput<Self::Step, Self::RunMetadata>, JobError> {
process(
step,
file_paths,
self.location.id,
&data.location_path,
&data.thumbnails_base_dir,
self.regenerate_thumbnails,
&ctx.library,
|completed_count| {
&ctx.library.db,
&|completed_count| {
ctx.progress(vec![JobReportUpdate::CompletedTaskCount(
step_number * BATCH_SIZE + completed_count,
)]);
@ -214,7 +192,7 @@ impl StatefulJob for MediaProcessorJobInit {
.display()
);
if run_metadata.thumbnailer.created > 0 || run_metadata.media_data.extracted > 0 {
if run_metadata.media_data.extracted > 0 {
invalidate_query!(ctx.library, "search.paths");
}
@ -222,55 +200,60 @@ impl StatefulJob for MediaProcessorJobInit {
}
}
async fn get_files_for_thumbnailer(
db: &PrismaClient,
parent_iso_file_path: &IsolatedFilePathData<'_>,
) -> Result<
impl Iterator<Item = (file_path_for_media_processor::Data, ThumbnailerEntryKind)>,
MediaProcessorError,
> {
// query database for all image files in this location that need thumbnails
let image_thumb_files = get_all_children_files_by_extensions(
db,
parent_iso_file_path,
&thumbnail::THUMBNAILABLE_EXTENSIONS,
)
.await?
.into_iter()
.map(|file_path| (file_path, ThumbnailerEntryKind::Image));
#[cfg(feature = "ffmpeg")]
let all_files = {
// query database for all video files in this location that need thumbnails
let video_files = get_all_children_files_by_extensions(
db,
parent_iso_file_path,
&thumbnail::THUMBNAILABLE_VIDEO_EXTENSIONS,
)
.await?;
image_thumb_files.chain(
video_files
.into_iter()
.map(|file_path| (file_path, ThumbnailerEntryKind::Video)),
)
};
#[cfg(not(feature = "ffmpeg"))]
let all_files = { image_thumb_files };
Ok(all_files)
}
async fn get_files_for_media_data_extraction(
db: &PrismaClient,
parent_iso_file_path: &IsolatedFilePathData<'_>,
) -> Result<impl Iterator<Item = file_path_for_media_processor::Data>, MediaProcessorError> {
) -> Result<Vec<file_path_for_media_processor::Data>, MediaProcessorError> {
get_all_children_files_by_extensions(
db,
parent_iso_file_path,
&media_data_extractor::FILTERED_IMAGE_EXTENSIONS,
)
.await
.map(|file_paths| file_paths.into_iter())
.map_err(Into::into)
}
fn get_all_children_files_by_extensions<'d, 'p, 'e, 'ret>(
db: &'d PrismaClient,
parent_iso_file_path: &'p IsolatedFilePathData<'_>,
extensions: &'e [Extension],
) -> impl Future<Output = Result<Vec<file_path_for_media_processor::Data>, MediaProcessorError>> + 'ret
where
'd: 'ret,
'p: 'ret,
'e: 'ret,
{
async move {
// FIXME: Had to use format! macro because PCR doesn't support IN with Vec for SQLite
// We have no data coming from the user, so this is sql injection safe
db._query_raw(raw!(
&format!(
"SELECT id, materialized_path, is_dir, name, extension, cas_id, object_id
FROM file_path
WHERE
location_id={{}}
AND cas_id IS NOT NULL
AND LOWER(extension) IN ({})
AND materialized_path LIKE {{}}
ORDER BY materialized_path ASC",
// Orderind by materialized_path so we can prioritize processing the first files
// in the above part of the directories tree
extensions
.iter()
.map(|ext| format!("LOWER('{ext}')"))
.collect::<Vec<_>>()
.join(",")
),
PrismaValue::Int(parent_iso_file_path.location_id() as i64),
PrismaValue::String(format!(
"{}%",
parent_iso_file_path
.materialized_path_for_children()
.expect("sub path iso_file_path must be a directory")
))
))
.exec()
.await
.map_err(Into::into)
}
}

View file

@ -4,21 +4,26 @@ use crate::{
location::file_path_helper::{
file_path_for_media_processor, FilePathError, IsolatedFilePathData,
},
util::db::{maybe_missing, MissingFieldError},
Node,
};
use sd_file_ext::extensions::Extension;
use sd_prisma::prisma::{location, PrismaClient};
use std::path::Path;
use std::{future::Future, path::Path};
use futures::try_join;
use prisma_client_rust::{raw, PrismaValue};
use serde::{Deserialize, Serialize};
use thiserror::Error;
use tracing::error;
use super::{
media_data_extractor::{self, MediaDataError, MediaDataExtractorMetadata},
thumbnail::{self, ThumbnailerEntryKind, ThumbnailerError, ThumbnailerMetadata},
thumbnail::{
self,
actor::{BatchToProcess, GenerateThumbnailArgs},
ThumbnailerError,
},
};
mod job;
@ -43,174 +48,152 @@ pub enum MediaProcessorError {
MediaDataExtractor(#[from] MediaDataError),
}
#[derive(Serialize, Deserialize, Debug, Clone, Copy)]
pub enum MediaProcessorEntryKind {
MediaData,
Thumbnailer(ThumbnailerEntryKind),
MediaDataAndThumbnailer(ThumbnailerEntryKind),
}
#[derive(Serialize, Deserialize, Debug)]
pub struct MediaProcessorEntry {
file_path: file_path_for_media_processor::Data,
operation_kind: MediaProcessorEntryKind,
}
#[derive(Debug, Serialize, Deserialize, Default)]
pub struct MediaProcessorMetadata {
media_data: MediaDataExtractorMetadata,
thumbnailer: ThumbnailerMetadata,
}
impl From<MediaDataExtractorMetadata> for MediaProcessorMetadata {
fn from(media_data: MediaDataExtractorMetadata) -> Self {
Self { media_data }
}
}
impl JobRunMetadata for MediaProcessorMetadata {
fn update(&mut self, new_data: Self) {
self.media_data.extracted += new_data.media_data.extracted;
self.media_data.skipped += new_data.media_data.skipped;
self.thumbnailer.created += new_data.thumbnailer.created;
self.thumbnailer.skipped += new_data.thumbnailer.skipped;
}
}
async fn get_all_children_files_by_extensions(
db: &PrismaClient,
parent_iso_file_path: &IsolatedFilePathData<'_>,
extensions: &[Extension],
) -> Result<Vec<file_path_for_media_processor::Data>, MediaProcessorError> {
// FIXME: Had to use format! macro because PCR doesn't support IN with Vec for SQLite
// We have no data coming from the user, so this is sql injection safe
db._query_raw(raw!(
&format!(
"SELECT id, materialized_path, is_dir, name, extension, cas_id, object_id
FROM file_path
WHERE
location_id={{}}
AND cas_id IS NOT NULL
AND LOWER(extension) IN ({})
AND materialized_path LIKE {{}}",
extensions
.iter()
.map(|ext| format!("LOWER('{ext}')"))
.collect::<Vec<_>>()
.join(",")
),
PrismaValue::Int(parent_iso_file_path.location_id() as i64),
PrismaValue::String(format!(
"{}%",
parent_iso_file_path
.materialized_path_for_children()
.expect("sub path iso_file_path must be a directory")
))
))
.exec()
.await
.map_err(Into::into)
}
async fn get_files_by_extensions(
db: &PrismaClient,
parent_iso_file_path: &IsolatedFilePathData<'_>,
extensions: &[Extension],
) -> Result<Vec<file_path_for_media_processor::Data>, MediaDataError> {
// FIXME: Had to use format! macro because PCR doesn't support IN with Vec for SQLite
// We have no data coming from the user, so this is sql injection safe
db._query_raw(raw!(
&format!(
"SELECT id, materialized_path, is_dir, name, extension, cas_id, object_id
FROM file_path
WHERE
location_id={{}}
AND cas_id IS NOT NULL
AND LOWER(extension) IN ({})
AND materialized_path = {{}}",
extensions
.iter()
.map(|ext| format!("LOWER('{ext}')"))
.collect::<Vec<_>>()
.join(",")
),
PrismaValue::Int(parent_iso_file_path.location_id() as i64),
PrismaValue::String(
parent_iso_file_path
.materialized_path_for_children()
.expect("sub path iso_file_path must be a directory")
)
))
.exec()
.await
.map_err(Into::into)
}
async fn process(
entries: &[MediaProcessorEntry],
// `thumbs_fetcher_fn` MUST return file_paths ordered by `materialized_path` for optimal results
async fn dispatch_thumbnails_for_processing<'d, 'p, 'e, 'ret, F>(
location_id: location::id::Type,
location_path: impl AsRef<Path>,
thumbnails_base_dir: impl AsRef<Path>,
regenerate_thumbnails: bool,
library: &Library,
ctx_update_fn: impl Fn(usize),
) -> Result<(MediaProcessorMetadata, JobRunErrors), MediaProcessorError> {
parent_iso_file_path: &'p IsolatedFilePathData<'_>,
library: &'d Library,
node: &Node,
should_regenerate: bool,
thumbs_fetcher_fn: impl Fn(&'d PrismaClient, &'p IsolatedFilePathData<'_>, &'e [Extension]) -> F,
) -> Result<(), MediaProcessorError>
where
'd: 'ret,
'p: 'ret,
'e: 'ret,
F: Future<Output = Result<Vec<file_path_for_media_processor::Data>, MediaProcessorError>>
+ 'ret,
{
let Library { db, .. } = library;
let location_path = location_path.as_ref();
let ((media_data_metadata, mut media_data_errors), (thumbnailer_metadata, thumbnailer_errors)) =
try_join!(
async {
media_data_extractor::process(
entries.iter().filter_map(
|MediaProcessorEntry {
file_path,
operation_kind,
}| {
matches!(
operation_kind,
MediaProcessorEntryKind::MediaDataAndThumbnailer(_)
| MediaProcessorEntryKind::MediaData
)
.then_some(file_path)
},
),
location_id,
location_path,
&library.db,
)
.await
.map_err(MediaProcessorError::from)
},
async {
thumbnail::process(
entries.iter().filter_map(
|MediaProcessorEntry {
file_path,
operation_kind,
}| {
if let MediaProcessorEntryKind::Thumbnailer(thumb_kind)
| MediaProcessorEntryKind::MediaDataAndThumbnailer(thumb_kind) = operation_kind
{
Some((file_path, *thumb_kind))
} else {
None
}
},
),
location_id,
location_path,
thumbnails_base_dir,
regenerate_thumbnails,
library,
ctx_update_fn,
)
.await
.map_err(MediaProcessorError::from)
},
)?;
let file_paths = thumbs_fetcher_fn(
db,
parent_iso_file_path,
&thumbnail::ALL_THUMBNAILABLE_EXTENSIONS,
)
.await?;
media_data_errors.0.extend(thumbnailer_errors.0.into_iter());
let mut current_batch = Vec::with_capacity(16);
Ok((
MediaProcessorMetadata {
media_data: media_data_metadata,
thumbnailer: thumbnailer_metadata,
},
media_data_errors,
))
// PDF thumbnails are currently way slower so we process them by last
let mut pdf_thumbs = Vec::with_capacity(16);
let mut current_materialized_path = None;
let mut in_background = false;
for file_path in file_paths {
// Initializing current_materialized_path with the first file_path materialized_path
if current_materialized_path.is_none() {
current_materialized_path = file_path.materialized_path.clone();
}
if file_path.materialized_path != current_materialized_path
&& (!current_batch.is_empty() || !pdf_thumbs.is_empty())
{
// Now we found a different materialized_path so we dispatch the current batch and start a new one
// We starting by appending all pdfs and leaving the vec clean to be reused
current_batch.append(&mut pdf_thumbs);
node.thumbnailer
.new_indexed_thumbnails_batch(BatchToProcess {
batch: current_batch,
should_regenerate,
in_background,
})
.await;
// We moved our vec so we need a new
current_batch = Vec::with_capacity(16);
in_background = true; // Only the first batch should be processed in foreground
// Exchaging for the first different materialized_path
current_materialized_path = file_path.materialized_path.clone();
}
let file_path_id = file_path.id;
if let Err(e) = add_to_batch(
location_id,
location_path,
file_path,
&mut current_batch,
&mut pdf_thumbs,
) {
error!("Error adding file_path <id='{file_path_id}'> to thumbnail batch: {e:#?}");
}
}
// Dispatching the last batch
if !current_batch.is_empty() {
node.thumbnailer
.new_indexed_thumbnails_batch(BatchToProcess {
batch: current_batch,
should_regenerate,
in_background,
})
.await;
}
Ok(())
}
fn add_to_batch(
location_id: location::id::Type,
location_path: &Path, // This function is only used internally once, so we can pass &Path as a parameter
file_path: file_path_for_media_processor::Data,
current_batch: &mut Vec<GenerateThumbnailArgs>,
pdf_thumbs: &mut Vec<GenerateThumbnailArgs>,
) -> Result<(), MissingFieldError> {
let cas_id = maybe_missing(&file_path.cas_id, "file_path.cas_id")?.clone();
let iso_file_path = IsolatedFilePathData::try_from((location_id, file_path))?;
let full_path = location_path.join(&iso_file_path);
let extension = iso_file_path.extension();
let args = GenerateThumbnailArgs::new(extension.to_string(), cas_id, full_path);
if extension != "pdf" {
current_batch.push(args);
} else {
pdf_thumbs.push(args);
}
Ok(())
}
pub async fn process(
files_paths: &[file_path_for_media_processor::Data],
location_id: location::id::Type,
location_path: impl AsRef<Path>,
db: &PrismaClient,
ctx_update_fn: &impl Fn(usize),
) -> Result<(MediaProcessorMetadata, JobRunErrors), MediaProcessorError> {
// Add here new kinds of media processing if necessary in the future
media_data_extractor::process(files_paths, location_id, location_path, db, ctx_update_fn)
.await
.map(|(media_data, errors)| (media_data.into(), errors))
.map_err(Into::into)
}

View file

@ -6,25 +6,24 @@ use crate::{
ensure_file_path_exists, ensure_sub_path_is_directory, ensure_sub_path_is_in_location,
file_path_for_media_processor, IsolatedFilePathData,
},
object::media::{
media_data_extractor,
thumbnail::{self, init_thumbnail_dir, ThumbnailerEntryKind},
},
prisma::{location, PrismaClient},
util::db::maybe_missing,
Node,
};
use std::{
collections::HashMap,
future::Future,
path::{Path, PathBuf},
};
use itertools::Itertools;
use prisma_client_rust::{raw, PrismaValue};
use sd_file_ext::extensions::Extension;
use tracing::{debug, error};
use super::{
get_files_by_extensions, process, MediaProcessorEntry, MediaProcessorEntryKind,
dispatch_thumbnails_for_processing,
media_data_extractor::{self, process},
MediaProcessorError, MediaProcessorMetadata,
};
@ -38,10 +37,6 @@ pub async fn shallow(
) -> Result<(), JobError> {
let Library { db, .. } = library;
let thumbnails_base_dir = init_thumbnail_dir(node.config.data_directory())
.await
.map_err(MediaProcessorError::from)?;
let location_id = location.id;
let location_path = maybe_missing(&location.path, "location.path").map(PathBuf::from)?;
@ -73,43 +68,27 @@ pub async fn shallow(
debug!("Searching for images in location {location_id} at path {iso_file_path}");
let thumbnailer_files = get_files_for_thumbnailer(db, &iso_file_path).await?;
dispatch_thumbnails_for_processing(
location.id,
&location_path,
&iso_file_path,
library,
node,
false,
get_files_by_extensions,
)
.await?;
let mut media_data_files_map = get_files_for_media_data_extraction(db, &iso_file_path)
.await?
.map(|file_path| (file_path.id, file_path))
.collect::<HashMap<_, _>>();
let file_paths = get_files_for_media_data_extraction(db, &iso_file_path).await?;
let mut total_files = 0;
let total_files = file_paths.len();
let chunked_files = thumbnailer_files
let chunked_files = file_paths
.into_iter()
.map(|(file_path, thumb_kind)| MediaProcessorEntry {
operation_kind: if media_data_files_map.remove(&file_path.id).is_some() {
MediaProcessorEntryKind::MediaDataAndThumbnailer(thumb_kind)
} else {
MediaProcessorEntryKind::Thumbnailer(thumb_kind)
},
file_path,
})
.collect::<Vec<_>>()
.into_iter()
.chain(
media_data_files_map
.into_values()
.map(|file_path| MediaProcessorEntry {
operation_kind: MediaProcessorEntryKind::MediaData,
file_path,
}),
)
.chunks(BATCH_SIZE)
.into_iter()
.map(|chunk| {
let chunk = chunk.collect::<Vec<_>>();
total_files += chunk.len();
chunk
})
.collect::<Vec<_>>();
.map(Iterator::collect)
.collect::<Vec<Vec<_>>>();
debug!(
"Preparing to process {total_files} files in {} chunks",
@ -119,17 +98,11 @@ pub async fn shallow(
let mut run_metadata = MediaProcessorMetadata::default();
for files in chunked_files {
let (more_run_metadata, errors) = process(
&files,
location.id,
&location_path,
&thumbnails_base_dir,
false,
library,
|_| {},
)
.await?;
run_metadata.update(more_run_metadata);
let (more_run_metadata, errors) = process(&files, location.id, &location_path, db, &|_| {})
.await
.map_err(MediaProcessorError::from)?;
run_metadata.update(more_run_metadata.into());
if !errors.is_empty() {
error!("Errors processing chunk of media data shallow extraction:\n{errors}");
@ -138,62 +111,63 @@ pub async fn shallow(
debug!("Media shallow processor run metadata: {run_metadata:?}");
if run_metadata.media_data.extracted > 0 || run_metadata.thumbnailer.created > 0 {
if run_metadata.media_data.extracted > 0 {
invalidate_query!(library, "search.paths");
}
Ok(())
}
async fn get_files_for_thumbnailer(
db: &PrismaClient,
parent_iso_file_path: &IsolatedFilePathData<'_>,
) -> Result<
impl Iterator<Item = (file_path_for_media_processor::Data, ThumbnailerEntryKind)>,
MediaProcessorError,
> {
// query database for all image files in this location that need thumbnails
let image_thumb_files = get_files_by_extensions(
db,
parent_iso_file_path,
&thumbnail::THUMBNAILABLE_EXTENSIONS,
)
.await?
.into_iter()
.map(|file_path| (file_path, ThumbnailerEntryKind::Image));
#[cfg(feature = "ffmpeg")]
let all_files = {
// query database for all video files in this location that need thumbnails
let video_files = get_files_by_extensions(
db,
parent_iso_file_path,
&thumbnail::THUMBNAILABLE_VIDEO_EXTENSIONS,
)
.await?;
image_thumb_files.chain(
video_files
.into_iter()
.map(|file_path| (file_path, ThumbnailerEntryKind::Video)),
)
};
#[cfg(not(feature = "ffmpeg"))]
let all_files = { image_thumb_files };
Ok(all_files)
}
async fn get_files_for_media_data_extraction(
db: &PrismaClient,
parent_iso_file_path: &IsolatedFilePathData<'_>,
) -> Result<impl Iterator<Item = file_path_for_media_processor::Data>, MediaProcessorError> {
) -> Result<Vec<file_path_for_media_processor::Data>, MediaProcessorError> {
get_files_by_extensions(
db,
parent_iso_file_path,
&media_data_extractor::FILTERED_IMAGE_EXTENSIONS,
)
.await
.map(|file_paths| file_paths.into_iter())
.map_err(Into::into)
}
fn get_files_by_extensions<'d, 'p, 'e, 'ret>(
db: &'d PrismaClient,
parent_iso_file_path: &'p IsolatedFilePathData<'_>,
extensions: &'e [Extension],
) -> impl Future<Output = Result<Vec<file_path_for_media_processor::Data>, MediaProcessorError>> + 'ret
where
'd: 'ret,
'p: 'ret,
'e: 'ret,
{
async move {
// FIXME: Had to use format! macro because PCR doesn't support IN with Vec for SQLite
// We have no data coming from the user, so this is sql injection safe
db._query_raw(raw!(
&format!(
"SELECT id, materialized_path, is_dir, name, extension, cas_id, object_id
FROM file_path
WHERE
location_id={{}}
AND cas_id IS NOT NULL
AND LOWER(extension) IN ({})
AND materialized_path = {{}}",
extensions
.iter()
.map(|ext| format!("LOWER('{ext}')"))
.collect::<Vec<_>>()
.join(",")
),
PrismaValue::Int(parent_iso_file_path.location_id() as i64),
PrismaValue::String(
parent_iso_file_path
.materialized_path_for_children()
.expect("sub path iso_file_path must be a directory")
)
))
.exec()
.await
.map_err(Into::into)
}
}

View file

@ -1,42 +1,69 @@
use crate::{
api::CoreEvent,
library::{Libraries, LibraryManagerEvent},
object::media::thumbnail::ThumbnailerError,
prisma::{file_path, PrismaClient},
object::media::thumbnail::{
can_generate_thumbnail_for_document, can_generate_thumbnail_for_image,
generate_image_thumbnail, get_shard_hex, get_thumb_key, ThumbnailerError,
},
util::error::{FileIOError, NonUtf8PathError},
};
use sd_file_ext::extensions::{DocumentExtension, ImageExtension};
use sd_prisma::prisma::{file_path, PrismaClient};
use serde::{Deserialize, Serialize};
use std::{
collections::{HashMap, HashSet, VecDeque},
ffi::OsStr,
path::{Path, PathBuf},
pin::pin,
str::FromStr,
sync::Arc,
time::{Duration, SystemTime},
};
use async_channel as chan;
use futures::{future::try_join_all, stream::FuturesUnordered, FutureExt};
use futures::stream::FuturesUnordered;
use futures_concurrency::{
future::{Join, Race},
future::{Join, Race, TryJoin},
stream::Merge,
};
use once_cell::sync::OnceCell;
use thiserror::Error;
use tokio::{
fs, io, spawn,
sync::oneshot,
time::{interval, interval_at, timeout, Instant, MissedTickBehavior},
sync::{broadcast, oneshot, Mutex},
time::{interval, interval_at, sleep, timeout, Instant, MissedTickBehavior},
};
use tokio_stream::{wrappers::IntervalStream, StreamExt};
use tokio_util::sync::{CancellationToken, DropGuard};
use tracing::{debug, error, trace};
use tracing::{debug, error, trace, warn};
use uuid::Uuid;
use super::{generate_thumbnail, GenerateThumbnailArgs, THUMBNAIL_CACHE_DIR_NAME};
use super::{init_thumbnail_dir, THUMBNAIL_CACHE_DIR_NAME};
const ONE_SEC: Duration = Duration::from_secs(1);
const THIRTY_SECS: Duration = Duration::from_secs(30);
const HALF_HOUR: Duration = Duration::from_secs(30 * 60);
const ONE_WEEK: Duration = Duration::from_secs(7 * 24 * 60 * 60);
const SAVE_STATE_FILE: &str = "thumbs_to_process.bin";
static BATCH_SIZE: OnceCell<usize> = OnceCell::new();
#[derive(Debug, Serialize, Deserialize)]
pub struct GenerateThumbnailArgs {
pub extension: String,
pub cas_id: String,
pub path: PathBuf,
}
impl GenerateThumbnailArgs {
pub fn new(extension: String, cas_id: String, path: PathBuf) -> Self {
Self {
extension,
cas_id,
path,
}
}
}
#[derive(Error, Debug)]
enum Error {
@ -53,56 +80,166 @@ enum Error {
#[derive(Debug)]
enum DatabaseMessage {
Add(Uuid, Arc<PrismaClient>),
Update(Uuid, Arc<PrismaClient>),
Remove(Uuid),
}
#[derive(Debug, Serialize, Deserialize)]
pub struct BatchToProcess {
pub batch: Vec<GenerateThumbnailArgs>,
pub should_regenerate: bool,
pub in_background: bool,
}
#[derive(Debug, Serialize, Deserialize)]
enum ProcessingKind {
Indexed,
Ephemeral,
}
// Thumbnails directory have the following structure:
// thumbnails/
// ├── version.txt
// └── <cas_id>[0..2]/ # sharding
// └── <cas_id>.webp
pub struct Thumbnailer {
thumbnails_directory: PathBuf,
cas_ids_to_delete_tx: chan::Sender<Vec<String>>,
ephemeral_thumbnails_to_generate_tx: chan::Sender<Vec<GenerateThumbnailArgs>>,
_cancel_loop: DropGuard,
thumbnails_to_generate_tx: chan::Sender<(BatchToProcess, ProcessingKind)>,
last_single_thumb_generated: Mutex<Instant>,
reporter: broadcast::Sender<CoreEvent>,
cancel_tx: chan::Sender<oneshot::Sender<()>>,
}
#[derive(Debug, Serialize, Deserialize)]
struct ThumbsProcessingSaveState {
ephemeral_cas_ids: HashSet<String>,
// This queues doubles as LIFO and FIFO, assuming LIFO in case of users asking for a new batch
// by entering a new directory in the explorer, otherwise processing as FIFO
queue: VecDeque<(BatchToProcess, ProcessingKind)>,
// These below are FIFO queues, so we can process leftovers from the previous batch first
indexed_leftovers_queue: VecDeque<BatchToProcess>,
ephemeral_leftovers_queue: VecDeque<BatchToProcess>,
}
impl Default for ThumbsProcessingSaveState {
fn default() -> Self {
Self {
ephemeral_cas_ids: HashSet::with_capacity(128),
queue: VecDeque::with_capacity(32),
indexed_leftovers_queue: VecDeque::with_capacity(8),
ephemeral_leftovers_queue: VecDeque::with_capacity(8),
}
}
}
impl ThumbsProcessingSaveState {
async fn load(thumbnails_directory: impl AsRef<Path>) -> Self {
let resume_file = thumbnails_directory.as_ref().join(SAVE_STATE_FILE);
match fs::read(&resume_file).await {
Ok(bytes) => {
let this = rmp_serde::from_slice::<Self>(&bytes).unwrap_or_else(|e| {
error!("Failed to deserialize save state at thumbnailer actor: {e:#?}");
Self::default()
});
if let Err(e) = fs::remove_file(&resume_file).await {
error!(
"Failed to remove save state file at thumbnailer actor: {:#?}",
FileIOError::from((resume_file, e))
);
}
this
}
Err(e) if e.kind() == io::ErrorKind::NotFound => {
trace!("No save state found at thumbnailer actor");
Self::default()
}
Err(e) => {
error!(
"Failed to read save state at thumbnailer actor: {:#?}",
FileIOError::from((resume_file, e))
);
Self::default()
}
}
}
async fn store(self, thumbnails_directory: impl AsRef<Path>) {
let resume_file = thumbnails_directory.as_ref().join(SAVE_STATE_FILE);
let Ok(bytes) = rmp_serde::to_vec_named(&self).map_err(|e| {
error!("Failed to serialize save state at thumbnailer actor: {e:#?}");
}) else {
return;
};
if let Err(e) = fs::write(&resume_file, bytes).await {
error!(
"Failed to write save state at thumbnailer actor: {:#?}",
FileIOError::from((resume_file, e))
);
}
}
}
impl Thumbnailer {
pub fn new(data_dir: PathBuf, lm: Arc<Libraries>) -> Self {
let mut thumbnails_directory = data_dir;
thumbnails_directory.push(THUMBNAIL_CACHE_DIR_NAME);
pub async fn new(
data_dir: PathBuf,
lm: Arc<Libraries>,
reporter: broadcast::Sender<CoreEvent>,
) -> Self {
let thumbnails_directory = init_thumbnail_dir(&data_dir).await.unwrap_or_else(|e| {
error!("Failed to initialize thumbnail directory: {e:#?}");
let mut thumbnails_directory = data_dir;
thumbnails_directory.push(THUMBNAIL_CACHE_DIR_NAME);
thumbnails_directory
});
let (databases_tx, databases_rx) = chan::bounded(4);
let (ephemeral_thumbnails_to_generate_tx, ephemeral_thumbnails_to_generate_rx) =
chan::unbounded();
let (thumbnails_to_generate_tx, ephemeral_thumbnails_to_generate_rx) = chan::unbounded();
let (cas_ids_to_delete_tx, cas_ids_to_delete_rx) = chan::bounded(16);
let cancel_token = CancellationToken::new();
let (cancel_tx, cancel_rx) = chan::bounded(1);
let inner_cancel_token = cancel_token.child_token();
tokio::spawn(async move {
loop {
if let Err(e) = tokio::spawn(Self::worker(
thumbnails_directory.clone(),
databases_rx.clone(),
cas_ids_to_delete_rx.clone(),
ephemeral_thumbnails_to_generate_rx.clone(),
inner_cancel_token.child_token(),
))
.await
{
error!(
"Error on Thumbnail Remover Actor; \
BATCH_SIZE
.set(std::thread::available_parallelism().map_or_else(
|e| {
error!("Failed to get available parallelism: {e:#?}");
4
},
|non_zero| {
let count = non_zero.get();
debug!("Thumbnailer will process batches of {count} thumbnails in parallel.");
count
},
))
.ok();
let inner_cancel_rx = cancel_rx.clone();
let inner_thumbnails_directory = thumbnails_directory.clone();
let inner_reporter = reporter.clone();
spawn(async move {
while let Err(e) = spawn(Self::worker(
inner_reporter.clone(),
inner_thumbnails_directory.clone(),
databases_rx.clone(),
cas_ids_to_delete_rx.clone(),
ephemeral_thumbnails_to_generate_rx.clone(),
inner_cancel_rx.clone(),
))
.await
{
error!(
"Error on Thumbnail Remover Actor; \
Error: {e}; \
Restarting the worker loop...",
);
}
if inner_cancel_token.is_cancelled() {
break;
}
);
}
});
tokio::spawn({
spawn({
let rx = lm.rx.clone();
async move {
if let Err(err) = rx
@ -113,15 +250,29 @@ impl Thumbnailer {
match event {
LibraryManagerEvent::Load(library) => {
if databases_tx
.send(DatabaseMessage::Add(library.id, library.db.clone()))
.send(DatabaseMessage::Add(
library.id,
Arc::clone(&library.db),
))
.await
.is_err()
{
error!("Thumbnail remover actor is dead")
}
}
LibraryManagerEvent::Edit(library)
| LibraryManagerEvent::InstancesModified(library) => {
if databases_tx
.send(DatabaseMessage::Update(
library.id,
Arc::clone(&library.db),
))
.await
.is_err()
{
error!("Thumbnail remover actor is dead")
}
}
LibraryManagerEvent::Edit(_) => {}
LibraryManagerEvent::InstancesModified(_) => {}
LibraryManagerEvent::Delete(library) => {
if databases_tx
.send(DatabaseMessage::Remove(library.id))
@ -142,18 +293,22 @@ impl Thumbnailer {
});
Self {
thumbnails_directory,
cas_ids_to_delete_tx,
ephemeral_thumbnails_to_generate_tx,
_cancel_loop: cancel_token.drop_guard(),
thumbnails_to_generate_tx,
last_single_thumb_generated: Mutex::new(Instant::now()),
reporter,
cancel_tx,
}
}
async fn worker(
reporter: broadcast::Sender<CoreEvent>,
thumbnails_directory: PathBuf,
databases_rx: chan::Receiver<DatabaseMessage>,
cas_ids_to_delete_rx: chan::Receiver<Vec<String>>,
ephemeral_thumbnails_to_generate_rx: chan::Receiver<Vec<GenerateThumbnailArgs>>,
cancel_token: CancellationToken,
thumbnails_to_generate_rx: chan::Receiver<(BatchToProcess, ProcessingKind)>,
cancel_rx: chan::Receiver<oneshot::Sender<()>>,
) {
let mut to_remove_interval = interval_at(Instant::now() + THIRTY_SECS, HALF_HOUR);
to_remove_interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
@ -162,31 +317,31 @@ impl Thumbnailer {
idle_interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
let mut databases = HashMap::new();
let mut ephemeral_thumbnails_cas_ids = HashSet::new();
#[derive(Debug)]
enum StreamMessage {
RemovalTick,
ToDelete(Vec<String>),
Database(DatabaseMessage),
EphemeralThumbnailNewBatch(Vec<GenerateThumbnailArgs>),
Leftovers(Vec<GenerateThumbnailArgs>),
NewBatch((BatchToProcess, ProcessingKind)),
Leftovers((BatchToProcess, ProcessingKind)),
NewEphemeralThumbnailCasIds(Vec<String>),
Stop,
Shutdown(oneshot::Sender<()>),
IdleTick,
}
let cancel = pin!(cancel_token.cancelled());
// This is a LIFO queue, so we can process the most recent thumbnails first
let mut ephemeral_thumbnails_queue = Vec::with_capacity(8);
// This one is a FIFO queue, so we can process leftovers from the previous batch first
let mut ephemeral_thumbnails_leftovers_queue = VecDeque::with_capacity(8);
let ThumbsProcessingSaveState {
mut ephemeral_cas_ids,
mut queue,
mut indexed_leftovers_queue,
mut ephemeral_leftovers_queue,
} = ThumbsProcessingSaveState::load(&thumbnails_directory).await;
let (ephemeral_thumbnails_cas_ids_tx, ephemeral_thumbnails_cas_ids_rx) = chan::bounded(32);
let (leftovers_tx, leftovers_rx) = chan::bounded(8);
let mut shutdown_leftovers_rx = leftovers_rx.clone();
let (stop_older_processing_tx, stop_older_processing_rx) = chan::bounded(1);
let mut current_batch_processing_rx: Option<oneshot::Receiver<()>> = None;
@ -194,12 +349,12 @@ impl Thumbnailer {
let mut msg_stream = (
databases_rx.map(StreamMessage::Database),
cas_ids_to_delete_rx.map(StreamMessage::ToDelete),
ephemeral_thumbnails_to_generate_rx.map(StreamMessage::EphemeralThumbnailNewBatch),
thumbnails_to_generate_rx.map(StreamMessage::NewBatch),
leftovers_rx.map(StreamMessage::Leftovers),
ephemeral_thumbnails_cas_ids_rx.map(StreamMessage::NewEphemeralThumbnailCasIds),
IntervalStream::new(to_remove_interval).map(|_| StreamMessage::RemovalTick),
IntervalStream::new(idle_interval).map(|_| StreamMessage::IdleTick),
cancel.into_stream().map(|()| StreamMessage::Stop),
cancel_rx.map(StreamMessage::Shutdown),
)
.merge();
@ -217,32 +372,35 @@ impl Thumbnailer {
}
if current_batch_processing_rx.is_none()
&& (!ephemeral_thumbnails_queue.is_empty()
|| !ephemeral_thumbnails_leftovers_queue.is_empty())
&& (!queue.is_empty()
|| !indexed_leftovers_queue.is_empty()
|| !ephemeral_leftovers_queue.is_empty())
{
let (done_tx, done_rx) = oneshot::channel();
current_batch_processing_rx = Some(done_rx);
if let Some(batch) = ephemeral_thumbnails_queue.pop() {
spawn(batch_processor(
batch,
ephemeral_thumbnails_cas_ids_tx.clone(),
stop_older_processing_rx.clone(),
let batch_and_kind = if let Some(batch_and_kind) = queue.pop_front() {
batch_and_kind
} else if let Some(batch) = indexed_leftovers_queue.pop_front() {
// indexed leftovers have bigger priority
(batch, ProcessingKind::Indexed)
} else if let Some(batch) = ephemeral_leftovers_queue.pop_front() {
(batch, ProcessingKind::Ephemeral)
} else {
continue;
};
spawn(batch_processor(
thumbnails_directory.clone(),
batch_and_kind,
ephemeral_thumbnails_cas_ids_tx.clone(),
ProcessorControlChannels {
stop_rx: stop_older_processing_rx.clone(),
done_tx,
leftovers_tx.clone(),
false,
));
} else if let Some(batch) = ephemeral_thumbnails_leftovers_queue.pop_front()
{
spawn(batch_processor(
batch,
ephemeral_thumbnails_cas_ids_tx.clone(),
stop_older_processing_rx.clone(),
done_tx,
leftovers_tx.clone(),
true,
));
}
},
leftovers_tx.clone(),
reporter.clone(),
));
}
}
@ -252,7 +410,7 @@ impl Thumbnailer {
if let Err(e) = Self::process_clean_up(
&thumbnails_directory,
databases.values(),
&ephemeral_thumbnails_cas_ids,
&ephemeral_cas_ids,
)
.await
{
@ -270,31 +428,115 @@ impl Thumbnailer {
}
}
StreamMessage::EphemeralThumbnailNewBatch(batch) => {
ephemeral_thumbnails_queue.push(batch);
if current_batch_processing_rx.is_some() // Only sends stop signal if there is a batch being processed
&& stop_older_processing_tx.send(()).await.is_err()
{
error!("Thumbnail remover actor died when trying to stop older processing");
StreamMessage::NewBatch((batch, kind)) => {
let in_background = batch.in_background;
trace!(
"New {kind:?} batch to process in {}, size: {}",
if in_background {
"background"
} else {
"foreground"
},
batch.batch.len()
);
if in_background {
queue.push_back((batch, kind));
} else {
// If a processing must be in foreground, then it takes maximum priority
queue.push_front((batch, kind));
}
// Only sends stop signal if there is a batch being processed
if !in_background && current_batch_processing_rx.is_some() {
trace!("Sending stop signal to older processing");
let (tx, rx) = oneshot::channel();
match stop_older_processing_tx.try_send(tx) {
Ok(()) => {
// We put a timeout here to avoid a deadlock in case the older processing already
// finished its batch
if timeout(ONE_SEC, rx).await.is_err() {
stop_older_processing_rx.recv().await.ok();
}
}
Err(e) if e.is_full() => {
// The last signal we sent happened after a batch was already processed
// So we clean the channel and we're good to go.
stop_older_processing_rx.recv().await.ok();
}
Err(_) => {
error!("Thumbnail remover actor died when trying to stop older processing");
}
}
}
}
StreamMessage::Leftovers(batch) => {
ephemeral_thumbnails_leftovers_queue.push_back(batch);
}
StreamMessage::Leftovers((batch, kind)) => match kind {
ProcessingKind::Indexed => indexed_leftovers_queue.push_back(batch),
ProcessingKind::Ephemeral => ephemeral_leftovers_queue.push_back(batch),
},
StreamMessage::Database(DatabaseMessage::Add(id, db)) => {
StreamMessage::Database(DatabaseMessage::Add(id, db))
| StreamMessage::Database(DatabaseMessage::Update(id, db)) => {
databases.insert(id, db);
}
StreamMessage::Database(DatabaseMessage::Remove(id)) => {
databases.remove(&id);
}
StreamMessage::NewEphemeralThumbnailCasIds(cas_ids) => {
ephemeral_thumbnails_cas_ids.extend(cas_ids);
trace!("New ephemeral thumbnail cas ids: {}", cas_ids.len());
ephemeral_cas_ids.extend(cas_ids);
}
StreamMessage::Stop => {
debug!("Thumbnail remover actor is stopping");
break;
StreamMessage::Shutdown(cancel_tx) => {
debug!("Thumbnail actor is shutting down...");
// First stopping the current batch processing
let (tx, rx) = oneshot::channel();
match stop_older_processing_tx.try_send(tx) {
Ok(()) => {
// We put a timeout here to avoid a deadlock in case the older processing already
// finished its batch
if timeout(ONE_SEC, rx).await.is_err() {
stop_older_processing_rx.recv().await.ok();
}
}
Err(e) if e.is_full() => {
// The last signal we sent happened after a batch was already processed
// So we clean the channel and we're good to go.
stop_older_processing_rx.recv().await.ok();
}
Err(_) => {
error!(
"Thumbnail remover actor died when trying to stop older processing"
);
}
}
// Closing the leftovers channel to stop the batch processor as we already sent
// an stop signal
leftovers_tx.close();
while let Some((batch, kind)) = shutdown_leftovers_rx.next().await {
match kind {
ProcessingKind::Indexed => indexed_leftovers_queue.push_back(batch),
ProcessingKind::Ephemeral => ephemeral_leftovers_queue.push_back(batch),
}
}
// Saving state
ThumbsProcessingSaveState {
ephemeral_cas_ids,
queue,
indexed_leftovers_queue,
ephemeral_leftovers_queue,
}
.store(thumbnails_directory)
.await;
// Signaling that we're done shutting down
cancel_tx.send(()).ok();
return;
}
}
}
@ -304,19 +546,23 @@ impl Thumbnailer {
thumbnails_directory: &Path,
cas_ids: Vec<String>,
) -> Result<(), Error> {
try_join_all(cas_ids.into_iter().map(|cas_id| async move {
let thumbnail_path =
thumbnails_directory.join(format!("{}/{cas_id}.webp", &cas_id[0..2]));
cas_ids
.into_iter()
.map(|cas_id| async move {
let thumbnail_path =
thumbnails_directory.join(format!("{}/{cas_id}.webp", &cas_id[0..2]));
trace!("Removing thumbnail: {}", thumbnail_path.display());
trace!("Removing thumbnail: {}", thumbnail_path.display());
match fs::remove_file(&thumbnail_path).await {
Ok(()) => Ok(()),
Err(e) if e.kind() == io::ErrorKind::NotFound => Ok(()),
Err(e) => Err(FileIOError::from((thumbnail_path, e))),
}
}))
.await?;
match fs::remove_file(&thumbnail_path).await {
Ok(()) => Ok(()),
Err(e) if e.kind() == io::ErrorKind::NotFound => Ok(()),
Err(e) => Err(FileIOError::from((thumbnail_path, e))),
}
})
.collect::<Vec<_>>()
.try_join()
.await?;
Ok(())
}
@ -324,7 +570,7 @@ impl Thumbnailer {
async fn process_clean_up(
thumbnails_directory: &Path,
databases: impl Iterator<Item = &Arc<PrismaClient>>,
non_indexed_thumbnails_cas_ids: &HashSet<String>,
ephemeral_cas_ids: &HashSet<String>,
) -> Result<(), Error> {
let databases = databases.collect::<Vec<_>>();
@ -415,13 +661,13 @@ impl Thumbnailer {
});
}
thumbnails_paths_by_cas_id
.retain(|cas_id, _| !non_indexed_thumbnails_cas_ids.contains(cas_id));
thumbnails_paths_by_cas_id.retain(|cas_id, _| !ephemeral_cas_ids.contains(cas_id));
let now = SystemTime::now();
let removed_count = try_join_all(thumbnails_paths_by_cas_id.into_values().map(
|path| async move {
let removed_count = thumbnails_paths_by_cas_id
.into_values()
.map(|path| async move {
if let Ok(metadata) = fs::metadata(&path).await {
if metadata
.accessed()
@ -438,17 +684,18 @@ impl Thumbnailer {
}
}
tracing::warn!("Removing stale thumbnail: {}", path.display());
trace!("Removing stale thumbnail: {}", path.display());
fs::remove_file(&path)
.await
.map(|()| true)
.map_err(|e| FileIOError::from((path, e)))
},
))
.await?
.into_iter()
.filter(|r| *r)
.count();
})
.collect::<Vec<_>>()
.try_join()
.await?
.into_iter()
.filter(|r| *r)
.count();
if thumbs_found == removed_count {
// if we removed all the thumnails we found, it means that the directory is empty
@ -466,10 +713,11 @@ impl Thumbnailer {
Ok(())
}
pub async fn new_non_indexed_thumbnails_batch(&self, batch: Vec<GenerateThumbnailArgs>) {
#[inline]
async fn new_batch(&self, batch: BatchToProcess, kind: ProcessingKind) {
if self
.ephemeral_thumbnails_to_generate_tx
.send(batch)
.thumbnails_to_generate_tx
.send((batch, kind))
.await
.is_err()
{
@ -477,45 +725,130 @@ impl Thumbnailer {
}
}
pub async fn new_ephemeral_thumbnails_batch(&self, batch: BatchToProcess) {
self.new_batch(batch, ProcessingKind::Ephemeral).await;
}
pub async fn new_indexed_thumbnails_batch(&self, batch: BatchToProcess) {
self.new_batch(batch, ProcessingKind::Indexed).await;
}
pub async fn remove_cas_ids(&self, cas_ids: Vec<String>) {
if self.cas_ids_to_delete_tx.send(cas_ids).await.is_err() {
error!("Thumbnail remover actor is dead: Failed to send cas ids to delete");
}
}
pub async fn shutdown(&self) {
let (tx, rx) = oneshot::channel();
if self.cancel_tx.send(tx).await.is_err() {
error!("Thumbnail remover actor is dead: Failed to send shutdown signal");
} else {
rx.await.ok();
}
}
/// WARNING!!!! DON'T USE THIS METHOD IN A LOOP!!!!!!!!!!!!! It will be pretty slow on purpose!
pub async fn generate_single_thumbnail(
&self,
extension: &str,
cas_id: String,
path: impl AsRef<Path>,
) -> Result<(), ThumbnailerError> {
let mut last_single_thumb_generated_guard = self.last_single_thumb_generated.lock().await;
let elapsed = Instant::now() - *last_single_thumb_generated_guard;
if elapsed < ONE_SEC {
// This will choke up in case someone try to use this method in a loop, otherwise
// it will consume all the machine resources like a gluton monster from hell
sleep(ONE_SEC - elapsed).await;
}
let res = generate_thumbnail(
self.thumbnails_directory.clone(),
extension,
cas_id,
path,
false,
false,
self.reporter.clone(),
)
.await
.map(|_| ());
*last_single_thumb_generated_guard = Instant::now();
res
}
}
struct ProcessorControlChannels {
stop_rx: chan::Receiver<oneshot::Sender<()>>,
done_tx: oneshot::Sender<()>,
}
async fn batch_processor(
batch: Vec<GenerateThumbnailArgs>,
thumbnails_directory: PathBuf,
(
BatchToProcess {
batch,
should_regenerate,
in_background,
},
kind,
): (BatchToProcess, ProcessingKind),
generated_cas_ids_tx: chan::Sender<Vec<String>>,
stop_rx: chan::Receiver<()>,
done_tx: oneshot::Sender<()>,
leftovers_tx: chan::Sender<Vec<GenerateThumbnailArgs>>,
in_background: bool,
ProcessorControlChannels { stop_rx, done_tx }: ProcessorControlChannels,
leftovers_tx: chan::Sender<(BatchToProcess, ProcessingKind)>,
reporter: broadcast::Sender<CoreEvent>,
) {
trace!(
"Processing thumbnails batch of kind {kind:?} with size {} in {}",
batch.len(),
if in_background {
"background"
} else {
"foreground"
},
);
// Tranforming to `VecDeque` so we don't need to move anything as we consume from the beginning
// This from is guaranteed to be O(1)
let mut queue = VecDeque::from(batch);
enum RaceOutputs {
Processed,
Stop,
Stop(oneshot::Sender<()>),
}
// Need this borrow here to satisfy the async move below
let generated_cas_ids_tx = &generated_cas_ids_tx;
while !queue.is_empty() {
let chunk = (0..4)
let chunk = (0..*BATCH_SIZE
.get()
.expect("BATCH_SIZE is set at thumbnailer new method"))
.filter_map(|_| queue.pop_front())
.map(
|GenerateThumbnailArgs {
extension,
cas_id,
path,
node,
}| {
let reporter = reporter.clone();
let thumbnails_directory = thumbnails_directory.clone();
spawn(async move {
timeout(
THIRTY_SECS,
generate_thumbnail(&extension, cas_id, &path, node, in_background),
generate_thumbnail(
thumbnails_directory,
&extension,
cas_id,
&path,
in_background,
should_regenerate,
reporter,
),
)
.await
.unwrap_or_else(|_| Err(ThumbnailerError::TimedOut(path.into_boxed_path())))
@ -524,7 +857,7 @@ async fn batch_processor(
)
.collect::<Vec<_>>();
if let RaceOutputs::Stop = (
if let RaceOutputs::Stop(tx) = (
async move {
let cas_ids = chunk
.join()
@ -554,33 +887,121 @@ async fn batch_processor(
RaceOutputs::Processed
},
async {
stop_rx
let tx = stop_rx
.recv()
.await
.expect("Critical error on thumbnails actor");
trace!("Received a stop signal");
RaceOutputs::Stop
RaceOutputs::Stop(tx)
},
)
.race()
.await
{
// Our queue is always contiguous, so this `from`` is free
// Our queue is always contiguous, so this `from` is free
let leftovers = Vec::from(queue);
trace!(
"Stopped with {} thumbnails left to process",
leftovers.len()
);
if !leftovers.is_empty() && leftovers_tx.send(leftovers).await.is_err() {
if !leftovers.is_empty()
&& leftovers_tx
.send((
BatchToProcess {
batch: leftovers,
should_regenerate,
in_background: true, // Leftovers should always be in background
},
kind,
))
.await
.is_err()
{
error!("Thumbnail remover actor is dead: Failed to send leftovers")
}
done_tx.send(()).ok();
tx.send(()).ok();
return;
}
}
trace!("Finished batch!");
done_tx.send(()).ok();
}
async fn generate_thumbnail(
thumbnails_directory: PathBuf,
extension: &str,
cas_id: String,
path: impl AsRef<Path>,
in_background: bool,
should_regenerate: bool,
reporter: broadcast::Sender<CoreEvent>,
) -> Result<String, ThumbnailerError> {
let path = path.as_ref();
trace!("Generating thumbnail for {}", path.display());
let mut output_path = thumbnails_directory;
output_path.push(get_shard_hex(&cas_id));
output_path.push(&cas_id);
output_path.set_extension("webp");
if let Err(e) = fs::metadata(&output_path).await {
if e.kind() != io::ErrorKind::NotFound {
error!(
"Failed to check if thumbnail exists, but we will try to generate it anyway: {e:#?}"
);
}
// Otherwise we good, thumbnail doesn't exist so we can generate it
} else if !should_regenerate {
trace!(
"Skipping thumbnail generation for {} because it already exists",
path.display()
);
return Ok(cas_id);
}
if let Ok(extension) = ImageExtension::from_str(extension) {
if can_generate_thumbnail_for_image(&extension) {
generate_image_thumbnail(&path, &output_path).await?;
}
} else if let Ok(extension) = DocumentExtension::from_str(extension) {
if can_generate_thumbnail_for_document(&extension) {
generate_image_thumbnail(&path, &output_path).await?;
}
}
#[cfg(feature = "ffmpeg")]
{
use crate::object::media::thumbnail::{
can_generate_thumbnail_for_video, generate_video_thumbnail,
};
use sd_file_ext::extensions::VideoExtension;
if let Ok(extension) = VideoExtension::from_str(extension) {
if can_generate_thumbnail_for_video(&extension) {
generate_video_thumbnail(&path, &output_path).await?;
}
}
}
if !in_background {
trace!("Emitting new thumbnail event");
if reporter
.send(CoreEvent::NewThumbnail {
thumb_key: get_thumb_key(&cas_id),
})
.is_err()
{
warn!("Error sending event to Node's event bus");
}
}
trace!("Generated thumbnail for {}", path.display());
Ok(cas_id)
}

View file

@ -1,6 +1,6 @@
use crate::util::{error::FileIOError, version_manager::VersionManager};
use std::path::PathBuf;
use std::path::{Path, PathBuf};
use int_enum::IntEnum;
use tokio::fs;
@ -16,9 +16,9 @@ enum ThumbnailVersion {
Unknown = 0,
}
pub async fn init_thumbnail_dir(data_dir: PathBuf) -> Result<PathBuf, ThumbnailerError> {
pub async fn init_thumbnail_dir(data_dir: impl AsRef<Path>) -> Result<PathBuf, ThumbnailerError> {
debug!("Initializing thumbnail directory");
let thumbnail_dir = data_dir.join(THUMBNAIL_CACHE_DIR_NAME);
let thumbnail_dir = data_dir.as_ref().join(THUMBNAIL_CACHE_DIR_NAME);
let version_file = thumbnail_dir.join("version.txt");
let version_manager =

View file

@ -1,9 +1,4 @@
use crate::{
api::CoreEvent,
job::JobRunErrors,
library::Library,
location::file_path_helper::{file_path_for_media_processor, IsolatedFilePathData},
prisma::location,
util::{error::FileIOError, version_manager::VersionManagerError},
Node,
};
@ -18,20 +13,16 @@ use sd_media_metadata::image::Orientation;
use sd_file_ext::extensions::{VideoExtension, ALL_VIDEO_EXTENSIONS};
use std::{
collections::HashMap,
ops::Deref,
path::{Path, PathBuf},
str::FromStr,
sync::Arc,
};
use futures::future::{join_all, try_join_all};
use image::{self, imageops, DynamicImage, GenericImageView};
use once_cell::sync::Lazy;
use serde::{Deserialize, Serialize};
use thiserror::Error;
use tokio::{fs, io, task};
use tracing::{error, trace, warn};
use tokio::{fs, task};
use tracing::error;
use webp::Encoder;
pub mod actor;
@ -87,6 +78,18 @@ pub(super) static THUMBNAILABLE_EXTENSIONS: Lazy<Vec<Extension>> = Lazy::new(||
.collect()
});
pub static ALL_THUMBNAILABLE_EXTENSIONS: Lazy<Vec<Extension>> = Lazy::new(|| {
if cfg!(feature = "ffmpeg") {
THUMBNAILABLE_EXTENSIONS
.iter()
.cloned()
.chain(THUMBNAILABLE_VIDEO_EXTENSIONS.iter().cloned())
.collect()
} else {
THUMBNAILABLE_EXTENSIONS.clone()
}
});
#[derive(Error, Debug)]
pub enum ThumbnailerError {
// Internal errors
@ -217,288 +220,3 @@ pub const fn can_generate_thumbnail_for_document(document_extension: &DocumentEx
matches!(document_extension, Pdf)
}
pub(super) async fn process(
entries: impl IntoIterator<Item = (&file_path_for_media_processor::Data, ThumbnailerEntryKind)>,
location_id: location::id::Type,
location_path: impl AsRef<Path>,
thumbnails_base_dir: impl AsRef<Path>,
regenerate: bool,
library: &Library,
ctx_update_fn: impl Fn(usize),
) -> Result<(ThumbnailerMetadata, JobRunErrors), ThumbnailerError> {
let mut run_metadata = ThumbnailerMetadata::default();
let location_path = location_path.as_ref();
let thumbnails_base_dir = thumbnails_base_dir.as_ref();
let mut errors = vec![];
let mut to_create_dirs = HashMap::new();
struct WorkTable<'a> {
kind: ThumbnailerEntryKind,
input_path: PathBuf,
cas_id: &'a str,
output_path: PathBuf,
metadata_res: io::Result<()>,
}
let entries = entries
.into_iter()
.filter_map(|(file_path, kind)| {
IsolatedFilePathData::try_from((location_id, file_path))
.map(|iso_file_path| (file_path, kind, location_path.join(iso_file_path)))
.map_err(|e| {
errors.push(format!(
"Failed to build path for file with id {}: {e}",
file_path.id
))
})
.ok()
})
.filter_map(|(file_path, kind, path)| {
if let Some(cas_id) = &file_path.cas_id {
Some((kind, path, cas_id))
} else {
warn!(
"Skipping thumbnail generation for {} due to missing cas_id",
path.display()
);
run_metadata.skipped += 1;
None
}
})
.map(|(kind, input_path, cas_id)| {
let thumbnails_shard_dir = thumbnails_base_dir.join(get_shard_hex(cas_id));
let output_path = thumbnails_shard_dir.join(format!("{cas_id}.webp"));
// Putting all sharding directories in a map to avoid trying to create repeteaded ones
to_create_dirs
.entry(thumbnails_shard_dir.clone())
.or_insert_with(|| async move {
fs::create_dir_all(&thumbnails_shard_dir)
.await
.map_err(|e| FileIOError::from((thumbnails_shard_dir, e)))
});
async move {
WorkTable {
kind,
input_path,
cas_id,
// Discarding the ok part as we don't actually care about metadata here, maybe avoiding extra space
metadata_res: fs::metadata(&output_path).await.map(|_| ()),
output_path,
}
}
})
.collect::<Vec<_>>();
if entries.is_empty() {
return Ok((run_metadata, errors.into()));
}
// Resolving these futures first, as we want to fail early if we can't create the directories
try_join_all(to_create_dirs.into_values()).await?;
// Running thumbs generation sequentially to don't overload the system, if we're wasting too much time on I/O we can
// try to run them in parallel
for (
idx,
WorkTable {
kind,
input_path,
cas_id,
output_path,
metadata_res,
},
) in join_all(entries).await.into_iter().enumerate()
{
ctx_update_fn(idx + 1);
match metadata_res {
Ok(_) => {
if !regenerate {
trace!(
"Thumbnail already exists, skipping generation for {}",
input_path.display()
);
run_metadata.skipped += 1;
} else {
tracing::debug!(
"Renegerating thumbnail {} to {}",
input_path.display(),
output_path.display()
);
process_single_thumbnail(
cas_id,
kind,
&input_path,
&output_path,
&mut errors,
&mut run_metadata,
library,
)
.await;
}
}
Err(e) if e.kind() == io::ErrorKind::NotFound => {
trace!(
"Writing {} to {}",
input_path.display(),
output_path.display()
);
process_single_thumbnail(
cas_id,
kind,
&input_path,
&output_path,
&mut errors,
&mut run_metadata,
library,
)
.await;
}
Err(e) => {
error!(
"Error getting metadata for thumb: {:#?}",
FileIOError::from((output_path, e))
);
errors.push(format!(
"Had an error generating thumbnail for \"{}\"",
input_path.display()
));
}
}
}
Ok((run_metadata, errors.into()))
}
// Using &Path as this function if private only to this module, always being used with a &Path, so we
// don't pay the compile price for generics
async fn process_single_thumbnail(
cas_id: &str,
kind: ThumbnailerEntryKind,
input_path: &Path,
output_path: &Path,
errors: &mut Vec<String>,
run_metadata: &mut ThumbnailerMetadata,
library: &Library,
) {
match kind {
ThumbnailerEntryKind::Image => {
if let Err(e) = generate_image_thumbnail(&input_path, &output_path).await {
error!(
"Error generating thumb for image \"{}\": {e:#?}",
input_path.display()
);
errors.push(format!(
"Had an error generating thumbnail for \"{}\"",
input_path.display()
));
return;
}
}
#[cfg(feature = "ffmpeg")]
ThumbnailerEntryKind::Video => {
if let Err(e) = generate_video_thumbnail(&input_path, &output_path).await {
error!(
"Error generating thumb for video \"{}\": {e:#?}",
input_path.display()
);
errors.push(format!(
"Had an error generating thumbnail for \"{}\"",
input_path.display()
));
return;
}
}
}
trace!("Emitting new thumbnail event");
library.emit(CoreEvent::NewThumbnail {
thumb_key: get_thumb_key(cas_id),
});
run_metadata.created += 1;
}
// TODO(fogodev): Unify how we generate thumbnails
#[derive(Debug)]
pub struct GenerateThumbnailArgs {
pub extension: String,
pub cas_id: String,
pub path: PathBuf,
pub node: Arc<Node>,
}
impl GenerateThumbnailArgs {
pub fn new(extension: String, cas_id: String, path: PathBuf, node: Arc<Node>) -> Self {
Self {
extension,
cas_id,
path,
node,
}
}
}
pub async fn generate_thumbnail(
extension: &str,
cas_id: String,
path: impl AsRef<Path>,
node: Arc<Node>,
in_background: bool,
) -> Result<String, ThumbnailerError> {
let path = path.as_ref();
trace!("Generating thumbnail for {}", path.display());
let output_path = get_thumbnail_path(&node, &cas_id);
if let Err(e) = fs::metadata(&output_path).await {
if e.kind() != io::ErrorKind::NotFound {
error!(
"Failed to check if thumbnail exists, but we will try to generate it anyway: {e}"
);
}
// Otherwise we good, thumbnail doesn't exist so we can generate it
} else {
trace!(
"Skipping thumbnail generation for {} because it already exists",
path.display()
);
return Ok(cas_id);
}
if let Ok(extension) = ImageExtension::from_str(extension) {
if can_generate_thumbnail_for_image(&extension) {
generate_image_thumbnail(&path, &output_path).await?;
}
} else if let Ok(extension) = DocumentExtension::from_str(extension) {
if can_generate_thumbnail_for_document(&extension) {
generate_image_thumbnail(&path, &output_path).await?;
}
}
#[cfg(feature = "ffmpeg")]
{
if let Ok(extension) = VideoExtension::from_str(extension) {
if can_generate_thumbnail_for_video(&extension) {
generate_video_thumbnail(&path, &output_path).await?;
}
}
}
if !in_background {
trace!("Emitting new thumbnail event");
node.emit(CoreEvent::NewThumbnail {
thumb_key: get_thumb_key(&cas_id),
});
}
trace!("Generated thumbnail for {}", path.display());
Ok(cas_id)
}

View file

@ -54,7 +54,7 @@ macro_rules! extension_enum {
}
) => {
// construct enum
#[derive(Debug, ::serde::Serialize, ::serde::Deserialize, PartialEq, Eq)]
#[derive(Debug, ::serde::Serialize, ::serde::Deserialize, PartialEq, Eq, Clone)]
pub enum Extension {
$( $variant($type), )*
}