Skip to content

Commit

Permalink
Merge pull request #167 from yoshuawuyts/futures-time
Browse files Browse the repository at this point in the history
Add `{Future,Stream}::wait_until`
  • Loading branch information
yoshuawuyts authored Apr 12, 2024
2 parents 28d7098 + 0b0eb02 commit 929ac6b
Show file tree
Hide file tree
Showing 7 changed files with 210 additions and 1 deletion.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ slab = { version = "0.4.8", optional = true }
smallvec = { version = "1.11.0", optional = true }

[dev-dependencies]
async-io = "2.3.2"
async-std = { version = "1.12.0", features = ["attributes"] }
criterion = { version = "0.3", features = [
"async",
Expand Down
39 changes: 39 additions & 0 deletions src/future/futures_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use futures_core::Future;

use super::join::tuple::Join2;
use super::race::tuple::Race2;
use super::WaitUntil;

/// An extension trait for the `Future` trait.
pub trait FutureExt: Future {
Expand All @@ -19,6 +20,44 @@ pub trait FutureExt: Future {
where
Self: Future<Output = T> + Sized,
S2: IntoFuture<Output = T>;

/// Delay resolving the future until the given deadline.
///
/// The underlying future will not be polled until the deadline has expired. In addition
/// to using a time source as a deadline, any future can be used as a
/// deadline too. When used in combination with a multi-consumer channel,
/// this method can be used to synchronize the start of multiple futures and streams.
///
/// # Example
///
/// ```
/// # #[cfg(miri)]fn main() {}
/// # #[cfg(not(miri))]
/// # fn main() {
/// use async_io::Timer;
/// use futures_concurrency::prelude::*;
/// use futures_lite::future::block_on;
/// use std::time::{Duration, Instant};
///
/// block_on(async {
/// let now = Instant::now();
/// let duration = Duration::from_millis(100);
///
/// async { "meow" }
/// .wait_until(Timer::after(duration))
/// .await;
///
/// assert!(now.elapsed() >= duration);
/// });
/// # }
/// ```
fn wait_until<D>(self, deadline: D) -> WaitUntil<Self, D::IntoFuture>
where
Self: Sized,
D: IntoFuture,
{
WaitUntil::new(self, deadline.into_future())
}
}

impl<F1> FutureExt for F1
Expand Down
2 changes: 2 additions & 0 deletions src/future/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ pub use join::Join;
pub use race::Race;
pub use race_ok::RaceOk;
pub use try_join::TryJoin;
pub use wait_until::WaitUntil;

/// A growable group of futures which act as a single unit.
#[cfg(feature = "alloc")]
Expand All @@ -86,3 +87,4 @@ pub(crate) mod join;
pub(crate) mod race;
pub(crate) mod race_ok;
pub(crate) mod try_join;
pub(crate) mod wait_until;
61 changes: 61 additions & 0 deletions src/future/wait_until.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
use core::future::Future;
use core::pin::Pin;
use core::task::{ready, Context, Poll};

/// Suspends a future until the specified deadline.
///
/// This `struct` is created by the [`wait_until`] method on [`FutureExt`]. See its
/// documentation for more.
///
/// [`wait_until`]: crate::future::FutureExt::wait_until
/// [`FutureExt`]: crate::future::FutureExt
#[derive(Debug)]
#[pin_project::pin_project]
#[must_use = "futures do nothing unless polled or .awaited"]
pub struct WaitUntil<F, D> {
#[pin]
future: F,
#[pin]
deadline: D,
state: State,
}

/// The internal state
#[derive(Debug)]
enum State {
Started,
PollFuture,
Completed,
}

impl<F, D> WaitUntil<F, D> {
pub(super) fn new(future: F, deadline: D) -> Self {
Self {
future,
deadline,
state: State::Started,
}
}
}

impl<F: Future, D: Future> Future for WaitUntil<F, D> {
type Output = F::Output;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();
loop {
match this.state {
State::Started => {
ready!(this.deadline.as_mut().poll(cx));
*this.state = State::PollFuture;
}
State::PollFuture => {
let value = ready!(this.future.as_mut().poll(cx));
*this.state = State::Completed;
return Poll::Ready(value);
}
State::Completed => panic!("future polled after completing"),
}
}
}
}
2 changes: 2 additions & 0 deletions src/stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ pub use stream_ext::StreamExt;
#[doc(inline)]
#[cfg(feature = "alloc")]
pub use stream_group::StreamGroup;
pub use wait_until::WaitUntil;
pub use zip::Zip;

/// A growable group of streams which act as a single unit.
Expand All @@ -64,4 +65,5 @@ pub(crate) mod chain;
mod into_stream;
pub(crate) mod merge;
mod stream_ext;
pub(crate) mod wait_until;
pub(crate) mod zip;
43 changes: 42 additions & 1 deletion src/stream/stream_ext.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
use core::future::IntoFuture;

use crate::stream::{IntoStream, Merge};
use futures_core::Stream;

#[cfg(feature = "alloc")]
use crate::concurrent_stream::FromStream;

use super::{chain::tuple::Chain2, merge::tuple::Merge2, zip::tuple::Zip2, Chain, Zip};
use super::{chain::tuple::Chain2, merge::tuple::Merge2, zip::tuple::Zip2, Chain, WaitUntil, Zip};

/// An extension trait for the `Stream` trait.
pub trait StreamExt: Stream {
Expand Down Expand Up @@ -34,6 +36,45 @@ pub trait StreamExt: Stream {
{
FromStream::new(self)
}

/// Delay the yielding of items from the stream until the given deadline.
///
/// The underlying stream will not be polled until the deadline has expired. In addition
/// to using a time source as a deadline, any future can be used as a
/// deadline too. When used in combination with a multi-consumer channel,
/// this method can be used to synchronize the start of multiple streams and futures.
///
/// # Example
/// ```
/// # #[cfg(miri)] fn main() {}
/// # #[cfg(not(miri))]
/// # fn main() {
/// use async_io::Timer;
/// use futures_concurrency::prelude::*;
/// use futures_lite::{future::block_on, stream};
/// use futures_lite::prelude::*;
/// use std::time::{Duration, Instant};
///
/// block_on(async {
/// let now = Instant::now();
/// let duration = Duration::from_millis(100);
///
/// stream::once("meow")
/// .wait_until(Timer::after(duration))
/// .next()
/// .await;
///
/// assert!(now.elapsed() >= duration);
/// });
/// # }
/// ```
fn wait_until<D>(self, deadline: D) -> WaitUntil<Self, D::IntoFuture>
where
Self: Sized,
D: IntoFuture,
{
WaitUntil::new(self, deadline.into_future())
}
}

impl<S1> StreamExt for S1
Expand Down
63 changes: 63 additions & 0 deletions src/stream/wait_until.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
use core::future::Future;
use core::pin::Pin;
use core::task::{Context, Poll};

use futures_core::stream::Stream;
use pin_project::pin_project;

/// Delay execution of a stream once for the specified duration.
///
/// This `struct` is created by the [`wait_until`] method on [`StreamExt`]. See its
/// documentation for more.
///
/// [`wait_until`]: crate::stream::StreamExt::wait_until
/// [`StreamExt`]: crate::stream::StreamExt
#[derive(Debug)]
#[must_use = "streams do nothing unless polled or .awaited"]
#[pin_project]
pub struct WaitUntil<S, D> {
#[pin]
stream: S,
#[pin]
deadline: D,
state: State,
}

#[derive(Debug)]
enum State {
Timer,
Streaming,
}

impl<S, D> WaitUntil<S, D> {
pub(crate) fn new(stream: S, deadline: D) -> Self {
WaitUntil {
stream,
deadline,
state: State::Timer,
}
}
}

impl<S, D> Stream for WaitUntil<S, D>
where
S: Stream,
D: Future,
{
type Item = S::Item;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();

match this.state {
State::Timer => match this.deadline.poll(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(_) => {
*this.state = State::Streaming;
this.stream.poll_next(cx)
}
},
State::Streaming => this.stream.poll_next(cx),
}
}
}

0 comments on commit 929ac6b

Please sign in to comment.