Skip to content

Commit

Permalink
Merge pull request #562 from felipesere/double_ended_ext
Browse files Browse the repository at this point in the history
DoubleEndedStream extension
  • Loading branch information
yoshuawuyts authored Dec 13, 2019
2 parents 63b6a2b + 182fe68 commit 3d3bf91
Show file tree
Hide file tree
Showing 10 changed files with 472 additions and 26 deletions.
24 changes: 0 additions & 24 deletions src/stream/double_ended_stream.rs

This file was deleted.

241 changes: 241 additions & 0 deletions src/stream/double_ended_stream/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,241 @@
use crate::stream::Stream;

use std::pin::Pin;
use std::task::{Context, Poll};

mod next_back;
mod nth_back;
mod rfind;
mod rfold;
mod try_rfold;

use next_back::NextBackFuture;
use nth_back::NthBackFuture;
use rfind::RFindFuture;
use rfold::RFoldFuture;
use try_rfold::TryRFoldFuture;

/// A stream able to yield elements from both ends.
///
/// Something that implements `DoubleEndedStream` has one extra capability
/// over something that implements [`Stream`]: the ability to also take
/// `Item`s from the back, as well as the front.
///
/// [`Stream`]: trait.Stream.html
#[cfg(feature = "unstable")]
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
pub trait DoubleEndedStream: Stream {
#[doc = r#"
Attempts to receive the next item from the back of the stream.
There are several possible return values:
* `Poll::Pending` means this stream's next_back value is not ready yet.
* `Poll::Ready(None)` means this stream has been exhausted.
* `Poll::Ready(Some(item))` means `item` was received out of the stream.
# Examples
```
# fn main() { async_std::task::block_on(async {
#
use std::pin::Pin;
use async_std::prelude::*;
use async_std::stream;
use async_std::task::{Context, Poll};
fn increment(
s: impl DoubleEndedStream<Item = i32> + Unpin,
) -> impl DoubleEndedStream<Item = i32> + Unpin {
struct Increment<S>(S);
impl<S: DoubleEndedStream<Item = i32> + Unpin> Stream for Increment<S> {
type Item = S::Item;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
match Pin::new(&mut self.0).poll_next(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(None) => Poll::Ready(None),
Poll::Ready(Some(item)) => Poll::Ready(Some(item + 1)),
}
}
}
impl<S: DoubleEndedStream<Item = i32> + Unpin> DoubleEndedStream for Increment<S> {
fn poll_next_back(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
match Pin::new(&mut self.0).poll_next_back(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(None) => Poll::Ready(None),
Poll::Ready(Some(item)) => Poll::Ready(Some(item + 1)),
}
}
}
Increment(s)
}
let mut s = increment(stream::once(7));
assert_eq!(s.next_back().await, Some(8));
assert_eq!(s.next_back().await, None);
#
# }) }
```
"#]
fn poll_next_back(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>;

#[doc = r#"
Advances the stream and returns the next value.
Returns [`None`] when iteration is finished. Individual stream implementations may
choose to resume iteration, and so calling `next()` again may or may not eventually
start returning more values.
[`None`]: https://doc.rust-lang.org/std/option/enum.Option.html#variant.None
# Examples
```
# fn main() { async_std::task::block_on(async {
#
use async_std::stream::double_ended_stream::{self, DoubleEndedStream};
let mut s = double_ended_stream::from_iter(vec![7u8]);
assert_eq!(s.next_back().await, Some(7));
assert_eq!(s.next_back().await, None);
#
# }) }
```
"#]
fn next_back(&mut self) -> NextBackFuture<'_, Self>
where
Self: Unpin,
{
NextBackFuture { stream: self }
}

#[doc = r#"
Returns the nth element from the back of the stream.
# Examples
Basic usage:
```
# fn main() { async_std::task::block_on(async {
#
use async_std::stream::double_ended_stream::{self, DoubleEndedStream};
let mut s = double_ended_stream::from_iter(vec![1u8, 2, 3, 4, 5]);
let second = s.nth_back(1).await;
assert_eq!(second, Some(4));
#
# }) }
```
"#]
fn nth_back(&mut self, n: usize) -> NthBackFuture<'_, Self>
where
Self: Unpin + Sized,
{
NthBackFuture::new(self, n)
}

#[doc = r#"
Returns the the frist element from the right that matches the predicate.
# Examples
Basic usage:
```
# fn main() { async_std::task::block_on(async {
#
use async_std::stream::double_ended_stream::{self, DoubleEndedStream};
let mut s = double_ended_stream::from_iter(vec![1u8, 2, 3, 4, 5]);
let second = s.rfind(|v| v % 2 == 0).await;
assert_eq!(second, Some(4));
#
# }) }
```
"#]
fn rfind<P>(&mut self, p: P) -> RFindFuture<'_, Self, P>
where
Self: Unpin + Sized,
P: FnMut(&Self::Item) -> bool,
{
RFindFuture::new(self, p)
}

#[doc = r#"
# Examples
Basic usage:
```
# fn main() { async_std::task::block_on(async {
#
use async_std::stream::double_ended_stream::{self, DoubleEndedStream};
let s = double_ended_stream::from_iter(vec![1u8, 2, 3, 4, 5]);
let second = s.rfold(0, |acc, v| v + acc).await;
assert_eq!(second, 15);
#
# }) }
```
"#]
fn rfold<B, F>(self, accum: B, f: F) -> RFoldFuture<Self, F, B>
where
Self: Sized,
F: FnMut(B, Self::Item) -> B,
{
RFoldFuture::new(self, accum, f)
}

#[doc = r#"
A combinator that applies a function as long as it returns successfully, producing a single, final value.
Immediately returns the error when the function returns unsuccessfully.
# Examples
Basic usage:
```
# fn main() { async_std::task::block_on(async {
#
use async_std::stream::double_ended_stream::{self, DoubleEndedStream};
let s = double_ended_stream::from_iter(vec![1u8, 2, 3, 4, 5]);
let sum = s.try_rfold(0, |acc, v| {
if (acc+v) % 2 == 1 {
Ok(v+3)
} else {
Err("fail")
}
}).await;
assert_eq!(sum, Err("fail"));
#
# }) }
```
"#]
fn try_rfold<B, F, E>(self, accum: B, f: F) -> TryRFoldFuture<Self, F, B>
where
Self: Sized,
F: FnMut(B, Self::Item) -> Result<B, E>,
{
TryRFoldFuture::new(self, accum, f)
}
}
19 changes: 19 additions & 0 deletions src/stream/double_ended_stream/next_back.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
use std::pin::Pin;
use std::future::Future;

use crate::stream::DoubleEndedStream;
use crate::task::{Context, Poll};

#[doc(hidden)]
#[allow(missing_debug_implementations)]
pub struct NextBackFuture<'a, T: Unpin + ?Sized> {
pub(crate) stream: &'a mut T,
}

