Skip to content

Commit

Permalink
restore client creation order
Browse files Browse the repository at this point in the history
  • Loading branch information
phuhung273 committed Jan 13, 2025
1 parent a3ba27e commit d139b86
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 52 deletions.
40 changes: 21 additions & 19 deletions cmd/node-termination-handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,12 @@ func main() {
log.Fatal().Err(err).Msg("Unable to instantiate a node for various kubernetes node functions,")
}

metrics, err := observability.InitMetrics(nthConfig)
if err != nil {
nthConfig.Print()
log.Fatal().Err(err).Msg("Unable to instantiate observability metrics,")
}

err = observability.InitProbes(nthConfig.EnableProbes, nthConfig.ProbesPort, nthConfig.ProbesEndpoint)
if err != nil {
nthConfig.Print()
Expand Down Expand Up @@ -148,25 +154,6 @@ func main() {
log.Fatal().Msgf("Unable to find the AWS region to process queue events.")
}

cfg := aws.NewConfig().WithRegion(nthConfig.AWSRegion).WithEndpoint(nthConfig.AWSEndpoint).WithSTSRegionalEndpoint(endpoints.RegionalSTSEndpoint)
sess := session.Must(session.NewSessionWithOptions(session.Options{
Config: *cfg,
SharedConfigState: session.SharedConfigEnable,
}))
creds, err := sess.Config.Credentials.Get()
if err != nil {
log.Fatal().Err(err).Msg("Unable to get AWS credentials")
}
log.Debug().Msgf("AWS Credentials retrieved from provider: %s", creds.ProviderName)

ec2Client := ec2.New(sess)

metrics, err := observability.InitMetrics(nthConfig, node, ec2Client)
if err != nil {
nthConfig.Print()
log.Fatal().Err(err).Msg("Unable to instantiate observability metrics")
}

recorder, err := observability.InitK8sEventRecorder(nthConfig.EmitKubernetesEvents, nthConfig.NodeName, nthConfig.EnableSQSTerminationDraining, nodeMetadata, nthConfig.KubernetesEventsExtraAnnotations, clientset)
if err != nil {
nthConfig.Print()
Expand Down Expand Up @@ -217,6 +204,21 @@ func main() {
}
}
if nthConfig.EnableSQSTerminationDraining {
cfg := aws.NewConfig().WithRegion(nthConfig.AWSRegion).WithEndpoint(nthConfig.AWSEndpoint).WithSTSRegionalEndpoint(endpoints.RegionalSTSEndpoint)
sess := session.Must(session.NewSessionWithOptions(session.Options{
Config: *cfg,
SharedConfigState: session.SharedConfigEnable,
}))
creds, err := sess.Config.Credentials.Get()
if err != nil {
log.Fatal().Err(err).Msg("Unable to get AWS credentials")
}
log.Debug().Msgf("AWS Credentials retrieved from provider: %s", creds.ProviderName)

ec2Client := ec2.New(sess)

go metrics.InitNodeMetrics(node, ec2Client)

completeLifecycleActionDelay := time.Duration(nthConfig.CompleteLifecycleActionDelaySeconds) * time.Second
sqsMonitor := sqsevent.SQSMonitor{
CheckIfManaged: nthConfig.CheckTagBeforeDraining,
Expand Down
58 changes: 29 additions & 29 deletions pkg/observability/opentelemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,20 +47,20 @@ var (

// Metrics represents the stats for observability
type Metrics struct {
enabled bool
nthConfig config.Config
ec2Helper ec2helper.EC2Helper
node *node.Node
meter api.Meter
actionsCounter api.Int64Counter
actionsCounterV2 api.Int64Counter
errorEventsCounter api.Int64Counter
nodesGauge api.Int64Gauge
instancesGauge api.Int64Gauge
enabled bool
nthConfig config.Config
ec2Helper ec2helper.EC2Helper
node *node.Node
meter api.Meter
actionsCounter api.Int64Counter
actionsCounterV2 api.Int64Counter
errorEventsCounter api.Int64Counter
nthTaggedNodesGauge api.Int64Gauge
nthTaggedInstancesGauge api.Int64Gauge
}

// InitMetrics will initialize, register and expose, via http server, the metrics with Opentelemetry.
func InitMetrics(nthConfig config.Config, node *node.Node, ec2 ec2iface.EC2API) (Metrics, error) {
func InitMetrics(nthConfig config.Config) (Metrics, error) {
exporter, err := prometheus.New()
if err != nil {
return Metrics{}, fmt.Errorf("failed to create Prometheus exporter: %w", err)
Expand All @@ -71,8 +71,6 @@ func InitMetrics(nthConfig config.Config, node *node.Node, ec2 ec2iface.EC2API)
return Metrics{}, fmt.Errorf("failed to register metrics with Prometheus provider: %w", err)
}
metrics.enabled = nthConfig.EnablePrometheus
metrics.ec2Helper = ec2helper.New(ec2)
metrics.node = node
metrics.nthConfig = nthConfig

// Starts an async process to collect golang runtime stats
Expand All @@ -83,14 +81,16 @@ func InitMetrics(nthConfig config.Config, node *node.Node, ec2 ec2iface.EC2API)
}

if metrics.enabled {
go metrics.initCronMetrics()
serveMetrics(nthConfig.PrometheusPort)
}

return metrics, nil
}

func (m Metrics) initCronMetrics() {
func (m Metrics) InitNodeMetrics(node *node.Node, ec2 ec2iface.EC2API) {
m.ec2Helper = ec2helper.New(ec2)
m.node = node

// Run a periodic task
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
Expand Down Expand Up @@ -156,15 +156,15 @@ func (m Metrics) NodesRecord(num int64) {
return
}

m.nodesGauge.Record(context.Background(), num)
m.nthTaggedNodesGauge.Record(context.Background(), num)
}

func (m Metrics) InstancesRecord(num int64) {
if !m.enabled {
return
}

m.instancesGauge.Record(context.Background(), num)
m.nthTaggedInstancesGauge.Record(context.Background(), num)
}

func registerMetricsWith(provider *metric.MeterProvider) (Metrics, error) {
Expand Down Expand Up @@ -194,27 +194,27 @@ func registerMetricsWith(provider *metric.MeterProvider) (Metrics, error) {
}
errorEventsCounter.Add(context.Background(), 0)

name = "nth_managed_nodes"
nodesGauge, err := meter.Int64Gauge(name, api.WithDescription("Number of nodes processing"))
name = "nth_tagged_nodes"
nthTaggedNodesGauge, err := meter.Int64Gauge(name, api.WithDescription("Number of nodes processing"))
if err != nil {
return Metrics{}, fmt.Errorf("failed to create Prometheus gauge %q: %w", name, err)
}
nodesGauge.Record(context.Background(), 0)
nthTaggedNodesGauge.Record(context.Background(), 0)

name = "nth_managed_instances"
instancesGauge, err := meter.Int64Gauge(name, api.WithDescription("Number of instances processing"))
name = "nth_tagged_instances"
nthTaggedInstancesGauge, err := meter.Int64Gauge(name, api.WithDescription("Number of instances processing"))
if err != nil {
return Metrics{}, fmt.Errorf("failed to create Prometheus gauge %q: %w", name, err)
}
instancesGauge.Record(context.Background(), 0)
nthTaggedInstancesGauge.Record(context.Background(), 0)

return Metrics{
meter: meter,
errorEventsCounter: errorEventsCounter,
actionsCounter: actionsCounter,
actionsCounterV2: actionsCounterV2,
nodesGauge: nodesGauge,
instancesGauge: instancesGauge,
meter: meter,
errorEventsCounter: errorEventsCounter,
actionsCounter: actionsCounter,
actionsCounterV2: actionsCounterV2,
nthTaggedNodesGauge: nthTaggedNodesGauge,
nthTaggedInstancesGauge: nthTaggedInstancesGauge,
}, nil
}

Expand Down
8 changes: 4 additions & 4 deletions pkg/observability/opentelemetry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,8 @@ func TestRegisterMetricsWith(t *testing.T) {
validateEventErrorTotal(t, metricsMap, errorEventMetricsTotal)
validateActionTotalV2(t, metricsMap, successActionMetricsTotal, successStatus)
validateActionTotalV2(t, metricsMap, errorActionMetricsTotal, errorStatus)
validateGauge(t, metricsMap, managedNodesTotal, "nth_managed_nodes")
validateGauge(t, metricsMap, managedInstancesTotal, "nth_managed_instances")
validateGauge(t, metricsMap, managedNodesTotal, "nth_tagged_nodes")
validateGauge(t, metricsMap, managedInstancesTotal, "nth_tagged_instances")
}

func TestServeNodeMetrics(t *testing.T) {
Expand Down Expand Up @@ -204,8 +204,8 @@ func TestServeNodeMetrics(t *testing.T) {

metricsMap := getMetricsMap(responseRecorder.Body.String())

validateGauge(t, metricsMap, 2, "nth_managed_nodes")
validateGauge(t, metricsMap, 3, "nth_managed_instances")
validateGauge(t, metricsMap, 2, "nth_tagged_nodes")
validateGauge(t, metricsMap, 3, "nth_tagged_instances")
}

func TestServeMetrics(t *testing.T) {
Expand Down

0 comments on commit d139b86

Please sign in to comment.