Trying to avoid data loss on actor stop

This commit is contained in:
Ericson Soares 2024-06-20 15:48:17 -03:00
parent f72166fe70
commit 6cdaed47ec
4 changed files with 151 additions and 112 deletions

View file

@ -207,7 +207,9 @@ impl Actor {
}
if let Some(tx) = event.wait_tx {
tx.send(()).ok();
if tx.send(()).is_err() {
warn!("Failed to send wait_tx signal");
}
}
if event.has_more {

View file

@ -205,8 +205,7 @@ async fn no_update_after_delete() -> Result<(), Box<dyn std::error::Error>> {
),
instance1.db.location().find_many(vec![]),
)
.await
.ok();
.await?;
// one spare update operation that actually gets ignored by instance 2
assert_eq!(instance1.db.crdt_operation().count(vec![]).exec().await?, 5);

View file

@ -146,7 +146,7 @@ impl Instance {
}
}
ingest::Request::FinishedIngesting => {
right.sync.tx.send(SyncMessage::Ingested).ok();
right.sync.tx.send(SyncMessage::Ingested).unwrap();
}
}
}

View file

@ -1,62 +1,93 @@
#![warn(
clippy::all,
clippy::pedantic,
clippy::correctness,
clippy::perf,
clippy::style,
clippy::suspicious,
clippy::complexity,
clippy::nursery,
clippy::unwrap_used,
unused_qualifications,
rust_2018_idioms,
trivial_casts,
trivial_numeric_casts,
unused_allocation,
clippy::unnecessary_cast,
clippy::cast_lossless,
clippy::cast_possible_truncation,
clippy::cast_possible_wrap,
clippy::cast_precision_loss,
clippy::cast_sign_loss,
clippy::dbg_macro,
clippy::deprecated_cfg_attr,
clippy::separated_literal_suffix,
deprecated
)]
#![forbid(deprecated_in_future)]
#![allow(clippy::missing_errors_doc, clippy::module_name_repetitions)]
use std::{
collections::HashMap,
future::{Future, IntoFuture},
panic::{panic_any, AssertUnwindSafe},
pin::Pin,
sync::Arc,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
task::{Context, Poll},
time::Duration,
};
use async_channel as chan;
use futures::FutureExt;
use tokio::{
sync::{broadcast, oneshot, Mutex, RwLock},
task::AbortHandle,
spawn,
sync::{broadcast, RwLock},
task::JoinHandle,
time::timeout,
};
use tracing::{error, instrument, warn};
type ActorFn = dyn Fn(StopActor) -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync;
type ActorsMap = HashMap<&'static str, (Arc<Actor>, ActorRunState)>;
const ONE_MINUTE: Duration = Duration::from_secs(60);
type ActorFn = dyn Fn(Stopper) -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync;
pub struct Actor {
abort_handle: Mutex<Option<AbortHandle>>,
spawn_fn: Arc<ActorFn>,
maybe_handle: Option<JoinHandle<()>>,
is_running: Arc<AtomicBool>,
stop_tx: chan::Sender<()>,
stop_rx: chan::Receiver<()>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum ActorRunState {
Running,
Stopped,
}
pub struct Actors {
pub invalidate_rx: broadcast::Receiver<()>,
invalidate_tx: broadcast::Sender<()>,
actors: Arc<RwLock<ActorsMap>>,
actors: Arc<RwLock<HashMap<&'static str, Actor>>>,
}
impl Actors {
pub async fn declare<F: Future<Output = ()> + Send + 'static>(
pub async fn declare<Fut>(
self: &Arc<Self>,
name: &'static str,
actor_fn: impl FnOnce(StopActor) -> F + Send + Sync + Clone + 'static,
actor_fn: impl FnOnce(Stopper) -> Fut + Send + Sync + Clone + 'static,
autostart: bool,
) {
) where
Fut: Future<Output = ()> + Send + 'static,
{
let (stop_tx, stop_rx) = chan::bounded(1);
self.actors.write().await.insert(
name,
(
Arc::new(Actor {
abort_handle: Default::default(),
spawn_fn: Arc::new(move |stop| {
Box::pin((actor_fn.clone())(stop)) as Pin<Box<_>>
}),
stop_tx,
stop_rx,
}),
ActorRunState::Stopped,
),
Actor {
spawn_fn: Arc::new(move |stop| Box::pin((actor_fn.clone())(stop))),
maybe_handle: None,
is_running: Arc::new(AtomicBool::new(false)),
stop_tx,
stop_rx,
},
);
if autostart {
@ -66,119 +97,105 @@ impl Actors {
#[instrument(skip(self))]
pub async fn start(self: &Arc<Self>, name: &str) {
let actor = {
let mut actors = self.actors.write().await;
let Some((actor, run_state)) = actors.get_mut(name) else {
return;
};
if matches!(run_state, ActorRunState::Running) {
if let Some(actor) = self.actors.write().await.get_mut(name) {
if actor.is_running.load(Ordering::Acquire) {
warn!("Actor already running!");
return;
}
*run_state = ActorRunState::Running;
let invalidate_tx = self.invalidate_tx.clone();
Arc::clone(actor)
};
let is_running = Arc::clone(&actor.is_running);
let mut abort_handle = actor.abort_handle.lock().await;
if abort_handle.is_some() {
return;
}
is_running.store(true, Ordering::Release);
let (tx, rx) = oneshot::channel();
let invalidate_tx = self.invalidate_tx.clone();
let spawn_fn = actor.spawn_fn.clone();
let stop_actor = StopActor {
rx: actor.stop_rx.clone(),
};
let task = tokio::spawn(async move {
(spawn_fn)(stop_actor).await;
tx.send(()).ok();
});
*abort_handle = Some(task.abort_handle());
invalidate_tx.send(()).ok();
tokio::spawn({
let actor = actor.clone();
async move {
#[allow(clippy::match_single_binding)]
match rx.await {
_ => {}
};
actor.abort_handle.lock().await.take();
invalidate_tx.send(()).ok();
if invalidate_tx.send(()).is_err() {
warn!("Failed to send invalidate signal");
}
});
if let Some(handle) = actor.maybe_handle.take() {
if handle.await.is_err() {
// This should never happen, as we're trying to catch the panic below with
// `catch_unwind`.
error!("Actor unexpectedly panicked");
}
}
actor.maybe_handle = Some(spawn({
let spawn_fn = Arc::clone(&actor.spawn_fn);
let stop_actor = Stopper(actor.stop_rx.clone());
async move {
if (AssertUnwindSafe((spawn_fn)(stop_actor)))
.catch_unwind()
.await
.is_err()
{
error!("Actor unexpectedly panicked");
}
is_running.store(false, Ordering::Release);
if invalidate_tx.send(()).is_err() {
warn!("Failed to send invalidate signal");
}
}
}));
}
}
#[instrument(skip(self))]
pub async fn stop(self: &Arc<Self>, name: &str) {
let actor = {
let mut actors = self.actors.write().await;
let Some((actor, run_state)) = actors.get_mut(name) else {
return;
};
if matches!(run_state, ActorRunState::Stopped) {
if let Some(actor) = self.actors.write().await.get_mut(name) {
if !actor.is_running.load(Ordering::Acquire) {
warn!("Actor already stopped!");
return;
}
if actor.stop_tx.send(()).await.is_err() {
error!("Failed to send stop signal to actor");
if actor.stop_tx.send(()).await.is_ok() {
wait_stop_or_abort(actor.maybe_handle.take()).await;
assert!(
!actor.is_running.load(Ordering::Acquire),
"actor handle finished without setting actor to stopped"
);
} else {
error!("Failed to send stop signal to actor, will check if it's already stopped or abort otherwise");
wait_stop_or_abort(actor.maybe_handle.take()).await;
}
*run_state = ActorRunState::Stopped;
Arc::clone(actor)
};
let mut abort_handle = actor.abort_handle.lock().await;
if let Some(abort_handle) = abort_handle.take() {
abort_handle.abort();
}
}
pub async fn get_state(&self) -> HashMap<String, bool> {
let actors = self.actors.read().await;
let mut state = HashMap::with_capacity(actors.len());
for (name, (actor, _)) in actors.iter() {
state.insert(name.to_string(), actor.abort_handle.lock().await.is_some());
}
state
self.actors
.read()
.await
.iter()
.map(|(&name, actor)| (name.to_string(), actor.is_running.load(Ordering::Relaxed)))
.collect()
}
}
impl Default for Actors {
fn default() -> Self {
let actors = Default::default();
let (invalidate_tx, invalidate_rx) = broadcast::channel(1);
Self {
actors,
actors: Arc::default(),
invalidate_rx,
invalidate_tx,
}
}
}
pub struct StopActor {
rx: chan::Receiver<()>,
pub struct Stopper(chan::Receiver<()>);
impl Stopper {
#[must_use]
pub fn check_stop(&self) -> bool {
self.0.try_recv().is_ok()
}
}
pin_project_lite::pin_project! {
@ -206,13 +223,34 @@ impl Future for StopActorFuture<'_> {
}
}
impl<'recv> IntoFuture for &'recv StopActor {
impl<'recv> IntoFuture for &'recv Stopper {
type Output = ();
type IntoFuture = StopActorFuture<'recv>;
fn into_future(self) -> Self::IntoFuture {
StopActorFuture {
fut: self.rx.recv(),
Self::IntoFuture { fut: self.0.recv() }
}
}
async fn wait_stop_or_abort(maybe_handle: Option<JoinHandle<()>>) {
if let Some(handle) = maybe_handle {
let abort_handle = handle.abort_handle();
match timeout(ONE_MINUTE, handle).await {
Ok(Ok(())) => { /* Everything is Awesome! */ }
Ok(Err(e)) => {
// This should never happen, as we're trying to catch the panic with
// `catch_unwind`.
if e.is_panic() {
let p = e.into_panic();
error!("Actor unexpectedly panicked, we will pop up the panic!");
panic_any(p);
}
}
Err(_) => {
error!("Actor failed to gracefully stop in the allotted time, will force abortion");
abort_handle.abort();
}
}
}
}