From 40620ef4970b20be9e439f740996bb7fb1ae9864 Mon Sep 17 00:00:00 2001 From: Toby Lawrence Date: Thu, 27 Feb 2025 11:36:49 -0500 Subject: [PATCH] add support to the prometheus destination for histograms with two-tier exponential buckets --- .../src/destinations/prometheus/mod.rs | 196 +++++++++++++++++- lib/saluki-event/src/metric/mod.rs | 4 +- lib/saluki-event/src/metric/value/mod.rs | 3 +- 3 files changed, 198 insertions(+), 5 deletions(-) diff --git a/lib/saluki-components/src/destinations/prometheus/mod.rs b/lib/saluki-components/src/destinations/prometheus/mod.rs index 1a8e65d7..1e101374 100644 --- a/lib/saluki-components/src/destinations/prometheus/mod.rs +++ b/lib/saluki-components/src/destinations/prometheus/mod.rs @@ -1,4 +1,9 @@ -use std::{convert::Infallible, fmt::Write as _, num::NonZeroUsize, sync::Arc}; +use std::{ + convert::Infallible, + fmt::Write as _, + num::NonZeroUsize, + sync::{Arc, LazyLock}, +}; use async_trait::async_trait; use ddsketch_agent::DDSketch; @@ -11,7 +16,7 @@ use saluki_context::{tags::Tagged as _, Context}; use saluki_core::components::{destinations::*, ComponentContext}; use saluki_error::GenericError; use saluki_event::{ - metric::{Metric, MetricValues}, + metric::{Histogram, Metric, MetricValues}, DataType, }; use saluki_io::net::{ @@ -30,6 +35,16 @@ const PAYLOAD_BUFFER_SIZE_LIMIT_BYTES: usize = 16384; const TAGS_BUFFER_SIZE_LIMIT_BYTES: usize = 1024; const NAME_NORMALIZATION_BUFFER_SIZE: usize = 512; +// Histogram-related constants and pre-calculated buckets. +const HISTOGRAM_LOWER_RANGE_START: f64 = 0.0000001; +const HISTOGRAM_LOWER_RANGE_BUCKETS: usize = 20; +const HISTOGRAM_LOWER_RANGE_GROWTH_FACTOR: f64 = 2.25; +const HISTOGRAM_UPPER_RANGE_START: f64 = 1.0; +const HISTOGRAM_UPPER_RANGE_BUCKETS: usize = 10; +const HISTOGRAM_UPPER_RANGE_GROWTH_FACTOR: f64 = 2.0; +const HISTOGRAM_MAX_BUCKETS: usize = HISTOGRAM_LOWER_RANGE_BUCKETS + HISTOGRAM_UPPER_RANGE_BUCKETS; +static HISTOGRAM_BUCKETS: LazyLock<[(f64, &'static str); HISTOGRAM_MAX_BUCKETS]> = LazyLock::new(histogram_buckets); + // SAFETY: This is obviously not zero. const METRIC_NAME_STRING_INTERNER_BYTES: NonZeroUsize = unsafe { NonZeroUsize::new_unchecked(65536) }; @@ -291,6 +306,42 @@ fn write_metrics( } writeln!(payload_buffer, " {}", value).unwrap(); } + PrometheusValue::Histogram(histogram) => { + // Write the histogram buckets. + for (upper_bound_str, count) in histogram.buckets() { + if count != 0 { + write!(payload_buffer, "{}_bucket{{{}", &prom_context.metric_name, tags_buffer).unwrap(); + if !tags_buffer.is_empty() { + payload_buffer.push(','); + } + writeln!(payload_buffer, "le=\"{}\"}} {}", upper_bound_str, count).unwrap(); + } + } + + // Write the final bucket -- the +Inf bucket -- which is just equal to the count of the histogram. + write!(payload_buffer, "{}_bucket{{{}", &prom_context.metric_name, tags_buffer).unwrap(); + if !tags_buffer.is_empty() { + payload_buffer.push(','); + } + writeln!(payload_buffer, "le=\"+Inf\"}} {}", histogram.count).unwrap(); + + // Write the histogram sum and count. + write!(payload_buffer, "{}_sum", &prom_context.metric_name).unwrap(); + if !tags_buffer.is_empty() { + payload_buffer.push('{'); + payload_buffer.push_str(tags_buffer); + payload_buffer.push('}'); + } + writeln!(payload_buffer, " {}", histogram.sum).unwrap(); + + write!(payload_buffer, "{}_count", &prom_context.metric_name).unwrap(); + if !tags_buffer.is_empty() { + payload_buffer.push('{'); + payload_buffer.push_str(tags_buffer); + payload_buffer.push('}'); + } + writeln!(payload_buffer, " {}", histogram.count).unwrap(); + } PrometheusValue::Summary(sketch) => { // We take a fixed set of quantiles from the sketch, which is hard-coded but should generally represent // the quantiles people generally care about. @@ -374,6 +425,7 @@ fn format_tags(tags_buffer: &mut String, context: &Context) -> bool { enum PrometheusType { Counter, Gauge, + Histogram, Summary, } @@ -382,6 +434,7 @@ impl PrometheusType { match self { Self::Counter => "counter", Self::Gauge => "gauge", + Self::Histogram => "histogram", Self::Summary => "summary", } } @@ -396,6 +449,7 @@ struct PrometheusContext { enum PrometheusValue { Counter(f64), Gauge(f64), + Histogram(PrometheusHistogram), Summary(DDSketch), } @@ -404,6 +458,7 @@ impl PrometheusValue { match (self, other) { (Self::Counter(a), Self::Counter(b)) => *a += b, (Self::Gauge(a), Self::Gauge(b)) => *a = b, + (Self::Histogram(a), Self::Histogram(b)) => a.merge(&b), (Self::Summary(a), Self::Summary(b)) => a.merge(&b), _ => unreachable!(), } @@ -452,6 +507,15 @@ fn into_prometheus_metric( PrometheusValue::Gauge(latest_value.unwrap_or_default()), ) } + MetricValues::Histogram(histograms) => { + let prom_hist = histograms + .into_iter() + .fold(PrometheusHistogram::new(), |mut acc, (_, hist)| { + acc.merge_histogram(&hist); + acc + }); + (PrometheusType::Histogram, PrometheusValue::Histogram(prom_hist)) + } MetricValues::Distribution(sketches) => { let sketch = sketches.into_iter().fold(DDSketch::default(), |mut acc, (_, sketch)| { acc.merge(&sketch); @@ -504,3 +568,131 @@ fn is_valid_name_char(c: char) -> bool { // Matches a regular expression of [a-zA-Z0-9_:]. c.is_ascii_alphanumeric() || c == '_' || c == ':' } + +struct PrometheusHistogram { + sum: f64, + count: u64, + buckets: Vec<(f64, u64)>, +} + +impl PrometheusHistogram { + fn new() -> Self { + let mut buckets = Vec::with_capacity(HISTOGRAM_BUCKETS.len()); + for (upper_bound, _) in HISTOGRAM_BUCKETS.iter() { + buckets.push((*upper_bound, 0)); + } + + Self { + sum: 0.0, + count: 0, + buckets, + } + } + + fn merge(&mut self, other: &Self) { + self.sum += other.sum; + self.count += other.count; + + // Extend our buckets to match the other buckets, if our bucket count is less than `other`. + if self.buckets.len() < other.buckets.len() { + for (upper_bound, _) in HISTOGRAM_BUCKETS.iter() { + if self.buckets.len() == other.buckets.len() { + break; + } + + self.buckets.push((*upper_bound, 0)); + } + } + + // Now just add the counts from `other`, in order, since we know our buckets are always in order and have + // identical bounds for the same indices. + for (i, (_, other_count)) in other.buckets.iter().enumerate() { + self.buckets[i].1 += other_count; + } + } + + fn merge_histogram(&mut self, histogram: &Histogram) { + for sample in histogram.samples() { + self.add_sample(sample.value.into_inner(), sample.weight); + } + } + + fn add_sample(&mut self, value: f64, weight: u64) { + self.sum += value * weight as f64; + self.count += weight; + + // Add the value to each bucket that it falls into, up to the maximum number of buckets. + for (i, (upper_bound, _)) in HISTOGRAM_BUCKETS.iter().enumerate() { + if value <= *upper_bound { + if self.buckets.len() <= i { + self.buckets.push((*upper_bound, 0)); + } + + self.buckets[i].1 += weight; + } + } + } + + fn buckets(&self) -> impl Iterator + '_ { + self.buckets + .iter() + .zip(HISTOGRAM_BUCKETS.iter()) + .map(|((_, count), (_, upper_bound_str))| (*upper_bound_str, *count)) + } +} + +fn histogram_buckets() -> [(f64, &'static str); HISTOGRAM_MAX_BUCKETS] { + // We generate two separate bucket ranges, meant to maximize resolution at different scale. + // + // The lower range is meant to cover values from 0 to 1, which generally includes time-based measurements for values + // that potentially reach down into the hundreds of nanoseconds range. We want a lot of granularity here. + // + // The upper range is meant to extend from 1 to infinity, which generally includes count-based measurements for + // things like the number of events in an event buffer: values that are often in the tens or hundreds. + // + // This is a fairly customized bucket range based on our knowledge of our internal telemetry, but is still decently + // generic and should be useful for most use cases. + + let mut buckets = [(0.0, ""); HISTOGRAM_MAX_BUCKETS]; + + let mut lower_range_end_idx = 0; + + // Generate the buckets for the lower range (0-1), and stop ourselves if our upper bound would exceed the start of + // the upper range. + for (i, (bucket_le, bucket_le_str)) in &mut buckets[0..HISTOGRAM_LOWER_RANGE_BUCKETS].iter_mut().enumerate() { + let current_le = if i == 0 { + HISTOGRAM_LOWER_RANGE_START + } else { + HISTOGRAM_LOWER_RANGE_START * HISTOGRAM_LOWER_RANGE_GROWTH_FACTOR.powf(i as f64) + }; + + if current_le > HISTOGRAM_UPPER_RANGE_START { + break; + } + + let current_le_str = format!("{}", current_le); + + *bucket_le = current_le; + *bucket_le_str = current_le_str.leak(); + lower_range_end_idx = i; + } + + // Generate the buckets for the upper range (1 - infinity). + // + // We start from where the lower range left off, in case we stopped before reaching bucket range midpoint. + let upper_range_start_idx = lower_range_end_idx + 1; + for (i, (bucket_le, bucket_le_str)) in &mut buckets[upper_range_start_idx..].iter_mut().enumerate() { + let current_le = if i == 0 { + HISTOGRAM_UPPER_RANGE_START + } else { + HISTOGRAM_UPPER_RANGE_START * HISTOGRAM_UPPER_RANGE_GROWTH_FACTOR.powf(i as f64) + }; + + let current_le_str = format!("{}", current_le); + + *bucket_le = current_le; + *bucket_le_str = current_le_str.leak(); + } + + buckets +} diff --git a/lib/saluki-event/src/metric/mod.rs b/lib/saluki-event/src/metric/mod.rs index 27cc4ff4..80db97c5 100644 --- a/lib/saluki-event/src/metric/mod.rs +++ b/lib/saluki-event/src/metric/mod.rs @@ -8,7 +8,9 @@ use saluki_context::Context; pub use self::metadata::*; mod value; -pub use self::value::{HistogramPoints, HistogramSummary, MetricValues, ScalarPoints, SetPoints, SketchPoints}; +pub use self::value::{ + Histogram, HistogramPoints, HistogramSummary, MetricValues, ScalarPoints, SetPoints, SketchPoints, +}; /// A metric. /// diff --git a/lib/saluki-event/src/metric/value/mod.rs b/lib/saluki-event/src/metric/value/mod.rs index 0a61520b..e41bff96 100644 --- a/lib/saluki-event/src/metric/value/mod.rs +++ b/lib/saluki-event/src/metric/value/mod.rs @@ -10,8 +10,7 @@ mod sketch; pub use self::sketch::SketchPoints; mod histogram; -use self::histogram::Histogram; -pub use self::histogram::{HistogramPoints, HistogramSummary}; +pub use self::histogram::{Histogram, HistogramPoints, HistogramSummary}; mod scalar; pub use self::scalar::ScalarPoints;