Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stabilize worker_total_busy_duration #6899

Open
wants to merge 19 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
8f1fcb4
stabilize worker total busy duration, bring WorkerMetrics, MetricsBat…
Owen-CH-Leung Oct 11, 2024
9b47cf9
Fix rustfmt ci job
Owen-CH-Leung Oct 11, 2024
e2f4f33
Fix various failing CI jobs by adding cfg(target_has_atomic = "64") t…
Owen-CH-Leung Oct 11, 2024
b6974ba
Use cfg_64bit_metrics instead
Owen-CH-Leung Oct 11, 2024
86f019f
Fix formatting and remove brackets
Owen-CH-Leung Oct 11, 2024
64f626d
Creat Mock Histogram, HistogramBatch and HistogramBuilder. Revert cha…
Owen-CH-Leung Oct 16, 2024
489003c
Hide queue_depth and thread_id behind unstable flag
Owen-CH-Leung Oct 16, 2024
8a134a2
Mark most fields of WorkerMetrics as unstable, except for busy_durati…
Owen-CH-Leung Oct 19, 2024
4bc00cf
Merge branch 'master' into stabilize_worker_total_busy_duration
Owen-CH-Leung Oct 19, 2024
7eb6b97
Remove allow dead_code, merge master & fix spellcheck
Owen-CH-Leung Oct 19, 2024
7239af5
Merge branch 'master' into stabilize_worker_total_busy_duration
Owen-CH-Leung Oct 21, 2024
57f6b9b
Add back worker_total_busy_duration test
Owen-CH-Leung Oct 21, 2024
c8b2c7d
Remove mock metricBatch, split MetricBatch implementation into stable…
Owen-CH-Leung Oct 22, 2024
d7333cf
Merge branch 'master' into stabilize_worker_total_busy_duration
Owen-CH-Leung Nov 23, 2024
14543de
Gate code based on cfg_unstable_metrics and cfg_not_unstable_metrics
Owen-CH-Leung Nov 23, 2024
245d75c
Merge branch 'master' into stabilize_worker_total_busy_duration
Owen-CH-Leung Dec 17, 2024
e0d0b84
Merge branch 'master' into stabilize_worker_total_busy_duration
Owen-CH-Leung Jan 11, 2025
b821f92
add new macro cfg_metrics_variant, refactor stable/unstable code
Owen-CH-Leung Jan 11, 2025
64f029a
refactor field ordering, add more stable/unstable code using cfg_metr…
Owen-CH-Leung Jan 13, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 44 additions & 22 deletions tokio/src/runtime/metrics/mock.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,31 @@
//! This file contains mocks of the types in src/runtime/metrics

use std::thread::ThreadId;
use crate::runtime::WorkerMetrics;
use std::sync::atomic::Ordering::Relaxed;
use std::time::{Duration, Instant};

pub(crate) struct SchedulerMetrics {}

pub(crate) struct WorkerMetrics {}
/// The `MetricsBatch` struct in this mock implementation provides a minimal,
/// simplified version of `batch::MetricsBatch`. It contains only the basic fields
/// required to track the total busy duration (`busy_duration_total`) .
///
/// This mock is used to stabilize the API `worker_total_busy_duration`
/// without relying on the full metrics collection logic. In the real implementation,
/// additional fields provide more detailed tracking of worker activity.
///
/// This mock can be further enriched when stabilizing other worker metrics, such as
/// `worker_thread_id`, `worker_park_count` and so on
///
/// When more worker metrics are stabilized, we can remove this mock and switch back
/// to `batch::MetricsBatch`
pub(crate) struct MetricsBatch {
/// The total busy duration in nanoseconds.
busy_duration_total: u64,

pub(crate) struct MetricsBatch {}
/// Instant at which work last resumed (continued after park).
processing_scheduled_tasks_started_at: Instant,
}

#[derive(Clone, Default)]
pub(crate) struct HistogramBuilder {}
Expand All @@ -20,32 +39,31 @@ impl SchedulerMetrics {
pub(crate) fn inc_remote_schedule_count(&self) {}
}

impl WorkerMetrics {
pub(crate) fn new() -> Self {
Self {}
}

pub(crate) fn from_config(config: &crate::runtime::Config) -> Self {
// Prevent the dead-code warning from being triggered
let _ = &config.metrics_poll_count_histogram;
Self::new()
}

pub(crate) fn set_queue_depth(&self, _len: usize) {}
pub(crate) fn set_thread_id(&self, _thread_id: ThreadId) {}
}

