Skip to content

Commit

Permalink
fix: Wait for pods to be fully terminated in groups (#1478)
Browse files Browse the repository at this point in the history
  • Loading branch information
jonathan-innis authored Aug 28, 2024
1 parent 375bced commit 41df062
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 88 deletions.
69 changes: 26 additions & 43 deletions pkg/controllers/node/termination/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,11 +361,11 @@ var _ = Describe("Termination", func() {
ExpectObjectReconciled(ctx, env.Client, terminationController, node)
ExpectNotFound(ctx, env.Client, node)
})
It("should evict pods in order", func() {
It("should evict pods in order and wait until pods are fully deleted", func() {
daemonEvict := test.DaemonSet()
daemonNodeCritical := test.DaemonSet(test.DaemonSetOptions{PodOptions: test.PodOptions{PriorityClassName: "system-node-critical"}})
daemonClusterCritical := test.DaemonSet(test.DaemonSetOptions{PodOptions: test.PodOptions{PriorityClassName: "system-cluster-critical"}})
ExpectApplied(ctx, env.Client, node, nodeClaim, daemonEvict, daemonNodeCritical, daemonClusterCritical)
ExpectApplied(ctx, env.Client, daemonEvict, daemonNodeCritical, daemonClusterCritical)

podEvict := test.Pod(test.PodOptions{NodeName: node.Name, ObjectMeta: metav1.ObjectMeta{OwnerReferences: defaultOwnerRefs}})
podDaemonEvict := test.Pod(test.PodOptions{NodeName: node.Name, ObjectMeta: metav1.ObjectMeta{OwnerReferences: []metav1.OwnerReference{{
Expand Down Expand Up @@ -399,48 +399,31 @@ var _ = Describe("Termination", func() {

// Trigger Termination Controller
Expect(env.Client.Delete(ctx, node)).To(Succeed())
node = ExpectNodeExists(ctx, env.Client, node.Name)
ExpectPodExists(ctx, env.Client, podEvict.Name, podEvict.Namespace)
ExpectObjectReconciled(ctx, env.Client, terminationController, node)
ExpectSingletonReconciled(ctx, queue)
// Expect node to exist and be draining
ExpectNodeWithNodeClaimDraining(env.Client, node.Name)

// Expect podEvict to be evicting, and delete it
EventuallyExpectTerminating(ctx, env.Client, podEvict)
ExpectDeleted(ctx, env.Client, podEvict)

// Expect the noncritical Daemon pod to be evicted
node = ExpectNodeExists(ctx, env.Client, node.Name)
ExpectPodExists(ctx, env.Client, podDaemonEvict.Name, podDaemonEvict.Namespace)
ExpectObjectReconciled(ctx, env.Client, terminationController, node)
ExpectSingletonReconciled(ctx, queue)
EventuallyExpectTerminating(ctx, env.Client, podDaemonEvict)
ExpectDeleted(ctx, env.Client, podDaemonEvict)

// Expect the critical pods to be evicted and deleted
node = ExpectNodeExists(ctx, env.Client, node.Name)
ExpectPodExists(ctx, env.Client, podNodeCritical.Name, podNodeCritical.Namespace)
ExpectPodExists(ctx, env.Client, podClusterCritical.Name, podClusterCritical.Namespace)
ExpectObjectReconciled(ctx, env.Client, terminationController, node)
ExpectSingletonReconciled(ctx, queue)
ExpectSingletonReconciled(ctx, queue)

EventuallyExpectTerminating(ctx, env.Client, podNodeCritical, podClusterCritical)
ExpectDeleted(ctx, env.Client, podNodeCritical)
ExpectDeleted(ctx, env.Client, podClusterCritical)

// Expect the critical daemon pods to be evicted and deleted
node = ExpectNodeExists(ctx, env.Client, node.Name)
ExpectPodExists(ctx, env.Client, podDaemonNodeCritical.Name, podDaemonNodeCritical.Namespace)
ExpectPodExists(ctx, env.Client, podDaemonClusterCritical.Name, podDaemonClusterCritical.Namespace)
ExpectObjectReconciled(ctx, env.Client, terminationController, node)
ExpectSingletonReconciled(ctx, queue)
ExpectSingletonReconciled(ctx, queue)

EventuallyExpectTerminating(ctx, env.Client, podDaemonNodeCritical, podDaemonClusterCritical)
ExpectDeleted(ctx, env.Client, podDaemonNodeCritical)
ExpectDeleted(ctx, env.Client, podDaemonClusterCritical)
podGroups := [][]*corev1.Pod{{podEvict}, {podDaemonEvict}, {podNodeCritical, podClusterCritical}, {podDaemonNodeCritical, podDaemonClusterCritical}}
for i, podGroup := range podGroups {
node = ExpectNodeExists(ctx, env.Client, node.Name)
for _, p := range podGroup {
ExpectPodExists(ctx, env.Client, p.Name, p.Namespace)
}
ExpectObjectReconciled(ctx, env.Client, terminationController, node)
ExpectNodeWithNodeClaimDraining(env.Client, node.Name)
for range podGroup {
ExpectSingletonReconciled(ctx, queue)
}
// Start draining the pod group, but don't complete it yet
EventuallyExpectTerminating(ctx, env.Client, lo.Map(podGroup, func(p *corev1.Pod, _ int) client.Object { return p })...)

// Look at the next pod group and ensure that none of the pods have started terminating on it
if i != len(podGroups)-1 {
for range podGroups[i+1] {
ExpectSingletonReconciled(ctx, queue)
}
ConsistentlyExpectNotTerminating(ctx, env.Client, lo.Map(podGroups[i+1], func(p *corev1.Pod, _ int) client.Object { return p })...)
}
// Expect that the pods are deleted -- which should unblock the next pod group
ExpectDeleted(ctx, env.Client, lo.Map(podGroup, func(p *corev1.Pod, _ int) client.Object { return p })...)
}

// Reconcile to delete node
node = ExpectNodeExists(ctx, env.Client, node.Name)
Expand Down
43 changes: 11 additions & 32 deletions pkg/controllers/node/termination/terminator/terminator.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,30 +96,27 @@ func (t *Terminator) Drain(ctx context.Context, node *corev1.Node, nodeGracePeri
if err != nil {
return fmt.Errorf("listing pods on node, %w", err)
}

podsToDelete := lo.Filter(pods, func(p *corev1.Pod, _ int) bool {
return podutil.IsWaitingEviction(p, t.clock) && !podutil.IsTerminating(p)
})
if err := t.DeleteExpiringPods(ctx, podsToDelete, nodeGracePeriodExpirationTime); err != nil {
return fmt.Errorf("deleting expiring pods, %w", err)
}

// evictablePods are pods that aren't yet terminating are eligible to have the eviction API called against them
evictablePods := lo.Filter(pods, func(p *corev1.Pod, _ int) bool { return podutil.IsEvictable(p) })
t.Evict(evictablePods)

// podsWaitingEvictionCount is the number of pods that either haven't had eviction called against them yet
// or are still actively terminating and haven't exceeded their termination grace period yet
podsWaitingEvictionCount := lo.CountBy(pods, func(p *corev1.Pod) bool { return podutil.IsWaitingEviction(p, t.clock) })
if podsWaitingEvictionCount > 0 {
return NewNodeDrainError(fmt.Errorf("%d pods are waiting to be evicted", len(pods)))
// Monitor pods in pod groups that either haven't been evicted or are actively evicting
podGroups := t.groupPodsByPriority(lo.Filter(pods, func(p *corev1.Pod, _ int) bool { return podutil.IsWaitingEviction(p, t.clock) }))
for _, group := range podGroups {
if len(group) > 0 {
// Only add pods to the eviction queue that haven't been evicted yet
t.evictionQueue.Add(lo.Filter(group, func(p *corev1.Pod, _ int) bool { return podutil.IsEvictable(p) })...)
return NewNodeDrainError(fmt.Errorf("%d pods are waiting to be evicted", lo.SumBy(podGroups, func(pods []*corev1.Pod) int { return len(pods) })))
}
}
return nil
}

func (t *Terminator) Evict(pods []*corev1.Pod) {
func (t *Terminator) groupPodsByPriority(pods []*corev1.Pod) [][]*corev1.Pod {
// 1. Prioritize noncritical pods, non-daemon pods https://kubernetes.io/docs/concepts/architecture/nodes/#graceful-node-shutdown
var criticalNonDaemon, criticalDaemon, nonCriticalNonDaemon, nonCriticalDaemon []*corev1.Pod
var nonCriticalNonDaemon, nonCriticalDaemon, criticalNonDaemon, criticalDaemon []*corev1.Pod
for _, pod := range pods {
if pod.Spec.PriorityClassName == "system-cluster-critical" || pod.Spec.PriorityClassName == "system-node-critical" {
if podutil.IsOwnedByDaemonSet(pod) {
Expand All @@ -135,25 +132,7 @@ func (t *Terminator) Evict(pods []*corev1.Pod) {
}
}
}

// EvictInOrder evicts only the first list of pods which is not empty
// future Evict calls will catch later lists of pods that were not initially evicted
t.EvictInOrder(
nonCriticalNonDaemon,
nonCriticalDaemon,
criticalNonDaemon,
criticalDaemon,
)
}

func (t *Terminator) EvictInOrder(pods ...[]*corev1.Pod) {
for _, podList := range pods {
if len(podList) > 0 {
// evict the first list of pods that is not empty, ignore the rest
t.evictionQueue.Add(podList...)
return
}
}
return [][]*corev1.Pod{nonCriticalNonDaemon, nonCriticalDaemon, criticalNonDaemon, criticalDaemon}
}

func (t *Terminator) DeleteExpiringPods(ctx context.Context, pods []*corev1.Pod, nodeGracePeriodTerminationTime *time.Time) error {
Expand Down
22 changes: 10 additions & 12 deletions pkg/test/daemonsets.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,22 +42,20 @@ func DaemonSet(overrides ...DaemonSetOptions) *appsv1.DaemonSet {
panic(fmt.Sprintf("Failed to merge daemonset options: %s", err))
}
}
if options.Name == "" {
options.Name = RandomName()
}
if options.Namespace == "" {
options.Namespace = "default"
}
if options.Selector == nil {
options.Selector = map[string]string{"app": options.Name}
objectMeta := NamespacedObjectMeta(options.ObjectMeta)
if options.PodOptions.Labels == nil {
options.PodOptions.Labels = map[string]string{
"app": objectMeta.Name,
}
}
pod := Pod(options.PodOptions)
return &appsv1.DaemonSet{
ObjectMeta: metav1.ObjectMeta{Name: options.Name, Namespace: options.Namespace},
ObjectMeta: objectMeta,
Spec: appsv1.DaemonSetSpec{
Selector: &metav1.LabelSelector{MatchLabels: options.Selector},
Selector: &metav1.LabelSelector{MatchLabels: options.PodOptions.Labels},
Template: v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{Labels: options.Selector},
Spec: Pod(options.PodOptions).Spec,
ObjectMeta: ObjectMeta(options.PodOptions.ObjectMeta),
Spec: pod.Spec,
},
},
}
Expand Down
16 changes: 15 additions & 1 deletion pkg/test/expectations/expectations.go
Original file line number Diff line number Diff line change
Expand Up @@ -691,6 +691,7 @@ func ExpectEvicted(ctx context.Context, c client.Client, pods ...*corev1.Pod) {
}

// EventuallyExpectTerminating ensures that the deletion timestamp is eventually set
// We need this since there is some propagation time for the eviction API to set the deletionTimestamp
func EventuallyExpectTerminating(ctx context.Context, c client.Client, objs ...client.Object) {
GinkgoHelper()

Expand All @@ -699,5 +700,18 @@ func EventuallyExpectTerminating(ctx context.Context, c client.Client, objs ...c
g.Expect(c.Get(ctx, client.ObjectKeyFromObject(obj), obj)).To(Succeed())
g.Expect(obj.GetDeletionTimestamp().IsZero()).ToNot(BeTrue())
}
}, ReconcilerPropagationTime, RequestInterval).Should(Succeed())
}, time.Second).Should(Succeed())
}

// ConsistentlyExpectNotTerminating ensures that the deletion timestamp is not set
// We need this since there is some propagation time for the eviction API to set the deletionTimestamp
func ConsistentlyExpectNotTerminating(ctx context.Context, c client.Client, objs ...client.Object) {
GinkgoHelper()

Consistently(func(g Gomega) {
for _, obj := range objs {
g.Expect(c.Get(ctx, client.ObjectKeyFromObject(obj), obj)).To(Succeed())
g.Expect(obj.GetDeletionTimestamp().IsZero()).To(BeTrue())
}
}, time.Second).Should(Succeed())
}

0 comments on commit 41df062

Please sign in to comment.