Skip to content

Commit

Permalink
Merge pull request #506 from stjepang/cleanup-stream
Browse files Browse the repository at this point in the history
Cleanup stream module
  • Loading branch information
yoshuawuyts authored Nov 11, 2019
2 parents 352f18b + 5438258 commit 76c5ffe
Show file tree
Hide file tree
Showing 31 changed files with 315 additions and 428 deletions.
18 changes: 9 additions & 9 deletions src/io/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
//! [`File`]s:
//!
//! ```no_run
//! use async_std::prelude::*;
//! use async_std::fs::File;
//! use async_std::prelude::*;
//!
//! # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
//! #
Expand All @@ -47,9 +47,9 @@
//! coming from:
//!
//! ```no_run
//! use async_std::io::prelude::*;
//! use async_std::io::SeekFrom;
//! use async_std::fs::File;
//! use async_std::io::SeekFrom;
//! use async_std::prelude::*;
//!
//! # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
//! #
Expand Down Expand Up @@ -82,9 +82,9 @@
//! methods to any reader:
//!
//! ```no_run
//! use async_std::io::prelude::*;
//! use async_std::io::BufReader;
//! use async_std::fs::File;
//! use async_std::io::BufReader;
//! use async_std::prelude::*;
//!
//! # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
//! #
Expand All @@ -104,9 +104,9 @@
//! to [`write`][`Write::write`]:
//!
//! ```no_run
//! use async_std::io::prelude::*;
//! use async_std::io::BufWriter;
//! use async_std::fs::File;
//! use async_std::io::BufWriter;
//! use async_std::io::prelude::*;
//!
//! # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
//! #
Expand Down Expand Up @@ -179,9 +179,9 @@
//! lines:
//!
//! ```no_run
//! use async_std::prelude::*;
//! use async_std::io::BufReader;
//! use async_std::fs::File;
//! use async_std::io::BufReader;
//! use async_std::prelude::*;
//!
//! # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
//! #
Expand Down
6 changes: 3 additions & 3 deletions src/stream/extend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,10 @@ pub trait Extend<A> {
/// ```
#[cfg(feature = "unstable")]
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
pub async fn extend<'a, C, A, T>(collection: &mut C, stream: T)
pub async fn extend<'a, C, T, S>(collection: &mut C, stream: S)
where
C: Extend<A>,
T: IntoStream<Item = A> + 'a,
C: Extend<T>,
S: IntoStream<Item = T> + 'a,
{
Extend::extend(collection, stream).await
}
80 changes: 25 additions & 55 deletions src/stream/from_fn.rs
Original file line number Diff line number Diff line change
@@ -1,28 +1,21 @@
use std::marker::PhantomData;
use std::pin::Pin;
use std::future::Future;

use pin_project_lite::pin_project;

use crate::stream::Stream;
use crate::task::{Context, Poll};

pin_project! {
/// A stream that yields elements by calling a closure.
///
/// This stream is created by the [`from_fn`] function. See its
/// documentation for more.
///
/// [`from_fn`]: fn.from_fn.html
#[derive(Debug)]
pub struct FromFn<F, Fut, T> {
f: F,
#[pin]
future: Option<Fut>,
__t: PhantomData<T>,
}
/// A stream that yields elements by calling a closure.
///
/// This stream is created by the [`from_fn`] function. See its
/// documentation for more.
///
/// [`from_fn`]: fn.from_fn.html
#[derive(Clone, Debug)]
pub struct FromFn<F> {
f: F,
}

impl<F> Unpin for FromFn<F> {}

/// Creates a new stream where to produce each new element a provided closure is called.
///
/// This allows creating a custom stream with any behaviour without using the more verbose
Expand All @@ -34,21 +27,15 @@ pin_project! {
/// # async_std::task::block_on(async {
/// #
/// use async_std::prelude::*;
/// use async_std::sync::{Arc, Mutex};
/// use async_std::stream;
///
/// let count = Arc::new(Mutex::new(0u8));
/// let mut count = 0u8;
/// let s = stream::from_fn(|| {
/// let count = Arc::clone(&count);
///
/// async move {
/// *count.lock().await += 1;
///
/// if *count.lock().await > 3 {
/// None
/// } else {
/// Some(*count.lock().await)
/// }
/// count += 1;
/// if count > 3 {
/// None
/// } else {
/// Some(count)
/// }
/// });
///
Expand All @@ -60,38 +47,21 @@ pin_project! {
/// #
/// # })
/// ```
pub fn from_fn<T, F, Fut>(f: F) -> FromFn<F, Fut, T>
pub fn from_fn<T, F>(f: F) -> FromFn<F>
where
F: FnMut() -> Fut,
Fut: Future<Output = Option<T>>,
F: FnMut() -> Option<T>,
{
FromFn {
f,
future: None,
__t: PhantomData,
}
FromFn { f }
}

impl<F, Fut, T> Stream for FromFn<F, Fut, T>
impl<T, F> Stream for FromFn<F>
where
F: FnMut() -> Fut,
Fut: Future<Output = Option<T>>,
F: FnMut() -> Option<T>,
{
type Item = T;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();
loop {
if this.future.is_some() {
let next =
futures_core::ready!(this.future.as_mut().as_pin_mut().unwrap().poll(cx));
this.future.set(None);

return Poll::Ready(next);
} else {
let fut = (this.f)();
this.future.set(Some(fut));
}
}
fn poll_next(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let item = (&mut self.f)();
Poll::Ready(item)
}
}
2 changes: 1 addition & 1 deletion src/stream/from_iter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::stream::Stream;
use crate::task::{Context, Poll};

pin_project! {
/// A stream that created from iterator
/// A stream that was created from iterator.
///
/// This stream is created by the [`from_iter`] function.
/// See it documentation for more.
Expand Down
4 changes: 1 addition & 3 deletions src/stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -306,9 +306,7 @@ pub use from_iter::{from_iter, FromIter};
pub use once::{once, Once};
pub use repeat::{repeat, Repeat};
pub use repeat_with::{repeat_with, RepeatWith};
pub use stream::{
Chain, Filter, Fuse, Inspect, Scan, Skip, SkipWhile, StepBy, Stream, Take, TakeWhile, Zip,
};
pub use stream::*;

pub(crate) mod stream;

Expand Down
2 changes: 1 addition & 1 deletion src/stream/once.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ pin_project! {
/// documentation for more.
///
/// [`once`]: fn.once.html
#[derive(Debug)]
#[derive(Clone, Debug)]
pub struct Once<T> {
value: Option<T>,
}
Expand Down
2 changes: 1 addition & 1 deletion src/stream/repeat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ where
/// documentation for more.
///
/// [`repeat`]: fn.repeat.html
#[derive(Debug)]
#[derive(Clone, Debug)]
pub struct Repeat<T> {
item: T,
}
Expand Down
79 changes: 31 additions & 48 deletions src/stream/repeat_with.rs
Original file line number Diff line number Diff line change
@@ -1,28 +1,21 @@
use std::marker::PhantomData;
use std::pin::Pin;
use std::future::Future;

use pin_project_lite::pin_project;

use crate::stream::Stream;
use crate::task::{Context, Poll};

pin_project! {
/// A stream that repeats elements of type `T` endlessly by applying a provided closure.
///
/// This stream is created by the [`repeat_with`] function. See its
/// documentation for more.
///
/// [`repeat_with`]: fn.repeat_with.html
#[derive(Debug)]
pub struct RepeatWith<F, Fut, A> {
f: F,
#[pin]
future: Option<Fut>,
__a: PhantomData<A>,
}
/// A stream that repeats elements of type `T` endlessly by applying a provided closure.
///
/// This stream is created by the [`repeat_with`] function. See its
/// documentation for more.
///
/// [`repeat_with`]: fn.repeat_with.html
#[derive(Clone, Debug)]
pub struct RepeatWith<F> {
f: F,
}

impl<F> Unpin for RepeatWith<F> {}

/// Creates a new stream that repeats elements of type `A` endlessly by applying the provided closure.
///
/// # Examples
Expand All @@ -35,7 +28,7 @@ pin_project! {
/// use async_std::prelude::*;
/// use async_std::stream;
///
/// let s = stream::repeat_with(|| async { 1 });
/// let s = stream::repeat_with(|| 1);
///
/// pin_utils::pin_mut!(s);
///
Expand All @@ -54,48 +47,38 @@ pin_project! {
/// use async_std::prelude::*;
/// use async_std::stream;
///
/// let s = stream::repeat_with(|| async { 1u8 }).take(2);
/// let mut n = 1;
/// let s = stream::repeat_with(|| {
/// let item = n;
/// n *= 2;
/// item
/// })
/// .take(4);
///
/// pin_utils::pin_mut!(s);
///
/// assert_eq!(s.next().await, Some(1));
/// assert_eq!(s.next().await, Some(1));
/// assert_eq!(s.next().await, Some(2));
/// assert_eq!(s.next().await, Some(4));
/// assert_eq!(s.next().await, Some(8));
/// assert_eq!(s.next().await, None);
/// # })
/// ```
pub fn repeat_with<F, Fut, A>(repeater: F) -> RepeatWith<F, Fut, A>
pub fn repeat_with<T, F>(repeater: F) -> RepeatWith<F>
where
F: FnMut() -> Fut,
Fut: Future<Output = A>,
F: FnMut() -> T,
{
RepeatWith {
f: repeater,
future: None,
__a: PhantomData,
}
RepeatWith { f: repeater }
}

impl<F, Fut, A> Stream for RepeatWith<F, Fut, A>
impl<T, F> Stream for RepeatWith<F>
where
F: FnMut() -> Fut,
Fut: Future<Output = A>,
F: FnMut() -> T,
{
type Item = A;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();
loop {
if this.future.is_some() {
let res = futures_core::ready!(this.future.as_mut().as_pin_mut().unwrap().poll(cx));

this.future.set(None);

return Poll::Ready(Some(res));
} else {
let fut = (this.f)();
type Item = T;

this.future.set(Some(fut));
}
}
fn poll_next(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let item = (&mut self.f)();
Poll::Ready(Some(item))
}
}
2 changes: 1 addition & 1 deletion src/stream/stream/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::prelude::*;
use crate::task::{Context, Poll};

pin_project! {
/// Chains two streams one after another.
/// A stream that chains two streams one after another.
///
/// This `struct` is created by the [`chain`] method on [`Stream`]. See its
/// documentation for more.
Expand Down
1 change: 1 addition & 0 deletions src/stream/stream/cloned.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use pin_project_lite::pin_project;
use std::pin::Pin;

pin_project! {
/// A stream that clones the elements of an underlying stream.
#[derive(Debug)]
pub struct Cloned<S> {
#[pin]
Expand Down
4 changes: 2 additions & 2 deletions src/stream/stream/copied.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ use pin_project_lite::pin_project;
use std::pin::Pin;

pin_project! {
#[doc(hidden)]
#[allow(missing_debug_implementations)]
/// A stream that copies the elements of an underlying stream.
#[derive(Debug)]
pub struct Copied<S> {
#[pin]
stream: S,
Expand Down
Loading

0 comments on commit 76c5ffe

Please sign in to comment.