From 1741ba41e479b5f549a2c4769d768576cc471747 Mon Sep 17 00:00:00 2001 From: Justin Dennison Date: Fri, 20 Mar 2020 16:22:41 -0400 Subject: [PATCH 1/2] implemented from_fn --- src/from_fn.rs | 84 ++++++++++++++++++++++++++++++++++++++++++++++++++ src/lib.rs | 2 ++ 2 files changed, 86 insertions(+) create mode 100644 src/from_fn.rs diff --git a/src/from_fn.rs b/src/from_fn.rs new file mode 100644 index 0000000..bc17a1a --- /dev/null +++ b/src/from_fn.rs @@ -0,0 +1,84 @@ +use async_std::future::Future; +use async_std::sync::{self, Receiver}; +use async_std::task; +use async_std::task::{Context, Poll}; +use core::pin::Pin; +use pin_project_lite::pin_project; + +use crate::ParallelStream; + +pin_project! { + /// A parallel stream that yields elements by calling a closure. + /// + /// This stream is created by the [`from_fn`] function. + /// See it documentation for more. + /// + /// [`from_fn`]: fn.from_fn.html + /// + /// # Examples + #[derive(Clone, Debug)] + pub struct FromFn { + #[pin] + receiver: Receiver, + f: F, + limit: Option, + } +} + +/// Creates a parallel stream from a closure. +pub fn from_fn(mut f: F) -> FromFn +where + T: Send + Sync + Unpin + 'static, + F: FnMut() -> Fut + Send + Sync + Copy + 'static, + Fut: Future> + Send, +{ + let (sender, receiver) = sync::channel(1); + task::spawn(async move { + let sender = sender.clone(); + while let Some(val) = f().await { + sender.send(val).await; + }; + }); + FromFn { + f, + receiver, + limit: None, + } +} + +impl ParallelStream for FromFn +where + T: Send + Sync + Unpin + 'static, + F: FnMut() -> Fut + Send + Sync + Copy + 'static, + Fut: Future> + Send, +{ + type Item = T; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + use async_std::prelude::*; + let this = self.project(); + this.receiver.poll_next(cx) + } + + fn limit(mut self, limit: impl Into>) -> Self { + self.limit = limit.into(); + self + } + + fn get_limit(&self) -> Option { + self.limit + } +} + +// #[async_std::test] +// async fn smoke() { +// let mut output = vec![]; + +// let mut stream = crate::from_fn(|| async move { +// Some(1u8) +// }); +// while let Some(n) = stream.next().await { +// output.push(n); +// } +// assert_eq!(output, vec![1u8]); +// } diff --git a/src/lib.rs b/src/lib.rs index d9ac3ab..7ac9f85 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -44,11 +44,13 @@ #![deny(missing_debug_implementations, nonstandard_style)] #![warn(missing_docs, missing_doc_code_examples)] +mod from_fn; mod from_parallel_stream; mod from_stream; mod into_parallel_stream; mod par_stream; +pub use from_fn::{from_fn, FromFn}; pub use from_parallel_stream::FromParallelStream; pub use from_stream::{from_stream, FromStream}; pub use into_parallel_stream::IntoParallelStream; From d0a58425a0e9c6866f4f2a124e3b2cababc4aa6a Mon Sep 17 00:00:00 2001 From: Justin Dennison Date: Mon, 23 Mar 2020 14:24:41 -0400 Subject: [PATCH 2/2] updated tests for from_fn --- src/from_fn.rs | 33 ++++++++++++++++++++------------- 1 file changed, 20 insertions(+), 13 deletions(-) diff --git a/src/from_fn.rs b/src/from_fn.rs index bc17a1a..07ad2a3 100644 --- a/src/from_fn.rs +++ b/src/from_fn.rs @@ -37,7 +37,7 @@ where let sender = sender.clone(); while let Some(val) = f().await { sender.send(val).await; - }; + } }); FromFn { f, @@ -70,15 +70,22 @@ where } } -// #[async_std::test] -// async fn smoke() { -// let mut output = vec![]; - -// let mut stream = crate::from_fn(|| async move { -// Some(1u8) -// }); -// while let Some(n) = stream.next().await { -// output.push(n); -// } -// assert_eq!(output, vec![1u8]); -// } +#[async_std::test] +async fn smoke() { + let mut output = vec![]; + let mut count = 0u8; + let mut stream = crate::from_fn(move || { + count += 1; + async move { + if count <= 3 { + Some(count) + } else { + None + } + } + }); + while let Some(n) = stream.next().await { + output.push(n); + } + assert_eq!(output, vec![1, 2, 3]); +}