diff --git a/tokio/src/io/util/copy.rs b/tokio/src/io/util/copy.rs index 87592608bed..f2dbd706752 100644 --- a/tokio/src/io/util/copy.rs +++ b/tokio/src/io/util/copy.rs @@ -94,7 +94,7 @@ impl CopyBuffer { feature = "time", ))] // Keep track of task budget - let coop = ready!(crate::runtime::coop::poll_proceed(cx)); + let coop = ready!(crate::task::coop::poll_proceed(cx)); loop { // If there is some space left in our buffer, then we try to read some // data to continue, thus maximizing the chances of a large write. diff --git a/tokio/src/io/util/mem.rs b/tokio/src/io/util/mem.rs index bb5ab1e39e2..26d753878d2 100644 --- a/tokio/src/io/util/mem.rs +++ b/tokio/src/io/util/mem.rs @@ -332,7 +332,7 @@ impl AsyncRead for SimplexStream { buf: &mut ReadBuf<'_>, ) -> Poll> { ready!(crate::trace::trace_leaf(cx)); - let coop = ready!(crate::runtime::coop::poll_proceed(cx)); + let coop = ready!(crate::task::coop::poll_proceed(cx)); let ret = self.poll_read_internal(cx, buf); if ret.is_ready() { @@ -362,7 +362,7 @@ impl AsyncWrite for SimplexStream { buf: &[u8], ) -> Poll> { ready!(crate::trace::trace_leaf(cx)); - let coop = ready!(crate::runtime::coop::poll_proceed(cx)); + let coop = ready!(crate::task::coop::poll_proceed(cx)); let ret = self.poll_write_internal(cx, buf); if ret.is_ready() { @@ -390,7 +390,7 @@ impl AsyncWrite for SimplexStream { bufs: &[std::io::IoSlice<'_>], ) -> Poll> { ready!(crate::trace::trace_leaf(cx)); - let coop = ready!(crate::runtime::coop::poll_proceed(cx)); + let coop = ready!(crate::task::coop::poll_proceed(cx)); let ret = self.poll_write_vectored_internal(cx, bufs); if ret.is_ready() { diff --git a/tokio/src/io/util/mod.rs b/tokio/src/io/util/mod.rs index 44556e867ca..e8658b4326b 100644 --- a/tokio/src/io/util/mod.rs +++ b/tokio/src/io/util/mod.rs @@ -88,7 +88,7 @@ cfg_io_util! { cfg_coop! { fn poll_proceed_and_make_progress(cx: &mut std::task::Context<'_>) -> std::task::Poll<()> { - let coop = std::task::ready!(crate::runtime::coop::poll_proceed(cx)); + let coop = std::task::ready!(crate::task::coop::poll_proceed(cx)); coop.made_progress(); std::task::Poll::Ready(()) } diff --git a/tokio/src/process/mod.rs b/tokio/src/process/mod.rs index 565795ac4e6..5a2aa8c9040 100644 --- a/tokio/src/process/mod.rs +++ b/tokio/src/process/mod.rs @@ -1048,7 +1048,7 @@ where fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { ready!(crate::trace::trace_leaf(cx)); // Keep track of task budget - let coop = ready!(crate::runtime::coop::poll_proceed(cx)); + let coop = ready!(crate::task::coop::poll_proceed(cx)); let ret = Pin::new(&mut self.inner).poll(cx); diff --git a/tokio/src/runtime/blocking/task.rs b/tokio/src/runtime/blocking/task.rs index c4461754005..7af08865592 100644 --- a/tokio/src/runtime/blocking/task.rs +++ b/tokio/src/runtime/blocking/task.rs @@ -37,7 +37,7 @@ where // currently goes through Task::poll(), and so is subject to budgeting. That isn't really // what we want; a blocking task may itself want to run tasks (it might be a Worker!), so // we want it to start without any budgeting. - crate::runtime::coop::stop(); + crate::task::coop::stop(); Poll::Ready(func()) } diff --git a/tokio/src/runtime/context.rs b/tokio/src/runtime/context.rs index 12f6edf1321..e8f17bb374a 100644 --- a/tokio/src/runtime/context.rs +++ b/tokio/src/runtime/context.rs @@ -1,5 +1,5 @@ use crate::loom::thread::AccessError; -use crate::runtime::coop; +use crate::task::coop; use std::cell::Cell; @@ -135,7 +135,7 @@ pub(crate) fn thread_rng_n(n: u32) -> u32 { }) } -pub(super) fn budget(f: impl FnOnce(&Cell) -> R) -> Result { +pub(crate) fn budget(f: impl FnOnce(&Cell) -> R) -> Result { CONTEXT.try_with(|ctx| f(&ctx.budget)) } diff --git a/tokio/src/runtime/context/blocking.rs b/tokio/src/runtime/context/blocking.rs index 39e14937ef7..a4d6ce73de3 100644 --- a/tokio/src/runtime/context/blocking.rs +++ b/tokio/src/runtime/context/blocking.rs @@ -87,7 +87,7 @@ impl BlockingRegionGuard { let when = Instant::now() + timeout; loop { - if let Ready(v) = crate::runtime::coop::budget(|| f.as_mut().poll(&mut cx)) { + if let Ready(v) = crate::task::coop::budget(|| f.as_mut().poll(&mut cx)) { return Ok(v); } diff --git a/tokio/src/runtime/io/registration.rs b/tokio/src/runtime/io/registration.rs index 16e79e82515..c6e4e32cb71 100644 --- a/tokio/src/runtime/io/registration.rs +++ b/tokio/src/runtime/io/registration.rs @@ -148,7 +148,7 @@ impl Registration { ) -> Poll> { ready!(crate::trace::trace_leaf(cx)); // Keep track of task budget - let coop = ready!(crate::runtime::coop::poll_proceed(cx)); + let coop = ready!(crate::task::coop::poll_proceed(cx)); let ev = ready!(self.shared.poll_readiness(cx, direction)); if ev.is_shutdown { @@ -219,7 +219,7 @@ impl Registration { loop { let event = self.readiness(interest).await?; - let coop = std::future::poll_fn(crate::runtime::coop::poll_proceed).await; + let coop = std::future::poll_fn(crate::task::coop::poll_proceed).await; match f() { Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { diff --git a/tokio/src/runtime/mod.rs b/tokio/src/runtime/mod.rs index bc00bc0810d..78a0114f48e 100644 --- a/tokio/src/runtime/mod.rs +++ b/tokio/src/runtime/mod.rs @@ -310,7 +310,7 @@ //! [`event_interval`]: crate::runtime::Builder::event_interval //! [`disable_lifo_slot`]: crate::runtime::Builder::disable_lifo_slot //! [the lifo slot optimization]: crate::runtime::Builder::disable_lifo_slot -//! [coop budget]: crate::task#cooperative-scheduling +//! [coop budget]: crate::task::coop#cooperative-scheduling //! [`worker_mean_poll_time`]: crate::runtime::RuntimeMetrics::worker_mean_poll_time // At the top due to macros @@ -321,8 +321,6 @@ mod tests; pub(crate) mod context; -pub(crate) mod coop; - pub(crate) mod park; mod driver; diff --git a/tokio/src/runtime/park.rs b/tokio/src/runtime/park.rs index c5c8b1307d0..08d3e719bc4 100644 --- a/tokio/src/runtime/park.rs +++ b/tokio/src/runtime/park.rs @@ -281,7 +281,7 @@ impl CachedParkThread { pin!(f); loop { - if let Ready(v) = crate::runtime::coop::budget(|| f.as_mut().poll(&mut cx)) { + if let Ready(v) = crate::task::coop::budget(|| f.as_mut().poll(&mut cx)) { return Ok(v); } diff --git a/tokio/src/runtime/scheduler/current_thread/mod.rs b/tokio/src/runtime/scheduler/current_thread/mod.rs index c66635e7bd6..b34ff6a7712 100644 --- a/tokio/src/runtime/scheduler/current_thread/mod.rs +++ b/tokio/src/runtime/scheduler/current_thread/mod.rs @@ -361,7 +361,7 @@ impl Context { /// thread-local context. fn run_task(&self, mut core: Box, f: impl FnOnce() -> R) -> (Box, R) { core.metrics.start_poll(); - let mut ret = self.enter(core, || crate::runtime::coop::budget(f)); + let mut ret = self.enter(core, || crate::task::coop::budget(f)); ret.0.metrics.end_poll(); ret } @@ -726,7 +726,7 @@ impl CoreGuard<'_> { if handle.reset_woken() { let (c, res) = context.enter(core, || { - crate::runtime::coop::budget(|| future.as_mut().poll(&mut cx)) + crate::task::coop::budget(|| future.as_mut().poll(&mut cx)) }); core = c; diff --git a/tokio/src/runtime/scheduler/multi_thread/worker.rs b/tokio/src/runtime/scheduler/multi_thread/worker.rs index ec15106fe1a..80db1eba2bf 100644 --- a/tokio/src/runtime/scheduler/multi_thread/worker.rs +++ b/tokio/src/runtime/scheduler/multi_thread/worker.rs @@ -63,10 +63,9 @@ use crate::runtime::scheduler::multi_thread::{ }; use crate::runtime::scheduler::{inject, Defer, Lock}; use crate::runtime::task::{OwnedTasks, TaskHarnessScheduleHooks}; -use crate::runtime::{ - blocking, coop, driver, scheduler, task, Config, SchedulerMetrics, WorkerMetrics, -}; +use crate::runtime::{blocking, driver, scheduler, task, Config, SchedulerMetrics, WorkerMetrics}; use crate::runtime::{context, TaskHooks}; +use crate::task::coop; use crate::util::atomic_cell::AtomicCell; use crate::util::rand::{FastRand, RngSeedGenerator}; diff --git a/tokio/src/runtime/scheduler/multi_thread_alt/worker.rs b/tokio/src/runtime/scheduler/multi_thread_alt/worker.rs index d88eb5e893c..58c9b74552c 100644 --- a/tokio/src/runtime/scheduler/multi_thread_alt/worker.rs +++ b/tokio/src/runtime/scheduler/multi_thread_alt/worker.rs @@ -64,8 +64,9 @@ use crate::runtime::scheduler::multi_thread_alt::{ }; use crate::runtime::scheduler::{self, inject, Lock}; use crate::runtime::task::{OwnedTasks, TaskHarnessScheduleHooks}; -use crate::runtime::{blocking, coop, driver, task, Config, SchedulerMetrics, WorkerMetrics}; +use crate::runtime::{blocking, driver, task, Config, SchedulerMetrics, WorkerMetrics}; use crate::runtime::{context, TaskHooks}; +use crate::task::coop; use crate::util::atomic_cell::AtomicCell; use crate::util::rand::{FastRand, RngSeedGenerator}; diff --git a/tokio/src/runtime/task/join.rs b/tokio/src/runtime/task/join.rs index 086637a582d..db1df2eb6cb 100644 --- a/tokio/src/runtime/task/join.rs +++ b/tokio/src/runtime/task/join.rs @@ -322,7 +322,7 @@ impl Future for JoinHandle { let mut ret = Poll::Pending; // Keep track of task budget - let coop = ready!(crate::runtime::coop::poll_proceed(cx)); + let coop = ready!(crate::task::coop::poll_proceed(cx)); // Try to read the task output. If the task is not yet complete, the // waker is stored and is notified once the task does complete. diff --git a/tokio/src/sync/batch_semaphore.rs b/tokio/src/sync/batch_semaphore.rs index aabee0f5c0e..07120d63411 100644 --- a/tokio/src/sync/batch_semaphore.rs +++ b/tokio/src/sync/batch_semaphore.rs @@ -591,11 +591,11 @@ impl Future for Acquire<'_> { #[cfg(all(tokio_unstable, feature = "tracing"))] let coop = ready!(trace_poll_op!( "poll_acquire", - crate::runtime::coop::poll_proceed(cx), + crate::task::coop::poll_proceed(cx), )); #[cfg(not(all(tokio_unstable, feature = "tracing")))] - let coop = ready!(crate::runtime::coop::poll_proceed(cx)); + let coop = ready!(crate::task::coop::poll_proceed(cx)); let result = match semaphore.poll_acquire(cx, needed, node, *queued) { Poll::Pending => { diff --git a/tokio/src/sync/broadcast.rs b/tokio/src/sync/broadcast.rs index e48925b497e..895a6e99404 100644 --- a/tokio/src/sync/broadcast.rs +++ b/tokio/src/sync/broadcast.rs @@ -119,7 +119,7 @@ use crate::loom::cell::UnsafeCell; use crate::loom::sync::atomic::{AtomicBool, AtomicUsize}; use crate::loom::sync::{Arc, Mutex, MutexGuard, RwLock, RwLockReadGuard}; -use crate::runtime::coop::cooperative; +use crate::task::coop::cooperative; use crate::util::linked_list::{self, GuardedLinkedList, LinkedList}; use crate::util::WakeList; diff --git a/tokio/src/sync/mod.rs b/tokio/src/sync/mod.rs index ddf99644270..3a6c8b8270c 100644 --- a/tokio/src/sync/mod.rs +++ b/tokio/src/sync/mod.rs @@ -439,7 +439,7 @@ //! or even use them from non-Tokio runtimes. //! //! When used in a Tokio runtime, the synchronization primitives participate in -//! [cooperative scheduling](crate::task#cooperative-scheduling) to avoid +//! [cooperative scheduling](crate::task::coop#cooperative-scheduling) to avoid //! starvation. This feature does not apply when used from non-Tokio runtimes. //! //! As an exception, methods ending in `_timeout` are not runtime agnostic diff --git a/tokio/src/sync/mpsc/chan.rs b/tokio/src/sync/mpsc/chan.rs index 1e6eaab1798..0bf760aa811 100644 --- a/tokio/src/sync/mpsc/chan.rs +++ b/tokio/src/sync/mpsc/chan.rs @@ -292,7 +292,7 @@ impl Rx { ready!(crate::trace::trace_leaf(cx)); // Keep track of task budget - let coop = ready!(crate::runtime::coop::poll_proceed(cx)); + let coop = ready!(crate::task::coop::poll_proceed(cx)); self.inner.rx_fields.with_mut(|rx_fields_ptr| { let rx_fields = unsafe { &mut *rx_fields_ptr }; @@ -354,7 +354,7 @@ impl Rx { ready!(crate::trace::trace_leaf(cx)); // Keep track of task budget - let coop = ready!(crate::runtime::coop::poll_proceed(cx)); + let coop = ready!(crate::task::coop::poll_proceed(cx)); if limit == 0 { coop.made_progress(); diff --git a/tokio/src/sync/mpsc/mod.rs b/tokio/src/sync/mpsc/mod.rs index a90a35d366b..3df612ca4f7 100644 --- a/tokio/src/sync/mpsc/mod.rs +++ b/tokio/src/sync/mpsc/mod.rs @@ -75,7 +75,7 @@ //! runtimes. //! //! When used in a Tokio runtime, it participates in -//! [cooperative scheduling](crate::task#cooperative-scheduling) to avoid +//! [cooperative scheduling](crate::task::coop#cooperative-scheduling) to avoid //! starvation. This feature does not apply when used from non-Tokio runtimes. //! //! As an exception, methods ending in `_timeout` are not runtime agnostic diff --git a/tokio/src/sync/oneshot.rs b/tokio/src/sync/oneshot.rs index 2b346eae81c..6201464c546 100644 --- a/tokio/src/sync/oneshot.rs +++ b/tokio/src/sync/oneshot.rs @@ -794,7 +794,7 @@ impl Sender { ready!(crate::trace::trace_leaf(cx)); // Keep track of task budget - let coop = ready!(crate::runtime::coop::poll_proceed(cx)); + let coop = ready!(crate::task::coop::poll_proceed(cx)); let inner = self.inner.as_ref().unwrap(); @@ -1142,7 +1142,7 @@ impl Inner { fn poll_recv(&self, cx: &mut Context<'_>) -> Poll> { ready!(crate::trace::trace_leaf(cx)); // Keep track of task budget - let coop = ready!(crate::runtime::coop::poll_proceed(cx)); + let coop = ready!(crate::task::coop::poll_proceed(cx)); // Load the state let mut state = State::load(&self.state, Acquire); diff --git a/tokio/src/sync/watch.rs b/tokio/src/sync/watch.rs index 0f3bafff889..884efffa30c 100644 --- a/tokio/src/sync/watch.rs +++ b/tokio/src/sync/watch.rs @@ -111,8 +111,8 @@ //! [`Sender::closed`]: crate::sync::watch::Sender::closed //! [`Sender::subscribe()`]: crate::sync::watch::Sender::subscribe -use crate::runtime::coop::cooperative; use crate::sync::notify::Notify; +use crate::task::coop::cooperative; use crate::loom::sync::atomic::AtomicUsize; use crate::loom::sync::atomic::Ordering::{AcqRel, Relaxed}; diff --git a/tokio/src/task/consume_budget.rs b/tokio/src/task/coop/consume_budget.rs similarity index 85% rename from tokio/src/task/consume_budget.rs rename to tokio/src/task/coop/consume_budget.rs index 85ef1bfb2d2..4133facb602 100644 --- a/tokio/src/task/consume_budget.rs +++ b/tokio/src/task/coop/consume_budget.rs @@ -1,5 +1,3 @@ -use std::task::{ready, Poll}; - /// Consumes a unit of budget and returns the execution back to the Tokio /// runtime *if* the task's coop budget was exhausted. /// @@ -25,14 +23,14 @@ use std::task::{ready, Poll}; /// ``` #[cfg_attr(docsrs, doc(cfg(feature = "rt")))] pub async fn consume_budget() { - let mut status = Poll::Pending; + let mut status = std::task::Poll::Pending; std::future::poll_fn(move |cx| { - ready!(crate::trace::trace_leaf(cx)); + std::task::ready!(crate::trace::trace_leaf(cx)); if status.is_ready() { return status; } - status = crate::runtime::coop::poll_proceed(cx).map(|restore| { + status = crate::task::coop::poll_proceed(cx).map(|restore| { restore.made_progress(); }); status diff --git a/tokio/src/runtime/coop.rs b/tokio/src/task/coop/mod.rs similarity index 68% rename from tokio/src/runtime/coop.rs rename to tokio/src/task/coop/mod.rs index c01e5e3de8b..85c6a5f9840 100644 --- a/tokio/src/runtime/coop.rs +++ b/tokio/src/task/coop/mod.rs @@ -1,10 +1,70 @@ #![cfg_attr(not(feature = "full"), allow(dead_code))] +#![cfg_attr(not(feature = "rt"), allow(unreachable_pub))] -//! Yield points for improved cooperative scheduling. +//! Utilities for improved cooperative scheduling. //! -//! Documentation for this can be found in the [`tokio::task`] module. +//! ### Cooperative scheduling //! -//! [`tokio::task`]: crate::task. +//! A single call to [`poll`] on a top-level task may potentially do a lot of +//! work before it returns `Poll::Pending`. If a task runs for a long period of +//! time without yielding back to the executor, it can starve other tasks +//! waiting on that executor to execute them, or drive underlying resources. +//! Since Rust does not have a runtime, it is difficult to forcibly preempt a +//! long-running task. Instead, this module provides an opt-in mechanism for +//! futures to collaborate with the executor to avoid starvation. +//! +//! Consider a future like this one: +//! +//! ``` +//! # use tokio_stream::{Stream, StreamExt}; +//! async fn drop_all(mut input: I) { +//! while let Some(_) = input.next().await {} +//! } +//! ``` +//! +//! It may look harmless, but consider what happens under heavy load if the +//! input stream is _always_ ready. If we spawn `drop_all`, the task will never +//! yield, and will starve other tasks and resources on the same executor. +//! +//! To account for this, Tokio has explicit yield points in a number of library +//! functions, which force tasks to return to the executor periodically. +//! +//! +//! #### unconstrained +//! +//! If necessary, [`task::unconstrained`] lets you opt a future out of Tokio's cooperative +//! scheduling. When a future is wrapped with `unconstrained`, it will never be forced to yield to +//! Tokio. For example: +//! +//! ``` +//! # #[tokio::main] +//! # async fn main() { +//! use tokio::{task, sync::mpsc}; +//! +//! let fut = async { +//! let (tx, mut rx) = mpsc::unbounded_channel(); +//! +//! for i in 0..1000 { +//! let _ = tx.send(()); +//! // This will always be ready. If coop was in effect, this code would be forced to yield +//! // periodically. However, if left unconstrained, then this code will never yield. +//! rx.recv().await; +//! } +//! }; +//! +//! task::coop::unconstrained(fut).await; +//! # } +//! ``` +//! [`poll`]: method@std::future::Future::poll +//! [`task::unconstrained`]: crate::task::unconstrained() + +cfg_rt! { + mod consume_budget; + pub use consume_budget::consume_budget; + + mod unconstrained; + pub use unconstrained::{unconstrained, Unconstrained}; +} // ```ignore // # use tokio_stream::{Stream, StreamExt}; @@ -57,7 +117,7 @@ impl Budget { } /// Returns an unconstrained budget. Operations will not be limited. - pub(super) const fn unconstrained() -> Budget { + pub(crate) const fn unconstrained() -> Budget { Budget(None) } @@ -107,8 +167,93 @@ fn with_budget(budget: Budget, f: impl FnOnce() -> R) -> R { f() } +/// Returns `true` if there is still budget left on the task. +/// +/// Futures created by the tokio library functions are budget-aware and yield when there is no more +/// budget left, but not all futures will be budget-aware. Consider future `A` that polls two inner +/// futures `B` and `C`, and returns `Poll::Ready` when one of them is ready. If both inner futures +/// were budget-aware, at some point the budget would be depleted which would cause both futures to +/// return `Poll::Pending` resulting in `A` returning `Poll::Pending` as well. Yielding all the way +/// back to the runtime, the budget would be reset, and `B` and `C` could make progress again. +/// Now let's consider `B` is budget-aware, but `C` is not and is always ready. The budget will be +/// depleted as before, but now since `C` always returns `Poll::Ready`, `A` will always return +/// `Poll::Ready` as well. This way, `A` will not yield back to the runtime which will keep the budget +/// depleted and `B` not making progress. +/// +/// In these scenarios you could use [`has_budget_remaining`] to check whether the budget has been depleted +/// or not and act accordingly: +/// ``` +/// # use std::future::{poll_fn, Future}; +/// # use std::task::{Context, Poll}; +/// # use std::pin::{pin, Pin}; +/// # use std::sync::atomic::{AtomicBool, Ordering}; +/// # use tokio::task::coop::{consume_budget, has_budget_remaining}; +/// # +/// # #[tokio::main] +/// # async fn main() { +/// struct Greedy; +/// struct Aware; +/// struct Combined { +/// greedy: Greedy, +/// aware: Aware, +/// } +/// +/// impl Future for Greedy { +/// type Output = (); +/// +/// fn poll(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll { +/// Poll::Ready(()) +/// } +/// } +/// +/// impl Future for Aware { +/// type Output = (); +/// +/// fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { +/// pin!(consume_budget()).poll(cx) +/// } +/// } +/// +/// impl Future for Combined { +/// type Output = (); +/// +/// fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { +/// let this = Pin::into_inner(self); +/// +/// if !has_budget_remaining() { +/// return Poll::Pending; +/// } +/// +/// if Pin::new(&mut this.aware).poll(cx).is_ready() { +/// return Poll::Ready(()); +/// } else { +/// return Pin::new(&mut this.greedy).poll(cx); +/// } +/// } +/// } +/// +/// let did_yield = AtomicBool::new(false); +/// +/// while !did_yield.load(Ordering::Relaxed) { +/// poll_fn(|cx| { +/// let combined = pin!(Combined { +/// greedy: Greedy, +/// aware: Aware, +/// }); +/// +/// if combined.poll(cx).is_pending() { +/// did_yield.store(true, Ordering::Relaxed); +/// } +/// +/// Poll::Ready(()) +/// }) +/// .await; +/// } +/// # } +///``` #[inline(always)] -pub(crate) fn has_budget_remaining() -> bool { +#[cfg_attr(docsrs, doc(cfg(feature = "rt")))] +pub fn has_budget_remaining() -> bool { // If the current budget cannot be accessed due to the thread-local being // shutdown, then we assume there is budget remaining. context::budget(|cell| cell.get().has_remaining()).unwrap_or(true) diff --git a/tokio/src/task/unconstrained.rs b/tokio/src/task/coop/unconstrained.rs similarity index 94% rename from tokio/src/task/unconstrained.rs rename to tokio/src/task/coop/unconstrained.rs index 40384c8709e..349468e92ae 100644 --- a/tokio/src/task/unconstrained.rs +++ b/tokio/src/task/coop/unconstrained.rs @@ -22,7 +22,7 @@ where cfg_coop! { fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let inner = self.project().inner; - crate::runtime::coop::with_unconstrained(|| inner.poll(cx)) + crate::task::coop::with_unconstrained(|| inner.poll(cx)) } } diff --git a/tokio/src/task/join_set.rs b/tokio/src/task/join_set.rs index ed2777ccf86..a156719a067 100644 --- a/tokio/src/task/join_set.rs +++ b/tokio/src/task/join_set.rs @@ -482,7 +482,7 @@ impl JoinSet { /// Note that this method may return `Poll::Pending` even if one of the tasks has completed. /// This can happen if the [coop budget] is reached. /// - /// [coop budget]: crate::task#cooperative-scheduling + /// [coop budget]: crate::task::coop#cooperative-scheduling pub fn poll_join_next(&mut self, cx: &mut Context<'_>) -> Poll>> { // The call to `pop_notified` moves the entry to the `idle` list. It is moved back to // the `notified` list if the waker is notified in the `poll` call below. @@ -537,7 +537,7 @@ impl JoinSet { /// Note that this method may return `Poll::Pending` even if one of the tasks has completed. /// This can happen if the [coop budget] is reached. /// - /// [coop budget]: crate::task#cooperative-scheduling + /// [coop budget]: crate::task::coop#cooperative-scheduling /// [task ID]: crate::task::Id pub fn poll_join_next_with_id( &mut self, diff --git a/tokio/src/task/local.rs b/tokio/src/task/local.rs index edd02acbac0..95bd6404bec 100644 --- a/tokio/src/task/local.rs +++ b/tokio/src/task/local.rs @@ -736,7 +736,7 @@ impl LocalSet { // task initially. Because `LocalSet` itself is `!Send`, and // `spawn_local` spawns into the `LocalSet` on the current // thread, the invariant is maintained. - Some(task) => crate::runtime::coop::budget(|| task.run()), + Some(task) => crate::task::coop::budget(|| task.run()), // We have fully drained the queue of notified tasks, so the // local future doesn't need to be notified again — it can wait // until something else wakes a task in the local set. diff --git a/tokio/src/task/mod.rs b/tokio/src/task/mod.rs index 86bc0eaae9d..7795a253f4b 100644 --- a/tokio/src/task/mod.rs +++ b/tokio/src/task/mod.rs @@ -260,66 +260,11 @@ //! # } //! ``` //! -//! ### Cooperative scheduling -//! -//! A single call to [`poll`] on a top-level task may potentially do a lot of -//! work before it returns `Poll::Pending`. If a task runs for a long period of -//! time without yielding back to the executor, it can starve other tasks -//! waiting on that executor to execute them, or drive underlying resources. -//! Since Rust does not have a runtime, it is difficult to forcibly preempt a -//! long-running task. Instead, this module provides an opt-in mechanism for -//! futures to collaborate with the executor to avoid starvation. -//! -//! Consider a future like this one: -//! -//! ``` -//! # use tokio_stream::{Stream, StreamExt}; -//! async fn drop_all(mut input: I) { -//! while let Some(_) = input.next().await {} -//! } -//! ``` -//! -//! It may look harmless, but consider what happens under heavy load if the -//! input stream is _always_ ready. If we spawn `drop_all`, the task will never -//! yield, and will starve other tasks and resources on the same executor. -//! -//! To account for this, Tokio has explicit yield points in a number of library -//! functions, which force tasks to return to the executor periodically. -//! -//! -//! #### unconstrained -//! -//! If necessary, [`task::unconstrained`] lets you opt a future out of Tokio's cooperative -//! scheduling. When a future is wrapped with `unconstrained`, it will never be forced to yield to -//! Tokio. For example: -//! -//! ``` -//! # #[tokio::main] -//! # async fn main() { -//! use tokio::{task, sync::mpsc}; -//! -//! let fut = async { -//! let (tx, mut rx) = mpsc::unbounded_channel(); -//! -//! for i in 0..1000 { -//! let _ = tx.send(()); -//! // This will always be ready. If coop was in effect, this code would be forced to yield -//! // periodically. However, if left unconstrained, then this code will never yield. -//! rx.recv().await; -//! } -//! }; -//! -//! task::unconstrained(fut).await; -//! # } -//! ``` -//! //! [`task::spawn_blocking`]: crate::task::spawn_blocking //! [`task::block_in_place`]: crate::task::block_in_place //! [rt-multi-thread]: ../runtime/index.html#threaded-scheduler //! [`task::yield_now`]: crate::task::yield_now() //! [`thread::yield_now`]: std::thread::yield_now -//! [`task::unconstrained`]: crate::task::unconstrained() -//! [`poll`]: method@std::future::Future::poll cfg_rt! { pub use crate::runtime::task::{JoinError, JoinHandle}; @@ -337,8 +282,10 @@ cfg_rt! { mod yield_now; pub use yield_now::yield_now; - mod consume_budget; - pub use consume_budget::consume_budget; + pub mod coop; + #[doc(hidden)] + #[deprecated = "Moved to tokio::task::coop::consume_budget"] + pub use coop::consume_budget; mod local; pub use local::{spawn_local, LocalSet, LocalEnterGuard}; @@ -346,8 +293,12 @@ cfg_rt! { mod task_local; pub use task_local::LocalKey; - mod unconstrained; - pub use unconstrained::{unconstrained, Unconstrained}; + #[doc(hidden)] + #[deprecated = "Moved to tokio::task::coop::unconstrained"] + pub use coop::unconstrained; + #[doc(hidden)] + #[deprecated = "Moved to tokio::task::coop::Unconstrained"] + pub use coop::Unconstrained; #[doc(inline)] pub use join_set::JoinSet; @@ -371,3 +322,7 @@ cfg_rt! { pub use super::task_local::TaskLocalFuture; } } + +cfg_not_rt! { + pub(crate) mod coop; +} diff --git a/tokio/src/time/sleep.rs b/tokio/src/time/sleep.rs index 6e59f1ff3d6..32eca0e4f41 100644 --- a/tokio/src/time/sleep.rs +++ b/tokio/src/time/sleep.rs @@ -407,11 +407,11 @@ impl Sleep { #[cfg(all(tokio_unstable, feature = "tracing"))] let coop = ready!(trace_poll_op!( "poll_elapsed", - crate::runtime::coop::poll_proceed(cx), + crate::task::coop::poll_proceed(cx), )); #[cfg(any(not(tokio_unstable), not(feature = "tracing")))] - let coop = ready!(crate::runtime::coop::poll_proceed(cx)); + let coop = ready!(crate::task::coop::poll_proceed(cx)); let result = me.entry.poll_elapsed(cx).map(move |r| { coop.made_progress(); diff --git a/tokio/src/time/timeout.rs b/tokio/src/time/timeout.rs index fa93a16910e..e7fbe75dce9 100644 --- a/tokio/src/time/timeout.rs +++ b/tokio/src/time/timeout.rs @@ -5,7 +5,7 @@ //! [`Timeout`]: struct@Timeout use crate::{ - runtime::coop, + task::coop, time::{error::Elapsed, sleep_until, Duration, Instant, Sleep}, util::trace, }; diff --git a/tokio/tests/coop_budget.rs b/tokio/tests/coop_budget.rs index 3aaf9db21d6..fc8dfc93c73 100644 --- a/tokio/tests/coop_budget.rs +++ b/tokio/tests/coop_budget.rs @@ -4,6 +4,9 @@ use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; use tokio::net::UdpSocket; +use tokio::task::coop::{consume_budget, has_budget_remaining}; + +const BUDGET: usize = 128; /// Ensure that UDP sockets have functional budgeting /// @@ -24,7 +27,6 @@ use tokio::net::UdpSocket; #[tokio::test] #[cfg_attr(miri, ignore)] // No `socket` on miri. async fn coop_budget_udp_send_recv() { - const BUDGET: usize = 128; const N_ITERATIONS: usize = 1024; const PACKET: &[u8] = b"Hello, world"; @@ -76,3 +78,16 @@ async fn coop_budget_udp_send_recv() { assert_eq!(N_ITERATIONS / (BUDGET / 2), tracker.load(Ordering::SeqCst)); } + +#[tokio::test] +async fn test_has_budget_remaining() { + // At the begining budget should be available + assert!(has_budget_remaining()); + + // Deplete the budget + for _ in 0..BUDGET { + consume_budget().await; + } + + assert!(!has_budget_remaining()); +}