Skip to content

Commit

Permalink
fix: make select budget-aware
Browse files Browse the repository at this point in the history
  • Loading branch information
maminrayej committed Jan 18, 2025
1 parent a82bdee commit 762526e
Show file tree
Hide file tree
Showing 5 changed files with 174 additions and 4 deletions.
4 changes: 4 additions & 0 deletions tokio/src/macros/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -524,6 +524,10 @@ doc! {macro_rules! select {
// supplied.
let start = $start;

// Return `Pending` when the task budget is depleted since budget-aware futures
// are going to yield anyway and other futures will not cooperate.
::std::task::ready!($crate::task::poll_budget_available(cx));

for i in 0..BRANCHES {
let branch;
#[allow(clippy::modulo_one)]
Expand Down
20 changes: 19 additions & 1 deletion tokio/src/task/consume_budget.rs → tokio/src/task/budget.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::task::{ready, Poll};
use std::task::{ready, Context, Poll};

/// Consumes a unit of budget and returns the execution back to the Tokio
/// runtime *if* the task's coop budget was exhausted.
Expand Down Expand Up @@ -39,3 +39,21 @@ pub async fn consume_budget() {
})
.await
}

/// Polls to see if any budget is available or not.
///
/// See also the usage example in the [task module](index.html#poll_budget_available).
///
/// This method returns:
/// - `Poll::Pending` if the budget is depleted
/// - `Poll::Ready(())` if there is still budget left
#[cfg_attr(docsrs, doc(cfg(feature = "rt")))]
pub fn poll_budget_available(cx: &mut Context<'_>) -> Poll<()> {
ready!(crate::trace::trace_leaf(cx));
if crate::runtime::coop::has_budget_remaining() {
Poll::Ready(())
} else {
cx.waker().wake_by_ref();
Poll::Pending
}
}
88 changes: 86 additions & 2 deletions tokio/src/task/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,10 +313,94 @@
//! # }
//! ```
//!
//! #### `poll_budget_available`
//!
//! Futures created by the tokio library functions are budget-aware and yield when there is no more
//! budget left, but not all futures will be budget-aware. Consider future `A` that polls two inner
//! futures `B` and `C`, and returns `Poll::Ready` when one of them is ready. If both inner futures
//! were budget-aware, at some point the budget would be depleted which would cause both futures to
//! return `Poll::Pending` resulting in `A` returning `Poll::Pending` as well. Yielding all the way
//! back to the runtime, the budget would be reset, and `B` and `C` could make progress again.
//! Now let's consider `B` is budget-aware, but `C` is not and is always ready. The budget will be
//! depleted as before, but now since `C` always returns `Poll::Ready`, `A` will always return
//! `Poll::Ready` as well. This way, `A` will not yield back to the runtime which will cause the budget
//! to stay depleted and `B` not making progress.
//!
//! In these scenarios you could use [`task::poll_budget_available`] to check whether the budget has been depleted
//! or not and act accordingly:
//! ```
//! # use std::future::{poll_fn, Future};
//! # use std::task::{ready, Context, Poll};
//! # use std::pin::{pin, Pin};
//! # use std::sync::atomic::{AtomicBool, Ordering};
//! # use tokio::task::{consume_budget, poll_budget_available};
//! #
//! # #[tokio::main]
//! # async fn main() {
//! struct Greedy;
//! struct Aware;
//! struct Combined {
//! greedy: Greedy,
//! aware: Aware,
//! }
//!
//! impl Future for Greedy {
//! type Output = ();
//!
//! fn poll(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Self::Output> {
//! Poll::Ready(())
//! }
//! }
//!
//! impl Future for Aware {
//! type Output = ();
//!
//! fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
//! pin!(consume_budget()).poll(cx)
//! }
//! }
//!
//! impl Future for Combined {
//! type Output = ();
//!
//! fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
//! let this = Pin::into_inner(self);
//!
//! ready!(poll_budget_available(cx));
//!
//! if Pin::new(&mut this.aware).poll(cx).is_ready() {
//! return Poll::Ready(());
//! } else {
//! return Pin::new(&mut this.greedy).poll(cx);
//! }
//! }
//! }
//!
//! let did_yield = AtomicBool::new(false);
//!
//! while !did_yield.load(Ordering::Relaxed) {
//! poll_fn(|cx| {
//! let combined = pin!(Combined {
//! greedy: Greedy,
//! aware: Aware,
//! });
//!
//! if combined.poll(cx).is_pending() {
//! did_yield.store(true, Ordering::Relaxed);
//! }
//!
//! Poll::Ready(())
//! })
//! .await;
//! }
//! # }
//!```
//!
//! [`task::spawn_blocking`]: crate::task::spawn_blocking
//! [`task::block_in_place`]: crate::task::block_in_place
//! [rt-multi-thread]: ../runtime/index.html#threaded-scheduler
//! [`task::yield_now`]: crate::task::yield_now()
//! [`task::poll_budget_available`]: crate::task::poll_budget_available()
//! [`thread::yield_now`]: std::thread::yield_now
//! [`task::unconstrained`]: crate::task::unconstrained()
//! [`poll`]: method@std::future::Future::poll
Expand All @@ -337,8 +421,8 @@ cfg_rt! {
mod yield_now;
pub use yield_now::yield_now;

mod consume_budget;
pub use consume_budget::consume_budget;
mod budget;
pub use budget::{consume_budget, poll_budget_available};

mod local;
pub use local::{spawn_local, LocalSet, LocalEnterGuard};
Expand Down
28 changes: 28 additions & 0 deletions tokio/tests/coop_budget.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
#![warn(rust_2018_idioms)]
#![cfg(all(feature = "full", target_os = "linux"))]

use std::future::poll_fn;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::task::Poll;
use tokio::net::UdpSocket;
use tokio::task::{consume_budget, poll_budget_available};

/// Ensure that UDP sockets have functional budgeting
///
Expand Down Expand Up @@ -76,3 +79,28 @@ async fn coop_budget_udp_send_recv() {

assert_eq!(N_ITERATIONS / (BUDGET / 2), tracker.load(Ordering::SeqCst));
}

#[tokio::test]
async fn test_poll_budget_available() {
const BUDGET: usize = 128;

// At the begining budget should be available
poll_fn(|cx| {
assert!(poll_budget_available(cx).is_ready());

Poll::Ready(())
})
.await;

// Deplete the budget
for _ in 0..BUDGET {
consume_budget().await;
}

poll_fn(|cx| {
assert!(poll_budget_available(cx).is_pending());

Poll::Ready(())
})
.await;
}
38 changes: 37 additions & 1 deletion tokio/tests/macros_select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use wasm_bindgen_test::wasm_bindgen_test as maybe_tokio_test;
#[cfg(not(all(target_family = "wasm", not(target_os = "wasi"))))]
use tokio::test as maybe_tokio_test;

use tokio::sync::oneshot;
use tokio::sync::{mpsc, oneshot};
use tokio_test::{assert_ok, assert_pending, assert_ready};

use std::future::poll_fn;
Expand Down Expand Up @@ -630,6 +630,42 @@ async fn mut_ref_patterns() {
};
}

#[maybe_tokio_test]
async fn select_is_budget_aware() {
const BUDGET: usize = 128;

let message_sent = BUDGET + 1;
let (sender, mut receiver) = mpsc::channel(message_sent);

let jh = tokio::spawn(async move {
for n in 0usize..message_sent {
sender.send(n).await.unwrap();
}
});

// Wait for all messages to be sent
jh.await.unwrap();

let mut message_received = 0;
loop {
tokio::select! {
// Poll the `receiver` first in order to deplete the budget
biased;

msg = receiver.recv() => {
if msg.is_some() {
message_received += 1
} else {
break;
}
},
() = std::future::ready(()) => {}
}
}

assert_eq!(message_sent, message_received);
}

#[cfg(tokio_unstable)]
mod unstable {
use tokio::runtime::RngSeed;
Expand Down

0 comments on commit 762526e

Please sign in to comment.