Skip to content

Commit

Permalink
Add tokio_unstable to task poll callback implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
Jason Gin committed Jan 22, 2025
1 parent 7e60dfe commit de3c63b
Show file tree
Hide file tree
Showing 5 changed files with 24 additions and 2 deletions.
11 changes: 11 additions & 0 deletions tokio/src/runtime/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,11 @@ pub struct Builder {
pub(super) before_spawn: Option<TaskCallback>,

/// To run before each poll
#[cfg(tokio_unstable)]
pub(super) before_poll: Option<TaskCallback>,

/// To run after each poll
#[cfg(tokio_unstable)]
pub(super) after_poll: Option<TaskCallback>,

/// To run after each task is terminated.
Expand Down Expand Up @@ -311,7 +314,9 @@ impl Builder {
before_spawn: None,
after_termination: None,

#[cfg(tokio_unstable)]
before_poll: None,
#[cfg(tokio_unstable)]
after_poll: None,

keep_alive: None,
Expand Down Expand Up @@ -1436,7 +1441,9 @@ impl Builder {
before_park: self.before_park.clone(),
after_unpark: self.after_unpark.clone(),
before_spawn: self.before_spawn.clone(),
#[cfg(tokio_unstable)]
before_poll: self.before_poll.clone(),
#[cfg(tokio_unstable)]
after_poll: self.after_poll.clone(),
after_termination: self.after_termination.clone(),
global_queue_interval: self.global_queue_interval,
Expand Down Expand Up @@ -1588,7 +1595,9 @@ cfg_rt_multi_thread! {
before_park: self.before_park.clone(),
after_unpark: self.after_unpark.clone(),
before_spawn: self.before_spawn.clone(),
#[cfg(tokio_unstable)]
before_poll: self.before_poll.clone(),
#[cfg(tokio_unstable)]
after_poll: self.after_poll.clone(),
after_termination: self.after_termination.clone(),
global_queue_interval: self.global_queue_interval,
Expand Down Expand Up @@ -1640,7 +1649,9 @@ cfg_rt_multi_thread! {
after_unpark: self.after_unpark.clone(),
before_spawn: self.before_spawn.clone(),
after_termination: self.after_termination.clone(),
#[cfg(tokio_unstable)]
before_poll: self.before_poll.clone(),
#[cfg(tokio_unstable)]
after_poll: self.after_poll.clone(),
global_queue_interval: self.global_queue_interval,
event_interval: self.event_interval,
Expand Down
2 changes: 2 additions & 0 deletions tokio/src/runtime/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,11 @@ pub(crate) struct Config {
pub(crate) after_termination: Option<TaskCallback>,

/// To run before each poll
#[cfg(tokio_unstable)]
pub(crate) before_poll: Option<TaskCallback>,

/// To run after each poll
#[cfg(tokio_unstable)]
pub(crate) after_poll: Option<TaskCallback>,

/// The multi-threaded scheduler includes a per-worker LIFO slot used to
Expand Down
2 changes: 2 additions & 0 deletions tokio/src/runtime/scheduler/current_thread/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,9 @@ impl CurrentThread {
task_hooks: TaskHooks {
task_spawn_callback: config.before_spawn.clone(),
task_terminate_callback: config.after_termination.clone(),
#[cfg(tokio_unstable)]
before_poll_callback: config.before_poll.clone(),
#[cfg(tokio_unstable)]
after_poll_callback: config.after_poll.clone(),
},
shared: Shared {
Expand Down
7 changes: 5 additions & 2 deletions tokio/src/runtime/scheduler/multi_thread/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
//! leak.
use crate::loom::sync::{Arc, Mutex};
use crate::runtime;
use crate::runtime::scheduler::multi_thread::{
idle, queue, Counters, Handle, Idle, Overflow, Parker, Stats, TraceStatus, Unparker,
};
Expand All @@ -74,8 +75,6 @@ use std::task::Waker;
use std::thread;
use std::time::Duration;

use self::task::Id;

mod metrics;

cfg_taskdump! {
Expand Down Expand Up @@ -572,7 +571,9 @@ impl Context {
}

fn run_task(&self, task: Notified, mut core: Box<Core>) -> RunResult {
#[cfg(tokio_unstable)]
let task_id = task.task_id();

let task = self.worker.handle.shared.owned.assert_owner(task);

// Make sure the worker is not in the **searching** state. This enables
Expand Down Expand Up @@ -664,6 +665,8 @@ impl Context {
// Run the LIFO task, then loop
*self.core.borrow_mut() = Some(core);
let task = self.worker.handle.shared.owned.assert_owner(task);

#[cfg(tokio_unstable)]
let task_id = task.task_id();

#[cfg(tokio_unstable)]
Expand Down
4 changes: 4 additions & 0 deletions tokio/src/runtime/task_hooks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ impl TaskHooks {
Self {
task_spawn_callback: config.before_spawn.clone(),
task_terminate_callback: config.after_termination.clone(),
#[cfg(tokio_unstable)]
before_poll_callback: config.before_poll.clone(),
#[cfg(tokio_unstable)]
after_poll_callback: config.after_poll.clone(),
}
}
Expand Down Expand Up @@ -45,7 +47,9 @@ impl TaskHooks {
pub(crate) struct TaskHooks {
pub(crate) task_spawn_callback: Option<TaskCallback>,
pub(crate) task_terminate_callback: Option<TaskCallback>,
#[cfg(tokio_unstable)]
pub(crate) before_poll_callback: Option<TaskCallback>,
#[cfg(tokio_unstable)]
pub(crate) after_poll_callback: Option<TaskCallback>,
}

Expand Down

0 comments on commit de3c63b

Please sign in to comment.