Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add zip and enumerate #9

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 31 additions & 31 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,45 +20,45 @@ jobs:
rust: [nightly]

steps:
- uses: actions/checkout@master
- uses: actions/checkout@master

- name: Install ${{ matrix.rust }}
uses: actions-rs/toolchain@v1
with:
toolchain: ${{ matrix.rust }}
override: true
- name: Install ${{ matrix.rust }}
uses: actions-rs/toolchain@v1
with:
toolchain: ${{ matrix.rust }}
override: true

- name: check
uses: actions-rs/cargo@v1
with:
command: check
args: --all --bins --examples --features hyperium_http
- name: check
uses: actions-rs/cargo@v1
with:
command: check
args: --all --bins --examples

- name: check unstable
uses: actions-rs/cargo@v1
with:
command: check
args: --all --benches --bins --examples --tests --features hyperium_http
- name: check unstable
uses: actions-rs/cargo@v1
with:
command: check
args: --all --benches --bins --examples --tests

- name: tests
uses: actions-rs/cargo@v1
with:
command: test
args: --all
- name: tests
uses: actions-rs/cargo@v1
with:
command: test
args: --all

check_fmt_and_docs:
name: Checking fmt and docs
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@master
- uses: actions-rs/toolchain@v1
with:
toolchain: nightly
components: rustfmt, clippy
override: true
- uses: actions/checkout@master
- uses: actions-rs/toolchain@v1
with:
toolchain: nightly
components: rustfmt, clippy
override: true

- name: fmt
run: cargo fmt --all -- --check
- name: fmt
run: cargo fmt --all -- --check

- name: Docs
run: cargo doc
- name: Docs
run: cargo doc
67 changes: 67 additions & 0 deletions src/par_stream/enumerate.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
use core::pin::Pin;
use core::task::{Context, Poll};

use async_std::task::ready;
use pin_project_lite::pin_project;

use crate::ParallelStream;

pin_project! {
/// A stream that yields the current count and element.
///
/// This `struct` is created by the [`enumerate`] method on [`ParallelStream`]. See its
/// documentation for more.
///
/// [`enumerate`]: trait.ParallelStream.html#method.enumerate
/// [`ParallelStream`]: trait.ParallelStream.html
#[derive(Clone, Debug)]
pub struct Enumerate<S> {
#[pin]
stream: S,
count: usize,
limit: Option<usize>,
}
}

impl<S: ParallelStream> Enumerate<S> {
pub(super) fn new(stream: S) -> Self {
Self {
limit: stream.get_limit(),
count: 0,
stream,
}
}
}

impl<S: ParallelStream> ParallelStream for Enumerate<S> {
type Item = (usize, S::Item);

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();
let next = ready!(this.stream.poll_next(cx));
*this.count += 1;
let count = *this.count;
Poll::Ready(next.map(|val| (count, val)))
}

fn limit(mut self, limit: impl Into<Option<usize>>) -> Self {
self.limit = limit.into();
self
}

fn get_limit(&self) -> Option<usize> {
self.limit
}
}

#[async_std::test]
async fn smoke() {
use async_std::prelude::*;
let s = async_std::stream::repeat(5usize).enumerate().take(3);
let mut output = vec![];
let mut stream = crate::from_stream(s);
while let Some(n) = stream.next().await {
output.push(n);
}
assert_eq!(output, vec![(0, 5), (1, 5), (2, 5)]);
}
23 changes: 22 additions & 1 deletion src/par_stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,21 @@ use async_std::task::{Context, Poll};

use std::pin::Pin;

use crate::FromParallelStream;
use crate::{FromParallelStream, IntoParallelStream};

pub use enumerate::Enumerate;
pub use for_each::ForEach;
pub use map::Map;
pub use next::NextFuture;
pub use take::Take;
pub use zip::Zip;

mod enumerate;
mod for_each;
mod map;
mod next;
mod take;
mod zip;

/// Parallel version of the standard `Stream` trait.
pub trait ParallelStream: Sized + Send + Sync + Unpin + 'static {
Expand All @@ -40,6 +44,23 @@ pub trait ParallelStream: Sized + Send + Sync + Unpin + 'static {
Map::new(self, f)
}

/// A stream that yields two streams simultaneously
fn zip<S, T>(self, other: T) -> Zip<Self, <T as IntoParallelStream>::IntoParStream>
where
Self: Sized,
T: IntoParallelStream,
{
Zip::new(self, other.into_par_stream())
}

/// A stream that yields a current count and element
fn enumerate<S, T>(self) -> Enumerate<Self>
where
Self: Sized,
{
Enumerate::new(self)
}

/// Applies `f` to each item of this stream in parallel, producing a new
/// stream with the results.
fn next(&mut self) -> NextFuture<'_, Self> {
Expand Down
112 changes: 112 additions & 0 deletions src/par_stream/zip.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
use core::cmp;
use core::pin::Pin;
use core::task::{Context, Poll};

use async_std::task::ready;
use pin_project_lite::pin_project;

use crate::ParallelStream;

pin_project! {
/// A stream that yields two streams simultaneously.
///
/// This `struct` is created by the [`zip`] method on [`ParallelStream`]. See its
/// documentation for more.
///
/// [`zip`]: trait.ParallelStream.html#method.zip
/// [`ParallelStream`]: trait.ParallelStream.html
#[derive(Clone, Debug)]
pub struct Zip<A, B>
where
A: ParallelStream,
B: ParallelStream,
{
#[pin]
stream: A,
#[pin]
other: B,
limit: Option<usize>,
queued1: Option<A::Item>,
queued2: Option<B::Item>,
}
}

impl<A, B> Zip<A, B>
where
A: ParallelStream,
B: ParallelStream,
{
pub(super) fn new(stream: A, other: B) -> Self {
Self {
limit: cmp::min(stream.get_limit(), other.get_limit()),
other,
stream,
queued1: None,
queued2: None,
}
}
}

impl<A, B> ParallelStream for Zip<A, B>
where
A: ParallelStream,
B: ParallelStream,
A::Item: Sync,
B::Item: Sync,
{
type Item = (A::Item, B::Item);

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();
let mut stream_done = false;
let mut other_done = false;
if this.queued1.is_none() {
match this.stream.poll_next(cx) {
Poll::Ready(Some(item1)) => *this.queued1 = Some(item1),
Poll::Ready(None) | Poll::Pending => {
stream_done = true;
}
}
}
if this.queued2.is_none() {
match this.other.poll_next(cx) {
Poll::Ready(Some(item2)) => *this.queued2 = Some(item2),
Poll::Ready(None) | Poll::Pending => {
other_done = true;
}
}
}

if this.queued1.is_some() && this.queued2.is_some() {
let pair = (this.queued1.take().unwrap(), this.queued2.take().unwrap());
Poll::Ready(Some(pair))
} else if stream_done && other_done {
Poll::Ready(None)
} else {
Poll::Pending
}
}

fn limit(mut self, limit: impl Into<Option<usize>>) -> Self {
self.limit = limit.into();
self
}

fn get_limit(&self) -> Option<usize> {
self.limit
}
}

#[async_std::test]
async fn smoke() {
use async_std::prelude::*;
let s = async_std::stream::repeat(5usize)
.zip(async_std::stream::repeat(10usize))
.take(3);
let mut output = vec![];
let mut stream = crate::from_stream(s);
while let Some(n) = stream.next().await {
output.push(n);
}
assert_eq!(output, vec![(5, 10), (5, 10), (5, 10)]);
}
Empty file removed tests/test.rs
Empty file.