Skip to content

Commit

Permalink
chore(core): clean up trait req chain
Browse files Browse the repository at this point in the history
  • Loading branch information
vmenge committed Dec 30, 2024
1 parent 66c66c5 commit d60949e
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 20 deletions.
15 changes: 10 additions & 5 deletions core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ impl<Msg> Handle<Msg> {

pub async fn reqw<F, Req, Res>(&self, to_req: F, req: Req) -> Result<Res, ReqErr>
where
F: Fn(Request<Req, Res>) -> Msg
F: Fn(Request<Req, Res>) -> Msg,
{
let (req, res) = req_res(req);
let msg = to_req(req);
Expand All @@ -267,9 +267,14 @@ impl<Msg> Handle<Msg> {
res.recv_timeout(timeout).await
}

pub async fn reqw_timeout<F, Req, Res>(&self, to_req: F, req: Req, timeout: Duration) -> Result<Res, ReqErr>
pub async fn reqw_timeout<F, Req, Res>(
&self,
to_req: F,
req: Req,
timeout: Duration,
) -> Result<Res, ReqErr>
where
F: Fn(Request<Req, Res>) -> Msg
F: Fn(Request<Req, Res>) -> Msg,
{
let (req, res) = req_res(req);
let msg = to_req(req);
Expand Down Expand Up @@ -364,8 +369,8 @@ where
pub fn stream<F, Fut, S, T, E>(&mut self, f: F) -> StreamBuilder<'_, P, F, Fut, S, T, E, NoSink>
where
F: Fn() -> Fut + Send + Sync + 'static,
Fut: Future<Output = S> + Send + Sync + 'static,
S: Stream<Item = Result<T, E>> + Send + Sync + 'static + Unpin,
Fut: Future<Output = S> + Send + 'static,
S: Stream<Item = Result<T, E>> + Send + 'static + Unpin,
T: Send + Sync + 'static,
E: Send + Sync + 'static,
P::Msg: Sync,
Expand Down
30 changes: 15 additions & 15 deletions core/src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ struct Props<F, Snk> {
impl<F, Fut, S, T, E, Snk> Process for Source<F, Fut, S, T, E, Snk>
where
F: Fn() -> Fut + Send + Sync + 'static,
Fut: Future<Output = S> + Send + 'static,
Fut: Future<Output = S> + Send + 'static,
S: Stream<Item = Result<T, E>> + Send + 'static + Unpin,
T: Send + Sync + 'static,
E: Send + Sync + 'static,
Expand Down Expand Up @@ -118,11 +118,11 @@ where
impl<'a, P, F, Fut, S, T, E> StreamBuilder<'a, P, F, Fut, S, T, E, NoSink>
where
P: Process,
F: Fn() -> Fut + 'static,
Fut: Future<Output = S> + 'static,
S: Stream<Item = Result<T, E>> + 'static + Unpin,
T: 'static,
E: 'static,
F: Fn() -> Fut + Send + Sync + 'static,
Fut: Future<Output = S> + Send + 'static,
S: Stream<Item = Result<T, E>> + Send + 'static + Unpin,
T: Send + Sync + 'static,
E: Send + Sync + 'static,
{
pub fn new(init: F, ctx: &'a mut Ctx<P>) -> Self {
Self {
Expand Down Expand Up @@ -150,8 +150,8 @@ impl<P, F, Fut, S, T, E, Snk> StreamBuilder<'_, P, F, Fut, S, T, E, Snk>
where
P: Process,
F: Fn() -> Fut + Send + Sync + 'static,
Fut: Future<Output = S> + Send + Sync + 'static,
S: Stream<Item = Result<T, E>> + Send + Sync + 'static + Unpin,
Fut: Future<Output = S> + Send + 'static,
S: Stream<Item = Result<T, E>> + Send + 'static + Unpin,
T: Send + Sync + 'static,
E: Send + Sync + 'static,
Snk: Sink<T> + Send + Sync + 'static,
Expand All @@ -171,7 +171,7 @@ where
impl<T, K> Sink<K> for Handle<T>
where
T: Send,
K: Sync + Send + Into<T>,
K: Send + Into<T>,
{
async fn consume(&self, item: K) {
let k: T = item.into();
Expand All @@ -181,8 +181,8 @@ where

impl<T, K> Sink<K> for flume::Sender<T>
where
T: Sync + Send,
K: Sync + Send + Into<T>,
T: Send,
K: Send + Into<T>,
{
async fn consume(&self, item: K) {
let k: T = item.into();
Expand All @@ -192,8 +192,8 @@ where

impl<T, K> Sink<K> for tokio::sync::mpsc::Sender<T>
where
T: Sync + Send,
K: Sync + Send + Into<T>,
T: Send,
K: Send + Into<T>,
{
async fn consume(&self, item: K) {
let k: T = item.into();
Expand All @@ -203,8 +203,8 @@ where

impl<T, K> Sink<K> for tokio::sync::broadcast::Sender<T>
where
T: Sync + Send,
K: Sync + Send + Into<T>,
T: Send,
K: Send + Into<T>,
{
async fn consume(&self, item: K) {
let k: T = item.into();
Expand Down

0 comments on commit d60949e

Please sign in to comment.