diff --git a/src/stream/chain/array.rs b/src/stream/chain/array.rs index e69de29..39f92d2 100644 --- a/src/stream/chain/array.rs +++ b/src/stream/chain/array.rs @@ -0,0 +1,101 @@ +use core::fmt; +use core::pin::Pin; +use core::task::{Context, Poll}; + +use futures_core::Stream; +use pin_project::pin_project; + +use crate::utils; + +use super::Chain as ChainTrait; + +/// A stream that chains multiple streams one after another. +/// +/// This `struct` is created by the [`chain`] method on the [`Chain`] trait. See its +/// documentation for more. +/// +/// [`chain`]: trait.Chain.html#method.merge +/// [`Chain`]: trait.Chain.html +#[pin_project] +pub struct Chain { + #[pin] + streams: [S; N], + index: usize, + len: usize, + done: bool, +} + +impl Stream for Chain { + type Item = S::Item; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut this = self.project(); + + assert!(!*this.done, "Stream should not be polled after completion"); + + loop { + if this.index == this.len { + *this.done = true; + return Poll::Ready(None); + } + let stream = utils::iter_pin_mut(this.streams.as_mut()) + .nth(*this.index) + .unwrap(); + match stream.poll_next(cx) { + Poll::Ready(Some(item)) => return Poll::Ready(Some(item)), + Poll::Ready(None) => { + *this.index += 1; + continue; + } + Poll::Pending => return Poll::Pending, + } + } + } +} + +impl fmt::Debug for Chain +where + S: Stream + fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_list().entries(self.streams.iter()).finish() + } +} + +impl ChainTrait for [S; N] { + type Item = S::Item; + + type Stream = Chain; + + fn chain(self) -> Self::Stream { + Chain { + len: self.len(), + streams: self, + index: 0, + done: false, + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use futures_lite::future::block_on; + use futures_lite::prelude::*; + use futures_lite::stream; + + #[test] + fn chain_3() { + block_on(async { + let a = stream::once(1); + let b = stream::once(2); + let c = stream::once(3); + let mut s = [a, b, c].chain(); + + assert_eq!(s.next().await, Some(1)); + assert_eq!(s.next().await, Some(2)); + assert_eq!(s.next().await, Some(3)); + assert_eq!(s.next().await, None); + }) + } +} diff --git a/src/stream/chain/vec.rs b/src/stream/chain/vec.rs index 85a1c79..76ce0f6 100644 --- a/src/stream/chain/vec.rs +++ b/src/stream/chain/vec.rs @@ -1,3 +1,4 @@ +use core::fmt; use core::pin::Pin; use core::task::{Context, Poll}; @@ -8,15 +9,14 @@ use crate::utils; use super::Chain as ChainTrait; -/// A stream that chains two streams one after another. +/// A stream that chains multiple streams one after another. /// -/// This `struct` is created by the [`chain`] method on [`Stream`]. See its +/// This `struct` is created by the [`chain`] method on the [`Chain`] trait. See its /// documentation for more. /// -/// [`chain`]: trait.Stream.html#method.chain -/// [`Stream`]: trait.Stream.html +/// [`chain`]: trait.Chain.html#method.merge +/// [`Chain`]: trait.Chain.html #[pin_project] -#[derive(Debug)] pub struct Chain { #[pin] streams: Vec, @@ -53,6 +53,15 @@ impl Stream for Chain { } } +impl fmt::Debug for Chain +where + S: Stream + fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_list().entries(self.streams.iter()).finish() + } +} + impl ChainTrait for Vec { type Item = S::Item;