Skip to content

Commit

Permalink
Merge pull request #3 from vmenge/stream
Browse files Browse the repository at this point in the history
adds stream spawning functionality
  • Loading branch information
vmenge authored Dec 27, 2024
2 parents f537eb3 + 49e98ff commit 0303384
Show file tree
Hide file tree
Showing 5 changed files with 424 additions and 16 deletions.
123 changes: 108 additions & 15 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ readme = "../README.md"
async-trait = { version = "0.1.73" }
flume = "0.11.0"
tokio = { version = "1.35.1", features = ["macros", "rt", "sync", "time"] }
futures = "0.3.31"

[dev-dependencies]
tokio = { version = "1.35.1", features = [
Expand Down
37 changes: 36 additions & 1 deletion core/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,20 @@
use async_trait::async_trait;
use flume::{Receiver, Sender};
use std::{any::Any, collections::HashMap, time::Duration};
use futures::Stream;
use std::{any::Any, collections::HashMap, future::Future, time::Duration};
use tokio::{
task,
time::{self},
};

mod exit;
mod req_res;
mod stream;
mod supervision;

pub use exit::*;
pub use req_res::*;
pub use stream::*;
pub use supervision::*;

/// A thin abstraction over tokio tasks and flume channels, allowing for easy message passing
Expand Down Expand Up @@ -242,6 +245,16 @@ impl<Msg> Handle<Msg> {
res.recv().await
}

pub async fn reqw<F, Req, Res>(&self, to_req: F, req: Req) -> Result<Res, ReqErr>
where
F: Fn(Request<Req, Res>) -> Msg
{
let (req, res) = req_res(req);
let msg = to_req(req);
self.send(msg);
res.recv().await
}

/// Sends a request to the `Process` as long as its messages implements `From<Request<Req,Res>>`.
///
/// Fails if response is not sent back within the given `Duration`.
Expand All @@ -253,6 +266,16 @@ impl<Msg> Handle<Msg> {
self.send(req);
res.recv_timeout(timeout).await
}

pub async fn reqw_timeout<F, Req, Res>(&self, to_req: F, req: Req, timeout: Duration) -> Result<Res, ReqErr>
where
F: Fn(Request<Req, Res>) -> Msg
{
let (req, res) = req_res(req);
let msg = to_req(req);
self.send(msg);
res.recv_timeout(timeout).await
}
}

/// The context surrounding the current `Process`.
Expand Down Expand Up @@ -338,6 +361,18 @@ where
handle
}

pub fn stream<F, Fut, S, T, E>(&mut self, f: F) -> StreamBuilder<'_, P, F, Fut, S, T, E, NoSink>
where
F: Fn() -> Fut + Send + Sync + 'static,
Fut: Future<Output = S> + Send + Sync + 'static,
S: Stream<Item = Result<T, E>> + Send + Sync + 'static + Unpin,
T: Send + Sync + 'static,
E: Send + Sync + 'static,
P::Msg: Sync,
{
StreamBuilder::new(f, self)
}

async fn handle_err(
&mut self,
e: Box<dyn Any + Send>,
Expand Down
Loading

0 comments on commit 0303384

Please sign in to comment.