From 59eca5d746c370d113c20cbeeaba0c335c8be8e0 Mon Sep 17 00:00:00 2001 From: "M.Amin Rayej" Date: Wed, 22 Jan 2025 02:36:49 +0330 Subject: [PATCH] expose coop module --- tokio/src/macros/cfg.rs | 14 +++++ tokio/src/runtime/coop.rs | 103 +++++++++++++++++++++++++++++++++---- tokio/src/runtime/mod.rs | 7 ++- tokio/tests/coop_budget.rs | 17 ++++++ 4 files changed, 131 insertions(+), 10 deletions(-) diff --git a/tokio/src/macros/cfg.rs b/tokio/src/macros/cfg.rs index 3242d3ce2ea..eadc9b6676e 100644 --- a/tokio/src/macros/cfg.rs +++ b/tokio/src/macros/cfg.rs @@ -549,6 +549,20 @@ macro_rules! cfg_not_coop { } } +macro_rules! cfg_pub_if_rt { + ($($(#[$meta:meta])* fn $($inner:tt)*)*) => { + $( + $(#[$meta])* + #[cfg(feature = "rt")] + pub fn $($inner)* + + $(#[$meta])* + #[cfg(not(feature = "rt"))] + pub(crate) fn $($inner)* + )* + } +} + macro_rules! cfg_has_atomic_u64 { ($($item:item)*) => { $( diff --git a/tokio/src/runtime/coop.rs b/tokio/src/runtime/coop.rs index c01e5e3de8b..6023bbe7967 100644 --- a/tokio/src/runtime/coop.rs +++ b/tokio/src/runtime/coop.rs @@ -1,10 +1,8 @@ #![cfg_attr(not(feature = "full"), allow(dead_code))] -//! Yield points for improved cooperative scheduling. +//! Utilities for improved cooperative scheduling. //! -//! Documentation for this can be found in the [`tokio::task`] module. -//! -//! [`tokio::task`]: crate::task. +//! See the "Cooperative scheduling" section in the [task](crate::task#cooperative-scheduling) module. // ```ignore // # use tokio_stream::{Stream, StreamExt}; @@ -107,11 +105,98 @@ fn with_budget(budget: Budget, f: impl FnOnce() -> R) -> R { f() } -#[inline(always)] -pub(crate) fn has_budget_remaining() -> bool { - // If the current budget cannot be accessed due to the thread-local being - // shutdown, then we assume there is budget remaining. - context::budget(|cell| cell.get().has_remaining()).unwrap_or(true) +cfg_pub_if_rt! { + /// Returns `true` if there is still budget left on the task. + /// + /// Futures created by the tokio library functions are budget-aware and yield when there is no more + /// budget left, but not all futures will be budget-aware. Consider future `A` that polls two inner + /// futures `B` and `C`, and returns `Poll::Ready` when one of them is ready. If both inner futures + /// were budget-aware, at some point the budget would be depleted which would cause both futures to + /// return `Poll::Pending` resulting in `A` returning `Poll::Pending` as well. Yielding all the way + /// back to the runtime, the budget would be reset, and `B` and `C` could make progress again. + /// Now let's consider `B` is budget-aware, but `C` is not and is always ready. The budget will be + /// depleted as before, but now since `C` always returns `Poll::Ready`, `A` will always return + /// `Poll::Ready` as well. This way, `A` will not yield back to the runtime which will keep the budget + /// depleted and `B` not making progress. + /// + /// In these scenarios you could use [`has_budget_remaining`] to check whether the budget has been depleted + /// or not and act accordingly: + /// ``` + /// # use std::future::{poll_fn, Future}; + /// # use std::task::{Context, Poll}; + /// # use std::pin::{pin, Pin}; + /// # use std::sync::atomic::{AtomicBool, Ordering}; + /// # use tokio::task::consume_budget; + /// # use tokio::runtime::coop::has_budget_remaining; + /// # + /// # #[tokio::main] + /// # async fn main() { + /// struct Greedy; + /// struct Aware; + /// struct Combined { + /// greedy: Greedy, + /// aware: Aware, + /// } + /// + /// impl Future for Greedy { + /// type Output = (); + /// + /// fn poll(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll { + /// Poll::Ready(()) + /// } + /// } + /// + /// impl Future for Aware { + /// type Output = (); + /// + /// fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + /// pin!(consume_budget()).poll(cx) + /// } + /// } + /// + /// impl Future for Combined { + /// type Output = (); + /// + /// fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + /// let this = Pin::into_inner(self); + /// + /// if !has_budget_remaining() { + /// return Poll::Pending; + /// } + /// + /// if Pin::new(&mut this.aware).poll(cx).is_ready() { + /// return Poll::Ready(()); + /// } else { + /// return Pin::new(&mut this.greedy).poll(cx); + /// } + /// } + /// } + /// + /// let did_yield = AtomicBool::new(false); + /// + /// while !did_yield.load(Ordering::Relaxed) { + /// poll_fn(|cx| { + /// let combined = pin!(Combined { + /// greedy: Greedy, + /// aware: Aware, + /// }); + /// + /// if combined.poll(cx).is_pending() { + /// did_yield.store(true, Ordering::Relaxed); + /// } + /// + /// Poll::Ready(()) + /// }) + /// .await; + /// } + /// # } + ///``` + #[inline(always)] + fn has_budget_remaining() -> bool { + // If the current budget cannot be accessed due to the thread-local being + // shutdown, then we assume there is budget remaining. + context::budget(|cell| cell.get().has_remaining()).unwrap_or(true) + } } cfg_rt_multi_thread! { diff --git a/tokio/src/runtime/mod.rs b/tokio/src/runtime/mod.rs index bc00bc0810d..a87299f7089 100644 --- a/tokio/src/runtime/mod.rs +++ b/tokio/src/runtime/mod.rs @@ -321,7 +321,12 @@ mod tests; pub(crate) mod context; -pub(crate) mod coop; +cfg_rt! { + pub mod coop; +} +cfg_not_rt! { + pub(crate) mod coop; +} pub(crate) mod park; diff --git a/tokio/tests/coop_budget.rs b/tokio/tests/coop_budget.rs index 3aaf9db21d6..04ba753e796 100644 --- a/tokio/tests/coop_budget.rs +++ b/tokio/tests/coop_budget.rs @@ -4,6 +4,8 @@ use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; use tokio::net::UdpSocket; +use tokio::runtime::coop::has_budget_remaining; +use tokio::task::consume_budget; /// Ensure that UDP sockets have functional budgeting /// @@ -76,3 +78,18 @@ async fn coop_budget_udp_send_recv() { assert_eq!(N_ITERATIONS / (BUDGET / 2), tracker.load(Ordering::SeqCst)); } + +#[tokio::test] +async fn test_has_budget_remaining() { + const BUDGET: usize = 128; + + // At the begining budget should be available + assert!(has_budget_remaining()); + + // Deplete the budget + for _ in 0..BUDGET { + consume_budget().await; + } + + assert!(!has_budget_remaining()); +}