From cf9bfc9a36bddab9cf558beb59caa10d152c6f05 Mon Sep 17 00:00:00 2001 From: vmenge Date: Mon, 30 Dec 2024 20:40:11 +0100 Subject: [PATCH] feat(core): tasks --- Cargo.lock | 93 ------------------- core/Cargo.toml | 1 - core/src/lib.rs | 76 ++++++++++++--- core/src/stream.rs | 213 ------------------------------------------- core/tests/stream.rs | 66 -------------- core/tests/tasks.rs | 71 +++++++++++++++ 6 files changed, 136 insertions(+), 384 deletions(-) delete mode 100644 core/src/stream.rs delete mode 100644 core/tests/stream.rs create mode 100644 core/tests/tasks.rs diff --git a/Cargo.lock b/Cargo.lock index a7d0228..017853a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -101,95 +101,18 @@ dependencies = [ "spin", ] -[[package]] -name = "futures" -version = "0.3.31" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "65bc07b1a8bc7c85c5f2e110c476c7389b4554ba72af57d8445ea63a576b0876" -dependencies = [ - "futures-channel", - "futures-core", - "futures-executor", - "futures-io", - "futures-sink", - "futures-task", - "futures-util", -] - -[[package]] -name = "futures-channel" -version = "0.3.31" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2dff15bf788c671c1934e366d07e30c1814a8ef514e1af724a602e8a2fbe1b10" -dependencies = [ - "futures-core", - "futures-sink", -] - [[package]] name = "futures-core" version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e" -[[package]] -name = "futures-executor" -version = "0.3.31" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1e28d1d997f585e54aebc3f97d39e72338912123a67330d723fdbb564d646c9f" -dependencies = [ - "futures-core", - "futures-task", - "futures-util", -] - -[[package]] -name = "futures-io" -version = "0.3.31" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6" - -[[package]] -name = "futures-macro" -version = "0.3.31" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650" -dependencies = [ - "proc-macro2", - "quote", - "syn 2.0.92", -] - [[package]] name = "futures-sink" version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e575fab7d1e0dcb8d0c7bcf9a63ee213816ab51902e6d244a95819acacf1d4f7" -[[package]] -name = "futures-task" -version = "0.3.31" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f90f7dce0722e95104fcb095585910c0977252f286e354b5e3bd38902cd99988" - -[[package]] -name = "futures-util" -version = "0.3.31" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81" -dependencies = [ - "futures-channel", - "futures-core", - "futures-io", - "futures-macro", - "futures-sink", - "futures-task", - "memchr", - "pin-project-lite", - "pin-utils", - "slab", -] - [[package]] name = "getrandom" version = "0.2.10" @@ -301,12 +224,6 @@ version = "0.2.13" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8afb450f006bf6385ca15ef45d71d2288452bc3683ce2e2cacc0d18e4be60b58" -[[package]] -name = "pin-utils" -version = "0.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" - [[package]] name = "proc-macro2" version = "1.0.92" @@ -352,15 +269,6 @@ version = "1.0.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b97ed7a9823b74f99c7742f5336af7be5ecd3eeafcb1507d1fa93347b1d589b0" -[[package]] -name = "slab" -version = "0.4.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8f92a496fb766b417c996b9c5e57daf2f7ad3b0bebe1ccfca4856390e3d3bb67" -dependencies = [ - "autocfg", -] - [[package]] name = "speare" version = "0.1.10" @@ -368,7 +276,6 @@ dependencies = [ "async-trait", "derive_more", "flume", - "futures", "tokio", ] diff --git a/core/Cargo.toml b/core/Cargo.toml index d4e472c..107996f 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -12,7 +12,6 @@ readme = "../README.md" async-trait = { version = "0.1.73" } flume = "0.11.0" tokio = { version = "1.35.1", features = ["macros", "rt", "sync", "time"] } -futures = "0.3.31" [dev-dependencies] tokio = { version = "1.35.1", features = [ diff --git a/core/src/lib.rs b/core/src/lib.rs index 08bb9bd..16c8f2d 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -1,6 +1,5 @@ use async_trait::async_trait; use flume::{Receiver, Sender}; -use futures::Stream; use std::{any::Any, collections::HashMap, future::Future, time::Duration}; use tokio::{ task, @@ -9,12 +8,10 @@ use tokio::{ mod exit; mod req_res; -mod stream; mod supervision; pub use exit::*; pub use req_res::*; -pub use stream::*; pub use supervision::*; /// A thin abstraction over tokio tasks and flume channels, allowing for easy message passing @@ -303,6 +300,7 @@ where children_proc_msg_tx: HashMap>, supervision: Supervision, total_children: u64, + tasks: Vec>, } impl

