Skip to content

Commit

Permalink
chore(speare): exit now is called on init errors
Browse files Browse the repository at this point in the history
  • Loading branch information
vmenge committed Jan 2, 2025
1 parent 075cc2c commit 0903b07
Show file tree
Hide file tree
Showing 8 changed files with 31 additions and 24 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion speare/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "speare"
version = "0.1.10"
version = "0.2.0"
edition = "2021"
license = "MIT"
description = "actor-like thin abstraction over tokio::task and flume channels"
Expand Down
21 changes: 14 additions & 7 deletions speare/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,10 @@ pub trait Actor: Sized + Send + 'static {
/// when spawning or restarting it.
async fn init(ctx: &mut Ctx<Self>) -> Result<Self, Self::Err>;

/// A function that will be called if your [`Actor`] is stopped or restarted.
async fn exit(&mut self, reason: ExitReason<Self>, ctx: &mut Ctx<Self>) {}
/// A function that will be called if your [`Actor`] fails to init, is stopped or restarted.
///
/// `this` is `None` if the [`Actor`] is failing on `init`.
async fn exit(this: Option<Self>, reason: ExitReason<Self>, ctx: &mut Ctx<Self>) {}

/// Called everytime your [`Actor`] receives a message.
async fn handle(&mut self, msg: Self::Msg, ctx: &mut Ctx<Self>) -> Result<(), Self::Err> {
Expand Down Expand Up @@ -387,7 +389,7 @@ where
/// Spawned thread is owned by this [`Actor`] and managed by the tokio threadpool
///
/// An error from this task counts as an error from the [`Actor`] that spawned it, invoking [`Actor::exit`] and regular error routines.
/// When the [`Actor`] owning the task terminates, all tasks are forcefully aborted.
/// Due to being a blocking task, when the [`Actor`] owning the task terminates, this task will not be forcefully aborted.
/// See [`tokio::task::spawn_blocking`] for more on blocking tasks
pub fn subtask_blocking<F>(&mut self, f: F)
where
Expand Down Expand Up @@ -591,10 +593,11 @@ where

match Child::init(&mut ctx).await {
Err(e) => {
let shared_err = SharedErr::new(e);
let (tx, rx) = flume::unbounded();
let _ = ctx.parent_proc_msg_tx.send(ProcMsg::FromChild {
child_id: ctx.id,
err: Box::new(SharedErr::new(e)),
err: Box::new(shared_err.clone()),
ack: tx,
});
let _ = rx.recv_async().await;
Expand All @@ -620,6 +623,8 @@ where

task::yield_now().await;

Child::exit(None, ExitReason::Err(shared_err), &mut ctx).await;

if let Some(r) = restart {
r.sync().await;
spawn::<Parent, Child>(ctx, Some(r.delay))
Expand Down Expand Up @@ -701,7 +706,7 @@ where
task::yield_now().await;

let exit_reason = exit_reason.unwrap_or(ExitReason::Handle);
actor.exit(exit_reason, &mut ctx).await;
Child::exit(Some(actor), exit_reason, &mut ctx).await;

if let Some(r) = restart {
r.sync().await;
Expand Down Expand Up @@ -797,12 +802,14 @@ impl Node {

tokio::spawn(async move {
match P::init(&mut ctx).await {
Err(_) => {
Err(e) => {
for child in ctx.children_proc_msg_tx.values() {
let _ = child.send(ProcMsg::FromParent(ProcAction::Stop));
}

task::yield_now().await;

P::exit(None, ExitReason::Err(SharedErr::new(e)), &mut ctx).await;
}

Ok(mut actor) => {
Expand Down Expand Up @@ -861,7 +868,7 @@ impl Node {
task::yield_now().await;

let exit_reason = exit_reason.unwrap_or(ExitReason::Handle);
actor.exit(exit_reason, &mut ctx).await;
P::exit(Some(actor), exit_reason, &mut ctx).await;
}
}
});
Expand Down
6 changes: 3 additions & 3 deletions speare/src/req_res.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ use flume::{Receiver, Sender};
use std::{fmt, time::Duration};
use tokio::time;

/// Represents a request sent to a `Actor `.
/// `Request` holds the data sent to a `Actor ` and provides a channel to reply back to the sender.
/// Represents a request sent to a `Actor`.
/// `Request` holds the data sent to a `Actor` and provides a channel to reply back to the sender.
///
/// ## Example
/// ```
Expand Down Expand Up @@ -87,7 +87,7 @@ impl<Req, Res> Request<Req, Res> {
}
}

///`Response<Res>` is used to asynchronously wait for and retrieve the result of a `Request<Req, Res>` sent to a `Actor `.
///`Response<Res>` is used to asynchronously wait for and retrieve the result of a `Request<Req, Res>` sent to a `Actor`.
///
/// ## Example
/// ```
Expand Down
4 changes: 2 additions & 2 deletions speare/src/supervision.rs
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ pub(crate) enum Strategy {
}

#[derive(Clone, Copy, Debug)]
/// Action to take after a `Actor ` errors.
/// Action to take after a `Actor` errors.
pub enum Directive {
/// Resumes the actor(s) as if nothing happened.
Resume,
Expand All @@ -377,7 +377,7 @@ pub enum Directive {
}

#[derive(Clone, Copy, Debug)]
/// How long to wait before restarting a `Actor ` that errored.
/// How long to wait before restarting a `Actor` that errored.
pub enum Backoff {
/// Uses the same backoff duration for every restart.
Static(Duration),
Expand Down
12 changes: 6 additions & 6 deletions speare/tests/lifetime_deps.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ impl Actor for Foo {
Ok(Foo)
}

async fn exit(&mut self, _: ExitReason<Self>, ctx: &mut Ctx<Self>) {
async fn exit(_: Option<Self>, _: ExitReason<Self>, ctx: &mut Ctx<Self>) {
ctx.props().push(TestMsg::FooQuit).await;
}
}
Expand All @@ -56,7 +56,7 @@ impl Actor for Bar {
}
}

async fn exit(&mut self, _: ExitReason<Self>, ctx: &mut Ctx<Self>) {
async fn exit(_: Option<Self>, _: ExitReason<Self>, ctx: &mut Ctx<Self>) {
ctx.props().0.push(TestMsg::BarQuit).await;
}
}
Expand All @@ -75,7 +75,7 @@ impl Actor for Child1 {
Ok(Child1)
}

async fn exit(&mut self, _: ExitReason<Self>, ctx: &mut Ctx<Self>) {
async fn exit(_: Option<Self>, _: ExitReason<Self>, ctx: &mut Ctx<Self>) {
ctx.props().push(TestMsg::Child1Quit).await;
}
}
Expand All @@ -93,7 +93,7 @@ impl Actor for Child2 {
Ok(Child2)
}

async fn exit(&mut self, _: ExitReason<Self>, ctx: &mut Ctx<Self>) {
async fn exit(_: Option<Self>, _: ExitReason<Self>, ctx: &mut Ctx<Self>) {
ctx.props().push(TestMsg::Child2Quit).await;
}
}
Expand Down Expand Up @@ -134,7 +134,7 @@ async fn on_init_and_on_exit_are_called_in_order() {
#[tokio::test]
async fn order_preserved_even_with_startup_failure() {
// Arrange
let mut node = Node::default();
let node = Node::default();
let recvd: SyncVec<_> = Default::default();
node.spawn::<Foo>(recvd.clone());
let fail_to_start = true;
Expand All @@ -155,8 +155,8 @@ async fn order_preserved_even_with_startup_failure() {
TestMsg::Child2Started,
TestMsg::Child2Quit,
TestMsg::Child1Quit,
TestMsg::BarQuit,
TestMsg::FooQuit,
// No TestMsg::BarQuit because Foo failed to even start
],
recvd.clone_vec().await
)
Expand Down
6 changes: 3 additions & 3 deletions speare/tests/supervision.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ impl Actor for Child {
Ok(())
}

async fn exit(&mut self, reason: ExitReason<Self>, _: &mut Ctx<Self>) {
async fn exit(_: Option<Self>, reason: ExitReason<Self>, _: &mut Ctx<Self>) {
println!("Child exiting. {:?}", reason);
}
}
Expand Down Expand Up @@ -680,7 +680,7 @@ mod one_for_all {
Ok(Self { kid0 })
}

async fn exit(&mut self, _: ExitReason<Self>, ctx: &mut Ctx<Self>) {
async fn exit(_: Option<Self>, _: ExitReason<Self>, ctx: &mut Ctx<Self>) {
ctx.props().push("Dad::exit".to_string()).await;
}

Expand Down Expand Up @@ -708,7 +708,7 @@ mod one_for_all {
Ok(Self)
}

async fn exit(&mut self, _: ExitReason<Self>, ctx: &mut Ctx<Self>) {
async fn exit(_: Option<Self>, _: ExitReason<Self>, ctx: &mut Ctx<Self>) {
let (id, evts) = ctx.props();
evts.push(format!("Kid{id}::exit")).await;
}
Expand Down
2 changes: 1 addition & 1 deletion speare/tests/tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ impl Actor for Root {
Ok(())
}

async fn exit(&mut self, reason: ExitReason<Self>, ctx: &mut Ctx<Self>) {
async fn exit(_: Option<Self>, reason: ExitReason<Self>, ctx: &mut Ctx<Self>) {
if let ExitReason::Err(e) = reason {
ctx.props().1.push((*e).clone()).await;
}
Expand Down

0 comments on commit 0903b07

Please sign in to comment.