Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

coop: expose coop as a public module #7116

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion tokio/src/io/util/copy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ impl CopyBuffer {
feature = "time",
))]
// Keep track of task budget
let coop = ready!(crate::runtime::coop::poll_proceed(cx));
let coop = ready!(crate::task::coop::poll_proceed(cx));
Darksonn marked this conversation as resolved.
Show resolved Hide resolved
loop {
// If there is some space left in our buffer, then we try to read some
// data to continue, thus maximizing the chances of a large write.
Expand Down
6 changes: 3 additions & 3 deletions tokio/src/io/util/mem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ impl AsyncRead for SimplexStream {
buf: &mut ReadBuf<'_>,
) -> Poll<std::io::Result<()>> {
ready!(crate::trace::trace_leaf(cx));
let coop = ready!(crate::runtime::coop::poll_proceed(cx));
let coop = ready!(crate::task::coop::poll_proceed(cx));

let ret = self.poll_read_internal(cx, buf);
if ret.is_ready() {
Expand Down Expand Up @@ -362,7 +362,7 @@ impl AsyncWrite for SimplexStream {
buf: &[u8],
) -> Poll<std::io::Result<usize>> {
ready!(crate::trace::trace_leaf(cx));
let coop = ready!(crate::runtime::coop::poll_proceed(cx));
let coop = ready!(crate::task::coop::poll_proceed(cx));

let ret = self.poll_write_internal(cx, buf);
if ret.is_ready() {
Expand Down Expand Up @@ -390,7 +390,7 @@ impl AsyncWrite for SimplexStream {
bufs: &[std::io::IoSlice<'_>],
) -> Poll<Result<usize, std::io::Error>> {
ready!(crate::trace::trace_leaf(cx));
let coop = ready!(crate::runtime::coop::poll_proceed(cx));
let coop = ready!(crate::task::coop::poll_proceed(cx));

let ret = self.poll_write_vectored_internal(cx, bufs);
if ret.is_ready() {
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/io/util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ cfg_io_util! {

cfg_coop! {
fn poll_proceed_and_make_progress(cx: &mut std::task::Context<'_>) -> std::task::Poll<()> {
let coop = std::task::ready!(crate::runtime::coop::poll_proceed(cx));
let coop = std::task::ready!(crate::task::coop::poll_proceed(cx));
coop.made_progress();
std::task::Poll::Ready(())
}
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/process/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1048,7 +1048,7 @@ where
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
ready!(crate::trace::trace_leaf(cx));
// Keep track of task budget
let coop = ready!(crate::runtime::coop::poll_proceed(cx));
let coop = ready!(crate::task::coop::poll_proceed(cx));

let ret = Pin::new(&mut self.inner).poll(cx);

Expand Down
2 changes: 1 addition & 1 deletion tokio/src/runtime/blocking/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ where
// currently goes through Task::poll(), and so is subject to budgeting. That isn't really
// what we want; a blocking task may itself want to run tasks (it might be a Worker!), so
// we want it to start without any budgeting.
crate::runtime::coop::stop();
crate::task::coop::stop();

Poll::Ready(func())
}
Expand Down
4 changes: 2 additions & 2 deletions tokio/src/runtime/context.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::loom::thread::AccessError;
use crate::runtime::coop;
use crate::task::coop;

use std::cell::Cell;

Expand Down Expand Up @@ -135,7 +135,7 @@ pub(crate) fn thread_rng_n(n: u32) -> u32 {
})
}

pub(super) fn budget<R>(f: impl FnOnce(&Cell<coop::Budget>) -> R) -> Result<R, AccessError> {
pub(crate) fn budget<R>(f: impl FnOnce(&Cell<coop::Budget>) -> R) -> Result<R, AccessError> {
CONTEXT.try_with(|ctx| f(&ctx.budget))
}

Expand Down
2 changes: 1 addition & 1 deletion tokio/src/runtime/context/blocking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ impl BlockingRegionGuard {
let when = Instant::now() + timeout;

loop {
if let Ready(v) = crate::runtime::coop::budget(|| f.as_mut().poll(&mut cx)) {
if let Ready(v) = crate::task::coop::budget(|| f.as_mut().poll(&mut cx)) {
return Ok(v);
}

Expand Down
4 changes: 2 additions & 2 deletions tokio/src/runtime/io/registration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ impl Registration {
) -> Poll<io::Result<ReadyEvent>> {
ready!(crate::trace::trace_leaf(cx));
// Keep track of task budget
let coop = ready!(crate::runtime::coop::poll_proceed(cx));
let coop = ready!(crate::task::coop::poll_proceed(cx));
let ev = ready!(self.shared.poll_readiness(cx, direction));

if ev.is_shutdown {
Expand Down Expand Up @@ -219,7 +219,7 @@ impl Registration {
loop {
let event = self.readiness(interest).await?;

let coop = std::future::poll_fn(crate::runtime::coop::poll_proceed).await;
let coop = std::future::poll_fn(crate::task::coop::poll_proceed).await;

match f() {
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
Expand Down
2 changes: 0 additions & 2 deletions tokio/src/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -321,8 +321,6 @@ mod tests;

pub(crate) mod context;

pub(crate) mod coop;

pub(crate) mod park;

mod driver;
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/runtime/park.rs
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ impl CachedParkThread {
pin!(f);

loop {
if let Ready(v) = crate::runtime::coop::budget(|| f.as_mut().poll(&mut cx)) {
if let Ready(v) = crate::task::coop::budget(|| f.as_mut().poll(&mut cx)) {
return Ok(v);
}

Expand Down
4 changes: 2 additions & 2 deletions tokio/src/runtime/scheduler/current_thread/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ impl Context {
/// thread-local context.
fn run_task<R>(&self, mut core: Box<Core>, f: impl FnOnce() -> R) -> (Box<Core>, R) {
core.metrics.start_poll();
let mut ret = self.enter(core, || crate::runtime::coop::budget(f));
let mut ret = self.enter(core, || crate::task::coop::budget(f));
ret.0.metrics.end_poll();
ret
}
Expand Down Expand Up @@ -726,7 +726,7 @@ impl CoreGuard<'_> {

if handle.reset_woken() {
let (c, res) = context.enter(core, || {
crate::runtime::coop::budget(|| future.as_mut().poll(&mut cx))
crate::task::coop::budget(|| future.as_mut().poll(&mut cx))
});

core = c;
Expand Down
5 changes: 2 additions & 3 deletions tokio/src/runtime/scheduler/multi_thread/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,9 @@ use crate::runtime::scheduler::multi_thread::{
};
use crate::runtime::scheduler::{inject, Defer, Lock};
use crate::runtime::task::{OwnedTasks, TaskHarnessScheduleHooks};
use crate::runtime::{
blocking, coop, driver, scheduler, task, Config, SchedulerMetrics, WorkerMetrics,
};
use crate::runtime::{blocking, driver, scheduler, task, Config, SchedulerMetrics, WorkerMetrics};
use crate::runtime::{context, TaskHooks};
use crate::task::coop;
use crate::util::atomic_cell::AtomicCell;
use crate::util::rand::{FastRand, RngSeedGenerator};

Expand Down
3 changes: 2 additions & 1 deletion tokio/src/runtime/scheduler/multi_thread_alt/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,9 @@ use crate::runtime::scheduler::multi_thread_alt::{
};
use crate::runtime::scheduler::{self, inject, Lock};
use crate::runtime::task::{OwnedTasks, TaskHarnessScheduleHooks};
use crate::runtime::{blocking, coop, driver, task, Config, SchedulerMetrics, WorkerMetrics};
use crate::runtime::{blocking, driver, task, Config, SchedulerMetrics, WorkerMetrics};
use crate::runtime::{context, TaskHooks};
use crate::task::coop;
use crate::util::atomic_cell::AtomicCell;
use crate::util::rand::{FastRand, RngSeedGenerator};

Expand Down
2 changes: 1 addition & 1 deletion tokio/src/runtime/task/join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ impl<T> Future for JoinHandle<T> {
let mut ret = Poll::Pending;

// Keep track of task budget
let coop = ready!(crate::runtime::coop::poll_proceed(cx));
let coop = ready!(crate::task::coop::poll_proceed(cx));

// Try to read the task output. If the task is not yet complete, the
// waker is stored and is notified once the task does complete.
Expand Down
4 changes: 2 additions & 2 deletions tokio/src/sync/batch_semaphore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -591,11 +591,11 @@ impl Future for Acquire<'_> {
#[cfg(all(tokio_unstable, feature = "tracing"))]
let coop = ready!(trace_poll_op!(
"poll_acquire",
crate::runtime::coop::poll_proceed(cx),
crate::task::coop::poll_proceed(cx),
));

#[cfg(not(all(tokio_unstable, feature = "tracing")))]
let coop = ready!(crate::runtime::coop::poll_proceed(cx));
let coop = ready!(crate::task::coop::poll_proceed(cx));

let result = match semaphore.poll_acquire(cx, needed, node, *queued) {
Poll::Pending => {
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/sync/broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@
use crate::loom::cell::UnsafeCell;
use crate::loom::sync::atomic::{AtomicBool, AtomicUsize};
use crate::loom::sync::{Arc, Mutex, MutexGuard, RwLock, RwLockReadGuard};
use crate::runtime::coop::cooperative;
use crate::task::coop::cooperative;
use crate::util::linked_list::{self, GuardedLinkedList, LinkedList};
use crate::util::WakeList;

Expand Down
4 changes: 2 additions & 2 deletions tokio/src/sync/mpsc/chan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ impl<T, S: Semaphore> Rx<T, S> {
ready!(crate::trace::trace_leaf(cx));

// Keep track of task budget
let coop = ready!(crate::runtime::coop::poll_proceed(cx));
let coop = ready!(crate::task::coop::poll_proceed(cx));

self.inner.rx_fields.with_mut(|rx_fields_ptr| {
let rx_fields = unsafe { &mut *rx_fields_ptr };
Expand Down Expand Up @@ -354,7 +354,7 @@ impl<T, S: Semaphore> Rx<T, S> {
ready!(crate::trace::trace_leaf(cx));

// Keep track of task budget
let coop = ready!(crate::runtime::coop::poll_proceed(cx));
let coop = ready!(crate::task::coop::poll_proceed(cx));

if limit == 0 {
coop.made_progress();
Expand Down
4 changes: 2 additions & 2 deletions tokio/src/sync/oneshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -794,7 +794,7 @@ impl<T> Sender<T> {
ready!(crate::trace::trace_leaf(cx));

// Keep track of task budget
let coop = ready!(crate::runtime::coop::poll_proceed(cx));
let coop = ready!(crate::task::coop::poll_proceed(cx));

let inner = self.inner.as_ref().unwrap();

Expand Down Expand Up @@ -1142,7 +1142,7 @@ impl<T> Inner<T> {
fn poll_recv(&self, cx: &mut Context<'_>) -> Poll<Result<T, RecvError>> {
ready!(crate::trace::trace_leaf(cx));
// Keep track of task budget
let coop = ready!(crate::runtime::coop::poll_proceed(cx));
let coop = ready!(crate::task::coop::poll_proceed(cx));

// Load the state
let mut state = State::load(&self.state, Acquire);
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/sync/watch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,8 @@
//! [`Sender::closed`]: crate::sync::watch::Sender::closed
//! [`Sender::subscribe()`]: crate::sync::watch::Sender::subscribe
use crate::runtime::coop::cooperative;
use crate::sync::notify::Notify;
use crate::task::coop::cooperative;

use crate::loom::sync::atomic::AtomicUsize;
use crate::loom::sync::atomic::Ordering::{AcqRel, Relaxed};
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use std::task::{ready, Poll};

/// Consumes a unit of budget and returns the execution back to the Tokio
/// runtime *if* the task's coop budget was exhausted.
///
Expand All @@ -25,14 +23,14 @@ use std::task::{ready, Poll};
/// ```
#[cfg_attr(docsrs, doc(cfg(feature = "rt")))]
pub async fn consume_budget() {
let mut status = Poll::Pending;
let mut status = std::task::Poll::Pending;

std::future::poll_fn(move |cx| {
ready!(crate::trace::trace_leaf(cx));
std::task::ready!(crate::trace::trace_leaf(cx));
if status.is_ready() {
return status;
}
status = crate::runtime::coop::poll_proceed(cx).map(|restore| {
status = crate::task::coop::poll_proceed(cx).map(|restore| {
restore.made_progress();
});
status
Expand Down
Loading
Loading