impl MetricsBatch {
pub(crate) fn new(_: &WorkerMetrics) -> Self {
Self {}
let now = Instant::now();

MetricsBatch {
busy_duration_total: 0,
processing_scheduled_tasks_started_at: now,
}
}

pub(crate) fn submit(&mut self, _to: &WorkerMetrics, _mean_poll_time: u64) {}
pub(crate) fn submit(&mut self, worker: &WorkerMetrics, _mean_poll_time: u64) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do I understand correctly that this function duplicates part of the submit function in batch::MetricsBatch?

I think this is a problematic way of gradually stabilizing metrics, as it opens the possibility of having divirging implementations if a change is made to the "real" MetricsBatch by someone who doesn't realise that there is another one.

This is additionally confusing because this effectively becomes the "stable" implementation, but it lives in a module called mock.

I would propose that we instead split the metrics::MetricsBatch implementation into stable (always compiles) and unstable (gated by cfg option), the same way we've done elsewhere in this PR. The same as with another comment, we would group all the unstable functions into a single cfg_unstable_metrics! block.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed spliting metrics::MetricsBatch is a much viable way of stabilising. I've adopted your suggestion and split it into stable & unstable (and group unstable functions into a single unstable block. Thanks a lot for reviewing!

worker
.busy_duration_total
.store(self.busy_duration_total, Relaxed);
}
pub(crate) fn about_to_park(&mut self) {}
pub(crate) fn unparked(&mut self) {}
pub(crate) fn inc_local_schedule_count(&mut self) {}
pub(crate) fn start_processing_scheduled_tasks(&mut self) {}
pub(crate) fn end_processing_scheduled_tasks(&mut self) {}
pub(crate) fn start_processing_scheduled_tasks(&mut self) {
self.processing_scheduled_tasks_started_at = Instant::now();
}
pub(crate) fn end_processing_scheduled_tasks(&mut self) {
let busy_duration = self.processing_scheduled_tasks_started_at.elapsed();
self.busy_duration_total += duration_as_u64(busy_duration);
}
pub(crate) fn start_poll(&mut self) {}
pub(crate) fn end_poll(&mut self) {}
}
Expand All @@ -57,3 +75,7 @@ cfg_rt_multi_thread! {
pub(crate) fn incr_overflow_count(&mut self) {}
}
}

