Skip to content

Commit

Permalink
expose coop module
Browse files Browse the repository at this point in the history
  • Loading branch information
maminrayej committed Jan 21, 2025
1 parent 21a13f9 commit 59eca5d
Show file tree
Hide file tree
Showing 4 changed files with 131 additions and 10 deletions.
14 changes: 14 additions & 0 deletions tokio/src/macros/cfg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -549,6 +549,20 @@ macro_rules! cfg_not_coop {
}
}

macro_rules! cfg_pub_if_rt {
($($(#[$meta:meta])* fn $($inner:tt)*)*) => {
$(
$(#[$meta])*
#[cfg(feature = "rt")]
pub fn $($inner)*

$(#[$meta])*
#[cfg(not(feature = "rt"))]
pub(crate) fn $($inner)*
)*
}
}

macro_rules! cfg_has_atomic_u64 {
($($item:item)*) => {
$(
Expand Down
103 changes: 94 additions & 9 deletions tokio/src/runtime/coop.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
#![cfg_attr(not(feature = "full"), allow(dead_code))]

//! Yield points for improved cooperative scheduling.
//! Utilities for improved cooperative scheduling.
//!
//! Documentation for this can be found in the [`tokio::task`] module.
//!
//! [`tokio::task`]: crate::task.
//! See the "Cooperative scheduling" section in the [task](crate::task#cooperative-scheduling) module.
// ```ignore
// # use tokio_stream::{Stream, StreamExt};
Expand Down Expand Up @@ -107,11 +105,98 @@ fn with_budget<R>(budget: Budget, f: impl FnOnce() -> R) -> R {
f()
}

#[inline(always)]
pub(crate) fn has_budget_remaining() -> bool {
// If the current budget cannot be accessed due to the thread-local being
// shutdown, then we assume there is budget remaining.
context::budget(|cell| cell.get().has_remaining()).unwrap_or(true)
cfg_pub_if_rt! {
/// Returns `true` if there is still budget left on the task.
///
/// Futures created by the tokio library functions are budget-aware and yield when there is no more
/// budget left, but not all futures will be budget-aware. Consider future `A` that polls two inner
/// futures `B` and `C`, and returns `Poll::Ready` when one of them is ready. If both inner futures
/// were budget-aware, at some point the budget would be depleted which would cause both futures to
/// return `Poll::Pending` resulting in `A` returning `Poll::Pending` as well. Yielding all the way
/// back to the runtime, the budget would be reset, and `B` and `C` could make progress again.
/// Now let's consider `B` is budget-aware, but `C` is not and is always ready. The budget will be
/// depleted as before, but now since `C` always returns `Poll::Ready`, `A` will always return
/// `Poll::Ready` as well. This way, `A` will not yield back to the runtime which will keep the budget
/// depleted and `B` not making progress.
///
/// In these scenarios you could use [`has_budget_remaining`] to check whether the budget has been depleted
/// or not and act accordingly:
/// ```
/// # use std::future::{poll_fn, Future};
/// # use std::task::{Context, Poll};
/// # use std::pin::{pin, Pin};
/// # use std::sync::atomic::{AtomicBool, Ordering};
/// # use tokio::task::consume_budget;
/// # use tokio::runtime::coop::has_budget_remaining;
/// #
/// # #[tokio::main]
/// # async fn main() {
/// struct Greedy;
/// struct Aware;
/// struct Combined {
/// greedy: Greedy,
/// aware: Aware,
/// }
///
/// impl Future for Greedy {
/// type Output = ();
///
/// fn poll(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Self::Output> {
/// Poll::Ready(())
/// }
/// }
///
/// impl Future for Aware {
/// type Output = ();
///
/// fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
/// pin!(consume_budget()).poll(cx)
/// }
/// }
///
/// impl Future for Combined {
/// type Output = ();
///
/// fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
/// let this = Pin::into_inner(self);
///
/// if !has_budget_remaining() {
/// return Poll::Pending;
/// }
///
/// if Pin::new(&mut this.aware).poll(cx).is_ready() {
/// return Poll::Ready(());
/// } else {
/// return Pin::new(&mut this.greedy).poll(cx);
/// }
/// }
/// }
///
/// let did_yield = AtomicBool::new(false);
///
/// while !did_yield.load(Ordering::Relaxed) {
/// poll_fn(|cx| {
/// let combined = pin!(Combined {
/// greedy: Greedy,
/// aware: Aware,
/// });
///
/// if combined.poll(cx).is_pending() {
/// did_yield.store(true, Ordering::Relaxed);
/// }
///
/// Poll::Ready(())
/// })
/// .await;
/// }
/// # }
///```
#[inline(always)]
fn has_budget_remaining() -> bool {
// If the current budget cannot be accessed due to the thread-local being
// shutdown, then we assume there is budget remaining.
context::budget(|cell| cell.get().has_remaining()).unwrap_or(true)
}
}

cfg_rt_multi_thread! {
Expand Down
7 changes: 6 additions & 1 deletion tokio/src/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,12 @@ mod tests;

pub(crate) mod context;

pub(crate) mod coop;
cfg_rt! {
pub mod coop;
}
cfg_not_rt! {
pub(crate) mod coop;
}

pub(crate) mod park;

Expand Down
17 changes: 17 additions & 0 deletions tokio/tests/coop_budget.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use tokio::net::UdpSocket;
use tokio::runtime::coop::has_budget_remaining;
use tokio::task::consume_budget;

/// Ensure that UDP sockets have functional budgeting
///
Expand Down Expand Up @@ -76,3 +78,18 @@ async fn coop_budget_udp_send_recv() {

assert_eq!(N_ITERATIONS / (BUDGET / 2), tracker.load(Ordering::SeqCst));
}

#[tokio::test]
async fn test_has_budget_remaining() {
const BUDGET: usize = 128;

// At the begining budget should be available
assert!(has_budget_remaining());

// Deplete the budget
for _ in 0..BUDGET {
consume_budget().await;
}

assert!(!has_budget_remaining());
}

0 comments on commit 59eca5d

Please sign in to comment.