diff --git a/src/stream/mod.rs b/src/stream/mod.rs index 98c6bd6..fdbf0fd 100644 --- a/src/stream/mod.rs +++ b/src/stream/mod.rs @@ -54,6 +54,7 @@ pub use stream_ext::StreamExt; #[doc(inline)] #[cfg(feature = "alloc")] pub use stream_group::StreamGroup; +pub use wait_until::WaitUntil; pub use zip::Zip; /// A growable group of streams which act as a single unit. @@ -64,4 +65,5 @@ pub(crate) mod chain; mod into_stream; pub(crate) mod merge; mod stream_ext; +pub(crate) mod wait_until; pub(crate) mod zip; diff --git a/src/stream/stream_ext.rs b/src/stream/stream_ext.rs index 3321735..4f74eb7 100644 --- a/src/stream/stream_ext.rs +++ b/src/stream/stream_ext.rs @@ -1,7 +1,9 @@ +use core::future::IntoFuture; + use crate::stream::{IntoStream, Merge}; use futures_core::Stream; -use super::{chain::tuple::Chain2, merge::tuple::Merge2, zip::tuple::Zip2, Chain, Zip}; +use super::{chain::tuple::Chain2, merge::tuple::Merge2, zip::tuple::Zip2, Chain, WaitUntil, Zip}; /// An extension trait for the `Stream` trait. pub trait StreamExt: Stream { @@ -22,6 +24,41 @@ pub trait StreamExt: Stream { where Self: Stream + Sized, S2: IntoStream; + + /// Delay the yielding of items from the stream until the given deadline. + /// + /// The underlying stream will not be polled until the deadline has expired. In addition + /// to using a time source as a deadline, any future can be used as a + /// deadline too. When used in combination with a multi-consumer channel, + /// this method can be used to synchronize the start of multiple streams and futures. + /// + /// # Example + /// ``` + /// use async_io::Timer; + /// use futures_concurrency::prelude::*; + /// use futures_lite::{future::block_on, stream}; + /// use futures_lite::prelude::*; + /// use std::time::{Duration, Instant}; + /// + /// block_on(async { + /// let now = Instant::now(); + /// let duration = Duration::from_millis(100); + /// + /// stream::once("meow") + /// .wait_until(Timer::after(duration)) + /// .next() + /// .await; + /// + /// assert!(now.elapsed() >= duration); + /// }); + /// ``` + fn wait_until(self, deadline: D) -> WaitUntil + where + Self: Sized, + D: IntoFuture, + { + WaitUntil::new(self, deadline.into_future()) + } } impl StreamExt for S1 diff --git a/src/stream/wait_until.rs b/src/stream/wait_until.rs new file mode 100644 index 0000000..346fbc4 --- /dev/null +++ b/src/stream/wait_until.rs @@ -0,0 +1,63 @@ +use core::future::Future; +use core::pin::Pin; +use core::task::{Context, Poll}; + +use futures_core::stream::Stream; +use pin_project::pin_project; + +/// Delay execution of a stream once for the specified duration. +/// +/// This `struct` is created by the [`wait_until`] method on [`StreamExt`]. See its +/// documentation for more. +/// +/// [`wait_until`]: crate::stream::StreamExt::wait_until +/// [`StreamExt`]: crate::stream::StreamExt +#[derive(Debug)] +#[must_use = "streams do nothing unless polled or .awaited"] +#[pin_project] +pub struct WaitUntil { + #[pin] + stream: S, + #[pin] + deadline: D, + state: State, +} + +#[derive(Debug)] +enum State { + Timer, + Streaming, +} + +impl WaitUntil { + pub(crate) fn new(stream: S, deadline: D) -> Self { + WaitUntil { + stream, + deadline, + state: State::Timer, + } + } +} + +impl Stream for WaitUntil +where + S: Stream, + D: Future, +{ + type Item = S::Item; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.project(); + + match this.state { + State::Timer => match this.deadline.poll(cx) { + Poll::Pending => return Poll::Pending, + Poll::Ready(_) => { + *this.state = State::Streaming; + this.stream.poll_next(cx) + } + }, + State::Streaming => this.stream.poll_next(cx), + } + } +}