diff --git a/core/crates/sync/src/ingest.rs b/core/crates/sync/src/ingest.rs index 2d3a34af8..d868f685d 100644 --- a/core/crates/sync/src/ingest.rs +++ b/core/crates/sync/src/ingest.rs @@ -207,7 +207,9 @@ impl Actor { } if let Some(tx) = event.wait_tx { - tx.send(()).ok(); + if tx.send(()).is_err() { + warn!("Failed to send wait_tx signal"); + } } if event.has_more { diff --git a/core/crates/sync/tests/lib.rs b/core/crates/sync/tests/lib.rs index 76d88bfe0..604739ac8 100644 --- a/core/crates/sync/tests/lib.rs +++ b/core/crates/sync/tests/lib.rs @@ -205,8 +205,7 @@ async fn no_update_after_delete() -> Result<(), Box> { ), instance1.db.location().find_many(vec![]), ) - .await - .ok(); + .await?; // one spare update operation that actually gets ignored by instance 2 assert_eq!(instance1.db.crdt_operation().count(vec![]).exec().await?, 5); diff --git a/core/crates/sync/tests/mock_instance.rs b/core/crates/sync/tests/mock_instance.rs index ed948eda6..807ccd4f6 100644 --- a/core/crates/sync/tests/mock_instance.rs +++ b/core/crates/sync/tests/mock_instance.rs @@ -146,7 +146,7 @@ impl Instance { } } ingest::Request::FinishedIngesting => { - right.sync.tx.send(SyncMessage::Ingested).ok(); + right.sync.tx.send(SyncMessage::Ingested).unwrap(); } } } diff --git a/crates/actors/src/lib.rs b/crates/actors/src/lib.rs index d10e91c1d..9c9c263fb 100644 --- a/crates/actors/src/lib.rs +++ b/crates/actors/src/lib.rs @@ -1,62 +1,93 @@ +#![warn( + clippy::all, + clippy::pedantic, + clippy::correctness, + clippy::perf, + clippy::style, + clippy::suspicious, + clippy::complexity, + clippy::nursery, + clippy::unwrap_used, + unused_qualifications, + rust_2018_idioms, + trivial_casts, + trivial_numeric_casts, + unused_allocation, + clippy::unnecessary_cast, + clippy::cast_lossless, + clippy::cast_possible_truncation, + clippy::cast_possible_wrap, + clippy::cast_precision_loss, + clippy::cast_sign_loss, + clippy::dbg_macro, + clippy::deprecated_cfg_attr, + clippy::separated_literal_suffix, + deprecated +)] +#![forbid(deprecated_in_future)] +#![allow(clippy::missing_errors_doc, clippy::module_name_repetitions)] + use std::{ collections::HashMap, future::{Future, IntoFuture}, + panic::{panic_any, AssertUnwindSafe}, pin::Pin, - sync::Arc, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, task::{Context, Poll}, + time::Duration, }; use async_channel as chan; +use futures::FutureExt; use tokio::{ - sync::{broadcast, oneshot, Mutex, RwLock}, - task::AbortHandle, + spawn, + sync::{broadcast, RwLock}, + task::JoinHandle, + time::timeout, }; use tracing::{error, instrument, warn}; -type ActorFn = dyn Fn(StopActor) -> Pin + Send>> + Send + Sync; -type ActorsMap = HashMap<&'static str, (Arc, ActorRunState)>; +const ONE_MINUTE: Duration = Duration::from_secs(60); + +type ActorFn = dyn Fn(Stopper) -> Pin + Send>> + Send + Sync; pub struct Actor { - abort_handle: Mutex>, spawn_fn: Arc, + maybe_handle: Option>, + is_running: Arc, stop_tx: chan::Sender<()>, stop_rx: chan::Receiver<()>, } -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -enum ActorRunState { - Running, - Stopped, -} - pub struct Actors { pub invalidate_rx: broadcast::Receiver<()>, invalidate_tx: broadcast::Sender<()>, - actors: Arc>, + actors: Arc>>, } impl Actors { - pub async fn declare + Send + 'static>( + pub async fn declare( self: &Arc, name: &'static str, - actor_fn: impl FnOnce(StopActor) -> F + Send + Sync + Clone + 'static, + actor_fn: impl FnOnce(Stopper) -> Fut + Send + Sync + Clone + 'static, autostart: bool, - ) { + ) where + Fut: Future + Send + 'static, + { let (stop_tx, stop_rx) = chan::bounded(1); self.actors.write().await.insert( name, - ( - Arc::new(Actor { - abort_handle: Default::default(), - spawn_fn: Arc::new(move |stop| { - Box::pin((actor_fn.clone())(stop)) as Pin> - }), - stop_tx, - stop_rx, - }), - ActorRunState::Stopped, - ), + Actor { + spawn_fn: Arc::new(move |stop| Box::pin((actor_fn.clone())(stop))), + maybe_handle: None, + is_running: Arc::new(AtomicBool::new(false)), + stop_tx, + stop_rx, + }, ); if autostart { @@ -66,119 +97,105 @@ impl Actors { #[instrument(skip(self))] pub async fn start(self: &Arc, name: &str) { - let actor = { - let mut actors = self.actors.write().await; - - let Some((actor, run_state)) = actors.get_mut(name) else { - return; - }; - - if matches!(run_state, ActorRunState::Running) { + if let Some(actor) = self.actors.write().await.get_mut(name) { + if actor.is_running.load(Ordering::Acquire) { warn!("Actor already running!"); return; } - *run_state = ActorRunState::Running; + let invalidate_tx = self.invalidate_tx.clone(); - Arc::clone(actor) - }; + let is_running = Arc::clone(&actor.is_running); - let mut abort_handle = actor.abort_handle.lock().await; - if abort_handle.is_some() { - return; - } + is_running.store(true, Ordering::Release); - let (tx, rx) = oneshot::channel(); - - let invalidate_tx = self.invalidate_tx.clone(); - - let spawn_fn = actor.spawn_fn.clone(); - let stop_actor = StopActor { - rx: actor.stop_rx.clone(), - }; - - let task = tokio::spawn(async move { - (spawn_fn)(stop_actor).await; - - tx.send(()).ok(); - }); - - *abort_handle = Some(task.abort_handle()); - invalidate_tx.send(()).ok(); - - tokio::spawn({ - let actor = actor.clone(); - async move { - #[allow(clippy::match_single_binding)] - match rx.await { - _ => {} - }; - - actor.abort_handle.lock().await.take(); - invalidate_tx.send(()).ok(); + if invalidate_tx.send(()).is_err() { + warn!("Failed to send invalidate signal"); } - }); + + if let Some(handle) = actor.maybe_handle.take() { + if handle.await.is_err() { + // This should never happen, as we're trying to catch the panic below with + // `catch_unwind`. + error!("Actor unexpectedly panicked"); + } + } + + actor.maybe_handle = Some(spawn({ + let spawn_fn = Arc::clone(&actor.spawn_fn); + + let stop_actor = Stopper(actor.stop_rx.clone()); + + async move { + if (AssertUnwindSafe((spawn_fn)(stop_actor))) + .catch_unwind() + .await + .is_err() + { + error!("Actor unexpectedly panicked"); + } + + is_running.store(false, Ordering::Release); + + if invalidate_tx.send(()).is_err() { + warn!("Failed to send invalidate signal"); + } + } + })); + } } #[instrument(skip(self))] pub async fn stop(self: &Arc, name: &str) { - let actor = { - let mut actors = self.actors.write().await; - - let Some((actor, run_state)) = actors.get_mut(name) else { - return; - }; - - if matches!(run_state, ActorRunState::Stopped) { + if let Some(actor) = self.actors.write().await.get_mut(name) { + if !actor.is_running.load(Ordering::Acquire) { warn!("Actor already stopped!"); return; } - if actor.stop_tx.send(()).await.is_err() { - error!("Failed to send stop signal to actor"); + if actor.stop_tx.send(()).await.is_ok() { + wait_stop_or_abort(actor.maybe_handle.take()).await; + + assert!( + !actor.is_running.load(Ordering::Acquire), + "actor handle finished without setting actor to stopped" + ); + } else { + error!("Failed to send stop signal to actor, will check if it's already stopped or abort otherwise"); + wait_stop_or_abort(actor.maybe_handle.take()).await; } - - *run_state = ActorRunState::Stopped; - - Arc::clone(actor) - }; - - let mut abort_handle = actor.abort_handle.lock().await; - - if let Some(abort_handle) = abort_handle.take() { - abort_handle.abort(); } } pub async fn get_state(&self) -> HashMap { - let actors = self.actors.read().await; - - let mut state = HashMap::with_capacity(actors.len()); - - for (name, (actor, _)) in actors.iter() { - state.insert(name.to_string(), actor.abort_handle.lock().await.is_some()); - } - - state + self.actors + .read() + .await + .iter() + .map(|(&name, actor)| (name.to_string(), actor.is_running.load(Ordering::Relaxed))) + .collect() } } impl Default for Actors { fn default() -> Self { - let actors = Default::default(); - let (invalidate_tx, invalidate_rx) = broadcast::channel(1); Self { - actors, + actors: Arc::default(), invalidate_rx, invalidate_tx, } } } -pub struct StopActor { - rx: chan::Receiver<()>, +pub struct Stopper(chan::Receiver<()>); + +impl Stopper { + #[must_use] + pub fn check_stop(&self) -> bool { + self.0.try_recv().is_ok() + } } pin_project_lite::pin_project! { @@ -206,13 +223,34 @@ impl Future for StopActorFuture<'_> { } } -impl<'recv> IntoFuture for &'recv StopActor { +impl<'recv> IntoFuture for &'recv Stopper { type Output = (); type IntoFuture = StopActorFuture<'recv>; fn into_future(self) -> Self::IntoFuture { - StopActorFuture { - fut: self.rx.recv(), + Self::IntoFuture { fut: self.0.recv() } + } +} + +async fn wait_stop_or_abort(maybe_handle: Option>) { + if let Some(handle) = maybe_handle { + let abort_handle = handle.abort_handle(); + + match timeout(ONE_MINUTE, handle).await { + Ok(Ok(())) => { /* Everything is Awesome! */ } + Ok(Err(e)) => { + // This should never happen, as we're trying to catch the panic with + // `catch_unwind`. + if e.is_panic() { + let p = e.into_panic(); + error!("Actor unexpectedly panicked, we will pop up the panic!"); + panic_any(p); + } + } + Err(_) => { + error!("Actor failed to gracefully stop in the allotted time, will force abortion"); + abort_handle.abort(); + } } } }