Skip to content

Commit

Permalink
Feedback
Browse files Browse the repository at this point in the history
Signed-off-by: Raymond Zhao <[email protected]>
  • Loading branch information
rayz committed Feb 6, 2025
1 parent 2425037 commit ac9d553
Showing 1 changed file with 131 additions and 117 deletions.
248 changes: 131 additions & 117 deletions lib/saluki-components/src/transforms/dogstatsd_mapper/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use bytesize::ByteSize;
use memory_accounting::{MemoryBounds, MemoryBoundsBuilder};
use regex::Regex;
use saluki_config::GenericConfiguration;
use saluki_context::{Context, ContextResolver, ContextResolverBuilder};
use saluki_context::{tags::TagSet, Context, ContextResolver, ContextResolverBuilder};
use saluki_core::{
components::transforms::{SynchronousTransform, SynchronousTransformBuilder},
topology::interconnect::FixedSizeEventBuffer,
Expand All @@ -17,84 +17,44 @@ use saluki_error::{generic_error, GenericError};
use serde::{Deserialize, Serialize};
use serde_with::{serde_as, DisplayFromStr, PickFirst};

pub const MATCH_TYPE_WILDCARD: &str = "wildcard";
pub const MATCH_TYPE_REGEX: &str = "regex";
const MATCH_TYPE_WILDCARD: &str = "wildcard";
const MATCH_TYPE_REGEX: &str = "regex";

static ALLOWED_WILDCARD_MATCH_PATTERN: LazyLock<Regex> =
LazyLock::new(|| Regex::new(r"^[a-zA-Z0-9\-_*.]+$").expect("Invalid regex in ALLOWED_WILDCARD_MATCH_PATTERN"));

const fn default_context_string_interner_size() -> ByteSize {
ByteSize::kib(64)
}
/// DogstatsD mapper transform.
#[serde_as]
#[derive(Deserialize)]
pub struct DogstatsDMapperConfiguration {
#[serde(skip)]
/// Total size of the string interner used for contexts.
///
/// This controls the amount of memory that can be used to intern metric names and tags. If the interner is full,
/// metrics with contexts that have not already been resolved may or may not be dropped, depending on the value of
/// `allow_context_heap_allocations`.
#[serde(
rename = "dogstatsd_mapper_string_interner_size",
default = "default_context_string_interner_size"
)]
context_string_interner_bytes: ByteSize,

/// Configuration related to metric mapping.
#[serde_as(as = "PickFirst<(DisplayFromStr, _)>")]
#[serde(default)]
dogstatsd_mapper_profiles: MapperProfileConfigs,
}

#[derive(Clone, Debug, Default, Deserialize, Serialize)]
struct MapperProfileConfigs(pub Vec<MappingProfileConfig>);

#[derive(Clone, Debug, Default, Deserialize, Serialize)]
struct MappingProfileConfig {
pub name: String,
pub prefix: String,
pub mappings: Vec<MetricMappingConfig>,
}

#[derive(Clone, Debug, Default, Deserialize, Serialize)]
struct MetricMappingConfig {
#[serde(rename = "match")]
metric_match: String,
#[serde(rename = "match_type")]
match_type: String,
name: String,
tags: HashMap<String, String>,
}

struct MetricMapper {
profiles: Vec<MappingProfile>,
}

impl MetricMapper {
fn map(&self, context_resolver: &mut ContextResolver, metric_name: &str) -> Option<Context> {
for profile in &self.profiles {
if !metric_name.starts_with(&profile.prefix) && profile.prefix != "*" {
continue;
}

for mapping in &profile.mappings {
if let Some(captures) = mapping.regex.captures(metric_name) {
let mut name = String::new();
captures.expand(&mapping.name, &mut name);

let mut tags = Vec::with_capacity(mapping.tags.len());
for (tag_key, tag_value_expr) in &mapping.tags {
let mut expanded_value = String::new();
captures.expand(tag_value_expr, &mut expanded_value);
tags.push(format!("{}:{}", tag_key, expanded_value));
}
return context_resolver.resolve(&name, tags, None);
}
}
}
None
}
}

