diff --git a/core/crates/heavy-lifting/src/job_system/error.rs b/core/crates/heavy-lifting/src/job_system/error.rs index 98c5b8f8e..4ae944b2e 100644 --- a/core/crates/heavy-lifting/src/job_system/error.rs +++ b/core/crates/heavy-lifting/src/job_system/error.rs @@ -30,6 +30,9 @@ pub enum JobSystemError { #[error(transparent)] Report(#[from] ReportError), + + #[error("internal job panic! ")] + Panic(JobId), } impl From for rspc::Error { diff --git a/core/crates/heavy-lifting/src/job_system/job.rs b/core/crates/heavy-lifting/src/job_system/job.rs index 91b3173b1..0a3642797 100644 --- a/core/crates/heavy-lifting/src/job_system/job.rs +++ b/core/crates/heavy-lifting/src/job_system/job.rs @@ -13,6 +13,7 @@ use std::{ hash::{Hash, Hasher}, marker::PhantomData, ops::{Deref, DerefMut}, + panic::AssertUnwindSafe, path::Path, pin::pin, sync::Arc, @@ -21,7 +22,7 @@ use std::{ use async_channel as chan; use chrono::{DateTime, Utc}; -use futures::{stream, Future, StreamExt}; +use futures::{stream, Future, FutureExt, StreamExt}; use futures_concurrency::{ future::{Join, TryJoin}, stream::Merge, @@ -750,15 +751,29 @@ where trace!("Dispatching job"); - spawn(to_spawn_job::( - self.id, - self.job, - ctx.clone(), - None, - base_dispatcher, - commands_rx, - done_tx, - )); + spawn({ + let id = self.id; + let job = self.job; + let ctx = ctx.clone(); + + async move { + if AssertUnwindSafe(to_spawn_job::( + id, + job, + ctx, + None, + base_dispatcher, + commands_rx, + done_tx, + )) + .catch_unwind() + .await + .is_err() + { + error!("job panicked"); + } + } + }); JobHandle { id: self.id, @@ -791,15 +806,29 @@ where trace!("Resuming job"); - spawn(to_spawn_job::( - self.id, - self.job, - ctx.clone(), - serialized_tasks, - base_dispatcher, - commands_rx, - done_tx, - )); + spawn({ + let id = self.id; + let job = self.job; + let ctx = ctx.clone(); + + async move { + if AssertUnwindSafe(to_spawn_job::( + id, + job, + ctx, + serialized_tasks, + base_dispatcher, + commands_rx, + done_tx, + )) + .catch_unwind() + .await + .is_err() + { + error!("job panicked"); + } + } + }); JobHandle { id: self.id, @@ -855,9 +884,14 @@ async fn to_spawn_job( spawn( async move { - tx.send(job.run::(dispatcher, ctx).await) - .await - .expect("job run channel closed"); + tx.send( + AssertUnwindSafe(job.run::(dispatcher, ctx)) + .catch_unwind() + .await + .unwrap_or(Err(Error::JobSystem(JobSystemError::Panic(job_id)))), + ) + .await + .expect("job run channel closed"); } .in_current_span(), ); diff --git a/core/crates/heavy-lifting/src/media_processor/helpers/thumbnailer.rs b/core/crates/heavy-lifting/src/media_processor/helpers/thumbnailer.rs index 5f636f606..caa06a3fe 100644 --- a/core/crates/heavy-lifting/src/media_processor/helpers/thumbnailer.rs +++ b/core/crates/heavy-lifting/src/media_processor/helpers/thumbnailer.rs @@ -14,6 +14,7 @@ use sd_file_ext::extensions::{VideoExtension, ALL_VIDEO_EXTENSIONS}; use std::{ ops::Deref, + panic, path::{Path, PathBuf}, str::FromStr, time::Duration, @@ -313,9 +314,9 @@ pub async fn generate_thumbnail( } fn inner_generate_image_thumbnail( - file_path: PathBuf, + file_path: &PathBuf, ) -> Result, thumbnailer::NonCriticalThumbnailerError> { - let mut img = format_image(&file_path).map_err(|e| { + let mut img = format_image(file_path).map_err(|e| { thumbnailer::NonCriticalThumbnailerError::FormatImage(file_path.clone(), e.to_string()) })?; @@ -336,7 +337,7 @@ fn inner_generate_image_thumbnail( // this corrects the rotation/flip of the image based on the *available* exif data // not all images have exif data, so we don't error. we also don't rotate HEIF as that's against the spec - if let Some(orientation) = Orientation::from_path(&file_path) { + if let Some(orientation) = Orientation::from_path(file_path) { if ConvertibleExtension::try_from(file_path.as_ref()) .expect("we already checked if the image was convertible") .should_rotate() @@ -347,7 +348,10 @@ fn inner_generate_image_thumbnail( // Create the WebP encoder for the above image let encoder = Encoder::from_image(&img).map_err(|reason| { - thumbnailer::NonCriticalThumbnailerError::WebPEncoding(file_path, reason.to_string()) + thumbnailer::NonCriticalThumbnailerError::WebPEncoding( + file_path.clone(), + reason.to_string(), + ) })?; // Type `WebPMemory` is !Send, which makes the `Future` in this function `!Send`, @@ -378,7 +382,19 @@ async fn generate_image_thumbnail( move || { // Handling error on receiver side - let _ = tx.send(inner_generate_image_thumbnail(file_path)); + + let _ = tx.send( + panic::catch_unwind(|| inner_generate_image_thumbnail(&file_path)).unwrap_or_else( + move |_| { + Err( + thumbnailer::NonCriticalThumbnailerError::PanicWhileGeneratingThumbnail( + file_path, + "Internal panic on third party crate".to_string(), + ), + ) + }, + ), + ); } }); diff --git a/crates/task-system/src/system.rs b/crates/task-system/src/system.rs index 904b0a5eb..f64887bc3 100644 --- a/crates/task-system/src/system.rs +++ b/crates/task-system/src/system.rs @@ -44,7 +44,7 @@ impl System { let workers_count = usize::max( std::thread::available_parallelism().map_or_else( |e| { - error!("Failed to get available parallelism in the job system: {e:#?}"); + error!(?e, "Failed to get available parallelism in the job system"); 1 }, NonZeroUsize::get, diff --git a/crates/task-system/src/worker/runner.rs b/crates/task-system/src/worker/runner.rs index d99981558..e9ab8da9e 100644 --- a/crates/task-system/src/worker/runner.rs +++ b/crates/task-system/src/worker/runner.rs @@ -1,6 +1,8 @@ use std::{ + any::Any, collections::{HashMap, VecDeque}, future::pending, + panic::AssertUnwindSafe, pin::pin, sync::{ atomic::{AtomicBool, Ordering}, @@ -53,7 +55,7 @@ struct AbortAndSuspendSignalers { struct RunningTask { id: TaskId, kind: PendingTaskKind, - handle: JoinHandle<()>, + handle: JoinHandle>>, } enum WaitingSuspendedTask { @@ -150,7 +152,7 @@ impl Runner { &mut self, task_id: TaskId, task_work_state: TaskWorkState, - ) -> JoinHandle<()> { + ) -> JoinHandle>> { let (abort_tx, abort_rx) = oneshot::channel(); let (suspend_tx, suspend_rx) = oneshot::channel(); @@ -163,13 +165,16 @@ impl Runner { ); let handle = spawn( - run_single_task( - task_work_state, - self.task_output_tx.clone(), - suspend_rx, - abort_rx, + AssertUnwindSafe( + run_single_task( + task_work_state, + self.task_output_tx.clone(), + suspend_rx, + abort_rx, + ) + .in_current_span(), ) - .in_current_span(), + .catch_unwind(), ); trace!("Task runner spawned"); @@ -624,8 +629,14 @@ impl Runner { } } - if let Err(e) = handle.await { - error!(%task_id, ?e, "Task failed to join"); + match handle.await { + Ok(Ok(())) => { /* Everything is Awesome! */ } + Ok(Err(_)) => { + error!(%task_id, "Task panicked"); + } + Err(e) => { + error!(%task_id, ?e, "Task failed to join"); + } } stolen_task_tx.close(); @@ -806,8 +817,14 @@ impl Runner { assert_eq!(*finished_task_id, old_task_id, "Task output id mismatch"); // Sanity check - if let Err(e) = handle.await { - error!(?e, "Task failed to join"); + match handle.await { + Ok(Ok(())) => { /* Everything is Awesome! */ } + Ok(Err(_)) => { + error!("Task panicked"); + } + Err(e) => { + error!(?e, "Task failed to join"); + } } if let Some((next_task_kind, task_work_state)) = self.get_next_task() {