Skip to content

Commit

Permalink
feat: immutable cloneable Node
Browse files Browse the repository at this point in the history
  • Loading branch information
vmenge committed Apr 29, 2024
1 parent 127395d commit beb483f
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 9 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion core/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "speare"
version = "0.1.8"
version = "0.1.9"
edition = "2021"
license = "MIT"
description = "actor-like thin abstraction over tokio::task and flume channels"
Expand Down
51 changes: 44 additions & 7 deletions core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -631,28 +631,63 @@ where
});
}

enum NodeProcMsg {
SpawnedChild(Sender<ProcMsg>),
Stop,
}

fn node_proc() -> Sender<NodeProcMsg> {
let (tx, rx) = flume::unbounded();

task::spawn(async move {
let mut children = Vec::new();

while let Ok(msg) = rx.recv_async().await {
match msg {
NodeProcMsg::SpawnedChild(child) => {
children.push(child);
}

NodeProcMsg::Stop => {
for child in &children {
let _ = child.send(ProcMsg::FromHandle(ProcAction::Stop));
}

break;
}
}
}
});

tx
}

/// A `Node` owns a collection of unsupervised top-level processes.
/// If the `Node` is dropped, all of its processes are stopped.
///
/// ### Unsupervised Processes
/// Unsupervised processes will be stopped when they error. Since they are unsupervised,
/// the errors won't be handled and they will not be automatically restarted.
#[derive(Default)]
#[derive(Clone)]
pub struct Node {
children_directive_tx: Vec<Sender<ProcMsg>>,
proc: Sender<NodeProcMsg>,
}

impl Default for Node {
fn default() -> Self {
Self { proc: node_proc() }
}
}

impl Drop for Node {
fn drop(&mut self) {
for directive_tx in &self.children_directive_tx {
let _ = directive_tx.send(ProcMsg::FromHandle(ProcAction::Stop));
}
let _ = self.proc.send(NodeProcMsg::Stop);
}
}

impl Node {
/// Spawns an unsupervised process.
pub fn spawn<P>(&mut self, props: P::Props) -> Handle<P::Msg>
pub fn spawn<P>(&self, props: P::Props) -> Handle<P::Msg>
where
P: Process,
{
Expand Down Expand Up @@ -739,7 +774,9 @@ impl Node {
}
});

self.children_directive_tx.push(handle.proc_msg_tx.clone());
let _ = self
.proc
.send(NodeProcMsg::SpawnedChild(handle.proc_msg_tx.clone()));

handle
}
Expand Down

0 comments on commit beb483f

Please sign in to comment.