struct MappingProfile {
prefix: String,
mappings: Vec<MetricMapping>,
}

struct MetricMapping {
name: String,
tags: HashMap<String, String>,
regex: Regex,
mappings: Vec<MetricMappingConfig>,
}
#[derive(Clone, Debug, Default, Deserialize, Serialize)]
struct MapperProfileConfigs(pub Vec<MappingProfileConfig>);

impl FromStr for MapperProfileConfigs {
type Err = serde_json::Error;
Expand All @@ -105,41 +65,8 @@ impl FromStr for MapperProfileConfigs {
}
}

fn build_regex(match_re: &str, match_type: &str) -> Result<Regex, GenericError> {
let mut pattern = match_re.to_owned();
if match_type == MATCH_TYPE_WILDCARD {
// Check it against the allowed wildcard pattern
if !ALLOWED_WILDCARD_MATCH_PATTERN.is_match(&pattern) {
return Err(generic_error!(
"invalid wildcard match pattern `{}`, it does not match allowed match regex `{}`",
pattern,
ALLOWED_WILDCARD_MATCH_PATTERN.as_str()
));
}
if pattern.contains("**") {
return Err(generic_error!(
"invalid wildcard match pattern `{}`, it should not contain consecutive `*`",
pattern
));
}
pattern = pattern.replace(".", "\\.");
pattern = pattern.replace("*", "([^.]*)");
}

let final_pattern = format!("^{}$", pattern);

match Regex::new(&final_pattern) {
Ok(re) => Ok(re),
Err(e) => Err(generic_error!(
"invalid match `{}`, cannot compile regex: {}",
match_re,
e
)),
}
}

