Skip to content

Commit

Permalink
add Stream::wait_until
Browse files Browse the repository at this point in the history
  • Loading branch information
yoshuawuyts committed Mar 14, 2024
1 parent 739b4d1 commit 58c1b28
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 1 deletion.
2 changes: 2 additions & 0 deletions src/stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ pub use stream_ext::StreamExt;
#[doc(inline)]
#[cfg(feature = "alloc")]
pub use stream_group::StreamGroup;
pub use wait_until::WaitUntil;
pub use zip::Zip;

/// A growable group of streams which act as a single unit.
Expand All @@ -64,4 +65,5 @@ pub(crate) mod chain;
mod into_stream;
pub(crate) mod merge;
mod stream_ext;
pub(crate) mod wait_until;
pub(crate) mod zip;
39 changes: 38 additions & 1 deletion src/stream/stream_ext.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use core::future::IntoFuture;

use crate::stream::{IntoStream, Merge};
use futures_core::Stream;

use super::{chain::tuple::Chain2, merge::tuple::Merge2, zip::tuple::Zip2, Chain, Zip};
use super::{chain::tuple::Chain2, merge::tuple::Merge2, zip::tuple::Zip2, Chain, WaitUntil, Zip};

/// An extension trait for the `Stream` trait.
pub trait StreamExt: Stream {
Expand All @@ -22,6 +24,41 @@ pub trait StreamExt: Stream {
where
Self: Stream<Item = T> + Sized,
S2: IntoStream<Item = T>;

/// Delay the yielding of items from the stream until the given deadline.
///
/// The underlying stream will not be polled until the deadline has expired. In addition
/// to using a time source as a deadline, any future can be used as a
/// deadline too. When used in combination with a multi-consumer channel,
/// this method can be used to synchronize the start of multiple streams and futures.
///
/// # Example
/// ```
/// use async_io::Timer;
/// use futures_concurrency::prelude::*;
/// use futures_lite::{future::block_on, stream};
/// use futures_lite::prelude::*;
/// use std::time::{Duration, Instant};
///
/// block_on(async {
/// let now = Instant::now();
/// let duration = Duration::from_millis(100);
///
/// stream::once("meow")
/// .wait_until(Timer::after(duration))
/// .next()
/// .await;
///
/// assert!(now.elapsed() >= duration);
/// });
/// ```
fn wait_until<D>(self, deadline: D) -> WaitUntil<Self, D::IntoFuture>
where
Self: Sized,
D: IntoFuture,
{
WaitUntil::new(self, deadline.into_future())
}
}

impl<S1> StreamExt for S1
Expand Down
63 changes: 63 additions & 0 deletions src/stream/wait_until.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
use core::future::Future;
use core::pin::Pin;
use core::task::{Context, Poll};

use futures_core::stream::Stream;
use pin_project::pin_project;

/// Delay execution of a stream once for the specified duration.
///
/// This `struct` is created by the [`wait_until`] method on [`StreamExt`]. See its
/// documentation for more.
///
/// [`wait_until`]: crate::stream::StreamExt::wait_until
/// [`StreamExt`]: crate::stream::StreamExt
#[derive(Debug)]
#[must_use = "streams do nothing unless polled or .awaited"]
#[pin_project]
pub struct WaitUntil<S, D> {
#[pin]
stream: S,
#[pin]
deadline: D,
state: State,
}

#[derive(Debug)]
enum State {
Timer,
Streaming,
}

impl<S, D> WaitUntil<S, D> {
pub(crate) fn new(stream: S, deadline: D) -> Self {
WaitUntil {
stream,
deadline,
state: State::Timer,
}
}
}

impl<S, D> Stream for WaitUntil<S, D>
where
S: Stream,
D: Future,
{
type Item = S::Item;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();

match this.state {
State::Timer => match this.deadline.poll(cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(_) => {
*this.state = State::Streaming;
this.stream.poll_next(cx)
}
},
State::Streaming => this.stream.poll_next(cx),
}
}
}

0 comments on commit 58c1b28

Please sign in to comment.