This commit is contained in:
Ericson "Fogo" Soares 2024-06-27 20:02:51 +00:00 committed by GitHub
commit 09f3dcee92
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 111 additions and 41 deletions

View file

@ -143,7 +143,7 @@ incremental = false
# Optimize release builds
[profile.release]
panic = "abort" # Strip expensive panic clean-up logic
panic = "unwind" # Sadly we need unwind to avoid unexpected crashes on third party crates
codegen-units = 1 # Compile crates one after another so the compiler can optimize better
lto = true # Enables link to optimizations
opt-level = "s" # Optimize for binary size

View file

@ -30,6 +30,9 @@ pub enum JobSystemError {
#[error(transparent)]
Report(#[from] ReportError),
#[error("internal job panic! <id='{0}'>")]
Panic(JobId),
}
impl From<JobSystemError> for rspc::Error {

View file

@ -13,6 +13,7 @@ use std::{
hash::{Hash, Hasher},
marker::PhantomData,
ops::{Deref, DerefMut},
panic::AssertUnwindSafe,
path::Path,
pin::pin,
sync::Arc,
@ -21,7 +22,7 @@ use std::{
use async_channel as chan;
use chrono::{DateTime, Utc};
use futures::{stream, Future, StreamExt};
use futures::{stream, Future, FutureExt, StreamExt};
use futures_concurrency::{
future::{Join, TryJoin},
stream::Merge,
@ -750,15 +751,29 @@ where
trace!("Dispatching job");
spawn(to_spawn_job::<OuterCtx, _, _>(
self.id,
self.job,
ctx.clone(),
None,
base_dispatcher,
commands_rx,
done_tx,
));
spawn({
let id = self.id;
let job = self.job;
let ctx = ctx.clone();
async move {
if AssertUnwindSafe(to_spawn_job::<OuterCtx, _, _>(
id,
job,
ctx,
None,
base_dispatcher,
commands_rx,
done_tx,
))
.catch_unwind()
.await
.is_err()
{
error!("job panicked");
}
}
});
JobHandle {
id: self.id,
@ -791,15 +806,29 @@ where
trace!("Resuming job");
spawn(to_spawn_job::<OuterCtx, _, _>(
self.id,
self.job,
ctx.clone(),
serialized_tasks,
base_dispatcher,
commands_rx,
done_tx,
));
spawn({
let id = self.id;
let job = self.job;
let ctx = ctx.clone();
async move {
if AssertUnwindSafe(to_spawn_job::<OuterCtx, _, _>(
id,
job,
ctx,
serialized_tasks,
base_dispatcher,
commands_rx,
done_tx,
))
.catch_unwind()
.await
.is_err()
{
error!("job panicked");
}
}
});
JobHandle {
id: self.id,
@ -855,9 +884,14 @@ async fn to_spawn_job<OuterCtx, JobCtx, J>(
spawn(
async move {
tx.send(job.run::<OuterCtx>(dispatcher, ctx).await)
.await
.expect("job run channel closed");
tx.send(
AssertUnwindSafe(job.run::<OuterCtx>(dispatcher, ctx))
.catch_unwind()
.await
.unwrap_or(Err(Error::JobSystem(JobSystemError::Panic(job_id)))),
)
.await
.expect("job run channel closed");
}
.in_current_span(),
);

View file

@ -14,6 +14,7 @@ use sd_file_ext::extensions::{VideoExtension, ALL_VIDEO_EXTENSIONS};
use std::{
ops::Deref,
panic,
path::{Path, PathBuf},
str::FromStr,
time::Duration,
@ -313,9 +314,9 @@ pub async fn generate_thumbnail(
}
fn inner_generate_image_thumbnail(
file_path: PathBuf,
file_path: &PathBuf,
) -> Result<Vec<u8>, thumbnailer::NonCriticalThumbnailerError> {
let mut img = format_image(&file_path).map_err(|e| {
let mut img = format_image(file_path).map_err(|e| {
thumbnailer::NonCriticalThumbnailerError::FormatImage(file_path.clone(), e.to_string())
})?;
@ -336,7 +337,7 @@ fn inner_generate_image_thumbnail(
// this corrects the rotation/flip of the image based on the *available* exif data
// not all images have exif data, so we don't error. we also don't rotate HEIF as that's against the spec
if let Some(orientation) = Orientation::from_path(&file_path) {
if let Some(orientation) = Orientation::from_path(file_path) {
if ConvertibleExtension::try_from(file_path.as_ref())
.expect("we already checked if the image was convertible")
.should_rotate()
@ -347,7 +348,10 @@ fn inner_generate_image_thumbnail(
// Create the WebP encoder for the above image
let encoder = Encoder::from_image(&img).map_err(|reason| {
thumbnailer::NonCriticalThumbnailerError::WebPEncoding(file_path, reason.to_string())
thumbnailer::NonCriticalThumbnailerError::WebPEncoding(
file_path.clone(),
reason.to_string(),
)
})?;
// Type `WebPMemory` is !Send, which makes the `Future` in this function `!Send`,
@ -378,7 +382,19 @@ async fn generate_image_thumbnail(
move || {
// Handling error on receiver side
let _ = tx.send(inner_generate_image_thumbnail(file_path));
let _ = tx.send(
panic::catch_unwind(|| inner_generate_image_thumbnail(&file_path)).unwrap_or_else(
move |_| {
Err(
thumbnailer::NonCriticalThumbnailerError::PanicWhileGeneratingThumbnail(
file_path,
"Internal panic on third party crate".to_string(),
),
)
},
),
);
}
});

View file

@ -44,7 +44,7 @@ impl<E: RunError> System<E> {
let workers_count = usize::max(
std::thread::available_parallelism().map_or_else(
|e| {
error!("Failed to get available parallelism in the job system: {e:#?}");
error!(?e, "Failed to get available parallelism in the job system");
1
},
NonZeroUsize::get,

View file

@ -1,6 +1,8 @@
use std::{
any::Any,
collections::{HashMap, VecDeque},
future::pending,
panic::AssertUnwindSafe,
pin::pin,
sync::{
atomic::{AtomicBool, Ordering},
@ -53,7 +55,7 @@ struct AbortAndSuspendSignalers {
struct RunningTask {
id: TaskId,
kind: PendingTaskKind,
handle: JoinHandle<()>,
handle: JoinHandle<Result<(), Box<dyn Any + Send>>>,
}
enum WaitingSuspendedTask {
@ -150,7 +152,7 @@ impl<E: RunError> Runner<E> {
&mut self,
task_id: TaskId,
task_work_state: TaskWorkState<E>,
) -> JoinHandle<()> {
) -> JoinHandle<Result<(), Box<dyn Any + Send>>> {
let (abort_tx, abort_rx) = oneshot::channel();
let (suspend_tx, suspend_rx) = oneshot::channel();
@ -163,13 +165,16 @@ impl<E: RunError> Runner<E> {
);
let handle = spawn(
run_single_task(
task_work_state,
self.task_output_tx.clone(),
suspend_rx,
abort_rx,
AssertUnwindSafe(
run_single_task(
task_work_state,
self.task_output_tx.clone(),
suspend_rx,
abort_rx,
)
.in_current_span(),
)
.in_current_span(),
.catch_unwind(),
);
trace!("Task runner spawned");
@ -624,8 +629,14 @@ impl<E: RunError> Runner<E> {
}
}
if let Err(e) = handle.await {
error!(%task_id, ?e, "Task failed to join");
match handle.await {
Ok(Ok(())) => { /* Everything is Awesome! */ }
Ok(Err(_)) => {
error!(%task_id, "Task panicked");
}
Err(e) => {
error!(%task_id, ?e, "Task failed to join");
}
}
stolen_task_tx.close();
@ -806,8 +817,14 @@ impl<E: RunError> Runner<E> {
assert_eq!(*finished_task_id, old_task_id, "Task output id mismatch"); // Sanity check
if let Err(e) = handle.await {
error!(?e, "Task failed to join");
match handle.await {
Ok(Ok(())) => { /* Everything is Awesome! */ }
Ok(Err(_)) => {
error!("Task panicked");
}
Err(e) => {
error!(?e, "Task failed to join");
}
}
if let Some((next_task_kind, task_work_state)) = self.get_next_task() {