Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add zip and enumerate #9

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 31 additions & 31 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,45 +20,45 @@ jobs:
rust: [nightly]

steps:
- uses: actions/checkout@master
- uses: actions/checkout@master

- name: Install ${{ matrix.rust }}
uses: actions-rs/toolchain@v1
with:
toolchain: ${{ matrix.rust }}
override: true
- name: Install ${{ matrix.rust }}
uses: actions-rs/toolchain@v1
with:
toolchain: ${{ matrix.rust }}
override: true

- name: check
uses: actions-rs/cargo@v1
with:
command: check
args: --all --bins --examples --features hyperium_http
- name: check
uses: actions-rs/cargo@v1
with:
command: check
args: --all --bins --examples

- name: check unstable
uses: actions-rs/cargo@v1
with:
command: check
args: --all --benches --bins --examples --tests --features hyperium_http
- name: check unstable
uses: actions-rs/cargo@v1
with:
command: check
args: --all --benches --bins --examples --tests

- name: tests
uses: actions-rs/cargo@v1
with:
command: test
args: --all
- name: tests
uses: actions-rs/cargo@v1
with:
command: test
args: --all

check_fmt_and_docs:
name: Checking fmt and docs
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@master
- uses: actions-rs/toolchain@v1
with:
toolchain: nightly
components: rustfmt, clippy
override: true
- uses: actions/checkout@master
- uses: actions-rs/toolchain@v1
with:
toolchain: nightly
components: rustfmt, clippy
override: true

- name: fmt
run: cargo fmt --all -- --check
- name: fmt
run: cargo fmt --all -- --check

- name: Docs
run: cargo doc
- name: Docs
run: cargo doc
67 changes: 67 additions & 0 deletions src/par_stream/enumerate.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
use core::pin::Pin;
use core::task::{Context, Poll};

use async_std::task::ready;
use pin_project_lite::pin_project;

use crate::ParallelStream;

pin_project! {
/// A stream that yields the current count and element.
///
/// This `struct` is created by the [`enumerate`] method on [`ParallelStream`]. See its
/// documentation for more.
///
/// [`enumerate`]: trait.ParallelStream.html#method.enumerate
/// [`ParallelStream`]: trait.ParallelStream.html
#[derive(Clone, Debug)]
pub struct Enumerate<S> {
#[pin]
stream: S,
count: usize,
limit: Option<usize>,
}
}

impl<S: ParallelStream> Enumerate<S> {
pub(super) fn new(stream: S) -> Self {
Self {
limit: stream.get_limit(),
count: 0,
stream,
}
}
}

impl<S: ParallelStream> ParallelStream for Enumerate<S> {
type Item = (usize, S::Item);

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();
let next = ready!(this.stream.poll_next(cx));
*this.count += 1;
let count = *this.count;
Poll::Ready(next.map(|val| (count, val)))
}

fn limit(mut self, limit: impl Into<Option<usize>>) -> Self {
self.limit = limit.into();
self
}

fn get_limit(&self) -> Option<usize> {
self.limit
}
}

#[async_std::test]
async fn smoke() {
use async_std::prelude::*;
let s = async_std::stream::repeat(5usize).enumerate().take(3);
let mut output = vec![];
let mut stream = crate::from_stream(s);
while let Some(n) = stream.next().await {
output.push(n);
}
assert_eq!(output, vec![(0, 5), (1, 5), (2, 5)]);
}
23 changes: 22 additions & 1 deletion src/par_stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,21 @@ use async_std::task::{Context, Poll};

use std::pin::Pin;

use crate::FromParallelStream;
use crate::{FromParallelStream, IntoParallelStream};

pub use enumerate::Enumerate;
pub use for_each::ForEach;
pub use map::Map;
pub use next::NextFuture;
pub use take::Take;
pub use zip::Zip;

mod enumerate;
mod for_each;
mod map;
mod next;
mod take;
mod zip;

/// Parallel version of the standard `Stream` trait.
pub trait ParallelStream: Sized + Send + Sync + Unpin + 'static {
Expand All @@ -40,6 +44,23 @@ pub trait ParallelStream: Sized + Send + Sync + Unpin + 'static {
Map::new(self, f)
}

/// A stream that yields two streams simultaneously
fn zip<S, T>(self, other: T) -> Zip<Self, <T as IntoParallelStream>::IntoParStream>
where
Self: Sized,
T: IntoParallelStream,
{
Zip::new(self, other.into_par_stream())
}

/// A stream that yields a current count and element
fn enumerate<S, T>(self) -> Enumerate<Self>
where
Self: Sized,
{
Enumerate::new(self)
}

/// Applies `f` to each item of this stream in parallel, producing a new
/// stream with the results.
fn next(&mut self) -> NextFuture<'_, Self> {
Expand Down
73 changes: 73 additions & 0 deletions src/par_stream/zip.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
use core::cmp;
use core::pin::Pin;
use core::task::{Context, Poll};

use async_std::task::ready;
use pin_project_lite::pin_project;

use crate::ParallelStream;

pin_project! {
/// A stream that yields two streams simultaneously.
///
/// This `struct` is created by the [`zip`] method on [`ParallelStream`]. See its
/// documentation for more.
///
/// [`zip`]: trait.ParallelStream.html#method.zip
/// [`ParallelStream`]: trait.ParallelStream.html
#[derive(Clone, Debug)]
pub struct Zip<A, B> {
#[pin]
stream: A,
#[pin]
other: B,
limit: Option<usize>,
}
}

impl<A: ParallelStream, B: ParallelStream> Zip<A, B> {
pub(super) fn new(stream: A, other: B) -> Self {
Self {
limit: cmp::min(stream.get_limit(), other.get_limit()),
other,
stream,
}
}
}

impl<A: ParallelStream, B: ParallelStream> ParallelStream for Zip<A, B> {
type Item = (A::Item, B::Item);

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();
let next = ready!(this.stream.poll_next(cx));
let other_next = ready!(this.other.poll_next(cx));
match (next, other_next) {
(Some(a), Some(b)) => Poll::Ready(Some((a, b))),
_ => Poll::Ready(None),
}
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will drop items when this.stream is ready but this.other is not. In this case, I suggest you store the result of this.stream.poll_next(cx) and use that stored value during the next poll (see here).

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah right, good point.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A straightforward port of the above into ParallelStream requires the ParallelStream::Items to be Sync. I'm not quite sure why, I think because they aren't Fused?


fn limit(mut self, limit: impl Into<Option<usize>>) -> Self {
self.limit = limit.into();
self
}

fn get_limit(&self) -> Option<usize> {
self.limit
}
}

#[async_std::test]
async fn smoke() {
use async_std::prelude::*;
let s = async_std::stream::repeat(5usize)
.zip(async_std::stream::repeat(10usize))
.take(3);
let mut output = vec![];
let mut stream = crate::from_stream(s);
while let Some(n) = stream.next().await {
output.push(n);
}
assert_eq!(output, vec![(5, 10), (5, 10), (5, 10)]);
}
Empty file removed tests/test.rs
Empty file.