Catching panics

This commit is contained in:
Ericson Soares 2024-06-27 16:06:03 -03:00
parent e1ec45bf5e
commit 236d7730a3
5 changed files with 110 additions and 40 deletions

View file

@ -30,6 +30,9 @@ pub enum JobSystemError {
#[error(transparent)] #[error(transparent)]
Report(#[from] ReportError), Report(#[from] ReportError),
#[error("internal job panic! <id='{0}'>")]
Panic(JobId),
} }
impl From<JobSystemError> for rspc::Error { impl From<JobSystemError> for rspc::Error {

View file

@ -13,6 +13,7 @@ use std::{
hash::{Hash, Hasher}, hash::{Hash, Hasher},
marker::PhantomData, marker::PhantomData,
ops::{Deref, DerefMut}, ops::{Deref, DerefMut},
panic::AssertUnwindSafe,
path::Path, path::Path,
pin::pin, pin::pin,
sync::Arc, sync::Arc,
@ -21,7 +22,7 @@ use std::{
use async_channel as chan; use async_channel as chan;
use chrono::{DateTime, Utc}; use chrono::{DateTime, Utc};
use futures::{stream, Future, StreamExt}; use futures::{stream, Future, FutureExt, StreamExt};
use futures_concurrency::{ use futures_concurrency::{
future::{Join, TryJoin}, future::{Join, TryJoin},
stream::Merge, stream::Merge,
@ -750,15 +751,29 @@ where
trace!("Dispatching job"); trace!("Dispatching job");
spawn(to_spawn_job::<OuterCtx, _, _>( spawn({
self.id, let id = self.id;
self.job, let job = self.job;
ctx.clone(), let ctx = ctx.clone();
None,
base_dispatcher, async move {
commands_rx, if AssertUnwindSafe(to_spawn_job::<OuterCtx, _, _>(
done_tx, id,
)); job,
ctx,
None,
base_dispatcher,
commands_rx,
done_tx,
))
.catch_unwind()
.await
.is_err()
{
error!("job panicked");
}
}
});
JobHandle { JobHandle {
id: self.id, id: self.id,
@ -791,15 +806,29 @@ where
trace!("Resuming job"); trace!("Resuming job");
spawn(to_spawn_job::<OuterCtx, _, _>( spawn({
self.id, let id = self.id;
self.job, let job = self.job;
ctx.clone(), let ctx = ctx.clone();
serialized_tasks,
base_dispatcher, async move {
commands_rx, if AssertUnwindSafe(to_spawn_job::<OuterCtx, _, _>(
done_tx, id,
)); job,
ctx,
serialized_tasks,
base_dispatcher,
commands_rx,
done_tx,
))
.catch_unwind()
.await
.is_err()
{
error!("job panicked");
}
}
});
JobHandle { JobHandle {
id: self.id, id: self.id,
@ -855,9 +884,14 @@ async fn to_spawn_job<OuterCtx, JobCtx, J>(
spawn( spawn(
async move { async move {
tx.send(job.run::<OuterCtx>(dispatcher, ctx).await) tx.send(
.await AssertUnwindSafe(job.run::<OuterCtx>(dispatcher, ctx))
.expect("job run channel closed"); .catch_unwind()
.await
.unwrap_or(Err(Error::JobSystem(JobSystemError::Panic(job_id)))),
)
.await
.expect("job run channel closed");
} }
.in_current_span(), .in_current_span(),
); );

View file

@ -14,6 +14,7 @@ use sd_file_ext::extensions::{VideoExtension, ALL_VIDEO_EXTENSIONS};
use std::{ use std::{
ops::Deref, ops::Deref,
panic,
path::{Path, PathBuf}, path::{Path, PathBuf},
str::FromStr, str::FromStr,
time::Duration, time::Duration,
@ -313,9 +314,9 @@ pub async fn generate_thumbnail(
} }
fn inner_generate_image_thumbnail( fn inner_generate_image_thumbnail(
file_path: PathBuf, file_path: &PathBuf,
) -> Result<Vec<u8>, thumbnailer::NonCriticalThumbnailerError> { ) -> Result<Vec<u8>, thumbnailer::NonCriticalThumbnailerError> {
let mut img = format_image(&file_path).map_err(|e| { let mut img = format_image(file_path).map_err(|e| {
thumbnailer::NonCriticalThumbnailerError::FormatImage(file_path.clone(), e.to_string()) thumbnailer::NonCriticalThumbnailerError::FormatImage(file_path.clone(), e.to_string())
})?; })?;
@ -336,7 +337,7 @@ fn inner_generate_image_thumbnail(
// this corrects the rotation/flip of the image based on the *available* exif data // this corrects the rotation/flip of the image based on the *available* exif data
// not all images have exif data, so we don't error. we also don't rotate HEIF as that's against the spec // not all images have exif data, so we don't error. we also don't rotate HEIF as that's against the spec
if let Some(orientation) = Orientation::from_path(&file_path) { if let Some(orientation) = Orientation::from_path(file_path) {
if ConvertibleExtension::try_from(file_path.as_ref()) if ConvertibleExtension::try_from(file_path.as_ref())
.expect("we already checked if the image was convertible") .expect("we already checked if the image was convertible")
.should_rotate() .should_rotate()
@ -347,7 +348,10 @@ fn inner_generate_image_thumbnail(
// Create the WebP encoder for the above image // Create the WebP encoder for the above image
let encoder = Encoder::from_image(&img).map_err(|reason| { let encoder = Encoder::from_image(&img).map_err(|reason| {
thumbnailer::NonCriticalThumbnailerError::WebPEncoding(file_path, reason.to_string()) thumbnailer::NonCriticalThumbnailerError::WebPEncoding(
file_path.clone(),
reason.to_string(),
)
})?; })?;
// Type `WebPMemory` is !Send, which makes the `Future` in this function `!Send`, // Type `WebPMemory` is !Send, which makes the `Future` in this function `!Send`,
@ -378,7 +382,19 @@ async fn generate_image_thumbnail(
move || { move || {
// Handling error on receiver side // Handling error on receiver side
let _ = tx.send(inner_generate_image_thumbnail(file_path));
let _ = tx.send(
panic::catch_unwind(|| inner_generate_image_thumbnail(&file_path)).unwrap_or_else(
move |_| {
Err(
thumbnailer::NonCriticalThumbnailerError::PanicWhileGeneratingThumbnail(
file_path,
"Internal panic on third party crate".to_string(),
),
)
},
),
);
} }
}); });

View file

@ -44,7 +44,7 @@ impl<E: RunError> System<E> {
let workers_count = usize::max( let workers_count = usize::max(
std::thread::available_parallelism().map_or_else( std::thread::available_parallelism().map_or_else(
|e| { |e| {
error!("Failed to get available parallelism in the job system: {e:#?}"); error!(?e, "Failed to get available parallelism in the job system");
1 1
}, },
NonZeroUsize::get, NonZeroUsize::get,

View file

@ -1,6 +1,8 @@
use std::{ use std::{
any::Any,
collections::{HashMap, VecDeque}, collections::{HashMap, VecDeque},
future::pending, future::pending,
panic::AssertUnwindSafe,
pin::pin, pin::pin,
sync::{ sync::{
atomic::{AtomicBool, Ordering}, atomic::{AtomicBool, Ordering},
@ -53,7 +55,7 @@ struct AbortAndSuspendSignalers {
struct RunningTask { struct RunningTask {
id: TaskId, id: TaskId,
kind: PendingTaskKind, kind: PendingTaskKind,
handle: JoinHandle<()>, handle: JoinHandle<Result<(), Box<dyn Any + Send>>>,
} }
enum WaitingSuspendedTask { enum WaitingSuspendedTask {
@ -150,7 +152,7 @@ impl<E: RunError> Runner<E> {
&mut self, &mut self,
task_id: TaskId, task_id: TaskId,
task_work_state: TaskWorkState<E>, task_work_state: TaskWorkState<E>,
) -> JoinHandle<()> { ) -> JoinHandle<Result<(), Box<dyn Any + Send>>> {
let (abort_tx, abort_rx) = oneshot::channel(); let (abort_tx, abort_rx) = oneshot::channel();
let (suspend_tx, suspend_rx) = oneshot::channel(); let (suspend_tx, suspend_rx) = oneshot::channel();
@ -163,13 +165,16 @@ impl<E: RunError> Runner<E> {
); );
let handle = spawn( let handle = spawn(
run_single_task( AssertUnwindSafe(
task_work_state, run_single_task(
self.task_output_tx.clone(), task_work_state,
suspend_rx, self.task_output_tx.clone(),
abort_rx, suspend_rx,
abort_rx,
)
.in_current_span(),
) )
.in_current_span(), .catch_unwind(),
); );
trace!("Task runner spawned"); trace!("Task runner spawned");
@ -624,8 +629,14 @@ impl<E: RunError> Runner<E> {
} }
} }
if let Err(e) = handle.await { match handle.await {
error!(%task_id, ?e, "Task failed to join"); Ok(Ok(())) => { /* Everything is Awesome! */ }
Ok(Err(_)) => {
error!(%task_id, "Task panicked");
}
Err(e) => {
error!(%task_id, ?e, "Task failed to join");
}
} }
stolen_task_tx.close(); stolen_task_tx.close();
@ -806,8 +817,14 @@ impl<E: RunError> Runner<E> {
assert_eq!(*finished_task_id, old_task_id, "Task output id mismatch"); // Sanity check assert_eq!(*finished_task_id, old_task_id, "Task output id mismatch"); // Sanity check
if let Err(e) = handle.await { match handle.await {
error!(?e, "Task failed to join"); Ok(Ok(())) => { /* Everything is Awesome! */ }
Ok(Err(_)) => {
error!("Task panicked");
}
Err(e) => {
error!(?e, "Task failed to join");
}
} }
if let Some((next_task_kind, task_work_state)) = self.get_next_task() { if let Some((next_task_kind, task_work_state)) = self.get_next_task() {