Skip to content

Commit

Permalink
unit test
Browse files Browse the repository at this point in the history
  • Loading branch information
phuhung273 committed Dec 26, 2024
1 parent 226d3d7 commit 78dc69c
Show file tree
Hide file tree
Showing 2 changed files with 124 additions and 23 deletions.
51 changes: 29 additions & 22 deletions pkg/observability/opentelemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,9 @@ var (
// Metrics represents the stats for observability
type Metrics struct {
enabled bool
nthConfig config.Config
ec2Helper ec2helper.EC2Helper
node *node.Node
meter api.Meter
actionsCounter api.Int64Counter
actionsCounterV2 api.Int64Counter
Expand All @@ -70,6 +72,8 @@ func InitMetrics(nthConfig config.Config, node *node.Node, ec2 ec2iface.EC2API)
}
metrics.enabled = nthConfig.EnablePrometheus
metrics.ec2Helper = ec2helper.New(ec2)
metrics.node = node
metrics.nthConfig = nthConfig

// Starts an async process to collect golang runtime stats
// go.opentelemetry.io/contrib/instrumentation/runtime
Expand All @@ -79,39 +83,42 @@ func InitMetrics(nthConfig config.Config, node *node.Node, ec2 ec2iface.EC2API)
}

if metrics.enabled {
go metrics.serveNodeMetrics(nthConfig, node)
metrics.initCronMetrics()
serveMetrics(nthConfig.PrometheusPort)
}

return metrics, nil
}

