From 7c6a1565baf3503ee14b78bb76a770786d5fa483 Mon Sep 17 00:00:00 2001 From: Evan Cameron Date: Sun, 22 Mar 2020 16:48:18 -0400 Subject: [PATCH 1/4] Add zip and enumerate --- src/par_stream/enumerate.rs | 67 ++++++++++++++++++++++++++++++++++ src/par_stream/mod.rs | 23 +++++++++++- src/par_stream/zip.rs | 73 +++++++++++++++++++++++++++++++++++++ 3 files changed, 162 insertions(+), 1 deletion(-) create mode 100644 src/par_stream/enumerate.rs create mode 100644 src/par_stream/zip.rs diff --git a/src/par_stream/enumerate.rs b/src/par_stream/enumerate.rs new file mode 100644 index 0000000..f840b6a --- /dev/null +++ b/src/par_stream/enumerate.rs @@ -0,0 +1,67 @@ +use core::pin::Pin; +use core::task::{Context, Poll}; + +use async_std::task::ready; +use pin_project_lite::pin_project; + +use crate::ParallelStream; + +pin_project! { + /// A stream that yields the current count and element. + /// + /// This `struct` is created by the [`enumerate`] method on [`ParallelStream`]. See its + /// documentation for more. + /// + /// [`enumerate`]: trait.ParallelStream.html#method.enumerate + /// [`ParallelStream`]: trait.ParallelStream.html + #[derive(Clone, Debug)] + pub struct Enumerate { + #[pin] + stream: S, + count: usize, + limit: Option, + } +} + +impl Enumerate { + pub(super) fn new(stream: S) -> Self { + Self { + limit: stream.get_limit(), + count: 0, + stream, + } + } +} + +impl ParallelStream for Enumerate { + type Item = (usize, S::Item); + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.project(); + let next = ready!(this.stream.poll_next(cx)); + *this.count += 1; + let count = *this.count; + Poll::Ready(next.map(|val| (count, val))) + } + + fn limit(mut self, limit: impl Into>) -> Self { + self.limit = limit.into(); + self + } + + fn get_limit(&self) -> Option { + self.limit + } +} + +#[async_std::test] +async fn smoke() { + use async_std::prelude::*; + let s = async_std::stream::repeat(5usize).enumerate().take(3); + let mut output = vec![]; + let mut stream = crate::from_stream(s); + while let Some(n) = stream.next().await { + output.push(n); + } + assert_eq!(output, vec![(0, 5), (1, 5), (2, 5)]); +} diff --git a/src/par_stream/mod.rs b/src/par_stream/mod.rs index 500befe..0fb6b1f 100644 --- a/src/par_stream/mod.rs +++ b/src/par_stream/mod.rs @@ -3,17 +3,21 @@ use async_std::task::{Context, Poll}; use std::pin::Pin; -use crate::FromParallelStream; +use crate::{FromParallelStream, IntoParallelStream}; +pub use enumerate::Enumerate; pub use for_each::ForEach; pub use map::Map; pub use next::NextFuture; pub use take::Take; +pub use zip::Zip; +mod enumerate; mod for_each; mod map; mod next; mod take; +mod zip; /// Parallel version of the standard `Stream` trait. pub trait ParallelStream: Sized + Send + Sync + Unpin + 'static { @@ -40,6 +44,23 @@ pub trait ParallelStream: Sized + Send + Sync + Unpin + 'static { Map::new(self, f) } + /// A stream that yields two streams simultaneously + fn zip(self, other: T) -> Zip::IntoParStream> + where + Self: Sized, + T: IntoParallelStream, + { + Zip::new(self, other.into_par_stream()) + } + + /// A stream that yields a current count and element + fn enumerate(self) -> Enumerate + where + Self: Sized, + { + Enumerate::new(self) + } + /// Applies `f` to each item of this stream in parallel, producing a new /// stream with the results. fn next(&mut self) -> NextFuture<'_, Self> { diff --git a/src/par_stream/zip.rs b/src/par_stream/zip.rs new file mode 100644 index 0000000..123d4a2 --- /dev/null +++ b/src/par_stream/zip.rs @@ -0,0 +1,73 @@ +use core::cmp; +use core::pin::Pin; +use core::task::{Context, Poll}; + +use async_std::task::ready; +use pin_project_lite::pin_project; + +use crate::ParallelStream; + +pin_project! { + /// A stream that yields two streams simultaneously. + /// + /// This `struct` is created by the [`zip`] method on [`ParallelStream`]. See its + /// documentation for more. + /// + /// [`zip`]: trait.ParallelStream.html#method.zip + /// [`ParallelStream`]: trait.ParallelStream.html + #[derive(Clone, Debug)] + pub struct Zip { + #[pin] + stream: A, + #[pin] + other: B, + limit: Option, + } +} + +impl Zip { + pub(super) fn new(stream: A, other: B) -> Self { + Self { + limit: cmp::min(stream.get_limit(), other.get_limit()), + other, + stream, + } + } +} + +impl ParallelStream for Zip { + type Item = (A::Item, B::Item); + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.project(); + let next = ready!(this.stream.poll_next(cx)); + let other_next = ready!(this.other.poll_next(cx)); + match (next, other_next) { + (Some(a), Some(b)) => Poll::Ready(Some((a, b))), + _ => Poll::Ready(None), + } + } + + fn limit(mut self, limit: impl Into>) -> Self { + self.limit = limit.into(); + self + } + + fn get_limit(&self) -> Option { + self.limit + } +} + +#[async_std::test] +async fn smoke() { + use async_std::prelude::*; + let s = async_std::stream::repeat(5usize) + .zip(async_std::stream::repeat(10usize)) + .take(3); + let mut output = vec![]; + let mut stream = crate::from_stream(s); + while let Some(n) = stream.next().await { + output.push(n); + } + assert_eq!(output, vec![(5, 10), (5, 10), (5, 10)]); +} From e35ac14722a9128b6274f12dc779e2751d2712bb Mon Sep 17 00:00:00 2001 From: Evan Cameron Date: Mon, 23 Mar 2020 17:03:05 -0400 Subject: [PATCH 2/4] remove hyperium --- .github/workflows/ci.yaml | 62 +++++++++++++++++++-------------------- 1 file changed, 31 insertions(+), 31 deletions(-) diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 4e4e94e..7218299 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -20,45 +20,45 @@ jobs: rust: [nightly] steps: - - uses: actions/checkout@master + - uses: actions/checkout@master - - name: Install ${{ matrix.rust }} - uses: actions-rs/toolchain@v1 - with: - toolchain: ${{ matrix.rust }} - override: true + - name: Install ${{ matrix.rust }} + uses: actions-rs/toolchain@v1 + with: + toolchain: ${{ matrix.rust }} + override: true - - name: check - uses: actions-rs/cargo@v1 - with: - command: check - args: --all --bins --examples --features hyperium_http + - name: check + uses: actions-rs/cargo@v1 + with: + command: check + args: --all --bins --examples - - name: check unstable - uses: actions-rs/cargo@v1 - with: - command: check - args: --all --benches --bins --examples --tests --features hyperium_http + - name: check unstable + uses: actions-rs/cargo@v1 + with: + command: check + args: --all --benches --bins --examples --tests - - name: tests - uses: actions-rs/cargo@v1 - with: - command: test - args: --all + - name: tests + uses: actions-rs/cargo@v1 + with: + command: test + args: --all check_fmt_and_docs: name: Checking fmt and docs runs-on: ubuntu-latest steps: - - uses: actions/checkout@master - - uses: actions-rs/toolchain@v1 - with: - toolchain: nightly - components: rustfmt, clippy - override: true + - uses: actions/checkout@master + - uses: actions-rs/toolchain@v1 + with: + toolchain: nightly + components: rustfmt, clippy + override: true - - name: fmt - run: cargo fmt --all -- --check + - name: fmt + run: cargo fmt --all -- --check - - name: Docs - run: cargo doc + - name: Docs + run: cargo doc From c176e746b55d84a4fed729a44b7eeb0356657f8e Mon Sep 17 00:00:00 2001 From: Evan Cameron Date: Mon, 23 Mar 2020 17:03:55 -0400 Subject: [PATCH 3/4] pass tests --- tests/test.rs | 0 1 file changed, 0 insertions(+), 0 deletions(-) delete mode 100644 tests/test.rs diff --git a/tests/test.rs b/tests/test.rs deleted file mode 100644 index e69de29..0000000 From d76c29915e908fa3a8806fbf2a669236df19e186 Mon Sep 17 00:00:00 2001 From: Evan Cameron Date: Tue, 24 Mar 2020 11:18:43 -0400 Subject: [PATCH 4/4] zip uses queued items --- src/par_stream/zip.rs | 55 ++++++++++++++++++++++++++++++++++++------- 1 file changed, 47 insertions(+), 8 deletions(-) diff --git a/src/par_stream/zip.rs b/src/par_stream/zip.rs index 123d4a2..a135569 100644 --- a/src/par_stream/zip.rs +++ b/src/par_stream/zip.rs @@ -16,35 +16,74 @@ pin_project! { /// [`zip`]: trait.ParallelStream.html#method.zip /// [`ParallelStream`]: trait.ParallelStream.html #[derive(Clone, Debug)] - pub struct Zip { + pub struct Zip + where + A: ParallelStream, + B: ParallelStream, + { #[pin] stream: A, #[pin] other: B, limit: Option, + queued1: Option, + queued2: Option, } } -impl Zip { +impl Zip +where + A: ParallelStream, + B: ParallelStream, +{ pub(super) fn new(stream: A, other: B) -> Self { Self { limit: cmp::min(stream.get_limit(), other.get_limit()), other, stream, + queued1: None, + queued2: None, } } } -impl ParallelStream for Zip { +impl ParallelStream for Zip +where + A: ParallelStream, + B: ParallelStream, + A::Item: Sync, + B::Item: Sync, +{ type Item = (A::Item, B::Item); fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let this = self.project(); - let next = ready!(this.stream.poll_next(cx)); - let other_next = ready!(this.other.poll_next(cx)); - match (next, other_next) { - (Some(a), Some(b)) => Poll::Ready(Some((a, b))), - _ => Poll::Ready(None), + let mut stream_done = false; + let mut other_done = false; + if this.queued1.is_none() { + match this.stream.poll_next(cx) { + Poll::Ready(Some(item1)) => *this.queued1 = Some(item1), + Poll::Ready(None) | Poll::Pending => { + stream_done = true; + } + } + } + if this.queued2.is_none() { + match this.other.poll_next(cx) { + Poll::Ready(Some(item2)) => *this.queued2 = Some(item2), + Poll::Ready(None) | Poll::Pending => { + other_done = true; + } + } + } + + if this.queued1.is_some() && this.queued2.is_some() { + let pair = (this.queued1.take().unwrap(), this.queued2.take().unwrap()); + Poll::Ready(Some(pair)) + } else if stream_done && other_done { + Poll::Ready(None) + } else { + Poll::Pending } }