Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP/POC] Degraded NodePool Status Condition #1880

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions kwok/charts/crds/karpenter.sh_nodepools.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,9 @@ spec:
- type
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we propose this change as an RFC? There seems to be a bunch of detail around what this status condition means, how we are going to track failures, what it means with respect to scheduling, etc. and I think it would be good to let people see this and get cloudprovder input as well

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm writing the RFC today and I should be able to post tomorrow.

type: object
type: array
failedlaunches:
description: FailedLaunches tracks the number of times a nodepool failed before being marked degraded
type: integer
resources:
additionalProperties:
anyOf:
Expand Down
3 changes: 3 additions & 0 deletions pkg/apis/crds/karpenter.sh_nodepools.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -496,6 +496,9 @@ spec:
- type
type: object
type: array
failedlaunches:
description: FailedLaunches tracks the number of times a nodepool failed before being marked degraded
type: integer
resources:
additionalProperties:
anyOf:
Expand Down
2 changes: 2 additions & 0 deletions pkg/apis/v1/nodepool_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ const (
ConditionTypeValidationSucceeded = "ValidationSucceeded"
// ConditionTypeNodeClassReady = "NodeClassReady" condition indicates that underlying nodeClass was resolved and is reporting as Ready
ConditionTypeNodeClassReady = "NodeClassReady"
// TODO
ConditionTypeStable = "Stable"
)

