Skip to content

Commit

Permalink
tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
vmenge committed Dec 30, 2024
1 parent 32d0875 commit 32a3022
Show file tree
Hide file tree
Showing 6 changed files with 136 additions and 384 deletions.
93 changes: 0 additions & 93 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ readme = "../README.md"
async-trait = { version = "0.1.73" }
flume = "0.11.0"
tokio = { version = "1.35.1", features = ["macros", "rt", "sync", "time"] }
futures = "0.3.31"

[dev-dependencies]
tokio = { version = "1.35.1", features = [
Expand Down
76 changes: 65 additions & 11 deletions core/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use async_trait::async_trait;
use flume::{Receiver, Sender};
use futures::Stream;
use std::{any::Any, collections::HashMap, future::Future, time::Duration};
use tokio::{
task,
Expand All @@ -9,12 +8,10 @@ use tokio::{

mod exit;
mod req_res;
mod stream;
mod supervision;

pub use exit::*;
pub use req_res::*;
pub use stream::*;
pub use supervision::*;

/// A thin abstraction over tokio tasks and flume channels, allowing for easy message passing
Expand Down Expand Up @@ -303,6 +300,7 @@ where
children_proc_msg_tx: HashMap<u64, Sender<ProcMsg>>,
supervision: Supervision,
total_children: u64,
tasks: Vec<task::JoinHandle<()>>,
}

impl<P> Ctx<P>
Expand Down Expand Up @@ -356,6 +354,7 @@ where
children_proc_msg_tx: Default::default(),
total_children: 0,
supervision,
tasks: vec![],
};

spawn::<P, Child>(ctx, None);
Expand All @@ -366,15 +365,43 @@ where
handle
}

pub fn stream<F, Fut, S, T, E>(&mut self, f: F) -> StreamBuilder<'_, P, F, Fut, S, T, E, NoSink>
/// Spawns a task owned by this [`Process`].
/// An error from this task counts as an error from the [`Actor`] that spawned it, invoking [`Actor::exit`] and regular error routines.
/// When the [`Actor`] owning the task terminates, all tasks are forcefully aborted.
pub fn subtask<F>(&mut self, future: F)
where
F: Fn() -> Fut + Send + 'static,
Fut: Future<Output = S> + Send + 'static,
S: Stream<Item = Result<T, E>> + Send + 'static + Unpin,
T: Send + 'static,
E: Send + Sync + 'static,
F: Future<Output = Result<(), P::Err>> + Send + 'static,
{
StreamBuilder::new(f, self)
let proc_msg_tx = self.handle.proc_msg_tx.clone();

let task = task::spawn(async move {
if let Err(e) = future.await {
let _ = proc_msg_tx.send(ProcMsg::TaskErr(Box::new(e)));
}
});

self.tasks.push(task);
}

/// Runs the provided closure on a thread where blocking is acceptable.
/// Spawned thread is owned by this [`Actor`] and managed by the tokio threadpool
///
/// An error from this task counts as an error from the [`Actor`] that spawned it, invoking [`Actor::exit`] and regular error routines.
/// When the [`Actor`] owning the task terminates, all tasks are forcefully aborted.
/// See [`tokio::task::spawn_blocking`] for more on blocking tasks
pub fn subtask_blocking<F>(&mut self, f: F)
where
F: FnOnce() -> Result<(), P::Err> + Send + 'static,
{
let proc_msg_tx = self.handle.proc_msg_tx.clone();

let task = task::spawn_blocking(move || {
if let Err(e) = f() {
let _ = proc_msg_tx.send(ProcMsg::TaskErr(Box::new(e)));
}
});

self.tasks.push(task);
}

async fn handle_err(
Expand Down Expand Up @@ -500,7 +527,7 @@ enum ProcMsg {
err: Box<dyn Any + Send>,
ack: Sender<()>,
},

TaskErr(Box<dyn Any + Send>),
FromParent(ProcAction),
FromHandle(ProcAction),
}
Expand Down Expand Up @@ -587,6 +614,10 @@ where
let _ = child.send(ProcMsg::FromParent(ProcAction::Stop));
}

for task in &ctx.tasks {
task.abort();
}

task::yield_now().await;

if let Some(r) = restart {
Expand All @@ -606,6 +637,13 @@ where
match proc_msg {
Err(_) => break,

Ok(ProcMsg::TaskErr(e)) => {
let e: Box<Child::Err> = e.downcast().unwrap();
let e = SharedErr::new(*e);
exit_reason = Some(ExitReason::Err(e));
break;
}

Ok(ProcMsg::FromHandle(ProcAction::Stop) ) => {
exit_reason = Some(ExitReason::Handle);
break
Expand Down Expand Up @@ -656,6 +694,10 @@ where
let _ = child.send(ProcMsg::FromParent(ProcAction::Stop));
}

for task in &ctx.tasks {
task.abort();
}

task::yield_now().await;

let exit_reason = exit_reason.unwrap_or(ExitReason::Handle);
Expand Down Expand Up @@ -750,6 +792,7 @@ impl Node {
proc_msg_rx: directive_rx,
children_proc_msg_tx: Default::default(),
supervision,
tasks: vec![],
};

tokio::spawn(async move {
Expand All @@ -772,6 +815,13 @@ impl Node {
match proc_msg {
Err(_) => break,

Ok(ProcMsg::TaskErr(e)) => {
let e: Box<P::Err> = e.downcast().unwrap();
let e = SharedErr::new(*e);
exit_reason = Some(ExitReason::Err(e));
break;
}

Ok(ProcMsg::FromHandle(ProcAction::Stop) | ProcMsg::FromParent(ProcAction::Stop)) => {
exit_reason = Some(ExitReason::Handle);
break
Expand Down Expand Up @@ -804,6 +854,10 @@ impl Node {
let _ = child.send(ProcMsg::FromParent(ProcAction::Stop));
}

for task in &ctx.tasks {
task.abort();
}

task::yield_now().await;

let exit_reason = exit_reason.unwrap_or(ExitReason::Handle);
Expand Down
Loading

0 comments on commit 32a3022

Please sign in to comment.