Skip to content

Commit

Permalink
Merge branch 'main' into rayz/dogstatsd-mapper
Browse files Browse the repository at this point in the history
Signed-off-by: Raymond Zhao <[email protected]>
  • Loading branch information
rayz committed Feb 6, 2025
2 parents b69a5c3 + b8af303 commit 2425037
Show file tree
Hide file tree
Showing 10 changed files with 283 additions and 23 deletions.
23 changes: 23 additions & 0 deletions bin/agent-data-plane/src/components/remapper/rules/aggregation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,34 @@ pub fn get_aggregation_remappings() -> Vec<RemapperRule> {
&["resolver_id:dogstatsd"],
"aggregator.dogstatsd_contexts",
),
RemapperRule::by_name_and_tags(
"adp.aggregate_active_contexts_by_type",
&["component_id:dsd_agg"],
"aggregator.dogstatsd_contexts_by_mtype",
)
.with_original_tags(["metric_type"]),
RemapperRule::by_name_and_tags(
"adp.aggregate_active_contexts_bytes_by_type",
&["component_id:dsd_agg"],
"aggregator.dogstatsd_contexts_bytes_by_mtype",
)
.with_original_tags(["metric_type"]),
RemapperRule::by_name_and_tags(
"adp.component_events_received_total",
&["component_id:dsd_agg"],
"aggregator.processed",
)
.with_additional_tags(["data_type:dogstatsd_metrics"]),
RemapperRule::by_name_and_tags(
"adp.aggregate_passthrough_metrics_total",
&["component_id:dsd_agg"],
"no_aggregation.processed",
)
.with_additional_tags(["state:ok"]),
RemapperRule::by_name_and_tags(
"adp.aggregate_passthrough_flushes_total",
&["component_id:dsd_agg"],
"no_aggregation.flush",
),
]
}
4 changes: 2 additions & 2 deletions lib/saluki-components/src/transforms/aggregate/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -603,7 +603,7 @@ impl AggregationState {
aggregated.values.merge(values);
}
Entry::Vacant(entry) => {
self.telemetry.increment_contexts(&values);
self.telemetry.increment_contexts(entry.key(), &values);

entry.insert(AggregatedMetric {
values,
Expand Down Expand Up @@ -705,7 +705,7 @@ impl AggregationState {
}

if am.values.is_empty() && should_expire_if_empty {
self.telemetry.decrement_contexts(&am.values);
self.telemetry.decrement_contexts(context, &am.values);
self.contexts_remove_buf.push(context.clone());
}
}
Expand Down
14 changes: 12 additions & 2 deletions lib/saluki-components/src/transforms/aggregate/telemetry.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::time::Duration;

use metrics::{Counter, Gauge, Histogram};
use saluki_context::Context;
use saluki_event::metric::MetricValues;
use saluki_metrics::MetricsBuilder;

Expand Down Expand Up @@ -54,6 +55,7 @@ impl MetricTypedGauge {
pub struct Telemetry {
active_contexts: Gauge,
active_contexts_by_type: MetricTypedGauge,
active_contexts_bytes_by_type: MetricTypedGauge,
events_dropped: Counter,
flushes: Counter,
passthrough_metrics: Counter,
Expand All @@ -66,6 +68,7 @@ impl Telemetry {
Self {
active_contexts: builder.register_debug_gauge("aggregate_active_contexts"),
active_contexts_by_type: MetricTypedGauge::new(builder, "aggregate_active_contexts_by_type"),
active_contexts_bytes_by_type: MetricTypedGauge::new(builder, "aggregate_active_contexts_bytes_by_type"),
events_dropped: builder
.register_debug_counter_with_tags("component_events_dropped_total", ["intentional:true"]),
flushes: builder.register_debug_counter("aggregate_flushes_total"),
Expand All @@ -80,6 +83,7 @@ impl Telemetry {
Self {
active_contexts: Gauge::noop(),
active_contexts_by_type: MetricTypedGauge::noop(),
active_contexts_bytes_by_type: MetricTypedGauge::noop(),
events_dropped: Counter::noop(),
flushes: Counter::noop(),
passthrough_metrics: Counter::noop(),
Expand All @@ -88,14 +92,20 @@ impl Telemetry {
}
}

pub fn increment_contexts(&self, values: &MetricValues) {
pub fn increment_contexts(&self, context: &Context, values: &MetricValues) {
self.active_contexts.increment(1);
self.active_contexts_by_type.for_values(values).increment(1);
self.active_contexts_bytes_by_type
.for_values(values)
.increment(context.size_of() as f64);
}

pub fn decrement_contexts(&self, values: &MetricValues) {
pub fn decrement_contexts(&self, context: &Context, values: &MetricValues) {
self.active_contexts.decrement(1);
self.active_contexts_by_type.for_values(values).decrement(1);
self.active_contexts_bytes_by_type
.for_values(values)
.decrement(context.size_of() as f64);
}

pub fn increment_events_dropped(&self) {
Expand Down
4 changes: 2 additions & 2 deletions lib/saluki-components/src/transforms/chained/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ impl Transform for Chained {

// We have to re-associate each subtransform with their allocation group token here, as we don't have access to
// it when the bounds are initially defined.
let subtransforms = self
let mut subtransforms = self
.subtransforms
.into_iter()
.map(|(subtransform_id, subtransform)| {
Expand All @@ -106,7 +106,7 @@ impl Transform for Chained {
_ = health.live() => continue,
maybe_events = context.event_stream().next() => match maybe_events {
Some(mut event_buffer) => {
for (allocation_token, transform) in &subtransforms {
for (allocation_token, transform) in &mut subtransforms {
let _guard = allocation_token.enter();
transform.transform_buffer(&mut event_buffer);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ impl HostEnrichment {
}

impl SynchronousTransform for HostEnrichment {
fn transform_buffer(&self, event_buffer: &mut FixedSizeEventBuffer) {
fn transform_buffer(&mut self, event_buffer: &mut FixedSizeEventBuffer) {
for event in event_buffer {
if let Some(metric) = event.try_as_metric_mut() {
self.enrich_metric(metric)
Expand Down
Loading

0 comments on commit 2425037

Please sign in to comment.