Ctx

@@ -356,6 +354,7 @@ where children_proc_msg_tx: Default::default(), total_children: 0, supervision, + tasks: vec![], }; spawn::(ctx, None); @@ -366,15 +365,43 @@ where handle } - pub fn stream(&mut self, f: F) -> StreamBuilder<'_, P, F, Fut, S, T, E, NoSink> + /// Spawns a task owned by this [`Process`]. + /// An error from this task counts as an error from the [`Actor`] that spawned it, invoking [`Actor::exit`] and regular error routines. + /// When the [`Actor`] owning the task terminates, all tasks are forcefully aborted. + pub fn subtask(&mut self, future: F) where - F: Fn() -> Fut + Send + 'static, - Fut: Future + Send + 'static, - S: Stream> + Send + 'static + Unpin, - T: Send + 'static, - E: Send + Sync + 'static, + F: Future> + Send + 'static, { - StreamBuilder::new(f, self) + let proc_msg_tx = self.handle.proc_msg_tx.clone(); + + let task = task::spawn(async move { + if let Err(e) = future.await { + let _ = proc_msg_tx.send(ProcMsg::TaskErr(Box::new(e))); + } + }); + + self.tasks.push(task); + } + + /// Runs the provided closure on a thread where blocking is acceptable. + /// Spawned thread is owned by this [`Actor`] and managed by the tokio threadpool + /// + /// An error from this task counts as an error from the [`Actor`] that spawned it, invoking [`Actor::exit`] and regular error routines. + /// When the [`Actor`] owning the task terminates, all tasks are forcefully aborted. + /// See [`tokio::task::spawn_blocking`] for more on blocking tasks + pub fn subtask_blocking(&mut self, f: F) + where + F: FnOnce() -> Result<(), P::Err> + Send + 'static, + { + let proc_msg_tx = self.handle.proc_msg_tx.clone(); + + let task = task::spawn_blocking(move || { + if let Err(e) = f() { + let _ = proc_msg_tx.send(ProcMsg::TaskErr(Box::new(e))); + } + }); + + self.tasks.push(task); } async fn handle_err( @@ -500,7 +527,7 @@ enum ProcMsg { err: Box, ack: Sender<()>, }, - + TaskErr(Box), FromParent(ProcAction), FromHandle(ProcAction), } @@ -587,6 +614,10 @@ where let _ = child.send(ProcMsg::FromParent(ProcAction::Stop)); } + for task in &ctx.tasks { + task.abort(); + } + task::yield_now().await; if let Some(r) = restart { @@ -606,6 +637,13 @@ where match proc_msg { Err(_) => break, + Ok(ProcMsg::TaskErr(e)) => { + let e: Box = e.downcast().unwrap(); + let e = SharedErr::new(*e); + exit_reason = Some(ExitReason::Err(e)); + break; + } + Ok(ProcMsg::FromHandle(ProcAction::Stop) ) => { exit_reason = Some(ExitReason::Handle); break @@ -656,6 +694,10 @@ where let _ = child.send(ProcMsg::FromParent(ProcAction::Stop)); } + for task in &ctx.tasks { + task.abort(); + } + task::yield_now().await; let exit_reason = exit_reason.unwrap_or(ExitReason::Handle); @@ -750,6 +792,7 @@ impl Node { proc_msg_rx: directive_rx, children_proc_msg_tx: Default::default(), supervision, + tasks: vec![], }; tokio::spawn(async move { @@ -772,6 +815,13 @@ impl Node { match proc_msg { Err(_) => break, + Ok(ProcMsg::TaskErr(e)) => { + let e: Box = e.downcast().unwrap(); + let e = SharedErr::new(*e); + exit_reason = Some(ExitReason::Err(e)); + break; + } + Ok(ProcMsg::FromHandle(ProcAction::Stop) | ProcMsg::FromParent(ProcAction::Stop)) => { exit_reason = Some(ExitReason::Handle); break @@ -804,6 +854,10 @@ impl Node { let _ = child.send(ProcMsg::FromParent(ProcAction::Stop)); } + for task in &ctx.tasks { + task.abort(); + } + task::yield_now().await; let exit_reason = exit_reason.unwrap_or(ExitReason::Handle); diff --git a/core/src/stream.rs b/core/src/stream.rs deleted file mode 100644 index 80320b5..0000000 --- a/core/src/stream.rs +++ /dev/null @@ -1,213 +0,0 @@ -use crate::{Ctx, Handle, Process}; -use async_trait::async_trait; -use futures::{pin_mut, Stream, StreamExt}; -use std::future::Future; -use std::marker::PhantomData; - -// Streams are spanwed by other processes, which means they are killed when the parent process is killed. - -struct Source { - stream: S, - _marker: std::marker::PhantomData<(F, Fut, S, T, E, Snk)>, -} - -pub trait Sink { - fn consume(&self, item: T) -> impl Future + Send; -} - -struct Props { - init: F, - sink: Snk, -} - -// i love rust haha -#[async_trait] -impl Process for Source -where - F: Fn() -> Fut + Send + 'static, - Fut: Future + Send + 'static, - S: Stream> + Send + 'static + Unpin, - T: Send + 'static, - E: Send + Sync + 'static, - Snk: Sink + Send + 'static, -{ - type Props = Props; - type Msg = (); - type Err = E; - - async fn init(ctx: &mut Ctx) -> Result { - let stream = (ctx.props().init)().await; - - Ok(Self { - stream, - _marker: PhantomData, - }) - } - - async fn handle(&mut self, _: Self::Msg, ctx: &mut Ctx) -> Result<(), Self::Err> { - let stream = &mut self.stream; - pin_mut!(stream); - - let mut fut = stream.next(); - - loop { - tokio::select! { - biased; - - msg = ctx.proc_msg_rx.recv_async() => { - if let Ok(msg) = msg { - let _ = ctx.this().proc_msg_tx.send(msg); - break; - } - } - - item = &mut fut => { - match item { - None => { - ctx.this().stop(); - break; - } - - Some(res) => { - ctx.props().sink.consume(res?).await; - fut = stream.next(); - } - }; - } - } - } - - Ok(()) - } -} - -#[derive(Clone)] -pub struct StreamHandle { - handle: Handle<()>, -} - -impl StreamHandle { - pub fn stop(&self) { - self.handle.stop(); - } - - pub fn is_alive(&self) -> bool { - self.handle.is_alive() - } -} - -pub struct NoSink; - -impl Sink for NoSink -where - T: Send, -{ - async fn consume(&self, _item: T) {} -} - -pub struct StreamBuilder<'a, P, F, Fut, S, T, E, Snk> -where - P: Process, -{ - init: F, - sink: Snk, - ctx: &'a mut Ctx

, - _marker: PhantomData<(Fut, S, T, E)>, -} - -impl<'a, P, F, Fut, S, T, E> StreamBuilder<'a, P, F, Fut, S, T, E, NoSink> -where - P: Process, - F: Fn() -> Fut + Send + 'static, - Fut: Future + Send + 'static, - S: Stream> + Send + 'static + Unpin, - T: Send + 'static, - E: Send + Sync + 'static, -{ - pub fn new(init: F, ctx: &'a mut Ctx

) -> Self { - Self { - init, - sink: NoSink, - ctx, - _marker: PhantomData, - } - } - - pub fn sink(self, sink: Snk) -> StreamBuilder<'a, P, F, Fut, S, T, E, Snk> - where - Snk: Sink + Send + Sync + 'static, - { - StreamBuilder { - init: self.init, - sink, - ctx: self.ctx, - _marker: PhantomData, - } - } -} - -impl StreamBuilder<'_, P, F, Fut, S, T, E, Snk> -where - P: Process, - F: Fn() -> Fut + Send + 'static, - Fut: Future + Send + 'static, - S: Stream> + Send + 'static + Unpin, - T: Send + 'static, - E: Send + Sync + 'static, - Snk: Sink + Send + 'static, -{ - pub fn spawn(self) -> StreamHandle { - let handle = self.ctx.spawn::>(Props { - init: self.init, - sink: self.sink, - }); - - handle.send(()); - - StreamHandle { handle } - } -} - -impl Sink for Handle -where - T: Send, - K: Send + Into, -{ - async fn consume(&self, item: K) { - let k: T = item.into(); - self.send(k); - } -} - -impl Sink for flume::Sender -where - T: Send, - K: Send + Into, -{ - async fn consume(&self, item: K) { - let k: T = item.into(); - let _ = self.send_async(k).await; - } -} - -impl Sink for tokio::sync::mpsc::Sender -where - T: Send, - K: Send + Into, -{ - async fn consume(&self, item: K) { - let k: T = item.into(); - let _ = self.send(k).await; - } -} - -impl Sink for tokio::sync::broadcast::Sender -where - T: Send, - K: Send + Into, -{ - async fn consume(&self, item: K) { - let k: T = item.into(); - let _ = self.send(k); - } -} diff --git a/core/tests/stream.rs b/core/tests/stream.rs deleted file mode 100644 index 850a117..0000000 --- a/core/tests/stream.rs +++ /dev/null @@ -1,66 +0,0 @@ -use async_trait::async_trait; -use derive_more::From; -use futures::stream; -use speare::{Ctx, Node, Process, Request, StreamHandle}; -use std::{marker::PhantomData, time::Duration}; -use tokio::time; - -struct Streamer { - handle: StreamHandle, - state: Vec, -} - -#[derive(From)] -enum Msg { - #[from] - Push(i32), - #[from] - GetStreamHandle(Request<(), StreamHandle>), - GetState(Request<(), Vec>), -} - -#[async_trait] -impl Process for Streamer { - type Props = (); - type Msg = Msg; - type Err = (); - - async fn init(ctx: &mut Ctx) -> Result { - let sink = ctx.this().clone(); - let handle = ctx - .stream(|| async { stream::iter(vec![1, 2, 3].into_iter().map(Ok::)) }) - .sink(sink) - .spawn(); - - Ok(Streamer { - handle, - state: vec![], - }) - } - - async fn handle(&mut self, msg: Self::Msg, _: &mut Ctx) -> Result<(), Self::Err> { - use Msg::*; - match msg { - Push(x) => self.state.push(x), - GetStreamHandle(req) => req.reply(self.handle.clone()), - GetState(req) => req.reply(self.state.clone()), - } - - Ok(()) - } -} - -#[tokio::test] -async fn it_terminates_process_after_streaming() { - use Msg::*; - let node = Node::default(); - let streamer = node.spawn::(()); - - time::sleep(Duration::from_millis(100)).await; - let vec = streamer.reqw(GetState, ()).await.unwrap(); - let handle = streamer.reqw(GetStreamHandle, ()).await.unwrap(); - - assert_eq!(vec, vec![1, 2, 3]); - assert!(!handle.is_alive()); -} - diff --git a/core/tests/tasks.rs b/core/tests/tasks.rs new file mode 100644 index 0000000..951adba --- /dev/null +++ b/core/tests/tasks.rs @@ -0,0 +1,71 @@ +use std::time::Duration; + +use async_trait::async_trait; +use speare::{Ctx, ExitReason, Node, Process, Request}; +use sync_vec::SyncVec; +use tokio::time; + +mod sync_vec; + +struct Root; + +enum Msg { + TaskOk(String), + TaskErr(String), +} + +#[async_trait] +impl Process for Root { + type Props = (SyncVec, SyncVec); + type Msg = Msg; + type Err = String; + + async fn init(_: &mut Ctx) -> Result { + Ok(Root) + } + + async fn handle(&mut self, msg: Self::Msg, ctx: &mut Ctx) -> Result<(), Self::Err> { + use Msg::*; + match msg { + TaskOk(val) => { + let oks = ctx.props().0.clone(); + ctx.subtask(async move { + oks.push(val).await; + Ok(()) + }); + } + + TaskErr(val) => { + ctx.subtask(async move { Err(val) }); + } + } + + Ok(()) + } + + async fn exit(&mut self, reason: ExitReason, ctx: &mut Ctx) { + if let ExitReason::Err(e) = reason { + ctx.props().1.push((*e).clone()).await; + } + } +} + +#[tokio::test] +async fn executes_subtasks() { + // Arrange + let node = Node::default(); + let (oks, errs) = (SyncVec::default(), SyncVec::default()); + let root = node.spawn::((oks.clone(), errs.clone())); + + // Act + use Msg::*; + root.send(TaskOk("foo".to_string())); + root.send(TaskOk("bar".to_string())); + root.send(TaskErr("baz".to_string())); + time::sleep(Duration::from_millis(500)).await; + + // Assert + assert_eq!(oks.clone_vec().await, vec!["foo", "bar"]); + assert_eq!(errs.clone_vec().await, vec!["baz"]); + assert!(!root.is_alive()) +}