diff --git a/lib/saluki-context/src/tags/mod.rs b/lib/saluki-context/src/tags/mod.rs index e5ff13e9..efd5ed9a 100644 --- a/lib/saluki-context/src/tags/mod.rs +++ b/lib/saluki-context/src/tags/mod.rs @@ -368,6 +368,22 @@ impl Tagged for TagSet { } } +impl fmt::Display for TagSet { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "[")?; + + for (i, tag) in self.0.iter().enumerate() { + if i > 0 { + write!(f, ",")?; + } + + write!(f, "{}", tag.as_str())?; + } + + write!(f, "]") + } +} + /// A shared, read-only set of tags. #[derive(Clone, Debug)] pub struct SharedTagSet(Arc); diff --git a/lib/saluki-event/src/metric/value/histogram.rs b/lib/saluki-event/src/metric/value/histogram.rs index 65fa6070..b0a19882 100644 --- a/lib/saluki-event/src/metric/value/histogram.rs +++ b/lib/saluki-event/src/metric/value/histogram.rs @@ -1,4 +1,4 @@ -use std::num::NonZeroU64; +use std::{fmt, num::NonZeroU64}; use ordered_float::OrderedFloat; use smallvec::SmallVec; @@ -225,6 +225,15 @@ impl From for HistogramPoints { } } +impl From<(u64, f64)> for HistogramPoints { + fn from((ts, value): (u64, f64)) -> Self { + let mut histogram = Histogram::default(); + histogram.insert(value, SampleRate::unsampled()); + + Self(TimestampedValue::from((ts, histogram)).into()) + } +} + impl From<[f64; N]> for HistogramPoints { fn from(values: [f64; N]) -> Self { let mut histogram = Histogram::default(); @@ -236,6 +245,33 @@ impl From<[f64; N]> for HistogramPoints { } } +impl From<(u64, [f64; N])> for HistogramPoints { + fn from((ts, values): (u64, [f64; N])) -> Self { + let mut histogram = Histogram::default(); + for value in values { + histogram.insert(value, SampleRate::unsampled()); + } + + Self(TimestampedValue::from((ts, histogram)).into()) + } +} + +impl From<[(u64, f64); N]> for HistogramPoints { + fn from(values: [(u64, f64); N]) -> Self { + Self( + values + .into_iter() + .map(|(ts, value)| { + let mut histogram = Histogram::default(); + histogram.insert(value, SampleRate::unsampled()); + + (ts, histogram) + }) + .into(), + ) + } +} + impl<'a> From<&'a [f64]> for HistogramPoints { fn from(values: &'a [f64]) -> Self { let mut histogram = Histogram::default(); @@ -291,6 +327,28 @@ impl<'a> IntoIterator for &'a mut HistogramPoints { } } +impl fmt::Display for HistogramPoints { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "[")?; + for (i, point) in self.0.values.iter().enumerate() { + if i > 0 { + write!(f, ",")?; + } + + let ts = point.timestamp.map(|ts| ts.get()).unwrap_or_default(); + write!(f, "({}, [", ts)?; + for (j, sample) in point.value.samples().iter().enumerate() { + if j > 0 { + write!(f, ",")?; + } + write!(f, "{{{} * {}}}", sample.value, sample.weight)?; + } + write!(f, "])")?; + } + write!(f, "]") + } +} + pub struct HistogramIter { inner: smallvec::IntoIter<[TimestampedValue; 1]>, } diff --git a/lib/saluki-event/src/metric/value/mod.rs b/lib/saluki-event/src/metric/value/mod.rs index 47fa4c39..fd662808 100644 --- a/lib/saluki-event/src/metric/value/mod.rs +++ b/lib/saluki-event/src/metric/value/mod.rs @@ -1,6 +1,6 @@ mod iter; -use std::{collections::HashSet, num::NonZeroU64, time::Duration}; +use std::{collections::HashSet, fmt, num::NonZeroU64, time::Duration}; use ddsketch_agent::DDSketch; use ordered_float::OrderedFloat; @@ -409,24 +409,16 @@ impl MetricValues { pub fn collapse_non_timestamped(&mut self, timestamp: u64) { match self { // Collapse by summing. - Self::Counter(points) => points - .inner_mut() - .collapse_non_timestamped(timestamp, collapse_scalar_merge), - Self::Rate(points, _) => points - .inner_mut() - .collapse_non_timestamped(timestamp, collapse_scalar_merge), + Self::Counter(points) => points.inner_mut().collapse_non_timestamped(timestamp, merge_scalar_sum), + Self::Rate(points, _) => points.inner_mut().collapse_non_timestamped(timestamp, merge_scalar_sum), // Collapse by keeping the last value. Self::Gauge(points) => points .inner_mut() - .collapse_non_timestamped(timestamp, collapse_scalar_latest), + .collapse_non_timestamped(timestamp, merge_scalar_latest), // Collapse by merging. - Self::Set(points) => points.inner_mut().collapse_non_timestamped(timestamp, collapse_set), - Self::Histogram(points) => points - .inner_mut() - .collapse_non_timestamped(timestamp, collapse_histogram), - Self::Distribution(sketches) => sketches - .inner_mut() - .collapse_non_timestamped(timestamp, collapse_sketch), + Self::Set(points) => points.inner_mut().collapse_non_timestamped(timestamp, merge_set), + Self::Histogram(points) => points.inner_mut().collapse_non_timestamped(timestamp, merge_histogram), + Self::Distribution(sketches) => sketches.inner_mut().collapse_non_timestamped(timestamp, merge_sketch), } } @@ -440,16 +432,16 @@ impl MetricValues { /// existing value. pub fn merge(&mut self, other: Self) { match (self, other) { - (Self::Counter(a), Self::Counter(b)) => a.merge(b), + (Self::Counter(a), Self::Counter(b)) => a.merge(b, merge_scalar_sum), (Self::Rate(a_points, a_interval), Self::Rate(b_points, b_interval)) => { if *a_interval != b_interval { *a_points = b_points; *a_interval = b_interval; } else { - a_points.merge(b_points); + a_points.merge(b_points, merge_scalar_sum); } } - (Self::Gauge(a), Self::Gauge(b)) => *a = b, + (Self::Gauge(a), Self::Gauge(b)) => a.merge(b, merge_scalar_latest), (Self::Set(a), Self::Set(b)) => a.merge(b), (Self::Histogram(a), Self::Histogram(b)) => a.merge(b), (Self::Distribution(a), Self::Distribution(b)) => a.merge(b), @@ -472,22 +464,319 @@ impl MetricValues { } } -fn collapse_scalar_merge(dest: &mut OrderedFloat, src: &mut OrderedFloat) { +impl fmt::Display for MetricValues { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::Counter(points) => write!(f, "{}", points), + Self::Rate(points, interval) => write!(f, "{} over {:?}", points, interval), + Self::Gauge(points) => write!(f, "{}", points), + Self::Set(points) => write!(f, "{}", points), + Self::Histogram(points) => write!(f, "{}", points), + Self::Distribution(points) => write!(f, "{}", points), + } + } +} + +fn merge_scalar_sum(dest: &mut OrderedFloat, src: &mut OrderedFloat) { *dest += *src; } -fn collapse_scalar_latest(dest: &mut OrderedFloat, src: &mut OrderedFloat) { +fn merge_scalar_latest(dest: &mut OrderedFloat, src: &mut OrderedFloat) { *dest = *src; } -fn collapse_set(dest: &mut HashSet, src: &mut HashSet) { +fn merge_set(dest: &mut HashSet, src: &mut HashSet) { dest.extend(src.drain()); } -fn collapse_histogram(dest: &mut Histogram, src: &mut Histogram) { +fn merge_histogram(dest: &mut Histogram, src: &mut Histogram) { dest.merge(src); } -fn collapse_sketch(dest: &mut DDSketch, src: &mut DDSketch) { +fn merge_sketch(dest: &mut DDSketch, src: &mut DDSketch) { dest.merge(src); } + +#[cfg(test)] +mod tests { + use std::time::Duration; + + use super::{HistogramPoints, MetricValues, SetPoints, SketchPoints}; + use crate::metric::ScalarPoints; + + #[test] + fn merge_counters() { + let cases = [ + // Both A and B have single point with an identical timestamp, so the points should be merged. + ( + ScalarPoints::from((1, 1.0)), + ScalarPoints::from((1, 2.0)), + ScalarPoints::from((1, 3.0)), + ), + // A has a single point with a timestamp, B has a single point without a timestamp, so both points should be kept. + ( + ScalarPoints::from((1, 1.0)), + ScalarPoints::from(2.0), + ScalarPoints::from([(0, 2.0), (1, 1.0)]), + ), + // Both A and B have single point without a timestamp, so the points should be merged. + ( + ScalarPoints::from(5.0), + ScalarPoints::from(6.0), + ScalarPoints::from(11.0), + ), + ]; + + for (a, b, expected) in cases { + let mut merged = MetricValues::Counter(a.clone()); + merged.merge(MetricValues::Counter(b.clone())); + + assert_eq!( + merged, + MetricValues::Counter(expected.clone()), + "merged {} with {}, expected {} but got {}", + a, + b, + expected, + merged + ); + } + } + + #[test] + fn merge_gauges() { + let cases = [ + // Both A and B have single point with an identical timestamp, so B's point value should override A's point value. + ( + ScalarPoints::from((1, 1.0)), + ScalarPoints::from((1, 2.0)), + ScalarPoints::from((1, 2.0)), + ), + // A has a single point with a timestamp, B has a single point without a timestamp, so both points should be kept. + ( + ScalarPoints::from((1, 1.0)), + ScalarPoints::from(2.0), + ScalarPoints::from([(0, 2.0), (1, 1.0)]), + ), + // Both A and B have single point without a timestamp, so B's point value should override A's point value. + ( + ScalarPoints::from(5.0), + ScalarPoints::from(6.0), + ScalarPoints::from(6.0), + ), + ]; + + for (a, b, expected) in cases { + let mut merged = MetricValues::Gauge(a.clone()); + merged.merge(MetricValues::Gauge(b.clone())); + + assert_eq!( + merged, + MetricValues::Gauge(expected.clone()), + "merged {} with {}, expected {} but got {}", + a, + b, + expected, + merged + ); + } + } + + #[test] + fn merge_rates() { + const FIVE_SECS: Duration = Duration::from_secs(5); + const TEN_SECS: Duration = Duration::from_secs(10); + + let cases = [ + // Both A and B have single point with an identical timestamp, and identical intervals, so the points should be merged. + ( + ScalarPoints::from((1, 1.0)), + FIVE_SECS, + ScalarPoints::from((1, 2.0)), + FIVE_SECS, + ScalarPoints::from((1, 3.0)), + FIVE_SECS, + ), + // A has a single point with a timestamp, B has a single point without a timestamp, and identical intervals, so both points should be kept. + ( + ScalarPoints::from((1, 1.0)), + FIVE_SECS, + ScalarPoints::from(2.0), + FIVE_SECS, + ScalarPoints::from([(0, 2.0), (1, 1.0)]), + FIVE_SECS, + ), + // Both A and B have single point without a timestamp, and identical intervals, so the points should be merged. + ( + ScalarPoints::from(5.0), + FIVE_SECS, + ScalarPoints::from(6.0), + FIVE_SECS, + ScalarPoints::from(11.0), + FIVE_SECS, + ), + // We do three permutations here -- identical timestamped point, differing timestamped point, + // non-timestamped point -- but always with differing intervals, which should lead to B overriding A + // entirely. + ( + ScalarPoints::from((1, 1.0)), + FIVE_SECS, + ScalarPoints::from((1, 2.0)), + TEN_SECS, + ScalarPoints::from((1, 2.0)), + TEN_SECS, + ), + ( + ScalarPoints::from((1, 3.0)), + FIVE_SECS, + ScalarPoints::from(4.0), + TEN_SECS, + ScalarPoints::from((0, 4.0)), + TEN_SECS, + ), + ( + ScalarPoints::from(7.0), + TEN_SECS, + ScalarPoints::from(9.0), + FIVE_SECS, + ScalarPoints::from(9.0), + FIVE_SECS, + ), + ]; + + for (a, a_interval, b, b_interval, expected, expected_interval) in cases { + let mut merged = MetricValues::Rate(a.clone(), a_interval); + merged.merge(MetricValues::Rate(b.clone(), b_interval)); + + assert_eq!( + merged, + MetricValues::Rate(expected.clone(), expected_interval), + "merged {}/{:?} with {}/{:?}, expected {} but got {}", + a, + a_interval, + b, + b_interval, + expected, + merged + ); + } + } + + #[test] + fn merge_sets() { + let cases = [ + // Both A and B have single point with an identical timestamp, so the values should be merged. + ( + SetPoints::from((1, "foo")), + SetPoints::from((1, "bar")), + SetPoints::from((1, ["foo", "bar"])), + ), + // A has a single point with a timestamp, B has a single point without a timestamp, so both points should be + // kept. + ( + SetPoints::from((1, "foo")), + SetPoints::from("bar"), + SetPoints::from([(0, "bar"), (1, "foo")]), + ), + // Both A and B have single point without a timestamp, so the values should be merged. + ( + SetPoints::from("foo"), + SetPoints::from("bar"), + SetPoints::from(["foo", "bar"]), + ), + ]; + + for (a, b, expected) in cases { + let mut merged = MetricValues::Set(a.clone()); + merged.merge(MetricValues::Set(b.clone())); + + assert_eq!( + merged, + MetricValues::Set(expected.clone()), + "merged {} with {}, expected {} but got {}", + a, + b, + expected, + merged + ); + } + } + + #[test] + fn merge_histograms() { + let cases = [ + // Both A and B have single point with an identical timestamp, so the samples should be merged. + ( + HistogramPoints::from((1, 1.0)), + HistogramPoints::from((1, 2.0)), + HistogramPoints::from((1, [1.0, 2.0])), + ), + // A has a single point with a timestamp, B has a single point without a timestamp, so both points should be kept. + ( + HistogramPoints::from((1, 1.0)), + HistogramPoints::from(2.0), + HistogramPoints::from([(0, 2.0), (1, 1.0)]), + ), + // Both A and B have single point without a timestamp, so the samples should be merged. + ( + HistogramPoints::from(5.0), + HistogramPoints::from(6.0), + HistogramPoints::from([5.0, 6.0]), + ), + ]; + + for (a, b, expected) in cases { + let mut merged = MetricValues::Histogram(a.clone()); + merged.merge(MetricValues::Histogram(b.clone())); + + assert_eq!( + merged, + MetricValues::Histogram(expected.clone()), + "merged {} with {}, expected {} but got {}", + a, + b, + expected, + merged + ); + } + } + + #[test] + fn merge_distributions() { + let cases = [ + // Both A and B have single point with an identical timestamp, so the sketches should be merged. + ( + SketchPoints::from((1, 1.0)), + SketchPoints::from((1, 2.0)), + SketchPoints::from((1, [1.0, 2.0])), + ), + // A has a single point with a timestamp, B has a single point without a timestamp, so both points should be kept. + ( + SketchPoints::from((1, 1.0)), + SketchPoints::from(2.0), + SketchPoints::from([(0, 2.0), (1, 1.0)]), + ), + // Both A and B have single point without a timestamp, so the sketches should be merged. + ( + SketchPoints::from(5.0), + SketchPoints::from(6.0), + SketchPoints::from([5.0, 6.0]), + ), + ]; + + for (a, b, expected) in cases { + let mut merged = MetricValues::Distribution(a.clone()); + merged.merge(MetricValues::Distribution(b.clone())); + + assert_eq!( + merged, + MetricValues::Distribution(expected.clone()), + "merged {} with {}, expected {} but got {}", + a, + b, + expected, + merged + ); + } + } +} diff --git a/lib/saluki-event/src/metric/value/scalar.rs b/lib/saluki-event/src/metric/value/scalar.rs index b1f9cbfb..25e93cd8 100644 --- a/lib/saluki-event/src/metric/value/scalar.rs +++ b/lib/saluki-event/src/metric/value/scalar.rs @@ -1,4 +1,4 @@ -use std::num::NonZeroU64; +use std::{fmt, num::NonZeroU64}; use ordered_float::OrderedFloat; @@ -57,7 +57,10 @@ impl ScalarPoints { /// /// If a point with the same timestamp exists in both sets, the values will be added together. Otherwise, the points /// will appended to the end of the set. - pub fn merge(&mut self, other: Self) { + pub fn merge(&mut self, other: Self, merge: F) + where + F: Fn(&mut OrderedFloat, &mut OrderedFloat), + { let mut needs_sort = false; for other_value in other.0.values { if let Some(existing_value) = self @@ -66,7 +69,8 @@ impl ScalarPoints { .iter_mut() .find(|value| value.timestamp == other_value.timestamp) { - existing_value.value += other_value.value; + let mut other = other_value.value; + merge(&mut existing_value.value, &mut other); } else { self.0.values.push(other_value); needs_sort = true; @@ -145,3 +149,18 @@ impl<'a> IntoIterator for &'a ScalarPoints { PointsIterRef::scalar(self.0.values.iter()) } } + +impl fmt::Display for ScalarPoints { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "[")?; + for (i, (timestamp, value)) in self.into_iter().enumerate() { + if i > 0 { + write!(f, ",")?; + } + + let ts = timestamp.map(|ts| ts.get()).unwrap_or_default(); + write!(f, "({}, {})", ts, value)?; + } + write!(f, "]") + } +} diff --git a/lib/saluki-event/src/metric/value/set.rs b/lib/saluki-event/src/metric/value/set.rs index 472e16a1..f0d7559b 100644 --- a/lib/saluki-event/src/metric/value/set.rs +++ b/lib/saluki-event/src/metric/value/set.rs @@ -1,4 +1,4 @@ -use std::{collections::HashSet, num::NonZeroU64}; +use std::{collections::HashSet, fmt, num::NonZeroU64}; use super::{ iter::{PointsIter, PointsIterRef}, @@ -80,6 +80,46 @@ impl<'a> From<&'a str> for SetPoints { } } +impl<'a> From<(u64, &'a str)> for SetPoints { + fn from((ts, value): (u64, &'a str)) -> Self { + Self(TimestampedValue::from((ts, HashSet::from([value.to_string()]))).into()) + } +} + +impl<'a, const N: usize> From<[&'a str; N]> for SetPoints { + fn from(values: [&'a str; N]) -> Self { + Self(TimestampedValue::from(HashSet::from_iter(values.into_iter().map(|s| s.to_string()))).into()) + } +} + +impl<'a, const N: usize> From<(u64, [&'a str; N])> for SetPoints { + fn from((ts, values): (u64, [&'a str; N])) -> Self { + Self(TimestampedValue::from((ts, values.into_iter().map(|s| s.to_string()).collect())).into()) + } +} + +impl<'a, const N: usize> From<[(u64, &'a str); N]> for SetPoints { + fn from(values: [(u64, &'a str); N]) -> Self { + Self( + values + .iter() + .map(|(ts, value)| TimestampedValue::from((*ts, HashSet::from([value.to_string()])))) + .into(), + ) + } +} + +impl<'a, const N: usize> From<[(u64, &'a [&'a str]); N]> for SetPoints { + fn from(values: [(u64, &'a [&'a str]); N]) -> Self { + Self( + values + .iter() + .map(|(ts, values)| TimestampedValue::from((*ts, values.iter().map(|s| s.to_string()).collect()))) + .into(), + ) + } +} + impl IntoIterator for SetPoints { type Item = (Option, f64); type IntoIter = PointsIter; @@ -97,3 +137,25 @@ impl<'a> IntoIterator for &'a SetPoints { PointsIterRef::set(self.0.values.iter()) } } + +impl fmt::Display for SetPoints { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "[")?; + for (i, point) in self.0.values.iter().enumerate() { + if i > 0 { + write!(f, ",")?; + } + + let ts = point.timestamp.map(|ts| ts.get()).unwrap_or_default(); + write!(f, "({}, [", ts)?; + for (j, value) in point.value.iter().enumerate() { + if j > 0 { + write!(f, ",")?; + } + write!(f, "{}", value)?; + } + write!(f, "])")?; + } + write!(f, "]") + } +} diff --git a/lib/saluki-event/src/metric/value/sketch.rs b/lib/saluki-event/src/metric/value/sketch.rs index ad01fe62..ee13e96b 100644 --- a/lib/saluki-event/src/metric/value/sketch.rs +++ b/lib/saluki-event/src/metric/value/sketch.rs @@ -1,4 +1,4 @@ -use std::num::NonZeroU64; +use std::{fmt, num::NonZeroU64}; use ddsketch_agent::DDSketch; @@ -115,6 +115,15 @@ impl From<(u64, f64)> for SketchPoints { } } +impl From<(u64, [f64; N])> for SketchPoints { + fn from((ts, values): (u64, [f64; N])) -> Self { + let mut sketch = DDSketch::default(); + sketch.insert_many(&values[..]); + + Self(TimestampedValue::from((ts, sketch)).into()) + } +} + impl<'a> From<(u64, &'a [f64])> for SketchPoints { fn from((ts, values): (u64, &'a [f64])) -> Self { let mut sketch = DDSketch::default(); @@ -135,6 +144,17 @@ impl<'a> From<&'a [(u64, &'a [f64])]> for SketchPoints { } } +impl From<[(u64, f64); N]> for SketchPoints { + fn from(values: [(u64, f64); N]) -> Self { + Self(TimestampedValues::from(values.iter().map(|(ts, value)| { + let mut sketch = DDSketch::default(); + sketch.insert(*value); + + (*ts, sketch) + }))) + } +} + impl<'a, const N: usize> From<[(u64, &'a [f64]); N]> for SketchPoints { fn from(values: [(u64, &'a [f64]); N]) -> Self { Self(TimestampedValues::from(values.iter().map(|(ts, values)| { @@ -168,6 +188,32 @@ impl<'a> IntoIterator for &'a SketchPoints { } } +impl fmt::Display for SketchPoints { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "[")?; + for (i, point) in self.0.values.iter().enumerate() { + if i > 0 { + write!(f, ",")?; + } + + let ts = point.timestamp.map(|ts| ts.get()).unwrap_or_default(); + let sketch = &point.value; + write!( + f, + "({}, {{cnt={} min={} max={} avg={} sum={} bin_count={}}})", + ts, + sketch.count(), + sketch.min().unwrap_or(0.0), + sketch.max().unwrap_or(0.0), + sketch.avg().unwrap_or(0.0), + sketch.sum().unwrap_or(0.0), + sketch.bin_count(), + )?; + } + write!(f, "]") + } +} + pub struct SketchesIter { inner: smallvec::IntoIter<[TimestampedValue; 1]>, }