From c1bb81bc3cf218807130b1061fcaeb251a426922 Mon Sep 17 00:00:00 2001 From: Jacob Hughes Date: Tue, 1 Aug 2017 19:56:25 -0400 Subject: [PATCH] Rewrote `Stream::with_flat_map()` to work with streams --- src/sink/mod.rs | 29 ++++++++++++++++---- src/sink/with_flat_map.rs | 58 +++++++++++++++++++-------------------- tests/sink.rs | 2 +- 3 files changed, 54 insertions(+), 35 deletions(-) diff --git a/src/sink/mod.rs b/src/sink/mod.rs index f9cd2e3183..37e8cf73b8 100644 --- a/src/sink/mod.rs +++ b/src/sink/mod.rs @@ -322,15 +322,34 @@ pub trait Sink { /// This adapter produces a new sink that passes each value through the /// given function `f` before sending it to `self`. /// - /// To process each value, `f` produces an *iterator*, each value of which + /// To process each value, `f` produces a *stream*, of which each value /// is passed to the underlying sink. A new value will not be accepted until - /// the iterator has been drained + /// the stream has been drained /// /// Note that this function consumes the given sink, returning a wrapped /// version, much like `Iterator::flat_map`. - fn with_flat_map(self, f: F) -> WithFlatMap - where F: FnMut(U) -> I, - I: IntoIterator, + /// + /// # Examples + /// --- + /// Using this function with an iterator through use of the `stream::iter()` + /// function + /// + /// ``` + /// use futures::{Sink,Future,Stream}; + /// use futures::stream; + /// use futures::sync::mpsc; + /// + /// let (tx, rx) = mpsc::channel::(5); + /// + /// let tx = tx.with_flat_map(|x| { + /// stream::iter(vec![42; x].into_iter().map(|y|Ok(y))) + /// }); + /// tx.send(5).wait().unwrap(); + /// assert_eq!(rx.collect().wait(), Ok(vec![42, 42, 42, 42, 42])) + /// ``` + fn with_flat_map(self, f: F) -> WithFlatMap + where F: FnMut(U) -> St, + St: Stream, Self: Sized { with_flat_map::new(self, f) diff --git a/src/sink/with_flat_map.rs b/src/sink/with_flat_map.rs index 3c87e9a760..ec28358b76 100644 --- a/src/sink/with_flat_map.rs +++ b/src/sink/with_flat_map.rs @@ -8,39 +8,39 @@ use stream::Stream; /// to run prior to pushing a value into the underlying sink #[derive(Debug)] #[must_use = "sinks do nothing unless polled"] -pub struct WithFlatMap +pub struct WithFlatMap where S: Sink, - F: FnMut(U) -> I, - I: IntoIterator, + F: FnMut(U) -> St, + St: Stream, { sink: S, f: F, - iter: Option, + stream: Option, buffer: Option, _phantom: PhantomData, } -pub fn new(sink: S, f: F) -> WithFlatMap +pub fn new(sink: S, f: F) -> WithFlatMap where S: Sink, - F: FnMut(U) -> I, - I: IntoIterator, + F: FnMut(U) -> St, + St: Stream, { WithFlatMap { sink: sink, f: f, - iter: None, + stream: None, buffer: None, - _phantom: PhantomData + _phantom: PhantomData, } } -impl WithFlatMap +impl WithFlatMap where S: Sink, - F: FnMut(U) -> I, - I: IntoIterator, + F: FnMut(U) -> St, + St: Stream, { /// Get a shared reference to the inner sink. pub fn get_ref(&self) -> &S { @@ -60,17 +60,17 @@ where self.sink } - fn try_empty_iter(&mut self) -> Poll<(), S::SinkError> { + fn try_empty_stream(&mut self) -> Poll<(), S::SinkError> { if let Some(x) = self.buffer.take() { if let AsyncSink::NotReady(x) = try!(self.sink.start_send(x)) { self.buffer = Some(x); return Ok(Async::NotReady); } } - if let Some(mut iter) = self.iter.take() { - while let Some(x) = iter.next() { + if let Some(mut stream) = self.stream.take() { + while let Some(x) = try_ready!(stream.poll()) { if let AsyncSink::NotReady(x) = try!(self.sink.start_send(x)) { - self.iter = Some(iter); + self.stream = Some(stream); self.buffer = Some(x); return Ok(Async::NotReady); } @@ -80,11 +80,11 @@ where } } -impl Stream for WithFlatMap +impl Stream for WithFlatMap where S: Stream + Sink, - F: FnMut(U) -> I, - I: IntoIterator, + F: FnMut(U) -> St, + St: Stream, { type Item = S::Item; type Error = S::Error; @@ -93,34 +93,34 @@ where } } -impl Sink for WithFlatMap +impl Sink for WithFlatMap where S: Sink, - F: FnMut(U) -> I, - I: IntoIterator, + F: FnMut(U) -> St, + St: Stream, { type SinkItem = U; type SinkError = S::SinkError; fn start_send(&mut self, i: Self::SinkItem) -> StartSend { - if try!(self.try_empty_iter()).is_not_ready() { + if try!(self.try_empty_stream()).is_not_ready() { return Ok(AsyncSink::NotReady(i)); } - assert!(self.iter.is_none()); - self.iter = Some((self.f)(i).into_iter()); - try!(self.try_empty_iter()); + assert!(self.stream.is_none()); + self.stream = Some((self.f)(i)); + try!(self.try_empty_stream()); Ok(AsyncSink::Ready) } fn poll_complete(&mut self) -> Poll<(), Self::SinkError> { - if try!(self.try_empty_iter()).is_not_ready() { + if try!(self.try_empty_stream()).is_not_ready() { return Ok(Async::NotReady); } self.sink.poll_complete() } fn close(&mut self) -> Poll<(), Self::SinkError> { - if try!(self.try_empty_iter()).is_not_ready() { + if try!(self.try_empty_stream()).is_not_ready() { return Ok(Async::NotReady); } - assert!(self.iter.is_none()); + assert!(self.stream.is_none()); self.sink.close() } } diff --git a/tests/sink.rs b/tests/sink.rs index 7383fd8038..ad34fbdbff 100644 --- a/tests/sink.rs +++ b/tests/sink.rs @@ -171,7 +171,7 @@ fn with_as_map() { // test simple use of with_flat_map fn with_flat_map() { let sink = Vec::new().with_flat_map(|item| { - vec!(item; item) + stream::iter(vec!(item; item).into_iter().map(Ok)) }); let sink = sink.send(0).wait().unwrap(); let sink = sink.send(1).wait().unwrap();