mirror of
https://github.com/spacedriveapp/spacedrive
synced 2024-07-07 00:53:28 +00:00
Merge 236d7730a3
into e1ec45bf5e
This commit is contained in:
commit
fdc87cde1a
|
@ -30,6 +30,9 @@ pub enum JobSystemError {
|
|||
|
||||
#[error(transparent)]
|
||||
Report(#[from] ReportError),
|
||||
|
||||
#[error("internal job panic! <id='{0}'>")]
|
||||
Panic(JobId),
|
||||
}
|
||||
|
||||
impl From<JobSystemError> for rspc::Error {
|
||||
|
|
|
@ -13,6 +13,7 @@ use std::{
|
|||
hash::{Hash, Hasher},
|
||||
marker::PhantomData,
|
||||
ops::{Deref, DerefMut},
|
||||
panic::AssertUnwindSafe,
|
||||
path::Path,
|
||||
pin::pin,
|
||||
sync::Arc,
|
||||
|
@ -21,7 +22,7 @@ use std::{
|
|||
|
||||
use async_channel as chan;
|
||||
use chrono::{DateTime, Utc};
|
||||
use futures::{stream, Future, StreamExt};
|
||||
use futures::{stream, Future, FutureExt, StreamExt};
|
||||
use futures_concurrency::{
|
||||
future::{Join, TryJoin},
|
||||
stream::Merge,
|
||||
|
@ -750,15 +751,29 @@ where
|
|||
|
||||
trace!("Dispatching job");
|
||||
|
||||
spawn(to_spawn_job::<OuterCtx, _, _>(
|
||||
self.id,
|
||||
self.job,
|
||||
ctx.clone(),
|
||||
None,
|
||||
base_dispatcher,
|
||||
commands_rx,
|
||||
done_tx,
|
||||
));
|
||||
spawn({
|
||||
let id = self.id;
|
||||
let job = self.job;
|
||||
let ctx = ctx.clone();
|
||||
|
||||
async move {
|
||||
if AssertUnwindSafe(to_spawn_job::<OuterCtx, _, _>(
|
||||
id,
|
||||
job,
|
||||
ctx,
|
||||
None,
|
||||
base_dispatcher,
|
||||
commands_rx,
|
||||
done_tx,
|
||||
))
|
||||
.catch_unwind()
|
||||
.await
|
||||
.is_err()
|
||||
{
|
||||
error!("job panicked");
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
JobHandle {
|
||||
id: self.id,
|
||||
|
@ -791,15 +806,29 @@ where
|
|||
|
||||
trace!("Resuming job");
|
||||
|
||||
spawn(to_spawn_job::<OuterCtx, _, _>(
|
||||
self.id,
|
||||
self.job,
|
||||
ctx.clone(),
|
||||
serialized_tasks,
|
||||
base_dispatcher,
|
||||
commands_rx,
|
||||
done_tx,
|
||||
));
|
||||
spawn({
|
||||
let id = self.id;
|
||||
let job = self.job;
|
||||
let ctx = ctx.clone();
|
||||
|
||||
async move {
|
||||
if AssertUnwindSafe(to_spawn_job::<OuterCtx, _, _>(
|
||||
id,
|
||||
job,
|
||||
ctx,
|
||||
serialized_tasks,
|
||||
base_dispatcher,
|
||||
commands_rx,
|
||||
done_tx,
|
||||
))
|
||||
.catch_unwind()
|
||||
.await
|
||||
.is_err()
|
||||
{
|
||||
error!("job panicked");
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
JobHandle {
|
||||
id: self.id,
|
||||
|
@ -855,9 +884,14 @@ async fn to_spawn_job<OuterCtx, JobCtx, J>(
|
|||
|
||||
spawn(
|
||||
async move {
|
||||
tx.send(job.run::<OuterCtx>(dispatcher, ctx).await)
|
||||
.await
|
||||
.expect("job run channel closed");
|
||||
tx.send(
|
||||
AssertUnwindSafe(job.run::<OuterCtx>(dispatcher, ctx))
|
||||
.catch_unwind()
|
||||
.await
|
||||
.unwrap_or(Err(Error::JobSystem(JobSystemError::Panic(job_id)))),
|
||||
)
|
||||
.await
|
||||
.expect("job run channel closed");
|
||||
}
|
||||
.in_current_span(),
|
||||
);
|
||||
|
|
|
@ -14,6 +14,7 @@ use sd_file_ext::extensions::{VideoExtension, ALL_VIDEO_EXTENSIONS};
|
|||
|
||||
use std::{
|
||||
ops::Deref,
|
||||
panic,
|
||||
path::{Path, PathBuf},
|
||||
str::FromStr,
|
||||
time::Duration,
|
||||
|
@ -313,9 +314,9 @@ pub async fn generate_thumbnail(
|
|||
}
|
||||
|
||||
fn inner_generate_image_thumbnail(
|
||||
file_path: PathBuf,
|
||||
file_path: &PathBuf,
|
||||
) -> Result<Vec<u8>, thumbnailer::NonCriticalThumbnailerError> {
|
||||
let mut img = format_image(&file_path).map_err(|e| {
|
||||
let mut img = format_image(file_path).map_err(|e| {
|
||||
thumbnailer::NonCriticalThumbnailerError::FormatImage(file_path.clone(), e.to_string())
|
||||
})?;
|
||||
|
||||
|
@ -336,7 +337,7 @@ fn inner_generate_image_thumbnail(
|
|||
|
||||
// this corrects the rotation/flip of the image based on the *available* exif data
|
||||
// not all images have exif data, so we don't error. we also don't rotate HEIF as that's against the spec
|
||||
if let Some(orientation) = Orientation::from_path(&file_path) {
|
||||
if let Some(orientation) = Orientation::from_path(file_path) {
|
||||
if ConvertibleExtension::try_from(file_path.as_ref())
|
||||
.expect("we already checked if the image was convertible")
|
||||
.should_rotate()
|
||||
|
@ -347,7 +348,10 @@ fn inner_generate_image_thumbnail(
|
|||
|
||||
// Create the WebP encoder for the above image
|
||||
let encoder = Encoder::from_image(&img).map_err(|reason| {
|
||||
thumbnailer::NonCriticalThumbnailerError::WebPEncoding(file_path, reason.to_string())
|
||||
thumbnailer::NonCriticalThumbnailerError::WebPEncoding(
|
||||
file_path.clone(),
|
||||
reason.to_string(),
|
||||
)
|
||||
})?;
|
||||
|
||||
// Type `WebPMemory` is !Send, which makes the `Future` in this function `!Send`,
|
||||
|
@ -378,7 +382,19 @@ async fn generate_image_thumbnail(
|
|||
|
||||
move || {
|
||||
// Handling error on receiver side
|
||||
let _ = tx.send(inner_generate_image_thumbnail(file_path));
|
||||
|
||||
let _ = tx.send(
|
||||
panic::catch_unwind(|| inner_generate_image_thumbnail(&file_path)).unwrap_or_else(
|
||||
move |_| {
|
||||
Err(
|
||||
thumbnailer::NonCriticalThumbnailerError::PanicWhileGeneratingThumbnail(
|
||||
file_path,
|
||||
"Internal panic on third party crate".to_string(),
|
||||
),
|
||||
)
|
||||
},
|
||||
),
|
||||
);
|
||||
}
|
||||
});
|
||||
|
||||
|
|
|
@ -44,7 +44,7 @@ impl<E: RunError> System<E> {
|
|||
let workers_count = usize::max(
|
||||
std::thread::available_parallelism().map_or_else(
|
||||
|e| {
|
||||
error!("Failed to get available parallelism in the job system: {e:#?}");
|
||||
error!(?e, "Failed to get available parallelism in the job system");
|
||||
1
|
||||
},
|
||||
NonZeroUsize::get,
|
||||
|
|
|
@ -1,6 +1,8 @@
|
|||
use std::{
|
||||
any::Any,
|
||||
collections::{HashMap, VecDeque},
|
||||
future::pending,
|
||||
panic::AssertUnwindSafe,
|
||||
pin::pin,
|
||||
sync::{
|
||||
atomic::{AtomicBool, Ordering},
|
||||
|
@ -53,7 +55,7 @@ struct AbortAndSuspendSignalers {
|
|||
struct RunningTask {
|
||||
id: TaskId,
|
||||
kind: PendingTaskKind,
|
||||
handle: JoinHandle<()>,
|
||||
handle: JoinHandle<Result<(), Box<dyn Any + Send>>>,
|
||||
}
|
||||
|
||||
enum WaitingSuspendedTask {
|
||||
|
@ -150,7 +152,7 @@ impl<E: RunError> Runner<E> {
|
|||
&mut self,
|
||||
task_id: TaskId,
|
||||
task_work_state: TaskWorkState<E>,
|
||||
) -> JoinHandle<()> {
|
||||
) -> JoinHandle<Result<(), Box<dyn Any + Send>>> {
|
||||
let (abort_tx, abort_rx) = oneshot::channel();
|
||||
let (suspend_tx, suspend_rx) = oneshot::channel();
|
||||
|
||||
|
@ -163,13 +165,16 @@ impl<E: RunError> Runner<E> {
|
|||
);
|
||||
|
||||
let handle = spawn(
|
||||
run_single_task(
|
||||
task_work_state,
|
||||
self.task_output_tx.clone(),
|
||||
suspend_rx,
|
||||
abort_rx,
|
||||
AssertUnwindSafe(
|
||||
run_single_task(
|
||||
task_work_state,
|
||||
self.task_output_tx.clone(),
|
||||
suspend_rx,
|
||||
abort_rx,
|
||||
)
|
||||
.in_current_span(),
|
||||
)
|
||||
.in_current_span(),
|
||||
.catch_unwind(),
|
||||
);
|
||||
|
||||
trace!("Task runner spawned");
|
||||
|
@ -624,8 +629,14 @@ impl<E: RunError> Runner<E> {
|
|||
}
|
||||
}
|
||||
|
||||
if let Err(e) = handle.await {
|
||||
error!(%task_id, ?e, "Task failed to join");
|
||||
match handle.await {
|
||||
Ok(Ok(())) => { /* Everything is Awesome! */ }
|
||||
Ok(Err(_)) => {
|
||||
error!(%task_id, "Task panicked");
|
||||
}
|
||||
Err(e) => {
|
||||
error!(%task_id, ?e, "Task failed to join");
|
||||
}
|
||||
}
|
||||
|
||||
stolen_task_tx.close();
|
||||
|
@ -806,8 +817,14 @@ impl<E: RunError> Runner<E> {
|
|||
|
||||
assert_eq!(*finished_task_id, old_task_id, "Task output id mismatch"); // Sanity check
|
||||
|
||||
if let Err(e) = handle.await {
|
||||
error!(?e, "Task failed to join");
|
||||
match handle.await {
|
||||
Ok(Ok(())) => { /* Everything is Awesome! */ }
|
||||
Ok(Err(_)) => {
|
||||
error!("Task panicked");
|
||||
}
|
||||
Err(e) => {
|
||||
error!(?e, "Task failed to join");
|
||||
}
|
||||
}
|
||||
|
||||
if let Some((next_task_kind, task_work_state)) = self.get_next_task() {
|
||||
|
|
Loading…
Reference in a new issue