impl<T: DoubleEndedStream + Unpin + ?Sized> Future for NextBackFuture<'_, T> {
type Output = Option<T::Item>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Pin::new(&mut *self.stream).poll_next_back(cx)
}
}
41 changes: 41 additions & 0 deletions src/stream/double_ended_stream/nth_back.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};

use crate::stream::DoubleEndedStream;

#[doc(hidden)]
#[allow(missing_debug_implementations)]
pub struct NthBackFuture<'a, S> {
stream: &'a mut S,
n: usize,
}

impl<'a, S> NthBackFuture<'a, S> {
pub(crate) fn new(stream: &'a mut S, n: usize) -> Self {
NthBackFuture { stream, n }
}
}

impl<'a, S> Future for NthBackFuture<'a, S>
where
S: DoubleEndedStream + Sized + Unpin,
{
type Output = Option<S::Item>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let next = futures_core::ready!(Pin::new(&mut *self.stream).poll_next_back(cx));
match next {
Some(v) => match self.n {
0 => Poll::Ready(Some(v)),
_ => {
self.n -= 1;
cx.waker().wake_by_ref();
Poll::Pending
}
},
None => Poll::Ready(None),
}
}
}

41 changes: 41 additions & 0 deletions src/stream/double_ended_stream/rfind.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
use std::task::{Context, Poll};
use std::future::Future;
use std::pin::Pin;

use crate::stream::DoubleEndedStream;

#[doc(hidden)]
#[allow(missing_debug_implementations)]
pub struct RFindFuture<'a, S, P> {
stream: &'a mut S,
p: P,
}

impl<'a, S, P> RFindFuture<'a, S, P> {
pub(super) fn new(stream: &'a mut S, p: P) -> Self {
RFindFuture { stream, p }
}
}

impl<S: Unpin, P> Unpin for RFindFuture<'_, S, P> {}

impl<'a, S, P> Future for RFindFuture<'a, S, P>
where
S: DoubleEndedStream + Unpin + Sized,
P: FnMut(&S::Item) -> bool,
{
type Output = Option<S::Item>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let item = futures_core::ready!(Pin::new(&mut *self.stream).poll_next_back(cx));

match item {
Some(v) if (&mut self.p)(&v) => Poll::Ready(Some(v)),
Some(_) => {
cx.waker().wake_by_ref();
Poll::Pending
}
None => Poll::Ready(None),
}
}
}
Loading

0 comments on commit 3d3bf91

Please sign in to comment.