diff --git a/bin/agent-data-plane/src/main.rs b/bin/agent-data-plane/src/main.rs index cffe988b..07a753fc 100644 --- a/bin/agent-data-plane/src/main.rs +++ b/bin/agent-data-plane/src/main.rs @@ -19,7 +19,8 @@ use saluki_components::{ }, sources::{DogStatsDConfiguration, InternalMetricsConfiguration}, transforms::{ - AggregateConfiguration, ChainedConfiguration, DogstatsDPrefixFilterConfiguration, HostEnrichmentConfiguration, + AggregateConfiguration, ChainedConfiguration, DogstatsDMapperConfiguration, DogstatsDPrefixFilterConfiguration, + HostEnrichmentConfiguration, }, }; use saluki_config::{ConfigurationLoader, GenericConfiguration, RefreshableConfiguration, RefresherConfiguration}; @@ -183,7 +184,10 @@ async fn create_topology( .error_context("Failed to configure aggregate transform.")?; let dsd_prefix_filter_configuration = DogstatsDPrefixFilterConfiguration::from_configuration(configuration)?; let host_enrichment_config = HostEnrichmentConfiguration::from_environment_provider(env_provider.clone()); - let enrich_config = ChainedConfiguration::default().with_transform_builder(host_enrichment_config); + let dsd_mapper_config = DogstatsDMapperConfiguration::from_configuration(configuration)?; + let enrich_config = ChainedConfiguration::default() + .with_transform_builder(host_enrichment_config) + .with_transform_builder(dsd_mapper_config); let mut dd_metrics_config = DatadogMetricsConfiguration::from_configuration(configuration) .error_context("Failed to configure Datadog Metrics destination.")?; diff --git a/lib/saluki-components/src/transforms/dogstatsd_mapper/mod.rs b/lib/saluki-components/src/transforms/dogstatsd_mapper/mod.rs new file mode 100644 index 00000000..02cf638f --- /dev/null +++ b/lib/saluki-components/src/transforms/dogstatsd_mapper/mod.rs @@ -0,0 +1,245 @@ +use std::collections::HashMap; +use std::num::NonZeroUsize; +use std::str::FromStr; +use std::sync::LazyLock; + +use async_trait::async_trait; +use bytesize::ByteSize; +use memory_accounting::{MemoryBounds, MemoryBoundsBuilder}; +use regex::Regex; +use saluki_config::GenericConfiguration; +use saluki_context::{ContextResolver, ContextResolverBuilder}; +use saluki_core::{ + components::transforms::{SynchronousTransform, SynchronousTransformBuilder}, + topology::interconnect::FixedSizeEventBuffer, +}; +use saluki_error::{generic_error, GenericError}; +use serde::{Deserialize, Serialize}; +use serde_with::{serde_as, DisplayFromStr, PickFirst}; + +pub const MATCH_TYPE_WILDCARD: &str = "wildcard"; +pub const MATCH_TYPE_REGEX: &str = "regex"; + +static ALLOWED_WILDCARD_MATCH_PATTERN: LazyLock = + LazyLock::new(|| Regex::new(r"^[a-zA-Z0-9\-_*.]+$").expect("Invalid regex in ALLOWED_WILDCARD_MATCH_PATTERN")); + +/// DogstatsD mapper transform. +#[serde_as] +#[derive(Deserialize)] +#[allow(dead_code)] +pub struct DogstatsDMapperConfiguration { + #[serde(skip)] + context_string_interner_bytes: ByteSize, + + #[serde_as(as = "PickFirst<(DisplayFromStr, _)>")] + #[serde(default)] + dogstatsd_mapper_profiles: MapperProfileConfigs, +} + +#[derive(Clone, Debug, Default, Deserialize, Serialize)] +#[allow(dead_code)] +struct MapperProfileConfigs(pub Vec); + +#[derive(Clone, Debug, Default, Deserialize, Serialize)] +#[allow(dead_code)] +struct MappingProfileConfig { + pub name: String, + pub prefix: String, + pub mappings: Vec, +} + +#[derive(Clone, Debug, Default, Deserialize, Serialize)] +#[allow(dead_code)] +struct MetricMappingConfig { + #[serde(rename = "match")] + metric_match: String, + #[serde(rename = "match_type")] + match_type: String, + name: String, + tags: HashMap, +} + +#[allow(dead_code)] +struct MetricMapper { + profiles: Vec, + context_resolver: ContextResolver, +} + +#[allow(dead_code)] +struct MappingProfile { + name: String, + prefix: String, + mappings: Vec, +} + +#[allow(dead_code)] +struct MetricMapping { + name: String, + tags: HashMap, + regex: Regex, +} + +impl FromStr for MapperProfileConfigs { + type Err = serde_json::Error; + + fn from_str(s: &str) -> Result { + let profiles: Vec = serde_json::from_str(s)?; + Ok(MapperProfileConfigs(profiles)) + } +} + +fn build_regex(match_re: &str, match_type: &str) -> Result { + let mut pattern = match_re.to_owned(); + if match_type == MATCH_TYPE_WILDCARD { + // Check it against the allowed wildcard pattern + if !ALLOWED_WILDCARD_MATCH_PATTERN.is_match(&pattern) { + return Err(generic_error!( + "invalid wildcard match pattern `{}`, it does not match allowed match regex `{}`", + pattern, + ALLOWED_WILDCARD_MATCH_PATTERN.as_str() + )); + } + // Disallow "**" + if pattern.contains("**") { + return Err(generic_error!( + "invalid wildcard match pattern `{}`, it should not contain consecutive `*`", + pattern + )); + } + // Escape dots and replace '*' with '([^.]*)' + pattern = pattern.replace(".", "\\."); + pattern = pattern.replace("*", "([^.]*)"); + } + + // Build final pattern as ^pattern$ + let final_pattern = format!("^{}$", pattern); + + // Compile the regex, return a GenericError if it fails + match Regex::new(&final_pattern) { + Ok(re) => Ok(re), + Err(e) => Err(generic_error!( + "invalid match `{}`, cannot compile regex: {}", + match_re, + e + )), + } +} + +impl MapperProfileConfigs { + fn build(&self, context_string_interner_bytes: ByteSize) -> Result { + let mut profiles = Vec::with_capacity(self.0.len()); + for (i, config_profile) in self.0.iter().enumerate() { + if config_profile.name.is_empty() { + return Err(generic_error!("missing profile name")); + } + if config_profile.prefix.is_empty() { + return Err(generic_error!("missing prefix for profile: {}", config_profile.name)); + } + + let mut profile = MappingProfile { + name: config_profile.name.clone(), + prefix: config_profile.prefix.clone(), + mappings: Vec::with_capacity(config_profile.mappings.len()), + }; + + for mapping in &config_profile.mappings { + let mut match_type = mapping.match_type.to_string(); + if mapping.match_type == "" { + match_type = "wildcard".to_string(); + } + if match_type != MATCH_TYPE_WILDCARD && match_type != MATCH_TYPE_REGEX { + return Err(generic_error!( + "profile: {}, mapping num {}: invalid match type, must be `wildcard` or `regex`", + config_profile.name, + i + )); + } + if mapping.name == "" { + return Err(generic_error!( + "profile: {}, mapping num {}: name is required", + config_profile.name, + i + )); + } + if mapping.name == "" { + return Err(generic_error!( + "profile: {}, mapping num {}: match is required", + config_profile.name, + i + )); + } + let regex = build_regex(&mapping.metric_match, &match_type)?; + profile.mappings.push(MetricMapping { + name: mapping.name.clone(), + tags: mapping.tags.clone(), + regex, + }); + } + profiles.push(profile); + } + + let context_string_interner_size = NonZeroUsize::new(context_string_interner_bytes.as_u64() as usize) + .ok_or_else(|| generic_error!("context_string_interner_size must be greater than 0"))?; + let context_resolver = ContextResolverBuilder::from_name("agent_telemetry_remapper") + .expect("resolver name is not empty") + .with_interner_capacity_bytes(context_string_interner_size) + .build(); + + Ok(MetricMapper { + profiles, + context_resolver, + }) + } +} + +impl DogstatsDMapperConfiguration { + /// Creates a new `DogstatsDMapperConfiguration` from the given configuration. + pub fn from_configuration(config: &GenericConfiguration) -> Result { + let mut config: Self = config.as_typed()?; + config.context_string_interner_bytes = ByteSize::kib(512); + Ok(config) + } +} + +#[async_trait] +impl SynchronousTransformBuilder for DogstatsDMapperConfiguration { + async fn build(&self) -> Result, GenericError> { + println!( + "rz6300 map config: {}", + serde_json::to_string_pretty(&self.dogstatsd_mapper_profiles).unwrap() + ); + let metric_mapper = self + .dogstatsd_mapper_profiles + .build(self.context_string_interner_bytes)?; + Ok(Box::new(DogstatsDMapper { metric_mapper })) + } +} + +impl MemoryBounds for DogstatsDMapperConfiguration { + fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) { + builder + .minimum() + // Capture the size of the heap allocation when the component is built. + .with_single_value::() + // We also allocate the backing storage for the string interner up front, which is used by our context + // resolver. + .with_fixed_amount(self.context_string_interner_bytes.as_u64() as usize); + } +} + +#[allow(dead_code)] +pub struct DogstatsDMapper { + metric_mapper: MetricMapper, +} + +impl DogstatsDMapper {} + +impl SynchronousTransform for DogstatsDMapper { + fn transform_buffer(&self, event_buffer: &mut FixedSizeEventBuffer) { + for event in event_buffer { + if let Some(_metric) = event.try_as_metric_mut() { + println!("rz6300 got metric {} from dogstatsd mapper", _metric.context().name()); + } + } + } +} diff --git a/lib/saluki-components/src/transforms/mod.rs b/lib/saluki-components/src/transforms/mod.rs index 50bd26e5..1dcad66f 100644 --- a/lib/saluki-components/src/transforms/mod.rs +++ b/lib/saluki-components/src/transforms/mod.rs @@ -11,3 +11,6 @@ pub use self::host_enrichment::HostEnrichmentConfiguration; mod dogstatsd_prefix_filter; pub use self::dogstatsd_prefix_filter::DogstatsDPrefixFilterConfiguration; + +mod dogstatsd_mapper; +pub use self::dogstatsd_mapper::DogstatsDMapperConfiguration;