diff --git a/lib/saluki-components/src/transforms/dogstatsd_mapper/mod.rs b/lib/saluki-components/src/transforms/dogstatsd_mapper/mod.rs index cb551978..af414c0a 100644 --- a/lib/saluki-components/src/transforms/dogstatsd_mapper/mod.rs +++ b/lib/saluki-components/src/transforms/dogstatsd_mapper/mod.rs @@ -8,7 +8,7 @@ use bytesize::ByteSize; use memory_accounting::{MemoryBounds, MemoryBoundsBuilder}; use regex::Regex; use saluki_config::GenericConfiguration; -use saluki_context::{Context, ContextResolver, ContextResolverBuilder}; +use saluki_context::{tags::TagSet, Context, ContextResolver, ContextResolverBuilder}; use saluki_core::{ components::transforms::{SynchronousTransform, SynchronousTransformBuilder}, topology::interconnect::FixedSizeEventBuffer, @@ -17,84 +17,44 @@ use saluki_error::{generic_error, GenericError}; use serde::{Deserialize, Serialize}; use serde_with::{serde_as, DisplayFromStr, PickFirst}; -pub const MATCH_TYPE_WILDCARD: &str = "wildcard"; -pub const MATCH_TYPE_REGEX: &str = "regex"; +const MATCH_TYPE_WILDCARD: &str = "wildcard"; +const MATCH_TYPE_REGEX: &str = "regex"; static ALLOWED_WILDCARD_MATCH_PATTERN: LazyLock = LazyLock::new(|| Regex::new(r"^[a-zA-Z0-9\-_*.]+$").expect("Invalid regex in ALLOWED_WILDCARD_MATCH_PATTERN")); +const fn default_context_string_interner_size() -> ByteSize { + ByteSize::kib(64) +} /// DogstatsD mapper transform. #[serde_as] #[derive(Deserialize)] pub struct DogstatsDMapperConfiguration { - #[serde(skip)] + /// Total size of the string interner used for contexts. + /// + /// This controls the amount of memory that can be used to intern metric names and tags. If the interner is full, + /// metrics with contexts that have not already been resolved may or may not be dropped, depending on the value of + /// `allow_context_heap_allocations`. + #[serde( + rename = "dogstatsd_mapper_string_interner_size", + default = "default_context_string_interner_size" + )] context_string_interner_bytes: ByteSize, + /// Configuration related to metric mapping. #[serde_as(as = "PickFirst<(DisplayFromStr, _)>")] #[serde(default)] dogstatsd_mapper_profiles: MapperProfileConfigs, } -#[derive(Clone, Debug, Default, Deserialize, Serialize)] -struct MapperProfileConfigs(pub Vec); - #[derive(Clone, Debug, Default, Deserialize, Serialize)] struct MappingProfileConfig { - pub name: String, - pub prefix: String, - pub mappings: Vec, -} - -#[derive(Clone, Debug, Default, Deserialize, Serialize)] -struct MetricMappingConfig { - #[serde(rename = "match")] - metric_match: String, - #[serde(rename = "match_type")] - match_type: String, name: String, - tags: HashMap, -} - -struct MetricMapper { - profiles: Vec, -} - -impl MetricMapper { - fn map(&self, context_resolver: &mut ContextResolver, metric_name: &str) -> Option { - for profile in &self.profiles { - if !metric_name.starts_with(&profile.prefix) && profile.prefix != "*" { - continue; - } - - for mapping in &profile.mappings { - if let Some(captures) = mapping.regex.captures(metric_name) { - let mut name = String::new(); - captures.expand(&mapping.name, &mut name); - - let mut tags = Vec::with_capacity(mapping.tags.len()); - for (tag_key, tag_value_expr) in &mapping.tags { - let mut expanded_value = String::new(); - captures.expand(tag_value_expr, &mut expanded_value); - tags.push(format!("{}:{}", tag_key, expanded_value)); - } - return context_resolver.resolve(&name, tags, None); - } - } - } - None - } -} - -struct MappingProfile { prefix: String, - mappings: Vec, -} - -struct MetricMapping { - name: String, - tags: HashMap, - regex: Regex, + mappings: Vec, } +#[derive(Clone, Debug, Default, Deserialize, Serialize)] +struct MapperProfileConfigs(pub Vec); impl FromStr for MapperProfileConfigs { type Err = serde_json::Error; @@ -105,41 +65,8 @@ impl FromStr for MapperProfileConfigs { } } -fn build_regex(match_re: &str, match_type: &str) -> Result { - let mut pattern = match_re.to_owned(); - if match_type == MATCH_TYPE_WILDCARD { - // Check it against the allowed wildcard pattern - if !ALLOWED_WILDCARD_MATCH_PATTERN.is_match(&pattern) { - return Err(generic_error!( - "invalid wildcard match pattern `{}`, it does not match allowed match regex `{}`", - pattern, - ALLOWED_WILDCARD_MATCH_PATTERN.as_str() - )); - } - if pattern.contains("**") { - return Err(generic_error!( - "invalid wildcard match pattern `{}`, it should not contain consecutive `*`", - pattern - )); - } - pattern = pattern.replace(".", "\\."); - pattern = pattern.replace("*", "([^.]*)"); - } - - let final_pattern = format!("^{}$", pattern); - - match Regex::new(&final_pattern) { - Ok(re) => Ok(re), - Err(e) => Err(generic_error!( - "invalid match `{}`, cannot compile regex: {}", - match_re, - e - )), - } -} - impl MapperProfileConfigs { - fn build(&self) -> Result { + fn build(&self, context_string_interner_bytes: ByteSize) -> Result { let mut profiles = Vec::with_capacity(self.0.len()); for (i, config_profile) in self.0.iter().enumerate() { if config_profile.name.is_empty() { @@ -166,14 +93,14 @@ impl MapperProfileConfigs { i )); } - if mapping.name == "" { + if mapping.name.is_empty() { return Err(generic_error!( "profile: {}, mapping num {}: name is required", config_profile.name, i )); } - if mapping.name == "" { + if mapping.match_type.is_empty() { return Err(generic_error!( "profile: {}, mapping num {}: match is required", config_profile.name, @@ -190,27 +117,122 @@ impl MapperProfileConfigs { profiles.push(profile); } - Ok(MetricMapper { profiles }) + let context_string_interner_size = NonZeroUsize::new(context_string_interner_bytes.as_u64() as usize) + .ok_or_else(|| generic_error!("context_string_interner_size must be greater than 0")) + .unwrap(); + let context_resolver = ContextResolverBuilder::from_name("dogstatsd_mapper") + .expect("resolver name is not empty") + .with_interner_capacity_bytes(context_string_interner_size) + .build(); + + Ok(MetricMapper { + context_resolver, + profiles, + }) + } +} + +fn build_regex(match_re: &str, match_type: &str) -> Result { + let mut pattern = match_re.to_owned(); + if match_type == MATCH_TYPE_WILDCARD { + // Check it against the allowed wildcard pattern + if !ALLOWED_WILDCARD_MATCH_PATTERN.is_match(&pattern) { + return Err(generic_error!( + "invalid wildcard match pattern `{}`, it does not match allowed match regex `{}`", + pattern, + ALLOWED_WILDCARD_MATCH_PATTERN.as_str() + )); + } + if pattern.contains("**") { + return Err(generic_error!( + "invalid wildcard match pattern `{}`, it should not contain consecutive `*`", + pattern + )); + } + pattern = pattern.replace(".", "\\."); + pattern = pattern.replace("*", "([^.]*)"); + } + + let final_pattern = format!("^{}$", pattern); + + match Regex::new(&final_pattern) { + Ok(re) => Ok(re), + Err(e) => Err(generic_error!( + "invalid match `{}`, cannot compile regex: {}", + match_re, + e + )), + } +} + +#[derive(Clone, Debug, Default, Deserialize, Serialize)] +struct MetricMappingConfig { + #[serde(rename = "match")] + metric_match: String, + match_type: String, + name: String, + tags: HashMap, +} + +struct MappingProfile { + prefix: String, + mappings: Vec, +} + +struct MetricMapping { + name: String, + tags: HashMap, + regex: Regex, +} + +struct MetricMapper { + profiles: Vec, + context_resolver: ContextResolver, +} + +impl MetricMapper { + fn map(&mut self, metric_name: &str, existing_tags: TagSet) -> Option { + for profile in &self.profiles { + if !metric_name.starts_with(&profile.prefix) && profile.prefix != "*" { + continue; + } + + for mapping in &profile.mappings { + if let Some(captures) = mapping.regex.captures(metric_name) { + let mut name = String::new(); + captures.expand(&mapping.name, &mut name); + + let mut tags = Vec::with_capacity(mapping.tags.len()); + for (tag_key, tag_value_expr) in &mapping.tags { + let mut expanded_value = String::new(); + captures.expand(tag_value_expr, &mut expanded_value); + tags.push(format!("{}:{}", tag_key, expanded_value)); + } + for tag in existing_tags { + tags.push(tag.as_str().to_owned()); + } + return self.context_resolver.resolve(&name, tags, None); + } + } + } + None } } impl DogstatsDMapperConfiguration { /// Creates a new `DogstatsDMapperConfiguration` from the given configuration. pub fn from_configuration(config: &GenericConfiguration) -> Result { - let mut config: Self = config.as_typed()?; - config.context_string_interner_bytes = ByteSize::kib(512); - Ok(config) + Ok(config.as_typed()?) } } #[async_trait] impl SynchronousTransformBuilder for DogstatsDMapperConfiguration { async fn build(&self) -> Result, GenericError> { - let metric_mapper = self.dogstatsd_mapper_profiles.build()?; - Ok(Box::new(DogstatsDMapper { - context_string_interner_bytes: self.context_string_interner_bytes, - metric_mapper, - })) + let metric_mapper = self + .dogstatsd_mapper_profiles + .build(self.context_string_interner_bytes)?; + Ok(Box::new(DogstatsDMapper { metric_mapper })) } } @@ -227,25 +249,17 @@ impl MemoryBounds for DogstatsDMapperConfiguration { } pub struct DogstatsDMapper { - context_string_interner_bytes: ByteSize, metric_mapper: MetricMapper, } -impl DogstatsDMapper {} - impl SynchronousTransform for DogstatsDMapper { - fn transform_buffer(&self, event_buffer: &mut FixedSizeEventBuffer) { - let context_string_interner_size = NonZeroUsize::new(self.context_string_interner_bytes.as_u64() as usize) - .ok_or_else(|| generic_error!("context_string_interner_size must be greater than 0")) - .unwrap(); - let mut context_resolver = ContextResolverBuilder::from_name("dogstatsd_mapper") - .expect("resolver name is not empty") - .with_interner_capacity_bytes(context_string_interner_size) - .build(); - + fn transform_buffer(&mut self, event_buffer: &mut FixedSizeEventBuffer) { for event in event_buffer { if let Some(metric) = event.try_as_metric_mut() { - if let Some(new_context) = self.metric_mapper.map(&mut context_resolver, metric.context().name()) { + if let Some(new_context) = self + .metric_mapper + .map(metric.context().name(), metric.context().tags().to_owned()) + { *metric.context_mut() = new_context; } }