Skip to content

Commit

Permalink
add support to the prometheus destination for histograms with two-tie…
Browse files Browse the repository at this point in the history
…r exponential buckets
  • Loading branch information
tobz committed Feb 27, 2025
1 parent 4369102 commit 40620ef
Show file tree
Hide file tree
Showing 3 changed files with 198 additions and 5 deletions.
196 changes: 194 additions & 2 deletions lib/saluki-components/src/destinations/prometheus/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
use std::{convert::Infallible, fmt::Write as _, num::NonZeroUsize, sync::Arc};
use std::{
convert::Infallible,
fmt::Write as _,
num::NonZeroUsize,
sync::{Arc, LazyLock},
};

use async_trait::async_trait;
use ddsketch_agent::DDSketch;
Expand All @@ -11,7 +16,7 @@ use saluki_context::{tags::Tagged as _, Context};
use saluki_core::components::{destinations::*, ComponentContext};
use saluki_error::GenericError;
use saluki_event::{
metric::{Metric, MetricValues},
metric::{Histogram, Metric, MetricValues},
DataType,
};
use saluki_io::net::{
Expand All @@ -30,6 +35,16 @@ const PAYLOAD_BUFFER_SIZE_LIMIT_BYTES: usize = 16384;
const TAGS_BUFFER_SIZE_LIMIT_BYTES: usize = 1024;
const NAME_NORMALIZATION_BUFFER_SIZE: usize = 512;

// Histogram-related constants and pre-calculated buckets.
const HISTOGRAM_LOWER_RANGE_START: f64 = 0.0000001;
const HISTOGRAM_LOWER_RANGE_BUCKETS: usize = 20;
const HISTOGRAM_LOWER_RANGE_GROWTH_FACTOR: f64 = 2.25;
const HISTOGRAM_UPPER_RANGE_START: f64 = 1.0;
const HISTOGRAM_UPPER_RANGE_BUCKETS: usize = 10;
const HISTOGRAM_UPPER_RANGE_GROWTH_FACTOR: f64 = 2.0;
const HISTOGRAM_MAX_BUCKETS: usize = HISTOGRAM_LOWER_RANGE_BUCKETS + HISTOGRAM_UPPER_RANGE_BUCKETS;
static HISTOGRAM_BUCKETS: LazyLock<[(f64, &'static str); HISTOGRAM_MAX_BUCKETS]> = LazyLock::new(histogram_buckets);

// SAFETY: This is obviously not zero.
const METRIC_NAME_STRING_INTERNER_BYTES: NonZeroUsize = unsafe { NonZeroUsize::new_unchecked(65536) };

Expand Down Expand Up @@ -291,6 +306,42 @@ fn write_metrics(
}
writeln!(payload_buffer, " {}", value).unwrap();
}
PrometheusValue::Histogram(histogram) => {
// Write the histogram buckets.
for (upper_bound_str, count) in histogram.buckets() {
if count != 0 {
write!(payload_buffer, "{}_bucket{{{}", &prom_context.metric_name, tags_buffer).unwrap();
if !tags_buffer.is_empty() {
payload_buffer.push(',');
}
writeln!(payload_buffer, "le=\"{}\"}} {}", upper_bound_str, count).unwrap();
}
}

// Write the final bucket -- the +Inf bucket -- which is just equal to the count of the histogram.
write!(payload_buffer, "{}_bucket{{{}", &prom_context.metric_name, tags_buffer).unwrap();
if !tags_buffer.is_empty() {
payload_buffer.push(',');
}
writeln!(payload_buffer, "le=\"+Inf\"}} {}", histogram.count).unwrap();

// Write the histogram sum and count.
write!(payload_buffer, "{}_sum", &prom_context.metric_name).unwrap();
if !tags_buffer.is_empty() {
payload_buffer.push('{');
payload_buffer.push_str(tags_buffer);
payload_buffer.push('}');
}
writeln!(payload_buffer, " {}", histogram.sum).unwrap();

write!(payload_buffer, "{}_count", &prom_context.metric_name).unwrap();
if !tags_buffer.is_empty() {
payload_buffer.push('{');
payload_buffer.push_str(tags_buffer);
payload_buffer.push('}');
}
writeln!(payload_buffer, " {}", histogram.count).unwrap();
}
PrometheusValue::Summary(sketch) => {
// We take a fixed set of quantiles from the sketch, which is hard-coded but should generally represent
// the quantiles people generally care about.
Expand Down Expand Up @@ -374,6 +425,7 @@ fn format_tags(tags_buffer: &mut String, context: &Context) -> bool {
enum PrometheusType {
Counter,
Gauge,
Histogram,
Summary,
}

Expand All @@ -382,6 +434,7 @@ impl PrometheusType {
match self {
Self::Counter => "counter",
Self::Gauge => "gauge",
Self::Histogram => "histogram",
Self::Summary => "summary",
}
}
Expand All @@ -396,6 +449,7 @@ struct PrometheusContext {
enum PrometheusValue {
Counter(f64),
Gauge(f64),
Histogram(PrometheusHistogram),
Summary(DDSketch),
}

Expand All @@ -404,6 +458,7 @@ impl PrometheusValue {
match (self, other) {
(Self::Counter(a), Self::Counter(b)) => *a += b,
(Self::Gauge(a), Self::Gauge(b)) => *a = b,
(Self::Histogram(a), Self::Histogram(b)) => a.merge(&b),
(Self::Summary(a), Self::Summary(b)) => a.merge(&b),
_ => unreachable!(),
}
Expand Down Expand Up @@ -452,6 +507,15 @@ fn into_prometheus_metric(
PrometheusValue::Gauge(latest_value.unwrap_or_default()),
)
}
MetricValues::Histogram(histograms) => {
let prom_hist = histograms
.into_iter()
.fold(PrometheusHistogram::new(), |mut acc, (_, hist)| {
acc.merge_histogram(&hist);
acc
});
(PrometheusType::Histogram, PrometheusValue::Histogram(prom_hist))
}
MetricValues::Distribution(sketches) => {
let sketch = sketches.into_iter().fold(DDSketch::default(), |mut acc, (_, sketch)| {
acc.merge(&sketch);
Expand Down Expand Up @@ -504,3 +568,131 @@ fn is_valid_name_char(c: char) -> bool {
// Matches a regular expression of [a-zA-Z0-9_:].
c.is_ascii_alphanumeric() || c == '_' || c == ':'
}

struct PrometheusHistogram {
sum: f64,
count: u64,
buckets: Vec<(f64, u64)>,
}

impl PrometheusHistogram {
fn new() -> Self {
let mut buckets = Vec::with_capacity(HISTOGRAM_BUCKETS.len());
for (upper_bound, _) in HISTOGRAM_BUCKETS.iter() {
buckets.push((*upper_bound, 0));
}

Self {
sum: 0.0,
count: 0,
buckets,
}
}

fn merge(&mut self, other: &Self) {
self.sum += other.sum;
self.count += other.count;

// Extend our buckets to match the other buckets, if our bucket count is less than `other`.
if self.buckets.len() < other.buckets.len() {
for (upper_bound, _) in HISTOGRAM_BUCKETS.iter() {
if self.buckets.len() == other.buckets.len() {
break;
}

self.buckets.push((*upper_bound, 0));
}
}

// Now just add the counts from `other`, in order, since we know our buckets are always in order and have
// identical bounds for the same indices.
for (i, (_, other_count)) in other.buckets.iter().enumerate() {
self.buckets[i].1 += other_count;
}
}

fn merge_histogram(&mut self, histogram: &Histogram) {
for sample in histogram.samples() {
self.add_sample(sample.value.into_inner(), sample.weight);
}
}

fn add_sample(&mut self, value: f64, weight: u64) {
self.sum += value * weight as f64;
self.count += weight;

// Add the value to each bucket that it falls into, up to the maximum number of buckets.
for (i, (upper_bound, _)) in HISTOGRAM_BUCKETS.iter().enumerate() {
if value <= *upper_bound {
if self.buckets.len() <= i {
self.buckets.push((*upper_bound, 0));
}

self.buckets[i].1 += weight;
}
}
}

fn buckets(&self) -> impl Iterator<Item = (&'static str, u64)> + '_ {
self.buckets
.iter()
.zip(HISTOGRAM_BUCKETS.iter())
.map(|((_, count), (_, upper_bound_str))| (*upper_bound_str, *count))
}
}

fn histogram_buckets() -> [(f64, &'static str); HISTOGRAM_MAX_BUCKETS] {
// We generate two separate bucket ranges, meant to maximize resolution at different scale.
//
// The lower range is meant to cover values from 0 to 1, which generally includes time-based measurements for values
// that potentially reach down into the hundreds of nanoseconds range. We want a lot of granularity here.
//
// The upper range is meant to extend from 1 to infinity, which generally includes count-based measurements for
// things like the number of events in an event buffer: values that are often in the tens or hundreds.
//
// This is a fairly customized bucket range based on our knowledge of our internal telemetry, but is still decently
// generic and should be useful for most use cases.

let mut buckets = [(0.0, ""); HISTOGRAM_MAX_BUCKETS];

let mut lower_range_end_idx = 0;

// Generate the buckets for the lower range (0-1), and stop ourselves if our upper bound would exceed the start of
// the upper range.
for (i, (bucket_le, bucket_le_str)) in &mut buckets[0..HISTOGRAM_LOWER_RANGE_BUCKETS].iter_mut().enumerate() {
let current_le = if i == 0 {
HISTOGRAM_LOWER_RANGE_START
} else {
HISTOGRAM_LOWER_RANGE_START * HISTOGRAM_LOWER_RANGE_GROWTH_FACTOR.powf(i as f64)
};

if current_le > HISTOGRAM_UPPER_RANGE_START {
break;
}

let current_le_str = format!("{}", current_le);

*bucket_le = current_le;
*bucket_le_str = current_le_str.leak();
lower_range_end_idx = i;
}

// Generate the buckets for the upper range (1 - infinity).
//
// We start from where the lower range left off, in case we stopped before reaching bucket range midpoint.
let upper_range_start_idx = lower_range_end_idx + 1;
for (i, (bucket_le, bucket_le_str)) in &mut buckets[upper_range_start_idx..].iter_mut().enumerate() {
let current_le = if i == 0 {
HISTOGRAM_UPPER_RANGE_START
} else {
HISTOGRAM_UPPER_RANGE_START * HISTOGRAM_UPPER_RANGE_GROWTH_FACTOR.powf(i as f64)
};

let current_le_str = format!("{}", current_le);

*bucket_le = current_le;
*bucket_le_str = current_le_str.leak();
}

buckets
}
4 changes: 3 additions & 1 deletion lib/saluki-event/src/metric/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ use saluki_context::Context;
pub use self::metadata::*;

mod value;
pub use self::value::{HistogramPoints, HistogramSummary, MetricValues, ScalarPoints, SetPoints, SketchPoints};
pub use self::value::{
Histogram, HistogramPoints, HistogramSummary, MetricValues, ScalarPoints, SetPoints, SketchPoints,
};

/// A metric.
///
Expand Down
3 changes: 1 addition & 2 deletions lib/saluki-event/src/metric/value/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@ mod sketch;
pub use self::sketch::SketchPoints;

mod histogram;
use self::histogram::Histogram;
pub use self::histogram::{HistogramPoints, HistogramSummary};
pub use self::histogram::{Histogram, HistogramPoints, HistogramSummary};

mod scalar;
pub use self::scalar::ScalarPoints;
Expand Down

0 comments on commit 40620ef

Please sign in to comment.