diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 4e4e94e..7218299 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -20,45 +20,45 @@ jobs: rust: [nightly] steps: - - uses: actions/checkout@master + - uses: actions/checkout@master - - name: Install ${{ matrix.rust }} - uses: actions-rs/toolchain@v1 - with: - toolchain: ${{ matrix.rust }} - override: true + - name: Install ${{ matrix.rust }} + uses: actions-rs/toolchain@v1 + with: + toolchain: ${{ matrix.rust }} + override: true - - name: check - uses: actions-rs/cargo@v1 - with: - command: check - args: --all --bins --examples --features hyperium_http + - name: check + uses: actions-rs/cargo@v1 + with: + command: check + args: --all --bins --examples - - name: check unstable - uses: actions-rs/cargo@v1 - with: - command: check - args: --all --benches --bins --examples --tests --features hyperium_http + - name: check unstable + uses: actions-rs/cargo@v1 + with: + command: check + args: --all --benches --bins --examples --tests - - name: tests - uses: actions-rs/cargo@v1 - with: - command: test - args: --all + - name: tests + uses: actions-rs/cargo@v1 + with: + command: test + args: --all check_fmt_and_docs: name: Checking fmt and docs runs-on: ubuntu-latest steps: - - uses: actions/checkout@master - - uses: actions-rs/toolchain@v1 - with: - toolchain: nightly - components: rustfmt, clippy - override: true + - uses: actions/checkout@master + - uses: actions-rs/toolchain@v1 + with: + toolchain: nightly + components: rustfmt, clippy + override: true - - name: fmt - run: cargo fmt --all -- --check + - name: fmt + run: cargo fmt --all -- --check - - name: Docs - run: cargo doc + - name: Docs + run: cargo doc diff --git a/src/par_stream/enumerate.rs b/src/par_stream/enumerate.rs new file mode 100644 index 0000000..f840b6a --- /dev/null +++ b/src/par_stream/enumerate.rs @@ -0,0 +1,67 @@ +use core::pin::Pin; +use core::task::{Context, Poll}; + +use async_std::task::ready; +use pin_project_lite::pin_project; + +use crate::ParallelStream; + +pin_project! { + /// A stream that yields the current count and element. + /// + /// This `struct` is created by the [`enumerate`] method on [`ParallelStream`]. See its + /// documentation for more. + /// + /// [`enumerate`]: trait.ParallelStream.html#method.enumerate + /// [`ParallelStream`]: trait.ParallelStream.html + #[derive(Clone, Debug)] + pub struct Enumerate { + #[pin] + stream: S, + count: usize, + limit: Option, + } +} + +impl Enumerate { + pub(super) fn new(stream: S) -> Self { + Self { + limit: stream.get_limit(), + count: 0, + stream, + } + } +} + +impl ParallelStream for Enumerate { + type Item = (usize, S::Item); + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.project(); + let next = ready!(this.stream.poll_next(cx)); + *this.count += 1; + let count = *this.count; + Poll::Ready(next.map(|val| (count, val))) + } + + fn limit(mut self, limit: impl Into>) -> Self { + self.limit = limit.into(); + self + } + + fn get_limit(&self) -> Option { + self.limit + } +} + +#[async_std::test] +async fn smoke() { + use async_std::prelude::*; + let s = async_std::stream::repeat(5usize).enumerate().take(3); + let mut output = vec![]; + let mut stream = crate::from_stream(s); + while let Some(n) = stream.next().await { + output.push(n); + } + assert_eq!(output, vec![(0, 5), (1, 5), (2, 5)]); +} diff --git a/src/par_stream/mod.rs b/src/par_stream/mod.rs index 500befe..0fb6b1f 100644 --- a/src/par_stream/mod.rs +++ b/src/par_stream/mod.rs @@ -3,17 +3,21 @@ use async_std::task::{Context, Poll}; use std::pin::Pin; -use crate::FromParallelStream; +use crate::{FromParallelStream, IntoParallelStream}; +pub use enumerate::Enumerate; pub use for_each::ForEach; pub use map::Map; pub use next::NextFuture; pub use take::Take; +pub use zip::Zip; +mod enumerate; mod for_each; mod map; mod next; mod take; +mod zip; /// Parallel version of the standard `Stream` trait. pub trait ParallelStream: Sized + Send + Sync + Unpin + 'static { @@ -40,6 +44,23 @@ pub trait ParallelStream: Sized + Send + Sync + Unpin + 'static { Map::new(self, f) } + /// A stream that yields two streams simultaneously + fn zip(self, other: T) -> Zip::IntoParStream> + where + Self: Sized, + T: IntoParallelStream, + { + Zip::new(self, other.into_par_stream()) + } + + /// A stream that yields a current count and element + fn enumerate(self) -> Enumerate + where + Self: Sized, + { + Enumerate::new(self) + } + /// Applies `f` to each item of this stream in parallel, producing a new /// stream with the results. fn next(&mut self) -> NextFuture<'_, Self> { diff --git a/src/par_stream/zip.rs b/src/par_stream/zip.rs new file mode 100644 index 0000000..a135569 --- /dev/null +++ b/src/par_stream/zip.rs @@ -0,0 +1,112 @@ +use core::cmp; +use core::pin::Pin; +use core::task::{Context, Poll}; + +use async_std::task::ready; +use pin_project_lite::pin_project; + +use crate::ParallelStream; + +pin_project! { + /// A stream that yields two streams simultaneously. + /// + /// This `struct` is created by the [`zip`] method on [`ParallelStream`]. See its + /// documentation for more. + /// + /// [`zip`]: trait.ParallelStream.html#method.zip + /// [`ParallelStream`]: trait.ParallelStream.html + #[derive(Clone, Debug)] + pub struct Zip + where + A: ParallelStream, + B: ParallelStream, + { + #[pin] + stream: A, + #[pin] + other: B, + limit: Option, + queued1: Option, + queued2: Option, + } +} + +impl Zip +where + A: ParallelStream, + B: ParallelStream, +{ + pub(super) fn new(stream: A, other: B) -> Self { + Self { + limit: cmp::min(stream.get_limit(), other.get_limit()), + other, + stream, + queued1: None, + queued2: None, + } + } +} + +impl ParallelStream for Zip +where + A: ParallelStream, + B: ParallelStream, + A::Item: Sync, + B::Item: Sync, +{ + type Item = (A::Item, B::Item); + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.project(); + let mut stream_done = false; + let mut other_done = false; + if this.queued1.is_none() { + match this.stream.poll_next(cx) { + Poll::Ready(Some(item1)) => *this.queued1 = Some(item1), + Poll::Ready(None) | Poll::Pending => { + stream_done = true; + } + } + } + if this.queued2.is_none() { + match this.other.poll_next(cx) { + Poll::Ready(Some(item2)) => *this.queued2 = Some(item2), + Poll::Ready(None) | Poll::Pending => { + other_done = true; + } + } + } + + if this.queued1.is_some() && this.queued2.is_some() { + let pair = (this.queued1.take().unwrap(), this.queued2.take().unwrap()); + Poll::Ready(Some(pair)) + } else if stream_done && other_done { + Poll::Ready(None) + } else { + Poll::Pending + } + } + + fn limit(mut self, limit: impl Into>) -> Self { + self.limit = limit.into(); + self + } + + fn get_limit(&self) -> Option { + self.limit + } +} + +#[async_std::test] +async fn smoke() { + use async_std::prelude::*; + let s = async_std::stream::repeat(5usize) + .zip(async_std::stream::repeat(10usize)) + .take(3); + let mut output = vec![]; + let mut stream = crate::from_stream(s); + while let Some(n) = stream.next().await { + output.push(n); + } + assert_eq!(output, vec![(5, 10), (5, 10), (5, 10)]); +} diff --git a/tests/test.rs b/tests/test.rs deleted file mode 100644 index e69de29..0000000