Skip to content

Commit

Permalink
chore: clean up origin enrichment logic / type names / etc (#463)
Browse files Browse the repository at this point in the history
  • Loading branch information
tobz authored Feb 3, 2025
1 parent b0576ab commit bb77a12
Show file tree
Hide file tree
Showing 23 changed files with 669 additions and 561 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ ordered-float = { version = "4.6", default-features = false }
paste = { version = "1", default-features = false }
pin-project = { version = "1.1", default-features = false }
pin-project-lite = { version = "0.2", default-features = false }
proptest = { version = "1.6", default-features = false }
proptest = { version = "1.6", default-features = false, features = ["std"] }
prost = { version = "0.13", default-features = false }
quanta = { version = "0.12", default-features = false }
rand = { version = "0.8.5", default-features = false }
Expand Down
2 changes: 1 addition & 1 deletion bin/agent-data-plane/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ async fn create_topology(
// and a Datadog Metrics destination that forwards aggregated buckets to the Datadog Platform.
let dsd_config = DogStatsDConfiguration::from_configuration(configuration)
.error_context("Failed to configure DogStatsD source.")?
.with_origin_tags_resolver(env_provider.workload().clone());
.with_workload_provider(env_provider.workload().clone());
let dsd_agg_config = AggregateConfiguration::from_configuration(configuration)
.error_context("Failed to configure aggregate transform.")?;
let dsd_prefix_filter_configuration = DogstatsDPrefixFilterConfiguration::from_configuration(configuration)?;
Expand Down
2 changes: 1 addition & 1 deletion lib/memory-accounting/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,4 @@ tokio = { workspace = true, features = ["sync", "time"] }
tracing = { workspace = true }

[dev-dependencies]
proptest = { workspace = true, features = ["std"] }
proptest = { workspace = true }
89 changes: 35 additions & 54 deletions lib/saluki-components/src/sources/dogstatsd/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,7 @@ use bytesize::ByteSize;
use memory_accounting::{MemoryBounds, MemoryBoundsBuilder};
use metrics::{Counter, Gauge, Histogram};
use saluki_config::GenericConfiguration;
use saluki_context::{
origin::{OriginInfo, OriginTagsResolver},
ContextResolver, ContextResolverBuilder,
};
use saluki_context::{ContextResolver, ContextResolverBuilder};
use saluki_core::{
components::{sources::*, ComponentContext},
observability::ComponentMetricsExt as _,
Expand All @@ -22,6 +19,7 @@ use saluki_core::{
OutputDefinition,
},
};
use saluki_env::WorkloadProvider;
use saluki_error::{generic_error, GenericError};
use saluki_event::metric::{MetricMetadata, MetricOrigin};
use saluki_event::{metric::Metric, DataType, Event};
Expand Down Expand Up @@ -54,6 +52,9 @@ use self::framer::{get_framer, DsdFramer};
mod filters;
use self::filters::{EnablePayloadsFilter, Filter};

mod origin;
use self::origin::{origin_from_metric_packet, DogStatsDOriginTagResolver, OriginEnrichmentConfiguration};

#[derive(Debug, Snafu)]
#[snafu(context(suffix(false)))]
enum Error {
Expand Down Expand Up @@ -107,11 +108,7 @@ const fn default_enable_payloads_sketches() -> bool {
///
/// Accepts metrics over TCP, UDP, or Unix Domain Sockets in the StatsD/DogStatsD format.
#[derive(Deserialize)]
pub struct DogStatsDConfiguration<R = ()> {
/// Origin tags resolver to use for resolving contexts.
#[serde(skip, default)]
origin_tags_resolver: R,

pub struct DogStatsDConfiguration {
/// The size of the buffer used to receive messages into, in bytes.
///
/// Payloads cannot exceed this size, or they will be truncated, leading to discarded messages.
Expand Down Expand Up @@ -225,36 +222,32 @@ pub struct DogStatsDConfiguration<R = ()> {
/// Defaults to `true`.
#[serde(default = "default_enable_payloads_sketches")]
enable_payloads_sketches: bool,

/// Configuration related to origin detection and enrichment.
origin_enrichment: OriginEnrichmentConfiguration,

/// Workload provider to utilize for origin detection/enrichment.
#[serde(skip)]
workload_provider: Option<Arc<dyn WorkloadProvider + Send + Sync>>,
}

impl DogStatsDConfiguration {
/// Creates a new `DogStatsDConfiguration` from the given configuration.
pub fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
Ok(config.as_typed()?)
}
}

impl<R> DogStatsDConfiguration<R> {
/// Sets the origin tags resolver to use for resolving contexts.
pub fn with_origin_tags_resolver<R2>(self, origin_tags_resolver: R2) -> DogStatsDConfiguration<R2>
/// Sets the workload provider to use for configuring origin detection/enrichment.
///
/// A workload provider must be set otherwise origin detection/enrichment will not be enabled.
///
/// Defaults to unset.
pub fn with_workload_provider<W>(mut self, workload_provider: W) -> Self
where
R2: OriginTagsResolver + Clone + 'static,
W: WorkloadProvider + Send + Sync + 'static,
{
DogStatsDConfiguration {
origin_tags_resolver,
buffer_size: self.buffer_size,
buffer_count: self.buffer_count,
port: self.port,
socket_path: self.socket_path,
socket_stream_path: self.socket_stream_path,
non_local_traffic: self.non_local_traffic,
allow_context_heap_allocations: self.allow_context_heap_allocations,
no_aggregation_pipeline_support: self.no_aggregation_pipeline_support,
context_string_interner_bytes: self.context_string_interner_bytes,
permissive_decoding: self.permissive_decoding,
enable_payloads_series: self.enable_payloads_series,
enable_payloads_sketches: self.enable_payloads_sketches,
}
self.workload_provider = Some(Arc::new(workload_provider));
self
}

async fn build_listeners(&self) -> Result<Vec<Listener>, Error> {
Expand Down Expand Up @@ -298,16 +291,17 @@ impl<R> DogStatsDConfiguration<R> {
}

#[async_trait]
impl<R> SourceBuilder for DogStatsDConfiguration<R>
where
R: OriginTagsResolver + Clone + 'static,
{
impl SourceBuilder for DogStatsDConfiguration {
async fn build(&self, _context: ComponentContext) -> Result<Box<dyn Source + Send>, GenericError> {
let listeners = self.build_listeners().await?;
if listeners.is_empty() {
return Err(Error::NoListenersConfigured.into());
}

let maybe_origin_tags_resolver = self
.workload_provider
.clone()
.map(|provider| DogStatsDOriginTagResolver::new(self.origin_enrichment.clone(), provider));
let context_string_interner_size = NonZeroUsize::new(self.context_string_interner_bytes.as_u64() as usize)
.ok_or_else(|| generic_error!("context_string_interner_size must be greater than 0"))?;
let context_resolver = ContextResolverBuilder::from_name("dogstatsd")
Expand All @@ -316,7 +310,7 @@ where
.with_idle_context_expiration(Duration::from_secs(30))
.with_expiration_interval(Duration::from_secs(1))
.with_heap_allocations(self.allow_context_heap_allocations)
.with_origin_tags_resolver(self.origin_tags_resolver.clone())
.with_origin_tags_resolver(maybe_origin_tags_resolver)
.build();

let codec_config = DogstatsdCodecConfiguration::default()
Expand Down Expand Up @@ -352,10 +346,7 @@ where
}
}

impl<R> MemoryBounds for DogStatsDConfiguration<R>
where
R: OriginTagsResolver,
{
impl MemoryBounds for DogStatsDConfiguration {
fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
builder
.minimum()
Expand Down Expand Up @@ -877,20 +868,20 @@ fn handle_frame(
fn handle_metric_packet(
packet: MetricPacket, context_resolver: &mut ContextResolver, peer_addr: &ConnectionAddress,
) -> Option<Metric> {
// Capture the origin information from the packet, including any process ID information if we have it.
let mut origin_info = origin_info_from_metric_packet(&packet);
// Capture the origin from the packet, including any process ID information if we have it.
let mut origin = origin_from_metric_packet(&packet);
if let ConnectionAddress::ProcessLike(Some(creds)) = &peer_addr {
origin_info.set_process_id(creds.pid as u32);
origin.set_process_id(creds.pid as u32);
}

// Try to resolve the context for this metric.
match context_resolver.resolve(packet.metric_name, packet.tags.clone(), Some(origin_info)) {
match context_resolver.resolve(packet.metric_name, packet.tags.clone(), Some(origin)) {
Some(context) => {
let origin = packet
let metric_origin = packet
.jmx_check_name
.map(MetricOrigin::jmx_check)
.unwrap_or_else(MetricOrigin::dogstatsd);
let metadata = MetricMetadata::default().with_origin(origin);
let metadata = MetricMetadata::default().with_origin(metric_origin);

Some(Metric::from_parts(context, packet.values, metadata))
}
Expand Down Expand Up @@ -966,16 +957,6 @@ const fn get_adjusted_buffer_size(buffer_size: usize) -> usize {
buffer_size + 4
}

/// Builds an `OriginInfo` object from the given metric packet.
fn origin_info_from_metric_packet<'packet>(packet: &MetricPacket<'packet>) -> OriginInfo<'packet> {
let mut origin_info = OriginInfo::default();
origin_info.set_pod_uid(packet.pod_uid);
origin_info.set_container_id(packet.container_id);
origin_info.set_external_data(packet.external_data);
origin_info.set_cardinality(packet.cardinality);
origin_info
}

#[cfg(test)]
mod tests {
use std::net::SocketAddr;
Expand Down
Loading

0 comments on commit bb77a12

Please sign in to comment.