func (m Metrics) serveNodeMetrics(nthConfig config.Config, node *node.Node) {
if !m.enabled {
return
func (m Metrics) initCronMetrics() {
// Run a periodic task
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()

for range ticker.C {
m.serveNodeMetrics()
}
}

for {
instanceIdsMap, err := m.ec2Helper.GetInstanceIdsMapByTagKey(nthConfig.ManagedTag)
if err != nil {
log.Err(err).Msg("Failed to get AWS instance ids")
} else {
m.InstancesRecord(int64(len(instanceIdsMap)))
}
func (m Metrics) serveNodeMetrics() {
instanceIdsMap, err := m.ec2Helper.GetInstanceIdsMapByTagKey(m.nthConfig.ManagedTag)
if err != nil {
log.Err(err).Msg("Failed to get AWS instance ids")
} else {
m.InstancesRecord(int64(len(instanceIdsMap)))
}

nodeInstanceIds, err := node.FetchKubernetesNodeInstanceIds()
if err != nil {
log.Err(err).Msg("Failed to get node instance ids")
} else {
nodeCount := 0
for _, id := range nodeInstanceIds {
if _, ok := instanceIdsMap[id]; ok {
nodeCount++
}
nodeInstanceIds, err := m.node.FetchKubernetesNodeInstanceIds()
if err != nil {
log.Err(err).Msg("Failed to get node instance ids")
} else {
nodeCount := 0
for _, id := range nodeInstanceIds {
if _, ok := instanceIdsMap[id]; ok {
nodeCount++
}
m.NodesRecord(int64(nodeCount))
}
time.Sleep(10 * time.Second)
m.NodesRecord(int64(nodeCount))
}
}

Expand Down
96 changes: 95 additions & 1 deletion pkg/observability/opentelemetry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,25 @@ import (
"testing"
"time"

"github.com/rs/zerolog/log"

"github.com/prometheus/client_golang/prometheus/promhttp"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/exporters/prometheus"
api "go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/sdk/metric"

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/kubectl/pkg/drain"

"github.com/aws/aws-node-termination-handler/pkg/config"
"github.com/aws/aws-node-termination-handler/pkg/ec2helper"
"github.com/aws/aws-node-termination-handler/pkg/node"
h "github.com/aws/aws-node-termination-handler/pkg/test"
"github.com/aws/aws-node-termination-handler/pkg/uptime"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/ec2"
)

var (
Expand All @@ -48,6 +60,9 @@ var (
errorStatus = "error"
mockDefaultPort = 9092
mockClosedPort = 9093
instanceId1 = "i-1"
instanceId2 = "i-2"
instanceId3 = "i-3"
)

func TestInitMetrics(t *testing.T) {
Expand Down Expand Up @@ -109,6 +124,8 @@ func TestRegisterMetricsWith(t *testing.T) {
const errorEventMetricsTotal = 23
const successActionMetricsTotal = 31
const errorActionMetricsTotal = 97
const managedInstancesTotal = 3
const managedNodesTotal = 5

metrics := getMetrics(t)

Expand All @@ -126,6 +143,9 @@ func TestRegisterMetricsWith(t *testing.T) {
metrics.actionsCounterV2.Add(context.Background(), 1, api.WithAttributes(errorActionlabels...))
}

metrics.NodesRecord(managedNodesTotal)
metrics.InstancesRecord(managedInstancesTotal)

responseRecorder := mockMetricsRequest()

validateStatus(t, responseRecorder)
Expand All @@ -135,6 +155,57 @@ func TestRegisterMetricsWith(t *testing.T) {
validateEventErrorTotal(t, metricsMap, errorEventMetricsTotal)
validateActionTotalV2(t, metricsMap, successActionMetricsTotal, successStatus)
validateActionTotalV2(t, metricsMap, errorActionMetricsTotal, errorStatus)
validateGauge(t, metricsMap, managedNodesTotal, "nth_managed_nodes")
validateGauge(t, metricsMap, managedInstancesTotal, "nth_managed_instances")
}

func TestServeNodeMetrics(t *testing.T) {
metrics := getMetrics(t)
metrics.ec2Helper = ec2helper.New(h.MockedEC2{
DescribeInstancesResp: ec2.DescribeInstancesOutput{
Reservations: []*ec2.Reservation{
{
Instances: []*ec2.Instance{
{
InstanceId: aws.String(instanceId1),
},
{
InstanceId: aws.String(instanceId2),
},
{
InstanceId: aws.String(instanceId3),
},
},
},
},
},
})

helper := getDrainHelper(fake.NewSimpleClientset(
&v1.Node{
ObjectMeta: metav1.ObjectMeta{Name: "node-1"},
Spec: v1.NodeSpec{ProviderID: fmt.Sprintf("aws:///us-west-2a/%s", instanceId1)},
},
&v1.Node{
ObjectMeta: metav1.ObjectMeta{Name: "node-2"},
Spec: v1.NodeSpec{ProviderID: fmt.Sprintf("aws:///us-west-2a/%s", instanceId2)},
},
))

node, err := node.NewWithValues(config.Config{}, helper, uptime.Uptime)
h.Ok(t, err)

metrics.node = node
metrics.serveNodeMetrics()

responseRecorder := mockMetricsRequest()

validateStatus(t, responseRecorder)

metricsMap := getMetricsMap(responseRecorder.Body.String())

validateGauge(t, metricsMap, 2, "nth_managed_nodes")
validateGauge(t, metricsMap, 3, "nth_managed_instances")
}

func TestServeMetrics(t *testing.T) {
Expand Down Expand Up @@ -225,6 +296,20 @@ func getMetricsMap(body string) map[string]string {
return metricsMap
}

func getDrainHelper(client *fake.Clientset) *drain.Helper {
return &drain.Helper{
Ctx: context.TODO(),
Client: client,
Force: true,
GracePeriodSeconds: -1,
IgnoreAllDaemonSets: true,
DeleteEmptyDirData: true,
Timeout: time.Duration(120) * time.Second,
Out: log.Logger,
ErrOut: log.Logger,
}
}

func validateEventErrorTotal(t *testing.T, metricsMap map[string]string, expectedTotal int) {
eventErrorTotalKey := fmt.Sprintf("events_error_total{event_error_where=\"%v\",otel_scope_name=\"%v\",otel_scope_version=\"\"}", mockErrorEvent, mockNth)
actualValue, exists := metricsMap[eventErrorTotalKey]
Expand All @@ -242,3 +327,12 @@ func validateActionTotalV2(t *testing.T, metricsMap map[string]string, expectedT
}
h.Equals(t, strconv.Itoa(expectedTotal), actualValue)
}

func validateGauge(t *testing.T, metricsMap map[string]string, expectedTotal int, name string) {
actionTotalKey := fmt.Sprintf("%v{otel_scope_name=\"%v\",otel_scope_version=\"\"}", name, mockNth)
actualValue, exists := metricsMap[actionTotalKey]
if !exists {
actualValue = "0"
}
h.Equals(t, strconv.Itoa(expectedTotal), actualValue)
}

0 comments on commit 78dc69c

Please sign in to comment.