Skip to content

Commit

Permalink
Abstract prometheus metrics into interfaces (#1801)
Browse files Browse the repository at this point in the history
  • Loading branch information
jonathan-innis authored Nov 8, 2024
1 parent d22742d commit 2f80354
Show file tree
Hide file tree
Showing 38 changed files with 302 additions and 322 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ go 1.23.2
require (
github.com/Pallinder/go-randomdata v1.2.0
github.com/avast/retry-go v3.0.0+incompatible
github.com/awslabs/operatorpkg v0.0.0-20240920182301-771460b3160b
github.com/awslabs/operatorpkg v0.0.0-20241108183842-a2ebef231d52
github.com/docker/docker v27.3.1+incompatible
github.com/go-logr/logr v1.4.2
github.com/imdario/mergo v0.3.16
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ github.com/Pallinder/go-randomdata v1.2.0 h1:DZ41wBchNRb/0GfsePLiSwb0PHZmT67XY00
github.com/Pallinder/go-randomdata v1.2.0/go.mod h1:yHmJgulpD2Nfrm0cR9tI/+oAgRqCQQixsA8HyRZfV9Y=
github.com/avast/retry-go v3.0.0+incompatible h1:4SOWQ7Qs+oroOTQOYnAHqelpCO0biHSxpiH9JdtuBj0=
github.com/avast/retry-go v3.0.0+incompatible/go.mod h1:XtSnn+n/sHqQIpZ10K1qAevBhOOCWBLXXy3hyiqqBrY=
github.com/awslabs/operatorpkg v0.0.0-20240920182301-771460b3160b h1:aG1+YRmKIf5nLTZJNhw1NmuxvjUprWYyluqJ2jmVqiU=
github.com/awslabs/operatorpkg v0.0.0-20240920182301-771460b3160b/go.mod h1:RI+iNDn57c3WX0tsZg4rvkmM58lWsEC5cc6E4vJJld8=
github.com/awslabs/operatorpkg v0.0.0-20241108183842-a2ebef231d52 h1:k8f1ukVs49+nC6JbN8r8bxs8g1TPE3Iki/dK/LGwf3A=
github.com/awslabs/operatorpkg v0.0.0-20241108183842-a2ebef231d52/go.mod h1:nq1PLBLCojzjfqSK8SG3ymxqwW6e/cHLJvddKOSFkfw=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
Expand Down
39 changes: 19 additions & 20 deletions pkg/cloudprovider/metrics/cloudprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package metrics
import (
"context"

opmetrics "github.com/awslabs/operatorpkg/metrics"
"github.com/prometheus/client_golang/prometheus"
crmetrics "sigs.k8s.io/controller-runtime/pkg/metrics"

Expand All @@ -44,7 +45,8 @@ const (
// decorator implements CloudProvider
var _ cloudprovider.CloudProvider = (*decorator)(nil)

var methodDuration = prometheus.NewHistogramVec(
var MethodDuration = opmetrics.NewPrometheusHistogram(
crmetrics.Registry,
prometheus.HistogramOpts{
Namespace: metrics.Namespace,
Subsystem: "cloudprovider",
Expand All @@ -59,7 +61,8 @@ var methodDuration = prometheus.NewHistogramVec(
)

var (
errorsTotal = prometheus.NewCounterVec(
ErrorsTotal = opmetrics.NewPrometheusCounter(
crmetrics.Registry,
prometheus.CounterOpts{
Namespace: metrics.Namespace,
Subsystem: "cloudprovider",
Expand All @@ -75,10 +78,6 @@ var (
)
)

func init() {
crmetrics.Registry.MustRegister(methodDuration, errorsTotal)
}

type decorator struct {
cloudprovider.CloudProvider
}
Expand All @@ -96,68 +95,68 @@ func Decorate(cloudProvider cloudprovider.CloudProvider) cloudprovider.CloudProv

func (d *decorator) Create(ctx context.Context, nodeClaim *v1.NodeClaim) (*v1.NodeClaim, error) {
method := "Create"
defer metrics.Measure(methodDuration.With(getLabelsMapForDuration(ctx, d, method)))()
defer metrics.Measure(MethodDuration, getLabelsMapForDuration(ctx, d, method))()
nodeClaim, err := d.CloudProvider.Create(ctx, nodeClaim)
if err != nil {
errorsTotal.With(getLabelsMapForError(ctx, d, method, err)).Inc()
ErrorsTotal.Inc(getLabelsMapForError(ctx, d, method, err))
}
return nodeClaim, err
}

func (d *decorator) Delete(ctx context.Context, nodeClaim *v1.NodeClaim) error {
method := "Delete"
defer metrics.Measure(methodDuration.With(getLabelsMapForDuration(ctx, d, method)))()
defer metrics.Measure(MethodDuration, getLabelsMapForDuration(ctx, d, method))()
err := d.CloudProvider.Delete(ctx, nodeClaim)
if err != nil {
errorsTotal.With(getLabelsMapForError(ctx, d, method, err)).Inc()
ErrorsTotal.Inc(getLabelsMapForError(ctx, d, method, err))
}
return err
}

func (d *decorator) Get(ctx context.Context, id string) (*v1.NodeClaim, error) {
method := "Get"
defer metrics.Measure(methodDuration.With(getLabelsMapForDuration(ctx, d, method)))()
defer metrics.Measure(MethodDuration, getLabelsMapForDuration(ctx, d, method))()
nodeClaim, err := d.CloudProvider.Get(ctx, id)
if err != nil {
errorsTotal.With(getLabelsMapForError(ctx, d, method, err)).Inc()
ErrorsTotal.Inc(getLabelsMapForError(ctx, d, method, err))
}
return nodeClaim, err
}

func (d *decorator) List(ctx context.Context) ([]*v1.NodeClaim, error) {
method := "List"
defer metrics.Measure(methodDuration.With(getLabelsMapForDuration(ctx, d, method)))()
defer metrics.Measure(MethodDuration, getLabelsMapForDuration(ctx, d, method))()
nodeClaims, err := d.CloudProvider.List(ctx)
if err != nil {
errorsTotal.With(getLabelsMapForError(ctx, d, method, err)).Inc()
ErrorsTotal.Inc(getLabelsMapForError(ctx, d, method, err))
}
return nodeClaims, err
}

func (d *decorator) GetInstanceTypes(ctx context.Context, nodePool *v1.NodePool) ([]*cloudprovider.InstanceType, error) {
method := "GetInstanceTypes"
defer metrics.Measure(methodDuration.With(getLabelsMapForDuration(ctx, d, method)))()
defer metrics.Measure(MethodDuration, getLabelsMapForDuration(ctx, d, method))()
instanceType, err := d.CloudProvider.GetInstanceTypes(ctx, nodePool)
if err != nil {
errorsTotal.With(getLabelsMapForError(ctx, d, method, err)).Inc()
ErrorsTotal.Inc(getLabelsMapForError(ctx, d, method, err))
}
return instanceType, err
}

func (d *decorator) IsDrifted(ctx context.Context, nodeClaim *v1.NodeClaim) (cloudprovider.DriftReason, error) {
method := "IsDrifted"
defer metrics.Measure(methodDuration.With(getLabelsMapForDuration(ctx, d, method)))()
defer metrics.Measure(MethodDuration, getLabelsMapForDuration(ctx, d, method))()
isDrifted, err := d.CloudProvider.IsDrifted(ctx, nodeClaim)
if err != nil {
errorsTotal.With(getLabelsMapForError(ctx, d, method, err)).Inc()
ErrorsTotal.Inc(getLabelsMapForError(ctx, d, method, err))
}
return isDrifted, err
}

// getLabelsMapForDuration is a convenience func that constructs a map[string]string
// for a prometheus Label map used to compose a duration metric spec
func getLabelsMapForDuration(ctx context.Context, d *decorator, method string) map[string]string {
return prometheus.Labels{
return map[string]string{
metricLabelController: injection.GetControllerName(ctx),
metricLabelMethod: method,
metricLabelProvider: d.Name(),
Expand All @@ -167,7 +166,7 @@ func getLabelsMapForDuration(ctx context.Context, d *decorator, method string) m
// getLabelsMapForError is a convenience func that constructs a map[string]string
// for a prometheus Label map used to compose a counter metric spec
func getLabelsMapForError(ctx context.Context, d *decorator, method string, err error) map[string]string {
return prometheus.Labels{
return map[string]string{
metricLabelController: injection.GetControllerName(ctx),
metricLabelMethod: method,
metricLabelProvider: d.Name(),
Expand Down
12 changes: 6 additions & 6 deletions pkg/controllers/disruption/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,17 +160,17 @@ func (c *Controller) Reconcile(ctx context.Context) (reconcile.Result, error) {
}

func (c *Controller) disrupt(ctx context.Context, disruption Method) (bool, error) {
defer metrics.Measure(EvaluationDurationSeconds.With(map[string]string{
defer metrics.Measure(EvaluationDurationSeconds, map[string]string{
metrics.ReasonLabel: strings.ToLower(string(disruption.Reason())),
consolidationTypeLabel: disruption.ConsolidationType(),
}))()
})()
candidates, err := GetCandidates(ctx, c.cluster, c.kubeClient, c.recorder, c.clock, c.cloudProvider, disruption.ShouldDisrupt, disruption.Class(), c.queue)
if err != nil {
return false, fmt.Errorf("determining candidates, %w", err)
}
EligibleNodes.With(map[string]string{
EligibleNodes.Set(float64(len(candidates)), map[string]string{
metrics.ReasonLabel: strings.ToLower(string(disruption.Reason())),
}).Set(float64(len(candidates)))
})

// If there are no candidates, move to the next disruption
if len(candidates) == 0 {
Expand Down Expand Up @@ -244,11 +244,11 @@ func (c *Controller) executeCommand(ctx context.Context, m Method, cmd Command,
}

// An action is only performed and pods/nodes are only disrupted after a successful add to the queue
DecisionsPerformedTotal.With(map[string]string{
DecisionsPerformedTotal.Inc(map[string]string{
decisionLabel: string(cmd.Decision()),
metrics.ReasonLabel: strings.ToLower(string(m.Reason())),
consolidationTypeLabel: m.ConsolidationType(),
}).Inc()
})
return nil
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/controllers/disruption/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,9 +234,9 @@ func BuildDisruptionBudgetMapping(ctx context.Context, cluster *state.Cluster, c
allowedDisruptions := nodePool.MustGetAllowedDisruptions(clk, numNodes[nodePool.Name], reason)
disruptionBudgetMapping[nodePool.Name] = lo.Max([]int{allowedDisruptions - disrupting[nodePool.Name], 0})

NodePoolAllowedDisruptions.With(map[string]string{
NodePoolAllowedDisruptions.Set(float64(allowedDisruptions), map[string]string{
metrics.NodePoolLabel: nodePool.Name, metrics.ReasonLabel: string(reason),
}).Set(float64(allowedDisruptions))
})
if allowedDisruptions == 0 {
recorder.Publish(disruptionevents.NodePoolBlockedForDisruptionReason(lo.ToPtr(nodePool), reason))
}
Expand Down
26 changes: 11 additions & 15 deletions pkg/controllers/disruption/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,30 +17,22 @@ limitations under the License.
package disruption

import (
opmetrics "github.com/awslabs/operatorpkg/metrics"
"github.com/prometheus/client_golang/prometheus"
crmetrics "sigs.k8s.io/controller-runtime/pkg/metrics"

"sigs.k8s.io/karpenter/pkg/metrics"
)

func init() {
crmetrics.Registry.MustRegister(
EvaluationDurationSeconds,
DecisionsPerformedTotal,
EligibleNodes,
ConsolidationTimeoutsTotal,
NodePoolAllowedDisruptions,
)
}

const (
voluntaryDisruptionSubsystem = "voluntary_disruption"
decisionLabel = "decision"
consolidationTypeLabel = "consolidation_type"
)

var (
EvaluationDurationSeconds = prometheus.NewHistogramVec(
EvaluationDurationSeconds = opmetrics.NewPrometheusHistogram(
crmetrics.Registry,
prometheus.HistogramOpts{
Namespace: metrics.Namespace,
Subsystem: voluntaryDisruptionSubsystem,
Expand All @@ -50,7 +42,8 @@ var (
},
[]string{metrics.ReasonLabel, consolidationTypeLabel},
)
DecisionsPerformedTotal = prometheus.NewCounterVec(
DecisionsPerformedTotal = opmetrics.NewPrometheusCounter(
crmetrics.Registry,
prometheus.CounterOpts{
Namespace: metrics.Namespace,
Subsystem: voluntaryDisruptionSubsystem,
Expand All @@ -59,7 +52,8 @@ var (
},
[]string{decisionLabel, metrics.ReasonLabel, consolidationTypeLabel},
)
EligibleNodes = prometheus.NewGaugeVec(
EligibleNodes = opmetrics.NewPrometheusGauge(
crmetrics.Registry,
prometheus.GaugeOpts{
Namespace: metrics.Namespace,
Subsystem: voluntaryDisruptionSubsystem,
Expand All @@ -68,7 +62,8 @@ var (
},
[]string{metrics.ReasonLabel},
)
ConsolidationTimeoutsTotal = prometheus.NewCounterVec(
ConsolidationTimeoutsTotal = opmetrics.NewPrometheusCounter(
crmetrics.Registry,
prometheus.CounterOpts{
Namespace: metrics.Namespace,
Subsystem: voluntaryDisruptionSubsystem,
Expand All @@ -77,7 +72,8 @@ var (
},
[]string{consolidationTypeLabel},
)
NodePoolAllowedDisruptions = prometheus.NewGaugeVec(
NodePoolAllowedDisruptions = opmetrics.NewPrometheusGauge(
crmetrics.Registry,
prometheus.GaugeOpts{
Namespace: metrics.Namespace,
Subsystem: metrics.NodePoolSubsystem,
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/disruption/multinodeconsolidation.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func (m *MultiNodeConsolidation) firstNConsolidationOption(ctx context.Context,
// binary search to find the maximum number of NodeClaims we can terminate
for min <= max {
if m.clock.Now().After(timeout) {
ConsolidationTimeoutsTotal.WithLabelValues(m.ConsolidationType()).Inc()
ConsolidationTimeoutsTotal.Inc(map[string]string{consolidationTypeLabel: m.ConsolidationType()})
if lastSavedCommand.candidates == nil {
log.FromContext(ctx).V(1).Info(fmt.Sprintf("failed to find a multi-node consolidation after timeout, last considered batch had %d", (min+max)/2))
} else {
Expand Down
8 changes: 3 additions & 5 deletions pkg/controllers/disruption/orchestration/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,22 @@ limitations under the License.
package orchestration

import (
opmetrics "github.com/awslabs/operatorpkg/metrics"
"github.com/prometheus/client_golang/prometheus"
crmetrics "sigs.k8s.io/controller-runtime/pkg/metrics"

"sigs.k8s.io/karpenter/pkg/metrics"
)

func init() {
crmetrics.Registry.MustRegister(disruptionQueueFailuresTotal)
}

const (
voluntaryDisruptionSubsystem = "voluntary_disruption"
consolidationTypeLabel = "consolidation_type"
decisionLabel = "decision"
)

var (
disruptionQueueFailuresTotal = prometheus.NewCounterVec(
DisruptionQueueFailuresTotal = opmetrics.NewPrometheusCounter(
crmetrics.Registry,
prometheus.CounterOpts{
Namespace: metrics.Namespace,
Subsystem: voluntaryDisruptionSubsystem,
Expand Down
9 changes: 4 additions & 5 deletions pkg/controllers/disruption/orchestration/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"time"

"github.com/awslabs/operatorpkg/singleton"
"github.com/prometheus/client_golang/prometheus"
"github.com/samber/lo"
"go.uber.org/multierr"
apierrors "k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -195,11 +194,11 @@ func (q *Queue) Reconcile(ctx context.Context) (reconcile.Result, error) {
failedLaunches := lo.Filter(cmd.Replacements, func(r Replacement, _ int) bool {
return !r.Initialized
})
disruptionQueueFailuresTotal.With(map[string]string{
DisruptionQueueFailuresTotal.Add(float64(len(failedLaunches)), map[string]string{
decisionLabel: cmd.Decision(),
metrics.ReasonLabel: string(cmd.reason),
consolidationTypeLabel: cmd.consolidationType,
}).Add(float64(len(failedLaunches)))
})
multiErr := multierr.Combine(err, cmd.lastError, state.RequireNoScheduleTaint(ctx, q.kubeClient, false, cmd.candidates...))
// Log the error
log.FromContext(ctx).WithValues("nodes", strings.Join(lo.Map(cmd.candidates, func(s *state.StateNode, _ int) string {
Expand Down Expand Up @@ -265,11 +264,11 @@ func (q *Queue) waitOrTerminate(ctx context.Context, cmd *Command) error {
if err := q.kubeClient.Delete(ctx, candidate.NodeClaim); err != nil {
multiErr = multierr.Append(multiErr, client.IgnoreNotFound(err))
} else {
metrics.NodeClaimsDisruptedTotal.With(prometheus.Labels{
metrics.NodeClaimsDisruptedTotal.Inc(map[string]string{
metrics.ReasonLabel: string(cmd.reason),
metrics.NodePoolLabel: cmd.candidates[i].NodeClaim.Labels[v1.NodePoolLabelKey],
metrics.CapacityTypeLabel: cmd.candidates[i].NodeClaim.Labels[v1.CapacityTypeLabelKey],
}).Inc()
})
}
}
// If there were any deletion failures, we should requeue.
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/disruption/singlenodeconsolidation.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func (s *SingleNodeConsolidation) ComputeCommand(ctx context.Context, disruption
continue
}
if s.clock.Now().After(timeout) {
ConsolidationTimeoutsTotal.WithLabelValues(s.ConsolidationType()).Inc()
ConsolidationTimeoutsTotal.Inc(map[string]string{consolidationTypeLabel: s.ConsolidationType()})
log.FromContext(ctx).V(1).Info(fmt.Sprintf("abandoning single-node consolidation due to timeout after evaluating %d candidates", i))
return Command{}, scheduling.Results{}, nil
}
Expand Down
Loading

0 comments on commit 2f80354

Please sign in to comment.