Skip to content

Commit

Permalink
feat: update cw stream plugin to add gmd based plugin compatibility
Browse files Browse the repository at this point in the history
  • Loading branch information
khurramcheema committed Oct 22, 2024
1 parent 75194db commit 2c65603
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"encoding/base64"
"encoding/json"
"errors"
"github.com/influxdata/telegraf/internal"
"math"
"net"
"net/http"
Expand All @@ -33,13 +34,14 @@ var sampleConfig string
const defaultMaxBodySize = 500 * 1024 * 1024

type CloudWatchMetricStreams struct {
ServiceAddress string `toml:"service_address"`
Paths []string `toml:"paths"`
MaxBodySize config.Size `toml:"max_body_size"`
ReadTimeout config.Duration `toml:"read_timeout"`
WriteTimeout config.Duration `toml:"write_timeout"`
AccessKey string `toml:"access_key"`
APICompatability bool `toml:"api_compatability"`
ServiceAddress string `toml:"service_address"`
Paths []string `toml:"paths"`
MaxBodySize config.Size `toml:"max_body_size"`
ReadTimeout config.Duration `toml:"read_timeout"`
WriteTimeout config.Duration `toml:"write_timeout"`
AccessKey string `toml:"access_key"`
APICompatability bool `toml:"api_compatability"`
GMDPluginCompatibility bool `toml:"gmd_plugin_compatability"`

requestsReceived selfstat.Stat
writesServed selfstat.Stat
Expand Down Expand Up @@ -341,7 +343,8 @@ func (cms *CloudWatchMetricStreams) composeMetrics(data data) {
}

// Rename Statistics to match the CloudWatch API if in API Compatability mode
if cms.APICompatability {
if cms.APICompatability && !cms.GMDPluginCompatibility {

if v, ok := fields["max"]; ok {
fields["maximum"] = v
delete(fields, "max")
Expand All @@ -358,10 +361,56 @@ func (cms *CloudWatchMetricStreams) composeMetrics(data data) {
}
}

tags["accountId"] = data.AccountID
if cms.GMDPluginCompatibility {
measurement = sanitizeMeasurement(data.Namespace)
metricName := snakeCase(data.MetricName)

_sum := data.Value["sum"]
_count := data.Value["count"]
average := float64(0)
if _count > 0 {
average = _sum / _count
}

fields[metricName+"_average"] = average

max, ok := fields["max"]
if ok {
fields[metricName+"_maximum"] = max
delete(fields, "max")
}

min, ok := fields["min"]
if ok {
fields[metricName+"_minimum"] = min
delete(fields, "min")
}

count, ok := fields["count"]
if ok {
fields[metricName+"_sample_count"] = count
delete(fields, "count")
}

sum, ok := fields["sum"]
if ok {
fields[metricName+"_sum"] = sum
delete(fields, "sum")
}
}

if cms.GMDPluginCompatibility {
tags["account"] = data.AccountID
} else {
tags["accountId"] = data.AccountID
}

tags["region"] = data.Region

for dimension, value := range data.Dimensions {
if cms.GMDPluginCompatibility {
dimension = snakeCase(dimension)
}
tags[dimension] = value
}

Expand Down Expand Up @@ -414,6 +463,19 @@ func (cms *CloudWatchMetricStreams) authenticateIfSet(handler http.HandlerFunc,
}
}

func sanitizeMeasurement(namespace string) string {
namespace = strings.ReplaceAll(namespace, "/", "_")
namespace = snakeCase(namespace)
return "cloudwatch_" + namespace
}

func snakeCase(s string) string {
s = internal.SnakeCase(s)
s = strings.ReplaceAll(s, " ", "_")
s = strings.ReplaceAll(s, "__", "_")
return s
}

func init() {
inputs.Add("cloudwatch_metric_streams", func() telegraf.Input {
return &CloudWatchMetricStreams{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,12 @@ func newTestCompatibleCloudWatchMetricStreams() *CloudWatchMetricStreams {
return metricStream
}

func newTestGMDCompatibleCloudWatchMetricStreams() *CloudWatchMetricStreams {
metricStream := newTestCloudWatchMetricStreams()
metricStream.GMDPluginCompatibility = true
return metricStream
}

func getHTTPSClient() *http.Client {
tlsConfig, err := pki.TLSClientConfig().TLSConfig()
if err != nil {
Expand Down Expand Up @@ -358,6 +364,37 @@ func TestComposeAPICompatibleMetrics(t *testing.T) {
)
}

func TestComposeGMDPluginCompatibleMetrics(t *testing.T) {
metricStream := newTestGMDCompatibleCloudWatchMetricStreams()

acc := &testutil.Accumulator{}
require.NoError(t, metricStream.Init())
require.NoError(t, metricStream.Start(acc))
defer metricStream.Stop()

// compose a data object for writing
data := data{
MetricStreamName: "cloudwatch-metric-stream",
AccountID: "546734499701",
Region: "us-west-2",
Namespace: "AWS/EC2",
MetricName: "CPUUtilization",
Dimensions: map[string]string{"AutoScalingGroupName": "test-autoscaling-group"},
Timestamp: 1651679400000,
Value: map[string]float64{"max": 0.4366666666666666, "min": 0.3683333333333333, "sum": 1.9399999999999997, "count": 5.0},
Unit: "Percent",
}

// Compose the metrics from data
metricStream.composeMetrics(data)

acc.Wait(1)
acc.AssertContainsTaggedFields(t, "cloudwatch_aws_ec2",
map[string]interface{}{"cpu_utilization_maximum": 0.4366666666666666, "cpu_utilization_minimum": 0.3683333333333333, "cpu_utilization_sum": 1.9399999999999997, "cpu_utilization_sample_count": 5.0, "cpu_utilization_average": 0.38799999999999996},
map[string]string{"auto_scaling_group_name": "test-autoscaling-group", "account": "546734499701", "region": "us-west-2"},
)
}

// post GZIP encoded data to the metric stream listener
func TestWriteHTTPGzippedData(t *testing.T) {
metricStream := newTestCloudWatchMetricStreams()
Expand Down

0 comments on commit 2c65603

Please sign in to comment.