fn duration_as_u64(dur: Duration) -> u64 {
u64::try_from(dur.as_nanos()).unwrap_or(u64::MAX)
}
7 changes: 5 additions & 2 deletions tokio/src/runtime/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ cfg_unstable_metrics! {

mod histogram;
pub(crate) use histogram::{Histogram, HistogramBatch, HistogramBuilder};

#[allow(unreachable_pub)] // rust-lang/rust#57411
pub use histogram::{HistogramScale, HistogramConfiguration, LogHistogram, LogHistogramBuilder, InvalidHistogramConfiguration};

Expand All @@ -34,7 +35,9 @@ cfg_unstable_metrics! {
}

cfg_not_unstable_metrics! {
mod mock;
mod worker;
pub(crate) use worker::WorkerMetrics;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please move the modules and imports that are in both the macro blocks out above them, we don't need to gate them at all.


pub(crate) use mock::{SchedulerMetrics, WorkerMetrics, MetricsBatch, HistogramBuilder};
mod mock;
pub(crate) use mock::{SchedulerMetrics, MetricsBatch, HistogramBuilder};
}
104 changes: 54 additions & 50 deletions tokio/src/runtime/metrics/runtime.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
use crate::runtime::Handle;
#[allow(unused_imports)]
use std::time::Duration;

cfg_64bit_metrics! {
use std::sync::atomic::Ordering::Relaxed;
}

cfg_unstable_metrics! {
use std::ops::Range;
use std::thread::ThreadId;
cfg_64bit_metrics! {
use std::sync::atomic::Ordering::Relaxed;
}
use std::time::Duration;
}

/// Handle to the runtime's metrics.
Expand Down Expand Up @@ -96,6 +98,54 @@ impl RuntimeMetrics {
self.handle.inner.injection_queue_depth()
}

cfg_64bit_metrics! {
/// Returns the amount of time the given worker thread has been busy.
///
/// The worker busy duration starts at zero when the runtime is created and
/// increases whenever the worker is spending time processing work. Using
/// this value can indicate the load of the given worker. If a lot of time
/// is spent busy, then the worker is under load and will check for inbound
/// events less often.
///
/// The timer is monotonically increasing. It is never decremented or reset
/// to zero.
///
/// # Arguments
///
/// `worker` is the index of the worker being queried. The given value must
/// be between 0 and `num_workers()`. The index uniquely identifies a single
/// worker and will continue to identify the worker throughout the lifetime
/// of the runtime instance.
///
/// # Panics
///
/// The method panics when `worker` represents an invalid worker, i.e. is
/// greater than or equal to `num_workers()`.
///
/// # Examples
///
/// ```
/// use tokio::runtime::Handle;
///
/// #[tokio::main]
/// async fn main() {
/// let metrics = Handle::current().metrics();
///
/// let n = metrics.worker_total_busy_duration(0);
/// println!("worker 0 was busy for a total of {:?}", n);
/// }
/// ```
pub fn worker_total_busy_duration(&self, worker: usize) -> Duration {
let nanos = self
.handle
.inner
.worker_metrics(worker)
.busy_duration_total
.load(Relaxed);
Duration::from_nanos(nanos)
}
}

cfg_unstable_metrics! {

/// Returns the number of additional threads spawned by the runtime.
Expand Down Expand Up @@ -543,52 +593,6 @@ impl RuntimeMetrics {
.load(Relaxed)
}

/// Returns the amount of time the given worker thread has been busy.
///
/// The worker busy duration starts at zero when the runtime is created and
/// increases whenever the worker is spending time processing work. Using
/// this value can indicate the load of the given worker. If a lot of time
/// is spent busy, then the worker is under load and will check for inbound
/// events less often.
///
/// The timer is monotonically increasing. It is never decremented or reset
/// to zero.
///
/// # Arguments
///
/// `worker` is the index of the worker being queried. The given value must
/// be between 0 and `num_workers()`. The index uniquely identifies a single
/// worker and will continue to identify the worker throughout the lifetime
/// of the runtime instance.
///
/// # Panics
///
/// The method panics when `worker` represents an invalid worker, i.e. is
/// greater than or equal to `num_workers()`.
///
/// # Examples
///
/// ```
/// use tokio::runtime::Handle;
///
/// #[tokio::main]
/// async fn main() {
/// let metrics = Handle::current().metrics();
///
/// let n = metrics.worker_total_busy_duration(0);
/// println!("worker 0 was busy for a total of {:?}", n);
/// }
/// ```
pub fn worker_total_busy_duration(&self, worker: usize) -> Duration {
let nanos = self
.handle
.inner
.worker_metrics(worker)
.busy_duration_total
.load(Relaxed);
Duration::from_nanos(nanos)
}

/// Returns the number of tasks scheduled from **within** the runtime on the
/// given worker's local queue.
///
Expand Down
59 changes: 47 additions & 12 deletions tokio/src/runtime/metrics/worker.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
use crate::runtime::metrics::Histogram;
use crate::runtime::Config;
use crate::util::metric_atomics::{MetricAtomicU64, MetricAtomicUsize};
use std::sync::atomic::Ordering::Relaxed;
use std::sync::Mutex;
use std::thread::ThreadId;

cfg_unstable_metrics! {
use crate::runtime::metrics::Histogram;
}

/// Retrieve runtime worker metrics.
///
/// **Note**: This is an [unstable API][unstable]. The public API of this type
Expand All @@ -15,40 +18,60 @@ use std::thread::ThreadId;
#[derive(Debug, Default)]
#[repr(align(128))]
pub(crate) struct WorkerMetrics {
#[cfg(tokio_unstable)]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same—please move stable fields to top

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. Moved stable fields to top

#[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
/// Number of times the worker parked.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this necessary? Since this isn't a public method, it won't appear in the documentation.

pub(crate) park_count: MetricAtomicU64,

#[cfg(tokio_unstable)]
#[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
/// Number of times the worker parked and unparked.
pub(crate) park_unpark_count: MetricAtomicU64,

#[cfg(tokio_unstable)]
#[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
/// Number of times the worker woke then parked again without doing work.
pub(crate) noop_count: MetricAtomicU64,

#[cfg(tokio_unstable)]
#[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
/// Number of tasks the worker stole.
pub(crate) steal_count: MetricAtomicU64,

#[cfg(tokio_unstable)]
#[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
/// Number of times the worker stole
pub(crate) steal_operations: MetricAtomicU64,

#[cfg(tokio_unstable)]
#[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
/// Number of tasks the worker polled.
pub(crate) poll_count: MetricAtomicU64,

#[cfg(tokio_unstable)]
#[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
/// EWMA task poll time, in nanoseconds.
pub(crate) mean_poll_time: MetricAtomicU64,

/// Amount of time the worker spent doing work vs. parking.
pub(crate) busy_duration_total: MetricAtomicU64,

#[cfg(tokio_unstable)]
#[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
/// Number of tasks scheduled for execution on the worker's local queue.
pub(crate) local_schedule_count: MetricAtomicU64,

#[cfg(tokio_unstable)]
#[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
/// Number of tasks moved from the local queue to the global queue to free space.
pub(crate) overflow_count: MetricAtomicU64,

/// Number of tasks currently in the local queue. Used only by the
/// current-thread scheduler.
pub(crate) queue_depth: MetricAtomicUsize,

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this intentionally stabilized?

Copy link
Contributor Author

@Owen-CH-Leung Owen-CH-Leung Jan 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe queue_depth is already stabilised at master and is used by the current thread scheduler :

https://github.com/tokio-rs/tokio/blob/master/tokio/src/runtime/scheduler/current_thread/mod.rs#L341

#[cfg(tokio_unstable)]
#[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
/// If `Some`, tracks the number of polls by duration range.
pub(super) poll_count_histogram: Option<Histogram>,

Expand All @@ -57,29 +80,41 @@ pub(crate) struct WorkerMetrics {
}

impl WorkerMetrics {
pub(crate) fn from_config(config: &Config) -> WorkerMetrics {
let mut worker_metrics = WorkerMetrics::new();
worker_metrics.poll_count_histogram = config
.metrics_poll_count_histogram
.as_ref()
.map(|histogram_builder| histogram_builder.build());
worker_metrics
cfg_unstable_metrics! {
pub(crate) fn from_config(config: &Config) -> WorkerMetrics {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be better if we grouped all the unstable functions together at the bottom of the impl block, instead of spreading them out.

let mut worker_metrics = WorkerMetrics::new();
worker_metrics.poll_count_histogram = config
.metrics_poll_count_histogram
.as_ref()
.map(|histogram_builder| histogram_builder.build());
worker_metrics
}
}

cfg_not_unstable_metrics! {
pub(crate) fn from_config(_: &Config) -> WorkerMetrics {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's move this down to be right above the cfg_unstable_metrics! block so that we keep the conditionally compiled implementations together.

WorkerMetrics::new()
}
}

pub(crate) fn new() -> WorkerMetrics {
WorkerMetrics::default()
}

pub(crate) fn queue_depth(&self) -> usize {
self.queue_depth.load(Relaxed)
cfg_unstable_metrics! {
pub(crate) fn queue_depth(&self) -> usize {
self.queue_depth.load(Relaxed)
}
}

pub(crate) fn set_queue_depth(&self, len: usize) {
self.queue_depth.store(len, Relaxed);
}

pub(crate) fn thread_id(&self) -> Option<ThreadId> {
*self.thread_id.lock().unwrap()
cfg_unstable_metrics! {
pub(crate) fn thread_id(&self) -> Option<ThreadId> {
*self.thread_id.lock().unwrap()
}
}

pub(crate) fn set_thread_id(&self, thread_id: ThreadId) {
Expand Down
10 changes: 5 additions & 5 deletions tokio/src/runtime/scheduler/current_thread/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -566,6 +566,11 @@ impl Handle {
pub(crate) fn injection_queue_depth(&self) -> usize {
self.shared.inject.len()
}

pub(crate) fn worker_metrics(&self, worker: usize) -> &WorkerMetrics {
assert_eq!(0, worker);
&self.shared.worker_metrics
}
}

cfg_unstable_metrics! {
Expand All @@ -574,11 +579,6 @@ cfg_unstable_metrics! {
&self.shared.scheduler_metrics
}

pub(crate) fn worker_metrics(&self, worker: usize) -> &WorkerMetrics {
assert_eq!(0, worker);
&self.shared.worker_metrics
}

pub(crate) fn worker_local_queue_depth(&self, worker: usize) -> usize {
self.worker_metrics(worker).queue_depth()
}
Expand Down
Loading
Loading