From 2c65603ba0225abd6f87d664a15e0f7704dbd2d7 Mon Sep 17 00:00:00 2001 From: Khurram Mehmood Cheema Date: Wed, 23 Oct 2024 01:04:38 +0500 Subject: [PATCH] feat: update cw stream plugin to add gmd based plugin compatibility --- .../cloudwatch_metric_streams.go | 80 ++++++++++++++++--- .../cloudwatch_metric_streams_test.go | 37 +++++++++ 2 files changed, 108 insertions(+), 9 deletions(-) diff --git a/plugins/inputs/cloudwatch_metric_streams/cloudwatch_metric_streams.go b/plugins/inputs/cloudwatch_metric_streams/cloudwatch_metric_streams.go index a453932859139..f6e4b7dadccdc 100644 --- a/plugins/inputs/cloudwatch_metric_streams/cloudwatch_metric_streams.go +++ b/plugins/inputs/cloudwatch_metric_streams/cloudwatch_metric_streams.go @@ -8,6 +8,7 @@ import ( "encoding/base64" "encoding/json" "errors" + "github.com/influxdata/telegraf/internal" "math" "net" "net/http" @@ -33,13 +34,14 @@ var sampleConfig string const defaultMaxBodySize = 500 * 1024 * 1024 type CloudWatchMetricStreams struct { - ServiceAddress string `toml:"service_address"` - Paths []string `toml:"paths"` - MaxBodySize config.Size `toml:"max_body_size"` - ReadTimeout config.Duration `toml:"read_timeout"` - WriteTimeout config.Duration `toml:"write_timeout"` - AccessKey string `toml:"access_key"` - APICompatability bool `toml:"api_compatability"` + ServiceAddress string `toml:"service_address"` + Paths []string `toml:"paths"` + MaxBodySize config.Size `toml:"max_body_size"` + ReadTimeout config.Duration `toml:"read_timeout"` + WriteTimeout config.Duration `toml:"write_timeout"` + AccessKey string `toml:"access_key"` + APICompatability bool `toml:"api_compatability"` + GMDPluginCompatibility bool `toml:"gmd_plugin_compatability"` requestsReceived selfstat.Stat writesServed selfstat.Stat @@ -341,7 +343,8 @@ func (cms *CloudWatchMetricStreams) composeMetrics(data data) { } // Rename Statistics to match the CloudWatch API if in API Compatability mode - if cms.APICompatability { + if cms.APICompatability && !cms.GMDPluginCompatibility { + if v, ok := fields["max"]; ok { fields["maximum"] = v delete(fields, "max") @@ -358,10 +361,56 @@ func (cms *CloudWatchMetricStreams) composeMetrics(data data) { } } - tags["accountId"] = data.AccountID + if cms.GMDPluginCompatibility { + measurement = sanitizeMeasurement(data.Namespace) + metricName := snakeCase(data.MetricName) + + _sum := data.Value["sum"] + _count := data.Value["count"] + average := float64(0) + if _count > 0 { + average = _sum / _count + } + + fields[metricName+"_average"] = average + + max, ok := fields["max"] + if ok { + fields[metricName+"_maximum"] = max + delete(fields, "max") + } + + min, ok := fields["min"] + if ok { + fields[metricName+"_minimum"] = min + delete(fields, "min") + } + + count, ok := fields["count"] + if ok { + fields[metricName+"_sample_count"] = count + delete(fields, "count") + } + + sum, ok := fields["sum"] + if ok { + fields[metricName+"_sum"] = sum + delete(fields, "sum") + } + } + + if cms.GMDPluginCompatibility { + tags["account"] = data.AccountID + } else { + tags["accountId"] = data.AccountID + } + tags["region"] = data.Region for dimension, value := range data.Dimensions { + if cms.GMDPluginCompatibility { + dimension = snakeCase(dimension) + } tags[dimension] = value } @@ -414,6 +463,19 @@ func (cms *CloudWatchMetricStreams) authenticateIfSet(handler http.HandlerFunc, } } +func sanitizeMeasurement(namespace string) string { + namespace = strings.ReplaceAll(namespace, "/", "_") + namespace = snakeCase(namespace) + return "cloudwatch_" + namespace +} + +func snakeCase(s string) string { + s = internal.SnakeCase(s) + s = strings.ReplaceAll(s, " ", "_") + s = strings.ReplaceAll(s, "__", "_") + return s +} + func init() { inputs.Add("cloudwatch_metric_streams", func() telegraf.Input { return &CloudWatchMetricStreams{ diff --git a/plugins/inputs/cloudwatch_metric_streams/cloudwatch_metric_streams_test.go b/plugins/inputs/cloudwatch_metric_streams/cloudwatch_metric_streams_test.go index 5b98692a55564..a340c0e3edfb2 100644 --- a/plugins/inputs/cloudwatch_metric_streams/cloudwatch_metric_streams_test.go +++ b/plugins/inputs/cloudwatch_metric_streams/cloudwatch_metric_streams_test.go @@ -57,6 +57,12 @@ func newTestCompatibleCloudWatchMetricStreams() *CloudWatchMetricStreams { return metricStream } +func newTestGMDCompatibleCloudWatchMetricStreams() *CloudWatchMetricStreams { + metricStream := newTestCloudWatchMetricStreams() + metricStream.GMDPluginCompatibility = true + return metricStream +} + func getHTTPSClient() *http.Client { tlsConfig, err := pki.TLSClientConfig().TLSConfig() if err != nil { @@ -358,6 +364,37 @@ func TestComposeAPICompatibleMetrics(t *testing.T) { ) } +func TestComposeGMDPluginCompatibleMetrics(t *testing.T) { + metricStream := newTestGMDCompatibleCloudWatchMetricStreams() + + acc := &testutil.Accumulator{} + require.NoError(t, metricStream.Init()) + require.NoError(t, metricStream.Start(acc)) + defer metricStream.Stop() + + // compose a data object for writing + data := data{ + MetricStreamName: "cloudwatch-metric-stream", + AccountID: "546734499701", + Region: "us-west-2", + Namespace: "AWS/EC2", + MetricName: "CPUUtilization", + Dimensions: map[string]string{"AutoScalingGroupName": "test-autoscaling-group"}, + Timestamp: 1651679400000, + Value: map[string]float64{"max": 0.4366666666666666, "min": 0.3683333333333333, "sum": 1.9399999999999997, "count": 5.0}, + Unit: "Percent", + } + + // Compose the metrics from data + metricStream.composeMetrics(data) + + acc.Wait(1) + acc.AssertContainsTaggedFields(t, "cloudwatch_aws_ec2", + map[string]interface{}{"cpu_utilization_maximum": 0.4366666666666666, "cpu_utilization_minimum": 0.3683333333333333, "cpu_utilization_sum": 1.9399999999999997, "cpu_utilization_sample_count": 5.0, "cpu_utilization_average": 0.38799999999999996}, + map[string]string{"auto_scaling_group_name": "test-autoscaling-group", "account": "546734499701", "region": "us-west-2"}, + ) +} + // post GZIP encoded data to the metric stream listener func TestWriteHTTPGzippedData(t *testing.T) { metricStream := newTestCloudWatchMetricStreams()