Skip to content

Commit

Permalink
chore: introduce first-class origin tags abstraction
Browse files Browse the repository at this point in the history
  • Loading branch information
tobz committed Jan 23, 2025
1 parent d9aba07 commit ec65bf8
Show file tree
Hide file tree
Showing 12 changed files with 477 additions and 372 deletions.
6 changes: 4 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion bin/agent-data-plane/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ fn create_topology(
// and a Datadog Metrics destination that forwards aggregated buckets to the Datadog Platform.
let dsd_config = DogStatsDConfiguration::from_configuration(configuration)
.error_context("Failed to configure DogStatsD source.")?
.with_workload_provider(env_provider.workload().clone());
.with_origin_tags_resolver(env_provider.workload().clone());
let dsd_agg_config = AggregateConfiguration::from_configuration(configuration)
.error_context("Failed to configure aggregate transform.")?;
let dsd_prefix_filter_configuration = DogstatsDPrefixFilterConfiguration::from_configuration(configuration)?;
Expand Down
66 changes: 44 additions & 22 deletions lib/saluki-components/src/sources/dogstatsd/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@ use bytesize::ByteSize;
use memory_accounting::{MemoryBounds, MemoryBoundsBuilder};
use metrics::{Counter, Gauge, Histogram};
use saluki_config::GenericConfiguration;
use saluki_context::{ContextResolver, ContextResolverBuilder};
use saluki_context::{
origin::{OriginInfo, OriginTagsResolver},
ContextResolver, ContextResolverBuilder,
};
use saluki_core::{
components::{sources::*, ComponentContext},
observability::ComponentMetricsExt as _,
Expand All @@ -19,7 +22,6 @@ use saluki_core::{
OutputDefinition,
},
};
use saluki_env::WorkloadProvider;
use saluki_error::{generic_error, GenericError};
use saluki_event::metric::{MetricMetadata, MetricOrigin};
use saluki_event::{metric::Metric, DataType, Event};
Expand Down Expand Up @@ -49,9 +51,6 @@ use tracing::{debug, error, info, trace, warn};
mod framer;
use self::framer::{get_framer, DsdFramer};

mod origin;
use self::origin::{origin_info_from_metric_packet, OriginEnrichmentConfiguration};

#[derive(Debug, Snafu)]
#[snafu(context(suffix(false)))]
enum Error {
Expand Down Expand Up @@ -97,12 +96,10 @@ const fn default_dogstatsd_permissive_decoding() -> bool {
///
/// Accepts metrics over TCP, UDP, or Unix Domain Sockets in the StatsD/DogStatsD format.
#[derive(Deserialize)]
pub struct DogStatsDConfiguration {
/// Origin enrichment configuration.
///
/// See [`OriginEnrichmentConfiguration`] for more details.
#[serde(default)]
origin_enrichment: OriginEnrichmentConfiguration,
pub struct DogStatsDConfiguration<R = ()> {
/// Origin tags resolver to use for resolving contexts.
#[serde(skip, default)]
origin_tags_resolver: R,

/// The size of the buffer used to receive messages into, in bytes.
///
Expand Down Expand Up @@ -212,17 +209,26 @@ impl DogStatsDConfiguration {
pub fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
Ok(config.as_typed()?)
}
}

/// Sets the workload provider for this configuration.
///
/// A workload provider must be set in order for origin enrichment to function.
pub fn with_workload_provider<W>(self, workload_provider: W) -> DogStatsDConfiguration
impl<R> DogStatsDConfiguration<R> {
/// Sets the origin tags resolver to use for resolving contexts.
pub fn with_origin_tags_resolver<R2>(self, origin_tags_resolver: R2) -> DogStatsDConfiguration<R2>
where
W: WorkloadProvider + Send + Sync + Clone + 'static,
R2: OriginTagsResolver + Clone + 'static,
{
Self {
origin_enrichment: self.origin_enrichment.with_workload_provider(workload_provider),
..self
DogStatsDConfiguration {
origin_tags_resolver,
buffer_size: self.buffer_size,
buffer_count: self.buffer_count,
port: self.port,
socket_path: self.socket_path,
socket_stream_path: self.socket_stream_path,
non_local_traffic: self.non_local_traffic,
allow_context_heap_allocations: self.allow_context_heap_allocations,
no_aggregation_pipeline_support: self.no_aggregation_pipeline_support,
context_string_interner_bytes: self.context_string_interner_bytes,
permissive_decoding: self.permissive_decoding,
}
}

Expand Down Expand Up @@ -267,7 +273,10 @@ impl DogStatsDConfiguration {
}

#[async_trait]
impl SourceBuilder for DogStatsDConfiguration {
impl<R> SourceBuilder for DogStatsDConfiguration<R>
where
R: OriginTagsResolver + Clone + 'static,
{
async fn build(&self, _context: ComponentContext) -> Result<Box<dyn Source + Send>, GenericError> {
let listeners = self.build_listeners().await?;
if listeners.is_empty() {
Expand All @@ -282,7 +291,7 @@ impl SourceBuilder for DogStatsDConfiguration {
.with_idle_context_expiration(Duration::from_secs(30))
.with_expiration_interval(Duration::from_secs(1))
.with_heap_allocations(self.allow_context_heap_allocations)
.with_origin_enricher(self.origin_enrichment.build())
.with_origin_tags_resolver(self.origin_tags_resolver.clone())
.build();

let codec_config = DogstatsdCodecConfiguration::default()
Expand Down Expand Up @@ -314,7 +323,10 @@ impl SourceBuilder for DogStatsDConfiguration {
}
}

impl MemoryBounds for DogStatsDConfiguration {
impl<R> MemoryBounds for DogStatsDConfiguration<R>
where
R: OriginTagsResolver,
{
fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
builder
.minimum()
Expand Down Expand Up @@ -909,6 +921,16 @@ const fn get_adjusted_buffer_size(buffer_size: usize) -> usize {
buffer_size + 4
}

/// Builds an `OriginInfo` object from the given metric packet.
fn origin_info_from_metric_packet<'packet>(packet: &MetricPacket<'packet>) -> OriginInfo<'packet> {
let mut origin_info = OriginInfo::default();
origin_info.set_pod_uid(packet.pod_uid);
origin_info.set_container_id(packet.container_id);
origin_info.set_external_data(packet.external_data);
origin_info.set_cardinality(packet.cardinality);
origin_info
}

#[cfg(test)]
mod tests {
use std::net::SocketAddr;
Expand Down
Loading

0 comments on commit ec65bf8

Please sign in to comment.