Skip to content

Commit

Permalink
Rewrote Stream::with_flat_map() to work with streams
Browse files Browse the repository at this point in the history
  • Loading branch information
Jacob Hughes committed Aug 1, 2017
1 parent 946d4e1 commit c1bb81b
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 35 deletions.
29 changes: 24 additions & 5 deletions src/sink/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -322,15 +322,34 @@ pub trait Sink {
/// This adapter produces a new sink that passes each value through the
/// given function `f` before sending it to `self`.
///
/// To process each value, `f` produces an *iterator*, each value of which
/// To process each value, `f` produces a *stream*, of which each value
/// is passed to the underlying sink. A new value will not be accepted until
/// the iterator has been drained
/// the stream has been drained
///
/// Note that this function consumes the given sink, returning a wrapped
/// version, much like `Iterator::flat_map`.
fn with_flat_map<U, F, I>(self, f: F) -> WithFlatMap<Self, U, F, I>
where F: FnMut(U) -> I,
I: IntoIterator<Item = Self::SinkItem>,
///
/// # Examples
/// ---
/// Using this function with an iterator through use of the `stream::iter()`
/// function
///
/// ```
/// use futures::{Sink,Future,Stream};
/// use futures::stream;
/// use futures::sync::mpsc;
///
/// let (tx, rx) = mpsc::channel::<i32>(5);
///
/// let tx = tx.with_flat_map(|x| {
/// stream::iter(vec![42; x].into_iter().map(|y|Ok(y)))
/// });
/// tx.send(5).wait().unwrap();
/// assert_eq!(rx.collect().wait(), Ok(vec![42, 42, 42, 42, 42]))
/// ```
fn with_flat_map<U, F, St>(self, f: F) -> WithFlatMap<Self, U, F, St>
where F: FnMut(U) -> St,
St: Stream<Item = Self::SinkItem, Error=Self::SinkError>,
Self: Sized
{
with_flat_map::new(self, f)
Expand Down
58 changes: 29 additions & 29 deletions src/sink/with_flat_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,39 +8,39 @@ use stream::Stream;
/// to run prior to pushing a value into the underlying sink
#[derive(Debug)]
#[must_use = "sinks do nothing unless polled"]
pub struct WithFlatMap<S, U, F, I>
pub struct WithFlatMap<S, U, F, St>
where
S: Sink,
F: FnMut(U) -> I,
I: IntoIterator<Item = S::SinkItem>,
F: FnMut(U) -> St,
St: Stream<Item = S::SinkItem, Error=S::SinkError>,
{
sink: S,
f: F,
iter: Option<I::IntoIter>,
stream: Option<St>,
buffer: Option<S::SinkItem>,
_phantom: PhantomData<fn(U)>,
}

pub fn new<S, U, F, I>(sink: S, f: F) -> WithFlatMap<S, U, F, I>
pub fn new<S, U, F, St>(sink: S, f: F) -> WithFlatMap<S, U, F, St>
where
S: Sink,
F: FnMut(U) -> I,
I: IntoIterator<Item = S::SinkItem>,
F: FnMut(U) -> St,
St: Stream<Item = S::SinkItem, Error=S::SinkError>,
{
WithFlatMap {
sink: sink,
f: f,
iter: None,
stream: None,
buffer: None,
_phantom: PhantomData
_phantom: PhantomData,
}
}

impl<S, U, F, I> WithFlatMap<S, U, F, I>
impl<S, U, F, St> WithFlatMap<S, U, F, St>
where
S: Sink,
F: FnMut(U) -> I,
I: IntoIterator<Item = S::SinkItem>,
F: FnMut(U) -> St,
St: Stream<Item = S::SinkItem, Error=S::SinkError>,
{
/// Get a shared reference to the inner sink.
pub fn get_ref(&self) -> &S {
Expand All @@ -60,17 +60,17 @@ where
self.sink
}

fn try_empty_iter(&mut self) -> Poll<(), S::SinkError> {
fn try_empty_stream(&mut self) -> Poll<(), S::SinkError> {
if let Some(x) = self.buffer.take() {
if let AsyncSink::NotReady(x) = try!(self.sink.start_send(x)) {
self.buffer = Some(x);
return Ok(Async::NotReady);
}
}
if let Some(mut iter) = self.iter.take() {
while let Some(x) = iter.next() {
if let Some(mut stream) = self.stream.take() {
while let Some(x) = try_ready!(stream.poll()) {
if let AsyncSink::NotReady(x) = try!(self.sink.start_send(x)) {
self.iter = Some(iter);
self.stream = Some(stream);
self.buffer = Some(x);
return Ok(Async::NotReady);
}
Expand All @@ -80,11 +80,11 @@ where
}
}

impl<S, U, F, I> Stream for WithFlatMap<S, U, F, I>
impl<S, U, F, St> Stream for WithFlatMap<S, U, F, St>
where
S: Stream + Sink,
F: FnMut(U) -> I,
I: IntoIterator<Item = S::SinkItem>,
F: FnMut(U) -> St,
St: Stream<Item = S::SinkItem, Error=S::SinkError>,
{
type Item = S::Item;
type Error = S::Error;
Expand All @@ -93,34 +93,34 @@ where
}
}

impl<S, U, F, I> Sink for WithFlatMap<S, U, F, I>
impl<S, U, F, St> Sink for WithFlatMap<S, U, F, St>
where
S: Sink,
F: FnMut(U) -> I,
I: IntoIterator<Item = S::SinkItem>,
F: FnMut(U) -> St,
St: Stream<Item = S::SinkItem, Error=S::SinkError>,
{
type SinkItem = U;
type SinkError = S::SinkError;
fn start_send(&mut self, i: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> {
if try!(self.try_empty_iter()).is_not_ready() {
if try!(self.try_empty_stream()).is_not_ready() {
return Ok(AsyncSink::NotReady(i));
}
assert!(self.iter.is_none());
self.iter = Some((self.f)(i).into_iter());
try!(self.try_empty_iter());
assert!(self.stream.is_none());
self.stream = Some((self.f)(i));
try!(self.try_empty_stream());
Ok(AsyncSink::Ready)
}
fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
if try!(self.try_empty_iter()).is_not_ready() {
if try!(self.try_empty_stream()).is_not_ready() {
return Ok(Async::NotReady);
}
self.sink.poll_complete()
}
fn close(&mut self) -> Poll<(), Self::SinkError> {
if try!(self.try_empty_iter()).is_not_ready() {
if try!(self.try_empty_stream()).is_not_ready() {
return Ok(Async::NotReady);
}
assert!(self.iter.is_none());
assert!(self.stream.is_none());
self.sink.close()
}
}
2 changes: 1 addition & 1 deletion tests/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ fn with_as_map() {
// test simple use of with_flat_map
fn with_flat_map() {
let sink = Vec::new().with_flat_map(|item| {
vec!(item; item)
stream::iter(vec!(item; item).into_iter().map(Ok))
});
let sink = sink.send(0).wait().unwrap();
let sink = sink.send(1).wait().unwrap();
Expand Down

0 comments on commit c1bb81b

Please sign in to comment.