From 1db1b2353ce8b2f5aa8928d85b4e19585da97128 Mon Sep 17 00:00:00 2001 From: Yosh Date: Thu, 14 Mar 2024 14:27:23 +0100 Subject: [PATCH 1/3] add `Future::wait_until` --- Cargo.toml | 1 + src/future/futures_ext.rs | 35 ++++++++++++++++++++++ src/future/mod.rs | 2 ++ src/future/wait_until.rs | 61 +++++++++++++++++++++++++++++++++++++++ 4 files changed, 99 insertions(+) create mode 100644 src/future/wait_until.rs diff --git a/Cargo.toml b/Cargo.toml index 93d2898..7433f52 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -39,6 +39,7 @@ slab = { version = "0.4.8", optional = true } smallvec = { version = "1.11.0", optional = true } [dev-dependencies] +async-io = "2.3.2" async-std = { version = "1.12.0", features = ["attributes"] } criterion = { version = "0.3", features = [ "async", diff --git a/src/future/futures_ext.rs b/src/future/futures_ext.rs index 2dd3e73..3fc9fb2 100644 --- a/src/future/futures_ext.rs +++ b/src/future/futures_ext.rs @@ -5,6 +5,7 @@ use futures_core::Future; use super::join::tuple::Join2; use super::race::tuple::Race2; +use super::WaitUntil; /// An extension trait for the `Future` trait. pub trait FutureExt: Future { @@ -19,6 +20,40 @@ pub trait FutureExt: Future { where Self: Future + Sized, S2: IntoFuture; + + /// Delay resolving the future until the given deadline. + /// + /// The underlying future will not be polled until the deadline has expired. In addition + /// to using a time source as a deadline, any future can be used as a + /// deadline too. When used in combination with a multi-consumer channel, + /// this method can be used to synchronize the start of multiple futures and streams. + /// + /// # Example + /// + /// ``` + /// use async_io::Timer; + /// use futures_concurrency::prelude::*; + /// use futures_lite::future::block_on; + /// use std::time::{Duration, Instant}; + /// + /// block_on(async { + /// let now = Instant::now(); + /// let duration = Duration::from_millis(100); + /// + /// async { "meow" } + /// .wait_until(Timer::after(duration)) + /// .await; + /// + /// assert!(now.elapsed() >= duration); + /// }); + /// ``` + fn wait_until(self, deadline: D) -> WaitUntil + where + Self: Sized, + D: IntoFuture, + { + WaitUntil::new(self, deadline.into_future()) + } } impl FutureExt for F1 diff --git a/src/future/mod.rs b/src/future/mod.rs index 2936a23..c035d81 100644 --- a/src/future/mod.rs +++ b/src/future/mod.rs @@ -76,6 +76,7 @@ pub use join::Join; pub use race::Race; pub use race_ok::RaceOk; pub use try_join::TryJoin; +pub use wait_until::WaitUntil; /// A growable group of futures which act as a single unit. #[cfg(feature = "alloc")] @@ -86,3 +87,4 @@ pub(crate) mod join; pub(crate) mod race; pub(crate) mod race_ok; pub(crate) mod try_join; +pub(crate) mod wait_until; diff --git a/src/future/wait_until.rs b/src/future/wait_until.rs new file mode 100644 index 0000000..d41b2d3 --- /dev/null +++ b/src/future/wait_until.rs @@ -0,0 +1,61 @@ +use core::future::Future; +use core::pin::Pin; +use core::task::{ready, Context, Poll}; + +/// Suspends a future until the specified deadline. +/// +/// This `struct` is created by the [`wait_until`] method on [`FutureExt`]. See its +/// documentation for more. +/// +/// [`wait_until`]: crate::future::FutureExt::wait_until +/// [`FutureExt`]: crate::future::FutureExt +#[derive(Debug)] +#[pin_project::pin_project] +#[must_use = "futures do nothing unless polled or .awaited"] +pub struct WaitUntil { + #[pin] + future: F, + #[pin] + deadline: D, + state: State, +} + +/// The internal state +#[derive(Debug)] +enum State { + Started, + PollFuture, + Completed, +} + +impl WaitUntil { + pub(super) fn new(future: F, deadline: D) -> Self { + Self { + future, + deadline, + state: State::Started, + } + } +} + +impl Future for WaitUntil { + type Output = F::Output; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut this = self.project(); + loop { + match this.state { + State::Started => { + ready!(this.deadline.as_mut().poll(cx)); + *this.state = State::PollFuture; + } + State::PollFuture => { + let value = ready!(this.future.as_mut().poll(cx)); + *this.state = State::Completed; + return Poll::Ready(value); + } + State::Completed => panic!("future polled after completing"), + } + } + } +} From bdb873a6e979541b6fab3b94695d9d3b63c3280d Mon Sep 17 00:00:00 2001 From: Yosh Date: Thu, 14 Mar 2024 14:50:09 +0100 Subject: [PATCH 2/3] add `Stream::wait_until` --- src/stream/mod.rs | 2 ++ src/stream/stream_ext.rs | 39 ++++++++++++++++++++++++- src/stream/wait_until.rs | 63 ++++++++++++++++++++++++++++++++++++++++ 3 files changed, 103 insertions(+), 1 deletion(-) create mode 100644 src/stream/wait_until.rs diff --git a/src/stream/mod.rs b/src/stream/mod.rs index 98c6bd6..fdbf0fd 100644 --- a/src/stream/mod.rs +++ b/src/stream/mod.rs @@ -54,6 +54,7 @@ pub use stream_ext::StreamExt; #[doc(inline)] #[cfg(feature = "alloc")] pub use stream_group::StreamGroup; +pub use wait_until::WaitUntil; pub use zip::Zip; /// A growable group of streams which act as a single unit. @@ -64,4 +65,5 @@ pub(crate) mod chain; mod into_stream; pub(crate) mod merge; mod stream_ext; +pub(crate) mod wait_until; pub(crate) mod zip; diff --git a/src/stream/stream_ext.rs b/src/stream/stream_ext.rs index 6118fb8..15265cf 100644 --- a/src/stream/stream_ext.rs +++ b/src/stream/stream_ext.rs @@ -1,10 +1,12 @@ +use core::future::IntoFuture; + use crate::stream::{IntoStream, Merge}; use futures_core::Stream; #[cfg(feature = "alloc")] use crate::concurrent_stream::FromStream; -use super::{chain::tuple::Chain2, merge::tuple::Merge2, zip::tuple::Zip2, Chain, Zip}; +use super::{chain::tuple::Chain2, merge::tuple::Merge2, zip::tuple::Zip2, Chain, WaitUntil, Zip}; /// An extension trait for the `Stream` trait. pub trait StreamExt: Stream { @@ -34,6 +36,41 @@ pub trait StreamExt: Stream { { FromStream::new(self) } + + /// Delay the yielding of items from the stream until the given deadline. + /// + /// The underlying stream will not be polled until the deadline has expired. In addition + /// to using a time source as a deadline, any future can be used as a + /// deadline too. When used in combination with a multi-consumer channel, + /// this method can be used to synchronize the start of multiple streams and futures. + /// + /// # Example + /// ``` + /// use async_io::Timer; + /// use futures_concurrency::prelude::*; + /// use futures_lite::{future::block_on, stream}; + /// use futures_lite::prelude::*; + /// use std::time::{Duration, Instant}; + /// + /// block_on(async { + /// let now = Instant::now(); + /// let duration = Duration::from_millis(100); + /// + /// stream::once("meow") + /// .wait_until(Timer::after(duration)) + /// .next() + /// .await; + /// + /// assert!(now.elapsed() >= duration); + /// }); + /// ``` + fn wait_until(self, deadline: D) -> WaitUntil + where + Self: Sized, + D: IntoFuture, + { + WaitUntil::new(self, deadline.into_future()) + } } impl StreamExt for S1 diff --git a/src/stream/wait_until.rs b/src/stream/wait_until.rs new file mode 100644 index 0000000..346fbc4 --- /dev/null +++ b/src/stream/wait_until.rs @@ -0,0 +1,63 @@ +use core::future::Future; +use core::pin::Pin; +use core::task::{Context, Poll}; + +use futures_core::stream::Stream; +use pin_project::pin_project; + +/// Delay execution of a stream once for the specified duration. +/// +/// This `struct` is created by the [`wait_until`] method on [`StreamExt`]. See its +/// documentation for more. +/// +/// [`wait_until`]: crate::stream::StreamExt::wait_until +/// [`StreamExt`]: crate::stream::StreamExt +#[derive(Debug)] +#[must_use = "streams do nothing unless polled or .awaited"] +#[pin_project] +pub struct WaitUntil { + #[pin] + stream: S, + #[pin] + deadline: D, + state: State, +} + +#[derive(Debug)] +enum State { + Timer, + Streaming, +} + +impl WaitUntil { + pub(crate) fn new(stream: S, deadline: D) -> Self { + WaitUntil { + stream, + deadline, + state: State::Timer, + } + } +} + +impl Stream for WaitUntil +where + S: Stream, + D: Future, +{ + type Item = S::Item; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.project(); + + match this.state { + State::Timer => match this.deadline.poll(cx) { + Poll::Pending => return Poll::Pending, + Poll::Ready(_) => { + *this.state = State::Streaming; + this.stream.poll_next(cx) + } + }, + State::Streaming => this.stream.poll_next(cx), + } + } +} From 0b0eb0203686aaaf3fc213d86e7e4ea28cbccb47 Mon Sep 17 00:00:00 2001 From: Yosh Date: Thu, 14 Mar 2024 15:40:19 +0100 Subject: [PATCH 3/3] fix CI --- src/future/futures_ext.rs | 4 ++++ src/stream/stream_ext.rs | 4 ++++ src/stream/wait_until.rs | 2 +- 3 files changed, 9 insertions(+), 1 deletion(-) diff --git a/src/future/futures_ext.rs b/src/future/futures_ext.rs index 3fc9fb2..329fea9 100644 --- a/src/future/futures_ext.rs +++ b/src/future/futures_ext.rs @@ -31,6 +31,9 @@ pub trait FutureExt: Future { /// # Example /// /// ``` + /// # #[cfg(miri)]fn main() {} + /// # #[cfg(not(miri))] + /// # fn main() { /// use async_io::Timer; /// use futures_concurrency::prelude::*; /// use futures_lite::future::block_on; @@ -46,6 +49,7 @@ pub trait FutureExt: Future { /// /// assert!(now.elapsed() >= duration); /// }); + /// # } /// ``` fn wait_until(self, deadline: D) -> WaitUntil where diff --git a/src/stream/stream_ext.rs b/src/stream/stream_ext.rs index 15265cf..35d8397 100644 --- a/src/stream/stream_ext.rs +++ b/src/stream/stream_ext.rs @@ -46,6 +46,9 @@ pub trait StreamExt: Stream { /// /// # Example /// ``` + /// # #[cfg(miri)] fn main() {} + /// # #[cfg(not(miri))] + /// # fn main() { /// use async_io::Timer; /// use futures_concurrency::prelude::*; /// use futures_lite::{future::block_on, stream}; @@ -63,6 +66,7 @@ pub trait StreamExt: Stream { /// /// assert!(now.elapsed() >= duration); /// }); + /// # } /// ``` fn wait_until(self, deadline: D) -> WaitUntil where diff --git a/src/stream/wait_until.rs b/src/stream/wait_until.rs index 346fbc4..8731145 100644 --- a/src/stream/wait_until.rs +++ b/src/stream/wait_until.rs @@ -51,7 +51,7 @@ where match this.state { State::Timer => match this.deadline.poll(cx) { - Poll::Pending => return Poll::Pending, + Poll::Pending => Poll::Pending, Poll::Ready(_) => { *this.state = State::Streaming; this.stream.poll_next(cx)