Skip to content

Commit

Permalink
feat: expose nodeclaim disruption through new disruption condition, i…
Browse files Browse the repository at this point in the history
…mproves pod eviction event message (#1370)

Signed-off-by: Cameron McAvoy <[email protected]>
  • Loading branch information
cnmcavoy authored Jan 2, 2025
1 parent 81481b7 commit cfda355
Show file tree
Hide file tree
Showing 11 changed files with 147 additions and 53 deletions.
1 change: 1 addition & 0 deletions pkg/apis/v1/nodeclaim_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ const (
ConditionTypeDrifted = "Drifted"
ConditionTypeInstanceTerminating = "InstanceTerminating"
ConditionTypeConsistentStateFound = "ConsistentStateFound"
ConditionTypeDisruptionReason = "DisruptionReason"
)

// NodeClaimStatus defines the observed state of NodeClaim
Expand Down
59 changes: 42 additions & 17 deletions pkg/controllers/disruption/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

"github.com/awslabs/operatorpkg/singleton"
"github.com/samber/lo"
"go.uber.org/multierr"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/utils/clock"
Expand All @@ -36,9 +37,6 @@ import (
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

nodepoolutils "sigs.k8s.io/karpenter/pkg/utils/nodepool"
"sigs.k8s.io/karpenter/pkg/utils/pretty"

v1 "sigs.k8s.io/karpenter/pkg/apis/v1"
"sigs.k8s.io/karpenter/pkg/cloudprovider"
"sigs.k8s.io/karpenter/pkg/controllers/disruption/orchestration"
Expand All @@ -49,6 +47,8 @@ import (
"sigs.k8s.io/karpenter/pkg/metrics"
"sigs.k8s.io/karpenter/pkg/operator/injection"
operatorlogging "sigs.k8s.io/karpenter/pkg/operator/logging"
nodepoolutils "sigs.k8s.io/karpenter/pkg/utils/nodepool"
"sigs.k8s.io/karpenter/pkg/utils/pretty"
)

type Controller struct {
Expand Down Expand Up @@ -124,14 +124,21 @@ func (c *Controller) Reconcile(ctx context.Context) (reconcile.Result, error) {
// Karpenter taints nodes with a karpenter.sh/disruption taint as part of the disruption process while it progresses in memory.
// If Karpenter restarts or fails with an error during a disruption action, some nodes can be left tainted.
// Idempotently remove this taint from candidates that are not in the orchestration queue before continuing.
if err := state.RequireNoScheduleTaint(ctx, c.kubeClient, false, lo.Filter(c.cluster.Nodes(), func(s *state.StateNode, _ int) bool {
return !c.queue.HasAny(s.ProviderID())
})...); err != nil {
outdatedNodes := lo.Filter(c.cluster.Nodes(), func(s *state.StateNode, _ int) bool {
return !c.queue.HasAny(s.ProviderID()) && !s.Deleted()
})
if err := state.RequireNoScheduleTaint(ctx, c.kubeClient, false, outdatedNodes...); err != nil {
if errors.IsConflict(err) {
return reconcile.Result{Requeue: true}, nil
}
return reconcile.Result{}, fmt.Errorf("removing taint %s from nodes, %w", pretty.Taint(v1.DisruptedNoScheduleTaint), err)
}
if err := state.ClearNodeClaimsCondition(ctx, c.kubeClient, v1.ConditionTypeDisruptionReason, outdatedNodes...); err != nil {
if errors.IsConflict(err) {
return reconcile.Result{Requeue: true}, nil
}
return reconcile.Result{}, fmt.Errorf("removing %s condition from nodeclaims, %w", v1.ConditionTypeDisruptionReason, err)
}

// Attempt different disruption methods. We'll only let one method perform an action
for _, m := range c.methods {
Expand Down Expand Up @@ -197,12 +204,9 @@ func (c *Controller) executeCommand(ctx context.Context, m Method, cmd Command,
commandID := uuid.NewUUID()
log.FromContext(ctx).WithValues("command-id", commandID, "reason", strings.ToLower(string(m.Reason()))).Info(fmt.Sprintf("disrupting nodeclaim(s) via %s", cmd))

stateNodes := lo.Map(cmd.candidates, func(c *Candidate, _ int) *state.StateNode {
return c.StateNode
})
// Cordon the old nodes before we launch the replacements to prevent new pods from scheduling to the old nodes
if err := state.RequireNoScheduleTaint(ctx, c.kubeClient, true, stateNodes...); err != nil {
return fmt.Errorf("tainting nodes with %s (command-id: %s), %w", pretty.Taint(v1.DisruptedNoScheduleTaint), commandID, err)
if err := c.MarkDisrupted(ctx, m, cmd.candidates...); err != nil {
return fmt.Errorf("marking disrupted (command-id: %s), %w", commandID, err)
}

var nodeClaimNames []string
Expand All @@ -226,12 +230,9 @@ func (c *Controller) executeCommand(ctx context.Context, m Method, cmd Command,
// the node is cleaned up.
schedulingResults.Record(log.IntoContext(ctx, operatorlogging.NopLogger), c.recorder, c.cluster)

providerIDs := lo.Map(cmd.candidates, func(c *Candidate, _ int) string { return c.ProviderID() })
// We have the new NodeClaims created at the API server so mark the old NodeClaims for deletion
c.cluster.MarkForDeletion(providerIDs...)

if err = c.queue.Add(orchestration.NewCommand(nodeClaimNames,
lo.Map(cmd.candidates, func(c *Candidate, _ int) *state.StateNode { return c.StateNode }), commandID, m.Reason(), m.ConsolidationType())); err != nil {
statenodes := lo.Map(cmd.candidates, func(c *Candidate, _ int) *state.StateNode { return c.StateNode })
if err := c.queue.Add(orchestration.NewCommand(nodeClaimNames, statenodes, commandID, m.Reason(), m.ConsolidationType())); err != nil {
providerIDs := lo.Map(cmd.candidates, func(c *Candidate, _ int) string { return c.ProviderID() })
c.cluster.UnmarkForDeletion(providerIDs...)
return fmt.Errorf("adding command to queue (command-id: %s), %w", commandID, err)
}
Expand All @@ -258,6 +259,30 @@ func (c *Controller) createReplacementNodeClaims(ctx context.Context, m Method,
return nodeClaimNames, nil
}

func (c *Controller) MarkDisrupted(ctx context.Context, m Method, candidates ...*Candidate) error {
stateNodes := lo.Map(candidates, func(c *Candidate, _ int) *state.StateNode {
return c.StateNode
})
if err := state.RequireNoScheduleTaint(ctx, c.kubeClient, true, stateNodes...); err != nil {
return fmt.Errorf("tainting nodes with %s: %w", pretty.Taint(v1.DisruptedNoScheduleTaint), err)
}

providerIDs := lo.Map(candidates, func(c *Candidate, _ int) string { return c.ProviderID() })
c.cluster.MarkForDeletion(providerIDs...)

return multierr.Combine(lo.Map(candidates, func(candidate *Candidate, _ int) error {
// refresh nodeclaim before updating status
nodeClaim := &v1.NodeClaim{}

if err := c.kubeClient.Get(ctx, client.ObjectKeyFromObject(candidate.NodeClaim), nodeClaim); err != nil {
return client.IgnoreNotFound(err)
}
stored := nodeClaim.DeepCopy()
nodeClaim.StatusConditions().SetTrueWithReason(v1.ConditionTypeDisruptionReason, v1.ConditionTypeDisruptionReason, string(m.Reason()))
return client.IgnoreNotFound(c.kubeClient.Status().Patch(ctx, nodeClaim, client.MergeFrom(stored)))
})...)
}

func (c *Controller) recordRun(s string) {
c.mu.Lock()
defer c.mu.Unlock()
Expand Down
1 change: 1 addition & 0 deletions pkg/controllers/disruption/orchestration/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ func (q *Queue) Reconcile(ctx context.Context) (reconcile.Result, error) {
consolidationTypeLabel: cmd.consolidationType,
})
multiErr := multierr.Combine(err, cmd.lastError, state.RequireNoScheduleTaint(ctx, q.kubeClient, false, cmd.candidates...))
multiErr = multierr.Combine(multiErr, state.ClearNodeClaimsCondition(ctx, q.kubeClient, v1.ConditionTypeDisruptionReason, cmd.candidates...))
// Log the error
log.FromContext(ctx).WithValues("nodes", strings.Join(lo.Map(cmd.candidates, func(s *state.StateNode, _ int) string {
return s.Name()
Expand Down
19 changes: 19 additions & 0 deletions pkg/controllers/disruption/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,7 @@ var _ = Describe("Disruption Taints", func() {
})
nodePool.Spec.Disruption.ConsolidateAfter = v1.MustParseNillableDuration("Never")
node.Spec.Taints = append(node.Spec.Taints, v1.DisruptedNoScheduleTaint)
nodeClaim.StatusConditions().SetTrue(v1.ConditionTypeDisruptionReason)
ExpectApplied(ctx, env.Client, nodePool, nodeClaim, node, pod)
ExpectManualBinding(ctx, env.Client, pod, node)

Expand All @@ -542,6 +543,12 @@ var _ = Describe("Disruption Taints", func() {
ExpectSingletonReconciled(ctx, disruptionController)
node = ExpectNodeExists(ctx, env.Client, node.Name)
Expect(node.Spec.Taints).ToNot(ContainElement(v1.DisruptedNoScheduleTaint))

nodeClaims := lo.Filter(ExpectNodeClaims(ctx, env.Client), func(nc *v1.NodeClaim, _ int) bool {
return nc.Status.ProviderID == node.Spec.ProviderID
})
Expect(nodeClaims).To(HaveLen(1))
Expect(nodeClaims[0].StatusConditions().Get(v1.ConditionTypeDisruptionReason)).To(BeNil())
})
It("should add and remove taints from NodeClaims that fail to disrupt", func() {
nodePool.Spec.Disruption.ConsolidationPolicy = v1.ConsolidationPolicyWhenEmptyOrUnderutilized
Expand Down Expand Up @@ -579,6 +586,12 @@ var _ = Describe("Disruption Taints", func() {

node = ExpectNodeExists(ctx, env.Client, node.Name)
Expect(node.Spec.Taints).To(ContainElement(v1.DisruptedNoScheduleTaint))
nodeClaims := lo.Filter(ExpectNodeClaims(ctx, env.Client), func(nc *v1.NodeClaim, _ int) bool {
return nc.Status.ProviderID == node.Spec.ProviderID
})
Expect(nodeClaims).To(HaveLen(1))
Expect(nodeClaims[0].StatusConditions().Get(v1.ConditionTypeDisruptionReason)).ToNot(BeNil())
Expect(nodeClaims[0].StatusConditions().Get(v1.ConditionTypeDisruptionReason).IsTrue()).To(BeTrue())

createdNodeClaim := lo.Reject(ExpectNodeClaims(ctx, env.Client), func(nc *v1.NodeClaim, _ int) bool {
return nc.Name == nodeClaim.Name
Expand All @@ -595,6 +608,12 @@ var _ = Describe("Disruption Taints", func() {

node = ExpectNodeExists(ctx, env.Client, node.Name)
Expect(node.Spec.Taints).ToNot(ContainElement(v1.DisruptedNoScheduleTaint))

nodeClaims = lo.Filter(ExpectNodeClaims(ctx, env.Client), func(nc *v1.NodeClaim, _ int) bool {
return nc.Status.ProviderID == node.Spec.ProviderID
})
Expect(nodeClaims).To(HaveLen(1))
Expect(nodeClaims[0].StatusConditions().Get(v1.ConditionTypeDisruptionReason)).To(BeNil())
})
})

Expand Down
14 changes: 7 additions & 7 deletions pkg/controllers/node/termination/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ var _ = Describe("Termination", func() {
Expect(env.Client.Delete(ctx, node)).To(Succeed())
node = ExpectNodeExists(ctx, env.Client, node.Name)
ExpectObjectReconciled(ctx, env.Client, terminationController, node)
Expect(queue.Has(podSkip)).To(BeFalse())
Expect(queue.Has(node, podSkip)).To(BeFalse())
ExpectSingletonReconciled(ctx, queue)

// Expect node to exist and be draining
Expand Down Expand Up @@ -233,7 +233,7 @@ var _ = Describe("Termination", func() {
Expect(env.Client.Delete(ctx, node)).To(Succeed())
node = ExpectNodeExists(ctx, env.Client, node.Name)
ExpectObjectReconciled(ctx, env.Client, terminationController, node)
Expect(queue.Has(podSkip)).To(BeFalse())
Expect(queue.Has(node, podSkip)).To(BeFalse())
ExpectSingletonReconciled(ctx, queue)

// Expect node to exist and be draining
Expand All @@ -243,7 +243,7 @@ var _ = Describe("Termination", func() {
EventuallyExpectTerminating(ctx, env.Client, podEvict)
ExpectDeleted(ctx, env.Client, podEvict)

Expect(queue.Has(podSkip)).To(BeFalse())
Expect(queue.Has(node, podSkip)).To(BeFalse())

// Reconcile to delete node
node = ExpectNodeExists(ctx, env.Client, node.Name)
Expand Down Expand Up @@ -357,13 +357,13 @@ var _ = Describe("Termination", func() {
ExpectNodeWithNodeClaimDraining(env.Client, node.Name)

// Expect podNoEvict to be added to the queue
Expect(queue.Has(podNoEvict)).To(BeTrue())
Expect(queue.Has(node, podNoEvict)).To(BeTrue())

// Attempt to evict the pod, but fail to do so
ExpectSingletonReconciled(ctx, queue)

// Expect podNoEvict to fail eviction due to PDB, and be retried
Expect(queue.Has(podNoEvict)).To(BeTrue())
Expect(queue.Has(node, podNoEvict)).To(BeTrue())

// Delete pod to simulate successful eviction
ExpectDeleted(ctx, env.Client, podNoEvict)
Expand Down Expand Up @@ -507,7 +507,7 @@ var _ = Describe("Termination", func() {
ExpectSingletonReconciled(ctx, queue)

// Expect mirror pod to not be queued for eviction
Expect(queue.Has(podNoEvict)).To(BeFalse())
Expect(queue.Has(node, podNoEvict)).To(BeFalse())

// Expect podEvict to be enqueued for eviction then be successful
EventuallyExpectTerminating(ctx, env.Client, podEvict)
Expand Down Expand Up @@ -681,7 +681,7 @@ var _ = Describe("Termination", func() {
ExpectObjectReconciled(ctx, env.Client, terminationController, node)

// Expect that the old pod's key still exists in the queue
Expect(queue.Has(pod)).To(BeTrue())
Expect(queue.Has(node, pod)).To(BeTrue())

// Re-create the pod and node, it should now have the same name, but a different UUID
node = test.Node(test.NodeOptions{
Expand Down
4 changes: 2 additions & 2 deletions pkg/controllers/node/termination/terminator/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@ import (
"sigs.k8s.io/karpenter/pkg/events"
)

func EvictPod(pod *corev1.Pod) events.Event {
func EvictPod(pod *corev1.Pod, message string) events.Event {
return events.Event{
InvolvedObject: pod,
Type: corev1.EventTypeNormal,
Reason: "Evicted",
Message: "Evicted pod",
Message: "Evicted pod: " + message,
DedupeValues: []string{pod.Name},
}
}
Expand Down
35 changes: 28 additions & 7 deletions pkg/controllers/node/termination/terminator/eviction.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,11 @@ import (
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

v1 "sigs.k8s.io/karpenter/pkg/apis/v1"
terminatorevents "sigs.k8s.io/karpenter/pkg/controllers/node/termination/terminator/events"
"sigs.k8s.io/karpenter/pkg/events"
"sigs.k8s.io/karpenter/pkg/operator/injection"
"sigs.k8s.io/karpenter/pkg/utils/node"
)

const (
Expand All @@ -68,13 +70,15 @@ func IsNodeDrainError(err error) bool {

type QueueKey struct {
types.NamespacedName
UID types.UID
UID types.UID
providerID string
}

func NewQueueKey(pod *corev1.Pod) QueueKey {
func NewQueueKey(pod *corev1.Pod, providerID string) QueueKey {
return QueueKey{
NamespacedName: client.ObjectKeyFromObject(pod),
UID: pod.UID,
providerID: providerID,
}
}

Expand Down Expand Up @@ -118,24 +122,24 @@ func (q *Queue) Register(_ context.Context, m manager.Manager) error {
}

// Add adds pods to the Queue
func (q *Queue) Add(pods ...*corev1.Pod) {
func (q *Queue) Add(node *corev1.Node, pods ...*corev1.Pod) {
q.mu.Lock()
defer q.mu.Unlock()

for _, pod := range pods {
qk := NewQueueKey(pod)
qk := NewQueueKey(pod, node.Spec.ProviderID)
if !q.set.Has(qk) {
q.set.Insert(qk)
q.TypedRateLimitingInterface.Add(qk)
}
}
}

func (q *Queue) Has(pod *corev1.Pod) bool {
func (q *Queue) Has(node *corev1.Node, pod *corev1.Pod) bool {
q.mu.Lock()
defer q.mu.Unlock()

return q.set.Has(NewQueueKey(pod))
return q.set.Has(NewQueueKey(pod, node.Spec.ProviderID))
}

func (q *Queue) Reconcile(ctx context.Context) (reconcile.Result, error) {
Expand Down Expand Up @@ -171,6 +175,11 @@ func (q *Queue) Reconcile(ctx context.Context) (reconcile.Result, error) {
// Evict returns true if successful eviction call, and false if there was an eviction-related error
func (q *Queue) Evict(ctx context.Context, key QueueKey) bool {
ctx = log.IntoContext(ctx, log.FromContext(ctx).WithValues("Pod", klog.KRef(key.Namespace, key.Name)))
evictionMessage, err := evictionReason(ctx, key, q.kubeClient)
if err != nil {
// XXX(cmcavoy): this should be unreachable, but we log it if it happens
log.FromContext(ctx).V(1).Error(err, "failed looking up pod eviction reason")
}
if err := q.kubeClient.SubResource("eviction").Create(ctx,
&corev1.Pod{ObjectMeta: metav1.ObjectMeta{Namespace: key.Namespace, Name: key.Name}},
&policyv1.Eviction{
Expand Down Expand Up @@ -205,6 +214,18 @@ func (q *Queue) Evict(ctx context.Context, key QueueKey) bool {
return false
}
NodesEvictionRequestsTotal.Inc(map[string]string{CodeLabel: "200"})
q.recorder.Publish(terminatorevents.EvictPod(&corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: key.Name, Namespace: key.Namespace}}))
q.recorder.Publish(terminatorevents.EvictPod(&corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: key.Name, Namespace: key.Namespace}}, evictionMessage))
return true
}

func evictionReason(ctx context.Context, key QueueKey, kubeClient client.Client) (string, error) {
nodeClaim, err := node.NodeClaimForNode(ctx, kubeClient, &corev1.Node{Spec: corev1.NodeSpec{ProviderID: key.providerID}})
if err != nil {
return "", err
}
terminationCondition := nodeClaim.StatusConditions().Get(v1.ConditionTypeDisruptionReason)
if terminationCondition.IsTrue() {
return terminationCondition.Message, nil
}
return "Forceful Termination", nil
}
Loading

0 comments on commit cfda355

Please sign in to comment.