diff --git a/pkg/observability/opentelemetry.go b/pkg/observability/opentelemetry.go index e1141dcc..581792d5 100644 --- a/pkg/observability/opentelemetry.go +++ b/pkg/observability/opentelemetry.go @@ -48,7 +48,9 @@ var ( // Metrics represents the stats for observability type Metrics struct { enabled bool + nthConfig config.Config ec2Helper ec2helper.EC2Helper + node *node.Node meter api.Meter actionsCounter api.Int64Counter actionsCounterV2 api.Int64Counter @@ -70,6 +72,8 @@ func InitMetrics(nthConfig config.Config, node *node.Node, ec2 ec2iface.EC2API) } metrics.enabled = nthConfig.EnablePrometheus metrics.ec2Helper = ec2helper.New(ec2) + metrics.node = node + metrics.nthConfig = nthConfig // Starts an async process to collect golang runtime stats // go.opentelemetry.io/contrib/instrumentation/runtime @@ -79,39 +83,42 @@ func InitMetrics(nthConfig config.Config, node *node.Node, ec2 ec2iface.EC2API) } if metrics.enabled { - go metrics.serveNodeMetrics(nthConfig, node) + metrics.initCronMetrics() serveMetrics(nthConfig.PrometheusPort) } return metrics, nil } -func (m Metrics) serveNodeMetrics(nthConfig config.Config, node *node.Node) { - if !m.enabled { - return +func (m Metrics) initCronMetrics() { + // Run a periodic task + ticker := time.NewTicker(30 * time.Second) + defer ticker.Stop() + + for range ticker.C { + m.serveNodeMetrics() } +} - for { - instanceIdsMap, err := m.ec2Helper.GetInstanceIdsMapByTagKey(nthConfig.ManagedTag) - if err != nil { - log.Err(err).Msg("Failed to get AWS instance ids") - } else { - m.InstancesRecord(int64(len(instanceIdsMap))) - } +func (m Metrics) serveNodeMetrics() { + instanceIdsMap, err := m.ec2Helper.GetInstanceIdsMapByTagKey(m.nthConfig.ManagedTag) + if err != nil { + log.Err(err).Msg("Failed to get AWS instance ids") + } else { + m.InstancesRecord(int64(len(instanceIdsMap))) + } - nodeInstanceIds, err := node.FetchKubernetesNodeInstanceIds() - if err != nil { - log.Err(err).Msg("Failed to get node instance ids") - } else { - nodeCount := 0 - for _, id := range nodeInstanceIds { - if _, ok := instanceIdsMap[id]; ok { - nodeCount++ - } + nodeInstanceIds, err := m.node.FetchKubernetesNodeInstanceIds() + if err != nil { + log.Err(err).Msg("Failed to get node instance ids") + } else { + nodeCount := 0 + for _, id := range nodeInstanceIds { + if _, ok := instanceIdsMap[id]; ok { + nodeCount++ } - m.NodesRecord(int64(nodeCount)) } - time.Sleep(10 * time.Second) + m.NodesRecord(int64(nodeCount)) } } diff --git a/pkg/observability/opentelemetry_test.go b/pkg/observability/opentelemetry_test.go index 237f456e..687d5b93 100644 --- a/pkg/observability/opentelemetry_test.go +++ b/pkg/observability/opentelemetry_test.go @@ -25,13 +25,25 @@ import ( "testing" "time" + "github.com/rs/zerolog/log" + "github.com/prometheus/client_golang/prometheus/promhttp" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/exporters/prometheus" api "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/sdk/metric" - + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes/fake" + "k8s.io/kubectl/pkg/drain" + + "github.com/aws/aws-node-termination-handler/pkg/config" + "github.com/aws/aws-node-termination-handler/pkg/ec2helper" + "github.com/aws/aws-node-termination-handler/pkg/node" h "github.com/aws/aws-node-termination-handler/pkg/test" + "github.com/aws/aws-node-termination-handler/pkg/uptime" + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/ec2" ) var ( @@ -48,6 +60,9 @@ var ( errorStatus = "error" mockDefaultPort = 9092 mockClosedPort = 9093 + instanceId1 = "i-1" + instanceId2 = "i-2" + instanceId3 = "i-3" ) func TestInitMetrics(t *testing.T) { @@ -109,6 +124,8 @@ func TestRegisterMetricsWith(t *testing.T) { const errorEventMetricsTotal = 23 const successActionMetricsTotal = 31 const errorActionMetricsTotal = 97 + const managedInstancesTotal = 3 + const managedNodesTotal = 5 metrics := getMetrics(t) @@ -126,6 +143,9 @@ func TestRegisterMetricsWith(t *testing.T) { metrics.actionsCounterV2.Add(context.Background(), 1, api.WithAttributes(errorActionlabels...)) } + metrics.NodesRecord(managedNodesTotal) + metrics.InstancesRecord(managedInstancesTotal) + responseRecorder := mockMetricsRequest() validateStatus(t, responseRecorder) @@ -135,6 +155,57 @@ func TestRegisterMetricsWith(t *testing.T) { validateEventErrorTotal(t, metricsMap, errorEventMetricsTotal) validateActionTotalV2(t, metricsMap, successActionMetricsTotal, successStatus) validateActionTotalV2(t, metricsMap, errorActionMetricsTotal, errorStatus) + validateGauge(t, metricsMap, managedNodesTotal, "nth_managed_nodes") + validateGauge(t, metricsMap, managedInstancesTotal, "nth_managed_instances") +} + +func TestServeNodeMetrics(t *testing.T) { + metrics := getMetrics(t) + metrics.ec2Helper = ec2helper.New(h.MockedEC2{ + DescribeInstancesResp: ec2.DescribeInstancesOutput{ + Reservations: []*ec2.Reservation{ + { + Instances: []*ec2.Instance{ + { + InstanceId: aws.String(instanceId1), + }, + { + InstanceId: aws.String(instanceId2), + }, + { + InstanceId: aws.String(instanceId3), + }, + }, + }, + }, + }, + }) + + helper := getDrainHelper(fake.NewSimpleClientset( + &v1.Node{ + ObjectMeta: metav1.ObjectMeta{Name: "node-1"}, + Spec: v1.NodeSpec{ProviderID: fmt.Sprintf("aws:///us-west-2a/%s", instanceId1)}, + }, + &v1.Node{ + ObjectMeta: metav1.ObjectMeta{Name: "node-2"}, + Spec: v1.NodeSpec{ProviderID: fmt.Sprintf("aws:///us-west-2a/%s", instanceId2)}, + }, + )) + + node, err := node.NewWithValues(config.Config{}, helper, uptime.Uptime) + h.Ok(t, err) + + metrics.node = node + metrics.serveNodeMetrics() + + responseRecorder := mockMetricsRequest() + + validateStatus(t, responseRecorder) + + metricsMap := getMetricsMap(responseRecorder.Body.String()) + + validateGauge(t, metricsMap, 2, "nth_managed_nodes") + validateGauge(t, metricsMap, 3, "nth_managed_instances") } func TestServeMetrics(t *testing.T) { @@ -225,6 +296,20 @@ func getMetricsMap(body string) map[string]string { return metricsMap } +func getDrainHelper(client *fake.Clientset) *drain.Helper { + return &drain.Helper{ + Ctx: context.TODO(), + Client: client, + Force: true, + GracePeriodSeconds: -1, + IgnoreAllDaemonSets: true, + DeleteEmptyDirData: true, + Timeout: time.Duration(120) * time.Second, + Out: log.Logger, + ErrOut: log.Logger, + } +} + func validateEventErrorTotal(t *testing.T, metricsMap map[string]string, expectedTotal int) { eventErrorTotalKey := fmt.Sprintf("events_error_total{event_error_where=\"%v\",otel_scope_name=\"%v\",otel_scope_version=\"\"}", mockErrorEvent, mockNth) actualValue, exists := metricsMap[eventErrorTotalKey] @@ -242,3 +327,12 @@ func validateActionTotalV2(t *testing.T, metricsMap map[string]string, expectedT } h.Equals(t, strconv.Itoa(expectedTotal), actualValue) } + +func validateGauge(t *testing.T, metricsMap map[string]string, expectedTotal int, name string) { + actionTotalKey := fmt.Sprintf("%v{otel_scope_name=\"%v\",otel_scope_version=\"\"}", name, mockNth) + actualValue, exists := metricsMap[actionTotalKey] + if !exists { + actualValue = "0" + } + h.Equals(t, strconv.Itoa(expectedTotal), actualValue) +}