From 18c0960ad695f06e5aa43b9d947c1ecf94f9618e Mon Sep 17 00:00:00 2001 From: Alex Koshelev Date: Mon, 21 Oct 2024 11:07:11 -0700 Subject: [PATCH] Add metric-tracing crate This will be used to support metric partitioning in unit tests --- Cargo.toml | 2 +- ipa-metrics-tracing/Cargo.toml | 10 +++ ipa-metrics-tracing/src/layer.rs | 123 +++++++++++++++++++++++++++++++ ipa-metrics-tracing/src/lib.rs | 7 ++ 4 files changed, 141 insertions(+), 1 deletion(-) create mode 100644 ipa-metrics-tracing/Cargo.toml create mode 100644 ipa-metrics-tracing/src/layer.rs create mode 100644 ipa-metrics-tracing/src/lib.rs diff --git a/Cargo.toml b/Cargo.toml index 1aed2b4b7..deb437919 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [workspace] resolver = "2" -members = ["ipa-core", "ipa-step", "ipa-step-derive", "ipa-step-test", "ipa-metrics"] +members = ["ipa-core", "ipa-step", "ipa-step-derive", "ipa-step-test", "ipa-metrics", "ipa-metrics-tracing"] [profile.release] incremental = true diff --git a/ipa-metrics-tracing/Cargo.toml b/ipa-metrics-tracing/Cargo.toml new file mode 100644 index 000000000..ac7314c19 --- /dev/null +++ b/ipa-metrics-tracing/Cargo.toml @@ -0,0 +1,10 @@ +[package] +name = "ipa-metrics-tracing" +version = "0.1.0" +edition = "2021" + +[dependencies] +# requires partitions feature because without it, it does not make sense to use +ipa-metrics = { version = "*", path = "../ipa-metrics", features = ["partitions"] } +tracing = "0.1" +tracing-subscriber = "0.3" \ No newline at end of file diff --git a/ipa-metrics-tracing/src/layer.rs b/ipa-metrics-tracing/src/layer.rs new file mode 100644 index 000000000..85d07d910 --- /dev/null +++ b/ipa-metrics-tracing/src/layer.rs @@ -0,0 +1,123 @@ +use std::fmt::Debug; + +use ipa_metrics::{CurrentThreadPartitionContext, MetricPartition, MetricsCurrentThreadContext}; +use tracing::{ + field::{Field, Visit}, + span::{Attributes, Record}, + Id, Subscriber, +}; +use tracing_subscriber::{ + layer::Context, + registry::{Extensions, ExtensionsMut, LookupSpan}, + Layer, +}; + +pub const FIELD: &str = concat!(env!("CARGO_PKG_NAME"), "-", "metrics-partition"); + +/// This layer allows partitioning metric stores. +/// This can be used in tests, where each unit test +/// creates its own unique root span. Upon entering +/// this span, this layer sets a unique partition key +#[derive(Default)] +pub struct MetricsPartitioningLayer; + +impl LookupSpan<'s>> Layer for MetricsPartitioningLayer { + fn on_new_span(&self, attrs: &Attributes<'_>, id: &Id, ctx: Context<'_, S>) { + #[derive(Default)] + struct MaybeMetricPartition(Option); + + impl Visit for MaybeMetricPartition { + fn record_u64(&mut self, field: &Field, value: u64) { + if field.name() == FIELD { + self.0 = Some(value); + } + } + + fn record_debug(&mut self, _field: &Field, _value: &dyn Debug) { + // not interested in anything else except MetricPartition values. + } + } + + let record = Record::new(attrs.values()); + let mut metric_partition = MaybeMetricPartition::default(); + record.record(&mut metric_partition); + if let Some(v) = metric_partition.0 { + let span = ctx.span(id).expect("Span should exists upon entering"); + span.extensions_mut().insert(MetricPartitionExt { + prev: None, + current: v, + }); + } + } + + fn on_enter(&self, id: &Id, ctx: Context<'_, S>) { + let span = ctx.span(id).expect("Span should exists upon entering"); + MetricPartitionExt::span_enter(span.extensions_mut()); + } + + fn on_exit(&self, id: &Id, ctx: Context<'_, S>) { + let span = ctx.span(id).expect("Span should exists upon exiting"); + MetricPartitionExt::span_exit(span.extensions_mut()); + } + + fn on_close(&self, id: Id, ctx: Context<'_, S>) { + let span = ctx.span(&id).expect("Span should exists before closing it"); + MetricPartitionExt::span_close(&span.extensions()); + } +} + +struct MetricPartitionExt { + // Partition active before span is entered. + prev: Option, + // Partition that must be set when this span is entered. + current: MetricPartition, +} + +impl MetricPartitionExt { + fn span_enter(mut span_ext: ExtensionsMut<'_>) { + if let Some(MetricPartitionExt { current, prev }) = span_ext.get_mut() { + *prev = CurrentThreadPartitionContext::get(); + CurrentThreadPartitionContext::set(*current); + } + } + + fn span_exit(mut span_ext: ExtensionsMut) { + if let Some(MetricPartitionExt { prev, .. }) = span_ext.get_mut() { + CurrentThreadPartitionContext::toggle(prev.take()); + } + } + + fn span_close(span_ext: &Extensions) { + if let Some(MetricPartitionExt { .. }) = span_ext.get() { + MetricsCurrentThreadContext::flush(); + } + } +} + +#[cfg(test)] +mod tests { + use ipa_metrics::CurrentThreadPartitionContext; + use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; + + use crate::{layer::FIELD, MetricsPartitioningLayer}; + + #[test] + fn basic() { + CurrentThreadPartitionContext::set(0); + tracing_subscriber::registry() + .with(MetricsPartitioningLayer) + .init(); + let span1 = tracing::info_span!("", { FIELD } = 1_u64); + let span2 = tracing::info_span!("", { FIELD } = 2_u64); + { + let _guard1 = span1.enter(); + assert_eq!(Some(1), CurrentThreadPartitionContext::get()); + { + let _guard2 = span2.enter(); + assert_eq!(Some(2), CurrentThreadPartitionContext::get()); + } + assert_eq!(Some(1), CurrentThreadPartitionContext::get()); + } + assert_eq!(Some(0), CurrentThreadPartitionContext::get()); + } +} diff --git a/ipa-metrics-tracing/src/lib.rs b/ipa-metrics-tracing/src/lib.rs new file mode 100644 index 000000000..c72bb9e54 --- /dev/null +++ b/ipa-metrics-tracing/src/lib.rs @@ -0,0 +1,7 @@ +#![deny(clippy::pedantic)] +#![allow(clippy::similar_names)] +#![allow(clippy::module_name_repetitions)] + +mod layer; + +pub use layer::{MetricsPartitioningLayer, FIELD as PARTITION_FIELD};