impl MapperProfileConfigs {
fn build(&self) -> Result<MetricMapper, GenericError> {
fn build(&self, context_string_interner_bytes: ByteSize) -> Result<MetricMapper, GenericError> {
let mut profiles = Vec::with_capacity(self.0.len());
for (i, config_profile) in self.0.iter().enumerate() {
if config_profile.name.is_empty() {
Expand All @@ -166,14 +93,14 @@ impl MapperProfileConfigs {
i
));
}
if mapping.name == "" {
if mapping.name.is_empty() {
return Err(generic_error!(
"profile: {}, mapping num {}: name is required",
config_profile.name,
i
));
}
if mapping.name == "" {
if mapping.match_type.is_empty() {
return Err(generic_error!(
"profile: {}, mapping num {}: match is required",
config_profile.name,
Expand All @@ -190,27 +117,122 @@ impl MapperProfileConfigs {
profiles.push(profile);
}

Ok(MetricMapper { profiles })
let context_string_interner_size = NonZeroUsize::new(context_string_interner_bytes.as_u64() as usize)
.ok_or_else(|| generic_error!("context_string_interner_size must be greater than 0"))
.unwrap();
let context_resolver = ContextResolverBuilder::from_name("dogstatsd_mapper")
.expect("resolver name is not empty")
.with_interner_capacity_bytes(context_string_interner_size)
.build();

Ok(MetricMapper {
context_resolver,
profiles,
})
}
}

fn build_regex(match_re: &str, match_type: &str) -> Result<Regex, GenericError> {
let mut pattern = match_re.to_owned();
if match_type == MATCH_TYPE_WILDCARD {
// Check it against the allowed wildcard pattern
if !ALLOWED_WILDCARD_MATCH_PATTERN.is_match(&pattern) {
return Err(generic_error!(
"invalid wildcard match pattern `{}`, it does not match allowed match regex `{}`",
pattern,
ALLOWED_WILDCARD_MATCH_PATTERN.as_str()
));
}
if pattern.contains("**") {
return Err(generic_error!(
"invalid wildcard match pattern `{}`, it should not contain consecutive `*`",
pattern
));
}
pattern = pattern.replace(".", "\\.");
pattern = pattern.replace("*", "([^.]*)");
}

let final_pattern = format!("^{}$", pattern);

match Regex::new(&final_pattern) {
Ok(re) => Ok(re),
Err(e) => Err(generic_error!(
"invalid match `{}`, cannot compile regex: {}",
match_re,
e
)),
}
}

#[derive(Clone, Debug, Default, Deserialize, Serialize)]
struct MetricMappingConfig {
#[serde(rename = "match")]
metric_match: String,
match_type: String,
name: String,
tags: HashMap<String, String>,
}

struct MappingProfile {
prefix: String,
mappings: Vec<MetricMapping>,
}

struct MetricMapping {
name: String,
tags: HashMap<String, String>,
regex: Regex,
}

struct MetricMapper {
profiles: Vec<MappingProfile>,
context_resolver: ContextResolver,
}

impl MetricMapper {
fn map(&mut self, metric_name: &str, existing_tags: TagSet) -> Option<Context> {
for profile in &self.profiles {
if !metric_name.starts_with(&profile.prefix) && profile.prefix != "*" {
continue;
}

for mapping in &profile.mappings {
if let Some(captures) = mapping.regex.captures(metric_name) {
let mut name = String::new();
captures.expand(&mapping.name, &mut name);

let mut tags = Vec::with_capacity(mapping.tags.len());
for (tag_key, tag_value_expr) in &mapping.tags {
let mut expanded_value = String::new();
captures.expand(tag_value_expr, &mut expanded_value);
tags.push(format!("{}:{}", tag_key, expanded_value));
}
for tag in existing_tags {
tags.push(tag.as_str().to_owned());
}
return self.context_resolver.resolve(&name, tags, None);
}
}
}
None
}
}

impl DogstatsDMapperConfiguration {
/// Creates a new `DogstatsDMapperConfiguration` from the given configuration.
pub fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
let mut config: Self = config.as_typed()?;
config.context_string_interner_bytes = ByteSize::kib(512);
Ok(config)
Ok(config.as_typed()?)
}
}

#[async_trait]
impl SynchronousTransformBuilder for DogstatsDMapperConfiguration {
async fn build(&self) -> Result<Box<dyn SynchronousTransform + Send>, GenericError> {
let metric_mapper = self.dogstatsd_mapper_profiles.build()?;
Ok(Box::new(DogstatsDMapper {
context_string_interner_bytes: self.context_string_interner_bytes,
metric_mapper,
}))
let metric_mapper = self
.dogstatsd_mapper_profiles
.build(self.context_string_interner_bytes)?;
Ok(Box::new(DogstatsDMapper { metric_mapper }))
}
}

Expand All @@ -227,25 +249,17 @@ impl MemoryBounds for DogstatsDMapperConfiguration {
}

pub struct DogstatsDMapper {
context_string_interner_bytes: ByteSize,
metric_mapper: MetricMapper,
}

impl DogstatsDMapper {}

impl SynchronousTransform for DogstatsDMapper {
fn transform_buffer(&self, event_buffer: &mut FixedSizeEventBuffer) {
let context_string_interner_size = NonZeroUsize::new(self.context_string_interner_bytes.as_u64() as usize)
.ok_or_else(|| generic_error!("context_string_interner_size must be greater than 0"))
.unwrap();
let mut context_resolver = ContextResolverBuilder::from_name("dogstatsd_mapper")
.expect("resolver name is not empty")
.with_interner_capacity_bytes(context_string_interner_size)
.build();

fn transform_buffer(&mut self, event_buffer: &mut FixedSizeEventBuffer) {
for event in event_buffer {
if let Some(metric) = event.try_as_metric_mut() {
if let Some(new_context) = self.metric_mapper.map(&mut context_resolver, metric.context().name()) {
if let Some(new_context) = self
.metric_mapper
.map(metric.context().name(), metric.context().tags().to_owned())
{
*metric.context_mut() = new_context;
}
}
Expand Down

0 comments on commit ac9d553

Please sign in to comment.