// NodePoolStatus defines the observed state of NodePool
Expand Down
4 changes: 3 additions & 1 deletion pkg/controllers/controllers.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import (
nodeclaimlifecycle "sigs.k8s.io/karpenter/pkg/controllers/nodeclaim/lifecycle"
"sigs.k8s.io/karpenter/pkg/controllers/nodeclaim/podevents"
nodepoolcounter "sigs.k8s.io/karpenter/pkg/controllers/nodepool/counter"
nodepooldegraded "sigs.k8s.io/karpenter/pkg/controllers/nodepool/degraded"
nodepoolhash "sigs.k8s.io/karpenter/pkg/controllers/nodepool/hash"
nodepoolreadiness "sigs.k8s.io/karpenter/pkg/controllers/nodepool/readiness"
nodepoolvalidation "sigs.k8s.io/karpenter/pkg/controllers/nodepool/validation"
Expand Down Expand Up @@ -90,9 +91,10 @@ func NewControllers(
nodepoolreadiness.NewController(kubeClient, cloudProvider),
nodepoolcounter.NewController(kubeClient, cloudProvider, cluster),
nodepoolvalidation.NewController(kubeClient, cloudProvider),
nodepooldegraded.NewController(kubeClient, cloudProvider, cluster),
podevents.NewController(clock, kubeClient, cloudProvider),
nodeclaimconsistency.NewController(clock, kubeClient, cloudProvider, recorder),
nodeclaimlifecycle.NewController(clock, kubeClient, cloudProvider, recorder),
nodeclaimlifecycle.NewController(clock, kubeClient, cloudProvider, recorder, cluster),
nodeclaimgarbagecollection.NewController(clock, kubeClient, cloudProvider),
nodeclaimdisruption.NewController(clock, kubeClient, cloudProvider),
nodeclaimhydration.NewController(kubeClient, cloudProvider),
Expand Down
4 changes: 4 additions & 0 deletions pkg/controllers/metrics/pod/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ const (
ownerSelfLink = "owner"
podHostName = "node"
podNodePool = "nodepool"
podNodePoolDegraded = "nodepool_degraded"
podHostZone = "zone"
podHostArchitecture = "arch"
podHostCapacityType = "capacity_type"
Expand Down Expand Up @@ -106,7 +107,9 @@ var (
Name: "unbound_time_seconds",
Help: "The time from pod creation until the pod is bound.",
},
// []string{podName, podNamespace, podNodePoolDegraded},
[]string{podName, podNamespace},
// podNodePoolDegraded == false
)
// Stage: alpha
PodProvisioningBoundDurationSeconds = opmetrics.NewPrometheusHistogram(
Expand All @@ -129,6 +132,7 @@ var (
Name: "provisioning_unbound_time_seconds",
Help: "The time from when Karpenter first thinks the pod can schedule until it binds. Note: this calculated from a point in memory, not by the pod creation timestamp.",
},
// []string{podName, podNamespace, podNodePoolDegraded},
[]string{podName, podNamespace},
)
// Stage: alpha
Expand Down
5 changes: 4 additions & 1 deletion pkg/controllers/nodeclaim/garbagecollection/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"sigs.k8s.io/karpenter/pkg/cloudprovider/fake"
nodeclaimgarbagecollection "sigs.k8s.io/karpenter/pkg/controllers/nodeclaim/garbagecollection"
nodeclaimlifcycle "sigs.k8s.io/karpenter/pkg/controllers/nodeclaim/lifecycle"
"sigs.k8s.io/karpenter/pkg/controllers/state"
"sigs.k8s.io/karpenter/pkg/events"
"sigs.k8s.io/karpenter/pkg/operator/options"
"sigs.k8s.io/karpenter/pkg/test"
Expand All @@ -48,6 +49,7 @@ var ctx context.Context
var nodeClaimController *nodeclaimlifcycle.Controller
var garbageCollectionController *nodeclaimgarbagecollection.Controller
var env *test.Environment
var cluster *state.Cluster
var fakeClock *clock.FakeClock
var cloudProvider *fake.CloudProvider

Expand All @@ -62,8 +64,9 @@ var _ = BeforeSuite(func() {
env = test.NewEnvironment(test.WithCRDs(apis.CRDs...), test.WithCRDs(v1alpha1.CRDs...), test.WithFieldIndexers(test.NodeProviderIDFieldIndexer(ctx)))
ctx = options.ToContext(ctx, test.Options())
cloudProvider = fake.NewCloudProvider()
cluster = state.NewCluster(fakeClock, env.Client, cloudProvider)
garbageCollectionController = nodeclaimgarbagecollection.NewController(fakeClock, env.Client, cloudProvider)
nodeClaimController = nodeclaimlifcycle.NewController(fakeClock, env.Client, cloudProvider, events.NewRecorder(&record.FakeRecorder{}))
nodeClaimController = nodeclaimlifcycle.NewController(fakeClock, env.Client, cloudProvider, events.NewRecorder(&record.FakeRecorder{}), cluster)
})

var _ = AfterSuite(func() {
Expand Down
15 changes: 8 additions & 7 deletions pkg/controllers/nodeclaim/lifecycle/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/reconcile"

terminatorevents "sigs.k8s.io/karpenter/pkg/controllers/node/termination/terminator/events"
"sigs.k8s.io/karpenter/pkg/controllers/state"

v1 "sigs.k8s.io/karpenter/pkg/apis/v1"
"sigs.k8s.io/karpenter/pkg/cloudprovider"
Expand All @@ -64,34 +65,34 @@ type Controller struct {
kubeClient client.Client
cloudProvider cloudprovider.CloudProvider
recorder events.Recorder
cluster *state.Cluster

launch *Launch
registration *Registration
initialization *Initialization
liveness *Liveness
}

func NewController(clk clock.Clock, kubeClient client.Client, cloudProvider cloudprovider.CloudProvider, recorder events.Recorder) *Controller {
func NewController(clk clock.Clock, kubeClient client.Client, cloudProvider cloudprovider.CloudProvider, recorder events.Recorder, cluster *state.Cluster) *Controller {
return &Controller{
kubeClient: kubeClient,
cloudProvider: cloudProvider,
recorder: recorder,
cluster: cluster,

launch: &Launch{kubeClient: kubeClient, cloudProvider: cloudProvider, cache: cache.New(time.Minute, time.Second*10), recorder: recorder},
registration: &Registration{kubeClient: kubeClient},
initialization: &Initialization{kubeClient: kubeClient},
liveness: &Liveness{clock: clk, kubeClient: kubeClient},
initialization: &Initialization{kubeClient: kubeClient, cluster: cluster},
liveness: &Liveness{clock: clk, kubeClient: kubeClient, cloudProvider: cloudProvider, cluster: cluster},
}
}

func (c *Controller) Register(_ context.Context, m manager.Manager) error {
return controllerruntime.NewControllerManagedBy(m).
Named(c.Name()).
For(&v1.NodeClaim{}, builder.WithPredicates(nodeclaimutils.IsManagedPredicateFuncs(c.cloudProvider))).
Watches(
&corev1.Node{},
nodeclaimutils.NodeEventHandler(c.kubeClient, c.cloudProvider),
).
Watches(&corev1.Node{}, nodeclaimutils.NodeEventHandler(c.kubeClient, c.cloudProvider)).
// Watches(&v1.NodePool{}, nodeclaimutils.NodePoolEventHandler(c.kubeClient, c.cloudProvider)).
WithOptions(controller.Options{
RateLimiter: workqueue.NewTypedMaxOfRateLimiter[reconcile.Request](
// back off until last attempt occurs ~90 seconds before nodeclaim expiration
Expand Down
12 changes: 12 additions & 0 deletions pkg/controllers/nodeclaim/lifecycle/initialization.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,14 @@
"github.com/samber/lo"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

v1 "sigs.k8s.io/karpenter/pkg/apis/v1"
"sigs.k8s.io/karpenter/pkg/controllers/state"
"sigs.k8s.io/karpenter/pkg/scheduling"
nodeutils "sigs.k8s.io/karpenter/pkg/utils/node"
nodeclaimutils "sigs.k8s.io/karpenter/pkg/utils/nodeclaim"
Expand All @@ -37,6 +39,7 @@

type Initialization struct {
kubeClient client.Client
cluster *state.Cluster
}

// Reconcile checks for initialization based on if:
Expand All @@ -44,7 +47,7 @@
// b) all the startup taints have been removed from the node
// c) all extended resources have been registered
// This method handles both nil nodepools and nodes without extended resources gracefully.
func (i *Initialization) Reconcile(ctx context.Context, nodeClaim *v1.NodeClaim) (reconcile.Result, error) {

Check failure on line 50 in pkg/controllers/nodeclaim/lifecycle/initialization.go

View workflow job for this annotation

GitHub Actions / presubmit (1.25.x)

cyclomatic complexity 12 of func `(*Initialization).Reconcile` is high (> 11) (gocyclo)

Check failure on line 50 in pkg/controllers/nodeclaim/lifecycle/initialization.go

View workflow job for this annotation

GitHub Actions / presubmit (1.26.x)

cyclomatic complexity 12 of func `(*Initialization).Reconcile` is high (> 11) (gocyclo)

Check failure on line 50 in pkg/controllers/nodeclaim/lifecycle/initialization.go

View workflow job for this annotation

GitHub Actions / presubmit (1.27.x)

cyclomatic complexity 12 of func `(*Initialization).Reconcile` is high (> 11) (gocyclo)

Check failure on line 50 in pkg/controllers/nodeclaim/lifecycle/initialization.go

View workflow job for this annotation

GitHub Actions / presubmit (1.28.x)

cyclomatic complexity 12 of func `(*Initialization).Reconcile` is high (> 11) (gocyclo)

Check failure on line 50 in pkg/controllers/nodeclaim/lifecycle/initialization.go

View workflow job for this annotation

GitHub Actions / presubmit (1.29.x)

cyclomatic complexity 12 of func `(*Initialization).Reconcile` is high (> 11) (gocyclo)

Check failure on line 50 in pkg/controllers/nodeclaim/lifecycle/initialization.go

View workflow job for this annotation

GitHub Actions / presubmit (1.30.x)

cyclomatic complexity 12 of func `(*Initialization).Reconcile` is high (> 11) (gocyclo)

Check failure on line 50 in pkg/controllers/nodeclaim/lifecycle/initialization.go

View workflow job for this annotation

GitHub Actions / presubmit (1.31.x)

cyclomatic complexity 12 of func `(*Initialization).Reconcile` is high (> 11) (gocyclo)
if cond := nodeClaim.StatusConditions().Get(v1.ConditionTypeInitialized); !cond.IsUnknown() {
// Ensure that we always set the status condition to the latest generation
nodeClaim.StatusConditions().Set(*cond)
Expand Down Expand Up @@ -83,6 +86,15 @@
return reconcile.Result{}, err
}
}
nodePoolName, ok := nodeClaim.Labels[v1.NodePoolLabelKey]
if !ok {
return reconcile.Result{}, nil
}
nodePool := &v1.NodePool{}
if err := i.kubeClient.Get(ctx, types.NamespacedName{Name: nodePoolName}, nodePool); err != nil {
return reconcile.Result{}, client.IgnoreNotFound(err)
}
i.cluster.NodePoolLaunchesFor(string(nodePool.UID)).Push(1)
log.FromContext(ctx).WithValues("allocatable", node.Status.Allocatable).Info("initialized nodeclaim")
nodeClaim.StatusConditions().SetTrue(v1.ConditionTypeInitialized)
return reconcile.Result{}, nil
Expand Down
28 changes: 21 additions & 7 deletions pkg/controllers/nodeclaim/lifecycle/liveness.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,32 +20,46 @@ import (
"context"
"time"

"k8s.io/apimachinery/pkg/types"
"k8s.io/utils/clock"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

v1 "sigs.k8s.io/karpenter/pkg/apis/v1"
"sigs.k8s.io/karpenter/pkg/cloudprovider"
"sigs.k8s.io/karpenter/pkg/controllers/state"
"sigs.k8s.io/karpenter/pkg/metrics"
)

type Liveness struct {
clock clock.Clock
kubeClient client.Client
clock clock.Clock
kubeClient client.Client
cloudProvider cloudprovider.CloudProvider
cluster *state.Cluster
}

// registrationTTL is a heuristic time that we expect the node to register within
// If we don't see the node within this time, then we should delete the NodeClaim and try again
const registrationTTL = time.Minute * 15
const registrationTTL = 15 * time.Minute

// nolint:gocyclo
func (l *Liveness) Reconcile(ctx context.Context, nodeClaim *v1.NodeClaim) (reconcile.Result, error) {
registered := nodeClaim.StatusConditions().Get(v1.ConditionTypeRegistered)
if registered.IsTrue() {
return reconcile.Result{}, nil
}
if registered == nil {
return reconcile.Result{Requeue: true}, nil
}
nodePoolName, ok := nodeClaim.Labels[v1.NodePoolLabelKey]
if !ok {
return reconcile.Result{}, nil
}
nodePool := &v1.NodePool{}
if err := l.kubeClient.Get(ctx, types.NamespacedName{Name: nodePoolName}, nodePool); err != nil {
return reconcile.Result{}, client.IgnoreNotFound(err)
}
if registered.IsTrue() {
return reconcile.Result{}, nil
}
// If the Registered statusCondition hasn't gone True during the TTL since we first updated it, we should terminate the NodeClaim
// NOTE: ttl has to be stored and checked in the same place since l.clock can advance after the check causing a race
if ttl := registrationTTL - l.clock.Since(registered.LastTransitionTime.Time); ttl > 0 {
Expand All @@ -55,12 +69,12 @@ func (l *Liveness) Reconcile(ctx context.Context, nodeClaim *v1.NodeClaim) (reco
if err := l.kubeClient.Delete(ctx, nodeClaim); err != nil {
return reconcile.Result{}, client.IgnoreNotFound(err)
}
l.cluster.NodePoolLaunchesFor(string(nodePool.UID)).Push(-1)
log.FromContext(ctx).V(1).WithValues("ttl", registrationTTL).Info("terminating due to registration ttl")
metrics.NodeClaimsDisruptedTotal.Inc(map[string]string{
metrics.ReasonLabel: "liveness",
metrics.NodePoolLabel: nodeClaim.Labels[v1.NodePoolLabelKey],
metrics.CapacityTypeLabel: nodeClaim.Labels[v1.CapacityTypeLabelKey],
})

return reconcile.Result{}, nil
}
5 changes: 4 additions & 1 deletion pkg/controllers/nodeclaim/lifecycle/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
v1 "sigs.k8s.io/karpenter/pkg/apis/v1"
"sigs.k8s.io/karpenter/pkg/cloudprovider/fake"
nodeclaimlifecycle "sigs.k8s.io/karpenter/pkg/controllers/nodeclaim/lifecycle"
"sigs.k8s.io/karpenter/pkg/controllers/state"
"sigs.k8s.io/karpenter/pkg/events"
"sigs.k8s.io/karpenter/pkg/operator/options"
"sigs.k8s.io/karpenter/pkg/test"
Expand All @@ -47,6 +48,7 @@ import (
var ctx context.Context
var nodeClaimController *nodeclaimlifecycle.Controller
var env *test.Environment
var cluster *state.Cluster
var fakeClock *clock.FakeClock
var cloudProvider *fake.CloudProvider

Expand Down Expand Up @@ -74,7 +76,8 @@ var _ = BeforeSuite(func() {
ctx = options.ToContext(ctx, test.Options())

cloudProvider = fake.NewCloudProvider()
nodeClaimController = nodeclaimlifecycle.NewController(fakeClock, env.Client, cloudProvider, events.NewRecorder(&record.FakeRecorder{}))
cluster = state.NewCluster(fakeClock, env.Client, cloudProvider)
nodeClaimController = nodeclaimlifecycle.NewController(fakeClock, env.Client, cloudProvider, events.NewRecorder(&record.FakeRecorder{}), cluster)
})

var _ = AfterSuite(func() {
Expand Down
86 changes: 86 additions & 0 deletions pkg/controllers/nodepool/degraded/controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
Copyright The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package readiness

import (
"context"
"time"

"k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/errors"
controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

v1 "sigs.k8s.io/karpenter/pkg/apis/v1"
"sigs.k8s.io/karpenter/pkg/cloudprovider"
"sigs.k8s.io/karpenter/pkg/controllers/state"
"sigs.k8s.io/karpenter/pkg/operator/injection"
nodepoolutils "sigs.k8s.io/karpenter/pkg/utils/nodepool"
)

type Controller struct {
kubeClient client.Client
cloudProvider cloudprovider.CloudProvider
cluster *state.Cluster
}

func NewController(kubeClient client.Client, cloudProvider cloudprovider.CloudProvider, cluster *state.Cluster) *Controller {
return &Controller{
kubeClient: kubeClient,
cloudProvider: cloudProvider,
cluster: cluster,
}
}

func (c *Controller) Reconcile(ctx context.Context, nodePool *v1.NodePool) (reconcile.Result, error) {
ctx = injection.WithControllerName(ctx, "nodepool.degraded")
stored := nodePool.DeepCopy()
nodePool.StatusConditions().SetUnknown(v1.ConditionTypeStable)
score, scored := c.cluster.NodePoolLaunchesFor(string(nodePool.UID)).Evaluate()
if !scored {
// no-op for an evaluation that doesn't exist
return reconcile.Result{RequeueAfter: 1 * time.Minute}, nil
}
if score < 0 {
nodePool.StatusConditions().SetFalse(v1.ConditionTypeStable, "unhealthy", "")
} else if score > 0 {
nodePool.StatusConditions().SetTrue(v1.ConditionTypeStable)
}
if !equality.Semantic.DeepEqual(stored, nodePool) {
if err := c.kubeClient.Status().Patch(ctx, nodePool, client.MergeFromWithOptions(stored, client.MergeFromWithOptimisticLock{})); err != nil {
if errors.IsConflict(err) {
return reconcile.Result{Requeue: true}, nil
}
return reconcile.Result{}, client.IgnoreNotFound(err)
}
}
log.FromContext(ctx).WithValues("score: ", score).Info("end of rec loop")
return reconcile.Result{RequeueAfter: 1 * time.Minute}, nil
}

func (c *Controller) Register(_ context.Context, m manager.Manager) error {
return controllerruntime.NewControllerManagedBy(m).
Named("nodepool.degraded").
For(&v1.NodePool{}, builder.WithPredicates(nodepoolutils.IsManagedPredicateFuncs(c.cloudProvider))).
WithOptions(controller.Options{MaxConcurrentReconciles: 10}).
Complete(reconcile.AsReconciler(m.GetClient(), c))
}
Loading
Loading