From 0ecc7f8aba5560e942a328b1e2aaebded2dbd451 Mon Sep 17 00:00:00 2001 From: njtran Date: Wed, 22 May 2024 19:00:51 -0700 Subject: [PATCH] test: add kwok e2e environment --- test/README.md | 10 + test/pkg/debug/events.go | 135 +++ test/pkg/debug/monitor.go | 83 ++ test/pkg/debug/node.go | 84 ++ test/pkg/debug/nodeclaim.go | 80 ++ test/pkg/debug/pod.go | 91 ++ test/pkg/debug/setup.go | 57 ++ test/pkg/environment/common/environment.go | 196 +++++ test/pkg/environment/common/expectations.go | 927 ++++++++++++++++++++ test/pkg/environment/common/monitor.go | 258 ++++++ test/pkg/environment/common/setup.go | 166 ++++ test/suites/disruption/expiration_test.go | 676 ++++++++++++++ test/suites/disruption/suite_test.go | 54 ++ 13 files changed, 2817 insertions(+) create mode 100644 test/README.md create mode 100644 test/pkg/debug/events.go create mode 100644 test/pkg/debug/monitor.go create mode 100644 test/pkg/debug/node.go create mode 100644 test/pkg/debug/nodeclaim.go create mode 100644 test/pkg/debug/pod.go create mode 100644 test/pkg/debug/setup.go create mode 100644 test/pkg/environment/common/environment.go create mode 100644 test/pkg/environment/common/expectations.go create mode 100644 test/pkg/environment/common/monitor.go create mode 100644 test/pkg/environment/common/setup.go create mode 100644 test/suites/disruption/expiration_test.go create mode 100644 test/suites/disruption/suite_test.go diff --git a/test/README.md b/test/README.md new file mode 100644 index 0000000000..e1ef053771 --- /dev/null +++ b/test/README.md @@ -0,0 +1,10 @@ +# E2E Testing + +Karpenter leverages Github Actions to run our E2E test suites. + +## Directories +- `./.github/workflows`: Workflow files run within this repository. Relevant files for E2E testing are prefixed with `e2e-` +- `./.github/actions/e2e`: Composite actions utilized by the E2E workflows +- `./test/suites`: Directories defining test suites +- `./test/pkg`: Common utilities and expectations +- `./test/hack`: Testing scripts diff --git a/test/pkg/debug/events.go b/test/pkg/debug/events.go new file mode 100644 index 0000000000..bd502f711c --- /dev/null +++ b/test/pkg/debug/events.go @@ -0,0 +1,135 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package debug + +import ( + "context" + "fmt" + "strings" + "time" + + "github.com/samber/lo" + "go.uber.org/multierr" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +type EventClient struct { + start time.Time + kubeClient client.Client +} + +func NewEventClient(kubeClient client.Client) *EventClient { + return &EventClient{ + start: time.Now(), + kubeClient: kubeClient, + } +} + +func (c *EventClient) DumpEvents(ctx context.Context) error { + return multierr.Combine( + c.dumpPodEvents(ctx), + c.dumpNodeEvents(ctx), + ) + +} + +func (c *EventClient) dumpPodEvents(ctx context.Context) error { + el := &v1.EventList{} + if err := c.kubeClient.List(ctx, el, &client.ListOptions{ + FieldSelector: fields.SelectorFromSet(map[string]string{"involvedObject.kind": "Pod"}), + }); err != nil { + return err + } + events := lo.Filter(filterTestEvents(el.Items, c.start), func(e v1.Event, _ int) bool { + return e.InvolvedObject.Namespace != "kube-system" + }) + for k, v := range coallateEvents(events) { + fmt.Print(getEventInformation(k, v)) + } + return nil +} + +func (c *EventClient) dumpNodeEvents(ctx context.Context) error { + el := &v1.EventList{} + if err := c.kubeClient.List(ctx, el, &client.ListOptions{ + FieldSelector: fields.SelectorFromSet(map[string]string{"involvedObject.kind": "Node"}), + }); err != nil { + return err + } + for k, v := range coallateEvents(filterTestEvents(el.Items, c.start)) { + fmt.Print(getEventInformation(k, v)) + } + return nil +} + +func filterTestEvents(events []v1.Event, startTime time.Time) []v1.Event { + return lo.Filter(events, func(e v1.Event, _ int) bool { + if !e.EventTime.IsZero() { + if e.EventTime.BeforeTime(&metav1.Time{Time: startTime}) { + return false + } + } else if e.FirstTimestamp.Before(&metav1.Time{Time: startTime}) { + return false + } + return true + }) +} + +func coallateEvents(events []v1.Event) map[v1.ObjectReference]*v1.EventList { + eventMap := map[v1.ObjectReference]*v1.EventList{} + for i := range events { + elem := events[i] + objectKey := v1.ObjectReference{Kind: elem.InvolvedObject.Kind, Namespace: elem.InvolvedObject.Namespace, Name: elem.InvolvedObject.Name} + if _, ok := eventMap[objectKey]; !ok { + eventMap[objectKey] = &v1.EventList{} + } + eventMap[objectKey].Items = append(eventMap[objectKey].Items, elem) + } + return eventMap +} + +// Partially copied from +// https://github.com/kubernetes/kubernetes/blob/04ee339c7a4d36b4037ce3635993e2a9e395ebf3/staging/src/k8s.io/kubectl/pkg/describe/describe.go#L4232 +func getEventInformation(o v1.ObjectReference, el *v1.EventList) string { + sb := strings.Builder{} + sb.WriteString(fmt.Sprintf("------- %s/%s%s EVENTS -------\n", + strings.ToLower(o.Kind), lo.Ternary(o.Namespace != "", o.Namespace+"/", ""), o.Name)) + if len(el.Items) == 0 { + return sb.String() + } + for _, e := range el.Items { + source := e.Source.Component + if source == "" { + source = e.ReportingController + } + eventTime := e.EventTime + if eventTime.IsZero() { + eventTime = metav1.NewMicroTime(e.FirstTimestamp.Time) + } + sb.WriteString(fmt.Sprintf("time=%s type=%s reason=%s from=%s message=%s\n", + eventTime.Format(time.RFC3339), + e.Type, + e.Reason, + source, + strings.TrimSpace(e.Message)), + ) + } + return sb.String() +} diff --git a/test/pkg/debug/monitor.go b/test/pkg/debug/monitor.go new file mode 100644 index 0000000000..0ebab55408 --- /dev/null +++ b/test/pkg/debug/monitor.go @@ -0,0 +1,83 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package debug + +import ( + "context" + "sync" + + "github.com/samber/lo" + "k8s.io/client-go/rest" + controllerruntime "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/metrics/server" + + "sigs.k8s.io/karpenter/pkg/operator/controller" + + "sigs.k8s.io/karpenter/pkg/operator/scheme" +) + +type Monitor struct { + ctx context.Context + cancel context.CancelFunc + wg sync.WaitGroup + mgr manager.Manager +} + +func New(ctx context.Context, config *rest.Config, kubeClient client.Client) *Monitor { + log.SetLogger(log.FromContext(ctx)) + mgr := lo.Must(controllerruntime.NewManager(config, controllerruntime.Options{ + Scheme: scheme.Scheme, + Metrics: server.Options{ + BindAddress: "0", + }, + })) + for _, c := range newControllers(kubeClient) { + lo.Must0(c.Register(ctx, mgr), "failed to register controller") + } + ctx, cancel := context.WithCancel(ctx) // this context is only meant for monitor start/stop + return &Monitor{ + ctx: ctx, + cancel: cancel, + mgr: mgr, + } +} + +// MustStart starts the debug monitor +func (m *Monitor) MustStart() { + m.wg.Add(1) + go func() { + defer m.wg.Done() + lo.Must0(m.mgr.Start(m.ctx)) + }() +} + +// Stop stops the monitor +func (m *Monitor) Stop() { + m.cancel() + m.wg.Wait() +} + +func newControllers(kubeClient client.Client) []controller.Controller { + return []controller.Controller{ + NewNodeClaimController(kubeClient), + NewNodeController(kubeClient), + NewPodController(kubeClient), + } +} diff --git a/test/pkg/debug/node.go b/test/pkg/debug/node.go new file mode 100644 index 0000000000..d2df773187 --- /dev/null +++ b/test/pkg/debug/node.go @@ -0,0 +1,84 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package debug + +import ( + "context" + "fmt" + "time" + + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + controllerruntime "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/predicate" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + + "sigs.k8s.io/karpenter/pkg/apis/v1beta1" + + nodeutils "sigs.k8s.io/karpenter/pkg/utils/node" +) + +type NodeController struct { + kubeClient client.Client +} + +func NewNodeController(kubeClient client.Client) *NodeController { + return &NodeController{ + kubeClient: kubeClient, + } +} + +func (c *NodeController) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) { + n := &v1.Node{} + if err := c.kubeClient.Get(ctx, req.NamespacedName, n); err != nil { + if errors.IsNotFound(err) { + fmt.Printf("[DELETED %s] NODE %s\n", time.Now().Format(time.RFC3339), req.NamespacedName.String()) + } + return reconcile.Result{}, client.IgnoreNotFound(err) + } + fmt.Printf("[CREATED/UPDATED %s] NODE %s %s\n", time.Now().Format(time.RFC3339), req.NamespacedName.Name, c.GetInfo(ctx, n)) + return reconcile.Result{}, nil +} + +func (c *NodeController) GetInfo(ctx context.Context, n *v1.Node) string { + pods, _ := nodeutils.GetPods(ctx, c.kubeClient, n) + return fmt.Sprintf("ready=%s schedulable=%t initialized=%s pods=%d taints=%v", nodeutils.GetCondition(n, v1.NodeReady).Status, !n.Spec.Unschedulable, n.Labels[v1beta1.NodeInitializedLabelKey], len(pods), n.Spec.Taints) +} + +func (c *NodeController) Register(ctx context.Context, m manager.Manager) error { + return controllerruntime.NewControllerManagedBy(m). + Named("node"). + For(&v1.Node{}). + WithEventFilter(predicate.And( + predicate.Funcs{ + UpdateFunc: func(e event.UpdateEvent) bool { + oldNode := e.ObjectOld.(*v1.Node) + newNode := e.ObjectNew.(*v1.Node) + return c.GetInfo(ctx, oldNode) != c.GetInfo(ctx, newNode) + }, + }, + predicate.NewPredicateFuncs(func(o client.Object) bool { + return o.GetLabels()[v1beta1.NodePoolLabelKey] != "" + }), + )). + WithOptions(controller.Options{MaxConcurrentReconciles: 10}). + Complete(c) +} diff --git a/test/pkg/debug/nodeclaim.go b/test/pkg/debug/nodeclaim.go new file mode 100644 index 0000000000..a73d6c4bf7 --- /dev/null +++ b/test/pkg/debug/nodeclaim.go @@ -0,0 +1,80 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package debug + +import ( + "context" + "fmt" + "time" + + "k8s.io/apimachinery/pkg/api/errors" + controllerruntime "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/predicate" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + + "sigs.k8s.io/karpenter/pkg/apis/v1beta1" +) + +type NodeClaimController struct { + kubeClient client.Client +} + +func NewNodeClaimController(kubeClient client.Client) *NodeClaimController { + return &NodeClaimController{ + kubeClient: kubeClient, + } +} + +func (c *NodeClaimController) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) { + nc := &v1beta1.NodeClaim{} + if err := c.kubeClient.Get(ctx, req.NamespacedName, nc); err != nil { + if errors.IsNotFound(err) { + fmt.Printf("[DELETED %s] NODECLAIM %s\n", time.Now().Format(time.RFC3339), req.NamespacedName.String()) + } + return reconcile.Result{}, client.IgnoreNotFound(err) + } + fmt.Printf("[CREATED/UPDATED %s] NODECLAIM %s %s\n", time.Now().Format(time.RFC3339), req.NamespacedName.Name, c.GetInfo(nc)) + return reconcile.Result{}, nil +} + +func (c *NodeClaimController) GetInfo(nc *v1beta1.NodeClaim) string { + return fmt.Sprintf("ready=%t launched=%t registered=%t initialized=%t", + nc.StatusConditions().Root().IsTrue(), + nc.StatusConditions().Get(v1beta1.ConditionTypeLaunched).IsTrue(), + nc.StatusConditions().Get(v1beta1.ConditionTypeRegistered).IsTrue(), + nc.StatusConditions().Get(v1beta1.ConditionTypeInitialized).IsTrue(), + ) +} + +func (c *NodeClaimController) Register(_ context.Context, m manager.Manager) error { + return controllerruntime.NewControllerManagedBy(m). + Named("nodeclaim"). + For(&v1beta1.NodeClaim{}). + WithEventFilter(predicate.Funcs{ + UpdateFunc: func(e event.UpdateEvent) bool { + oldNodeClaim := e.ObjectOld.(*v1beta1.NodeClaim) + newNodeClaim := e.ObjectNew.(*v1beta1.NodeClaim) + return c.GetInfo(oldNodeClaim) != c.GetInfo(newNodeClaim) + }, + }). + WithOptions(controller.Options{MaxConcurrentReconciles: 10}). + Complete(c) +} diff --git a/test/pkg/debug/pod.go b/test/pkg/debug/pod.go new file mode 100644 index 0000000000..51f47b3517 --- /dev/null +++ b/test/pkg/debug/pod.go @@ -0,0 +1,91 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package debug + +import ( + "context" + "fmt" + "strings" + "time" + + "github.com/samber/lo" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + controllerruntime "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/predicate" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + + "sigs.k8s.io/karpenter/pkg/utils/pod" +) + +type PodController struct { + kubeClient client.Client +} + +func NewPodController(kubeClient client.Client) *PodController { + return &PodController{ + kubeClient: kubeClient, + } +} + +func (c *PodController) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) { + p := &v1.Pod{} + if err := c.kubeClient.Get(ctx, req.NamespacedName, p); err != nil { + if errors.IsNotFound(err) { + fmt.Printf("[DELETED %s] POD %s\n", time.Now().Format(time.RFC3339), req.NamespacedName.String()) + } + return reconcile.Result{}, client.IgnoreNotFound(err) + } + fmt.Printf("[CREATED/UPDATED %s] POD %s %s\n", time.Now().Format(time.RFC3339), req.NamespacedName.String(), c.GetInfo(p)) + return reconcile.Result{}, nil +} + +func (c *PodController) GetInfo(p *v1.Pod) string { + var containerInfo strings.Builder + for _, c := range p.Status.ContainerStatuses { + if containerInfo.Len() > 0 { + _ = lo.Must(fmt.Fprintf(&containerInfo, ", ")) + } + _ = lo.Must(fmt.Fprintf(&containerInfo, "%s restarts=%d", c.Name, c.RestartCount)) + } + return fmt.Sprintf("provisionable=%v phase=%s nodename=%s owner=%#v [%s]", + pod.IsProvisionable(p), p.Status.Phase, p.Spec.NodeName, p.OwnerReferences, containerInfo.String()) +} + +func (c *PodController) Register(_ context.Context, m manager.Manager) error { + return controllerruntime.NewControllerManagedBy(m). + Named("pod"). + For(&v1.Pod{}). + WithEventFilter(predicate.And( + predicate.Funcs{ + UpdateFunc: func(e event.UpdateEvent) bool { + oldPod := e.ObjectOld.(*v1.Pod) + newPod := e.ObjectNew.(*v1.Pod) + return c.GetInfo(oldPod) != c.GetInfo(newPod) + }, + }, + predicate.NewPredicateFuncs(func(o client.Object) bool { + return o.GetNamespace() != "kube-system" + }), + )). + WithOptions(controller.Options{MaxConcurrentReconciles: 10}). + Complete(c) +} diff --git a/test/pkg/debug/setup.go b/test/pkg/debug/setup.go new file mode 100644 index 0000000000..3933407054 --- /dev/null +++ b/test/pkg/debug/setup.go @@ -0,0 +1,57 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package debug + +import ( + "context" + + "github.com/onsi/ginkgo/v2" + "github.com/samber/lo" + "k8s.io/client-go/rest" + "sigs.k8s.io/controller-runtime/pkg/client" + + . "github.com/onsi/gomega" +) + +const ( + NoWatch = "NoWatch" + NoEvents = "NoEvents" +) + +var m *Monitor +var e *EventClient + +func BeforeEach(ctx context.Context, config *rest.Config, kubeClient client.Client) { + // If the test is labeled as NoWatch, then the node/pod monitor will just list at the beginning + // of the test rather than perform a watch during it + if !lo.Contains(ginkgo.CurrentSpecReport().Labels(), NoWatch) { + m = New(ctx, config, kubeClient) + m.MustStart() + } + if !lo.Contains(ginkgo.CurrentSpecReport().Labels(), NoEvents) { + e = NewEventClient(kubeClient) + } +} + +func AfterEach(ctx context.Context) { + if !lo.Contains(ginkgo.CurrentSpecReport().Labels(), NoWatch) { + m.Stop() + } + if !lo.Contains(ginkgo.CurrentSpecReport().Labels(), NoEvents) { + Expect(e.DumpEvents(ctx)).To(Succeed()) + } +} diff --git a/test/pkg/environment/common/environment.go b/test/pkg/environment/common/environment.go new file mode 100644 index 0000000000..180b53a20d --- /dev/null +++ b/test/pkg/environment/common/environment.go @@ -0,0 +1,196 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package common + +import ( + "context" + "fmt" + "log" + "os" + "strconv" + "testing" + "time" + + "github.com/onsi/gomega" + "github.com/samber/lo" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/kubernetes" + clientgoscheme "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/rest" + + "sigs.k8s.io/karpenter/kwok/apis/v1alpha1" + "sigs.k8s.io/karpenter/pkg/test" + . "sigs.k8s.io/karpenter/pkg/utils/testing" //nolint:stylecheck + + "knative.dev/pkg/system" + controllerruntime "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/cache" + "sigs.k8s.io/controller-runtime/pkg/client" + + "sigs.k8s.io/karpenter/pkg/apis" + "sigs.k8s.io/karpenter/pkg/apis/v1beta1" + "sigs.k8s.io/karpenter/pkg/operator" +) + +type ContextKey string + +const ( + GitRefContextKey = ContextKey("gitRef") +) + +type Environment struct { + context.Context + cancel context.CancelFunc + + Client client.Client + Config *rest.Config + KubeClient kubernetes.Interface + Monitor *Monitor + + StartingNodeCount int +} + +func NewEnvironment(t *testing.T) *Environment { + ctx := TestContextWithLogger(t) + ctx, cancel := context.WithCancel(ctx) + config := NewConfig() + client := NewClient(ctx, config) + + lo.Must0(os.Setenv(system.NamespaceEnvKey, "kube-system")) + if val, ok := os.LookupEnv("GIT_REF"); ok { + ctx = context.WithValue(ctx, GitRefContextKey, val) + } + + gomega.SetDefaultEventuallyTimeout(5 * time.Minute) + gomega.SetDefaultEventuallyPollingInterval(1 * time.Second) + return &Environment{ + Context: ctx, + cancel: cancel, + Config: config, + Client: client, + KubeClient: kubernetes.NewForConfigOrDie(config), + Monitor: NewMonitor(ctx, client), + } +} + +func (env *Environment) Stop() { + env.cancel() +} + +func NewConfig() *rest.Config { + config := controllerruntime.GetConfigOrDie() + config.UserAgent = fmt.Sprintf("testing-%s", operator.Version) + config.QPS = 1e6 + config.Burst = 1e6 + return config +} + +func NewClient(ctx context.Context, config *rest.Config) client.Client { + scheme := runtime.NewScheme() + lo.Must0(clientgoscheme.AddToScheme(scheme)) + lo.Must0(apis.AddToScheme(scheme)) + lo.Must0(apis.AddToScheme(scheme)) + + cache := lo.Must(cache.New(config, cache.Options{Scheme: scheme})) + lo.Must0(cache.IndexField(ctx, &v1.Pod{}, "spec.nodeName", func(o client.Object) []string { + pod := o.(*v1.Pod) + return []string{pod.Spec.NodeName} + })) + lo.Must0(cache.IndexField(ctx, &v1.Event{}, "involvedObject.kind", func(o client.Object) []string { + evt := o.(*v1.Event) + return []string{evt.InvolvedObject.Kind} + })) + lo.Must0(cache.IndexField(ctx, &v1.Node{}, "spec.unschedulable", func(o client.Object) []string { + node := o.(*v1.Node) + return []string{strconv.FormatBool(node.Spec.Unschedulable)} + })) + lo.Must0(cache.IndexField(ctx, &v1.Node{}, "spec.taints[*].karpenter.sh/disruption", func(o client.Object) []string { + node := o.(*v1.Node) + t, _ := lo.Find(node.Spec.Taints, func(t v1.Taint) bool { + return t.Key == v1beta1.DisruptionTaintKey + }) + return []string{t.Value} + })) + + c := lo.Must(client.New(config, client.Options{Scheme: scheme, Cache: &client.CacheOptions{Reader: cache}})) + + go func() { + lo.Must0(cache.Start(ctx)) + }() + if !cache.WaitForCacheSync(ctx) { + log.Fatalf("cache failed to sync") + } + return c +} + +func (env *Environment) DefaultNodeClass() *v1alpha1.KWOKNodeClass { + return &v1alpha1.KWOKNodeClass{ + ObjectMeta: metav1.ObjectMeta{ + Name: test.RandomName(), + }, + } +} + +func (env *Environment) DefaultNodePool(nodeClass *v1alpha1.KWOKNodeClass) *v1beta1.NodePool { + nodePool := test.NodePool() + nodePool.Spec.Template.Spec.NodeClassRef = &v1beta1.NodeClassReference{ + Name: "default", + Kind: "KWOKNodeClass", + APIVersion: v1alpha1.SchemeGroupVersion.Version, + } + nodePool.Spec.Template.Spec.Requirements = []v1beta1.NodeSelectorRequirementWithMinValues{ + { + NodeSelectorRequirement: v1.NodeSelectorRequirement{ + Key: v1.LabelOSStable, + Operator: v1.NodeSelectorOpIn, + Values: []string{string(v1.Linux)}, + }, + }, + { + NodeSelectorRequirement: v1.NodeSelectorRequirement{ + Key: v1beta1.CapacityTypeLabelKey, + Operator: v1.NodeSelectorOpIn, + Values: []string{v1beta1.CapacityTypeOnDemand}, + }, + }, + // TODO @njtran: replace with kwok labels + // { + // NodeSelectorRequirement: v1.NodeSelectorRequirement{ + // Key: v1beta1.LabelInstanceCategory, + // Operator: v1.NodeSelectorOpIn, + // Values: []string{"c", "m", "r"}, + // }, + // }, + // { + // NodeSelectorRequirement: v1.NodeSelectorRequirement{ + // Key: v1beta1.LabelInstanceGeneration, + // Operator: v1.NodeSelectorOpGt, + // Values: []string{"2"}, + // }, + // }, + } + nodePool.Spec.Disruption.ConsolidateAfter = &v1beta1.NillableDuration{} + nodePool.Spec.Disruption.ExpireAfter.Duration = nil + nodePool.Spec.Limits = v1beta1.Limits(v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("1000"), + v1.ResourceMemory: resource.MustParse("1000Gi"), + }) + return nodePool +} diff --git a/test/pkg/environment/common/expectations.go b/test/pkg/environment/common/expectations.go new file mode 100644 index 0000000000..eb56d4fd9d --- /dev/null +++ b/test/pkg/environment/common/expectations.go @@ -0,0 +1,927 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package common + +import ( + "bytes" + "encoding/base64" + "fmt" + "io" + "math" + "strings" + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "github.com/samber/lo" + appsv1 "k8s.io/api/apps/v1" + coordinationv1 "k8s.io/api/coordination/v1" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/equality" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/client-go/transport" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/apiutil" + "sigs.k8s.io/controller-runtime/pkg/log" + + corev1beta1 "sigs.k8s.io/karpenter/pkg/apis/v1beta1" + pscheduling "sigs.k8s.io/karpenter/pkg/controllers/provisioning/scheduling" + "sigs.k8s.io/karpenter/pkg/scheduling" + "sigs.k8s.io/karpenter/pkg/test" + coreresources "sigs.k8s.io/karpenter/pkg/utils/resources" +) + +func (env *Environment) ExpectCreated(objects ...client.Object) { + GinkgoHelper() + for _, object := range objects { + Eventually(func(g Gomega) { + object.SetLabels(lo.Assign(object.GetLabels(), map[string]string{ + test.DiscoveryLabel: "unspecified", + })) + g.Expect(env.Client.Create(env, object)).To(Succeed()) + }).WithTimeout(time.Second * 10).Should(Succeed()) + } +} + +func (env *Environment) ExpectDeleted(objects ...client.Object) { + GinkgoHelper() + for _, object := range objects { + Eventually(func(g Gomega) { + g.Expect(client.IgnoreNotFound(env.Client.Delete(env, object, client.PropagationPolicy(metav1.DeletePropagationForeground), &client.DeleteOptions{GracePeriodSeconds: lo.ToPtr(int64(0))}))).To(Succeed()) + }).WithTimeout(time.Second * 10).Should(Succeed()) + } +} + +// ExpectUpdated will update objects in the cluster to match the inputs. +// WARNING: This ignores the resource version check, which can result in +// overwriting changes made by other controllers in the cluster. +// This is useful in ensuring that we can clean up resources by patching +// out finalizers. +// Grab the object before making the updates to reduce the chance of this race. +func (env *Environment) ExpectUpdated(objects ...client.Object) { + GinkgoHelper() + for _, o := range objects { + Eventually(func(g Gomega) { + current := o.DeepCopyObject().(client.Object) + g.Expect(env.Client.Get(env.Context, client.ObjectKeyFromObject(current), current)).To(Succeed()) + if current.GetResourceVersion() != o.GetResourceVersion() { + log.FromContext(env).Info(fmt.Sprintf("detected an update to an object (%s) with an outdated resource version, did you get the latest version of the object before patching?", lo.Must(apiutil.GVKForObject(o, env.Client.Scheme())))) + } + o.SetResourceVersion(current.GetResourceVersion()) + g.Expect(env.Client.Update(env.Context, o)).To(Succeed()) + }).WithTimeout(time.Second * 10).Should(Succeed()) + } +} + +// ExpectCreatedOrUpdated can update objects in the cluster to match the inputs. +// WARNING: ExpectUpdated ignores the resource version check, which can result in +// overwriting changes made by other controllers in the cluster. +// This is useful in ensuring that we can clean up resources by patching +// out finalizers. +// Grab the object before making the updates to reduce the chance of this race. +func (env *Environment) ExpectCreatedOrUpdated(objects ...client.Object) { + GinkgoHelper() + for _, o := range objects { + current := o.DeepCopyObject().(client.Object) + err := env.Client.Get(env, client.ObjectKeyFromObject(current), current) + if err != nil { + if errors.IsNotFound(err) { + env.ExpectCreated(o) + } else { + Fail(fmt.Sprintf("Getting object %s, %v", client.ObjectKeyFromObject(o), err)) + } + } else { + env.ExpectUpdated(o) + } + } +} + +func (env *Environment) ExpectSettings() (res []v1.EnvVar) { + GinkgoHelper() + + d := &appsv1.Deployment{} + Expect(env.Client.Get(env.Context, types.NamespacedName{Namespace: "kube-system", Name: "karpenter"}, d)).To(Succeed()) + Expect(d.Spec.Template.Spec.Containers).To(HaveLen(1)) + return lo.Map(d.Spec.Template.Spec.Containers[0].Env, func(v v1.EnvVar, _ int) v1.EnvVar { + return *v.DeepCopy() + }) +} + +func (env *Environment) ExpectSettingsReplaced(vars ...v1.EnvVar) { + GinkgoHelper() + + d := &appsv1.Deployment{} + Expect(env.Client.Get(env.Context, types.NamespacedName{Namespace: "kube-system", Name: "karpenter"}, d)).To(Succeed()) + Expect(d.Spec.Template.Spec.Containers).To(HaveLen(1)) + + stored := d.DeepCopy() + d.Spec.Template.Spec.Containers[0].Env = vars + + if !equality.Semantic.DeepEqual(d, stored) { + By("replacing environment variables for karpenter deployment") + Expect(env.Client.Patch(env.Context, d, client.StrategicMergeFrom(stored))).To(Succeed()) + env.EventuallyExpectKarpenterRestarted() + } +} + +func (env *Environment) ExpectSettingsOverridden(vars ...v1.EnvVar) { + GinkgoHelper() + + d := &appsv1.Deployment{} + Expect(env.Client.Get(env.Context, types.NamespacedName{Namespace: "kube-system", Name: "karpenter"}, d)).To(Succeed()) + Expect(d.Spec.Template.Spec.Containers).To(HaveLen(1)) + + stored := d.DeepCopy() + for _, v := range vars { + if _, i, ok := lo.FindIndexOf(d.Spec.Template.Spec.Containers[0].Env, func(e v1.EnvVar) bool { + return e.Name == v.Name + }); ok { + d.Spec.Template.Spec.Containers[0].Env[i] = v + } else { + d.Spec.Template.Spec.Containers[0].Env = append(d.Spec.Template.Spec.Containers[0].Env, v) + } + } + if !equality.Semantic.DeepEqual(d, stored) { + By("overriding environment variables for karpenter deployment") + Expect(env.Client.Patch(env.Context, d, client.StrategicMergeFrom(stored))).To(Succeed()) + env.EventuallyExpectKarpenterRestarted() + } +} + +func (env *Environment) ExpectSettingsRemoved(vars ...v1.EnvVar) { + GinkgoHelper() + + varNames := sets.New[string](lo.Map(vars, func(v v1.EnvVar, _ int) string { return v.Name })...) + + d := &appsv1.Deployment{} + Expect(env.Client.Get(env.Context, types.NamespacedName{Namespace: "kube-system", Name: "karpenter"}, d)).To(Succeed()) + Expect(d.Spec.Template.Spec.Containers).To(HaveLen(1)) + + stored := d.DeepCopy() + d.Spec.Template.Spec.Containers[0].Env = lo.Reject(d.Spec.Template.Spec.Containers[0].Env, func(v v1.EnvVar, _ int) bool { + return varNames.Has(v.Name) + }) + if !equality.Semantic.DeepEqual(d, stored) { + By("removing environment variables for karpenter deployment") + Expect(env.Client.Patch(env.Context, d, client.StrategicMergeFrom(stored))).To(Succeed()) + env.EventuallyExpectKarpenterRestarted() + } +} + +func (env *Environment) ExpectConfigMapExists(key types.NamespacedName) *v1.ConfigMap { + GinkgoHelper() + cm := &v1.ConfigMap{} + Expect(env.Client.Get(env, key, cm)).To(Succeed()) + return cm +} + +func (env *Environment) ExpectConfigMapDataReplaced(key types.NamespacedName, data ...map[string]string) (changed bool) { + GinkgoHelper() + cm := &v1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: key.Name, + Namespace: key.Namespace, + }, + } + err := env.Client.Get(env, key, cm) + Expect(client.IgnoreNotFound(err)).ToNot(HaveOccurred()) + + stored := cm.DeepCopy() + cm.Data = lo.Assign(data...) // Completely replace the data + + // If the data hasn't changed, we can just return and not update anything + if equality.Semantic.DeepEqual(stored, cm) { + return false + } + // Update the configMap to update the settings + env.ExpectCreatedOrUpdated(cm) + return true +} + +func (env *Environment) ExpectConfigMapDataOverridden(key types.NamespacedName, data ...map[string]string) (changed bool) { + GinkgoHelper() + cm := &v1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: key.Name, + Namespace: key.Namespace, + }, + } + err := env.Client.Get(env, key, cm) + Expect(client.IgnoreNotFound(err)).ToNot(HaveOccurred()) + + stored := cm.DeepCopy() + cm.Data = lo.Assign(append([]map[string]string{cm.Data}, data...)...) + + // If the data hasn't changed, we can just return and not update anything + if equality.Semantic.DeepEqual(stored, cm) { + return false + } + // Update the configMap to update the settings + env.ExpectCreatedOrUpdated(cm) + return true +} + +func (env *Environment) ExpectPodENIEnabled() { + GinkgoHelper() + env.ExpectDaemonSetEnvironmentVariableUpdated(types.NamespacedName{Namespace: "kube-system", Name: "aws-node"}, + "ENABLE_POD_ENI", "true", "aws-node") +} + +func (env *Environment) ExpectPodENIDisabled() { + GinkgoHelper() + env.ExpectDaemonSetEnvironmentVariableUpdated(types.NamespacedName{Namespace: "kube-system", Name: "aws-node"}, + "ENABLE_POD_ENI", "false", "aws-node") +} + +func (env *Environment) ExpectPrefixDelegationEnabled() { + GinkgoHelper() + env.ExpectDaemonSetEnvironmentVariableUpdated(types.NamespacedName{Namespace: "kube-system", Name: "aws-node"}, + "ENABLE_PREFIX_DELEGATION", "true", "aws-node") +} + +func (env *Environment) ExpectPrefixDelegationDisabled() { + GinkgoHelper() + env.ExpectDaemonSetEnvironmentVariableUpdated(types.NamespacedName{Namespace: "kube-system", Name: "aws-node"}, + "ENABLE_PREFIX_DELEGATION", "false", "aws-node") +} + +func (env *Environment) ExpectExists(obj client.Object) client.Object { + GinkgoHelper() + Eventually(func(g Gomega) { + g.Expect(env.Client.Get(env, client.ObjectKeyFromObject(obj), obj)).To(Succeed()) + }).WithTimeout(time.Second * 5).Should(Succeed()) + return obj +} + +func (env *Environment) EventuallyExpectBound(pods ...*v1.Pod) { + GinkgoHelper() + Eventually(func(g Gomega) { + for _, pod := range pods { + g.Expect(env.Client.Get(env, client.ObjectKeyFromObject(pod), pod)).To(Succeed()) + g.Expect(pod.Spec.NodeName).ToNot(BeEmpty()) + } + }).Should(Succeed()) +} + +func (env *Environment) EventuallyExpectHealthy(pods ...*v1.Pod) { + GinkgoHelper() + env.EventuallyExpectHealthyWithTimeout(-1, pods...) +} + +func (env *Environment) EventuallyExpectHealthyWithTimeout(timeout time.Duration, pods ...*v1.Pod) { + GinkgoHelper() + Eventually(func(g Gomega) { + for _, pod := range pods { + g.Expect(env.Client.Get(env, client.ObjectKeyFromObject(pod), pod)).To(Succeed()) + g.Expect(pod.Status.Conditions).To(ContainElement(And( + HaveField("Type", Equal(v1.PodReady)), + HaveField("Status", Equal(v1.ConditionTrue)), + ))) + } + }).WithTimeout(timeout).Should(Succeed()) + +} + +func (env *Environment) EventuallyExpectKarpenterRestarted() { + GinkgoHelper() + By("rolling out the new karpenter deployment") + env.EventuallyExpectRollout("karpenter", "kube-system") + env.ExpectKarpenterLeaseOwnerChanged() +} + +func (env *Environment) ExpectKarpenterLeaseOwnerChanged() { + GinkgoHelper() + + By("waiting for a new karpenter pod to hold the lease") + pods := env.ExpectKarpenterPods() + Eventually(func(g Gomega) { + name := env.ExpectActiveKarpenterPodName() + g.Expect(lo.ContainsBy(pods, func(p *v1.Pod) bool { + return p.Name == name + })).To(BeTrue()) + }).Should(Succeed()) +} + +func (env *Environment) EventuallyExpectRollout(name, namespace string) { + GinkgoHelper() + By("restarting the deployment") + deploy := &appsv1.Deployment{} + Expect(env.Client.Get(env.Context, types.NamespacedName{Name: name, Namespace: namespace}, deploy)).To(Succeed()) + + stored := deploy.DeepCopy() + restartedAtAnnotation := map[string]string{ + "kubectl.kubernetes.io/restartedAt": time.Now().Format(time.RFC3339), + } + deploy.Spec.Template.Annotations = lo.Assign(deploy.Spec.Template.Annotations, restartedAtAnnotation) + Expect(env.Client.Patch(env.Context, deploy, client.StrategicMergeFrom(stored))).To(Succeed()) + + By("waiting for the newly generated deployment to rollout") + Eventually(func(g Gomega) { + podList := &v1.PodList{} + g.Expect(env.Client.List(env.Context, podList, client.InNamespace(namespace))).To(Succeed()) + pods := lo.Filter(podList.Items, func(p v1.Pod, _ int) bool { + return p.Annotations["kubectl.kubernetes.io/restartedAt"] == restartedAtAnnotation["kubectl.kubernetes.io/restartedAt"] + }) + g.Expect(len(pods)).To(BeNumerically("==", lo.FromPtr(deploy.Spec.Replicas))) + for _, pod := range pods { + g.Expect(pod.Status.Conditions).To(ContainElement(And( + HaveField("Type", Equal(v1.PodReady)), + HaveField("Status", Equal(v1.ConditionTrue)), + ))) + g.Expect(pod.Status.Phase).To(Equal(v1.PodRunning)) + } + }).Should(Succeed()) +} + +func (env *Environment) ExpectKarpenterPods() []*v1.Pod { + GinkgoHelper() + podList := &v1.PodList{} + Expect(env.Client.List(env.Context, podList, client.MatchingLabels{ + "app.kubernetes.io/instance": "karpenter", + })).To(Succeed()) + return lo.Map(podList.Items, func(p v1.Pod, _ int) *v1.Pod { return &p }) +} + +func (env *Environment) ExpectActiveKarpenterPodName() string { + GinkgoHelper() + lease := &coordinationv1.Lease{} + Expect(env.Client.Get(env.Context, types.NamespacedName{Name: "karpenter-leader-election", Namespace: "kube-system"}, lease)).To(Succeed()) + + // Holder identity for lease is always in the format "_ + holderArr := strings.Split(lo.FromPtr(lease.Spec.HolderIdentity), "_") + Expect(len(holderArr)).To(BeNumerically(">", 0)) + + return holderArr[0] +} + +func (env *Environment) ExpectActiveKarpenterPod() *v1.Pod { + GinkgoHelper() + podName := env.ExpectActiveKarpenterPodName() + + pod := &v1.Pod{} + Expect(env.Client.Get(env.Context, types.NamespacedName{Name: podName, Namespace: "kube-system"}, pod)).To(Succeed()) + return pod +} + +func (env *Environment) EventuallyExpectPendingPodCount(selector labels.Selector, numPods int) { + GinkgoHelper() + Eventually(func(g Gomega) { + g.Expect(env.Monitor.PendingPodsCount(selector)).To(Equal(numPods)) + }).Should(Succeed()) +} + +func (env *Environment) EventuallyExpectBoundPodCount(selector labels.Selector, numPods int) []*v1.Pod { + GinkgoHelper() + var res []*v1.Pod + Eventually(func(g Gomega) { + res = []*v1.Pod{} + podList := &v1.PodList{} + g.Expect(env.Client.List(env.Context, podList, client.MatchingLabelsSelector{Selector: selector})).To(Succeed()) + for i := range podList.Items { + if podList.Items[i].Spec.NodeName != "" { + res = append(res, &podList.Items[i]) + } + } + g.Expect(res).To(HaveLen(numPods)) + }).Should(Succeed()) + return res +} + +func (env *Environment) EventuallyExpectHealthyPodCount(selector labels.Selector, numPods int) []*v1.Pod { + By(fmt.Sprintf("waiting for %d pods matching selector %s to be ready", numPods, selector.String())) + GinkgoHelper() + return env.EventuallyExpectHealthyPodCountWithTimeout(-1, selector, numPods) +} + +func (env *Environment) EventuallyExpectHealthyPodCountWithTimeout(timeout time.Duration, selector labels.Selector, numPods int) []*v1.Pod { + GinkgoHelper() + var pods []*v1.Pod + Eventually(func(g Gomega) { + pods = env.Monitor.RunningPods(selector) + g.Expect(pods).To(HaveLen(numPods)) + }).WithTimeout(timeout).Should(Succeed()) + return pods +} + +func (env *Environment) ExpectPodsMatchingSelector(selector labels.Selector) []*v1.Pod { + GinkgoHelper() + + podList := &v1.PodList{} + Expect(env.Client.List(env.Context, podList, client.MatchingLabelsSelector{Selector: selector})).To(Succeed()) + return lo.ToSlicePtr(podList.Items) +} + +func (env *Environment) EventuallyExpectUniqueNodeNames(selector labels.Selector, uniqueNames int) { + GinkgoHelper() + + Eventually(func(g Gomega) { + pods := env.Monitor.RunningPods(selector) + nodeNames := sets.NewString() + for _, pod := range pods { + nodeNames.Insert(pod.Spec.NodeName) + } + g.Expect(len(nodeNames)).To(BeNumerically("==", uniqueNames)) + }).Should(Succeed()) +} + +func (env *Environment) eventuallyExpectScaleDown() { + GinkgoHelper() + Eventually(func(g Gomega) { + // expect the current node count to be what it was when the test started + g.Expect(env.Monitor.NodeCount()).To(Equal(env.StartingNodeCount)) + }).Should(Succeed(), fmt.Sprintf("expected scale down to %d nodes, had %d", env.StartingNodeCount, env.Monitor.NodeCount())) +} + +func (env *Environment) EventuallyExpectNotFound(objects ...client.Object) { + GinkgoHelper() + env.EventuallyExpectNotFoundAssertion(objects...).Should(Succeed()) +} + +func (env *Environment) EventuallyExpectNotFoundAssertion(objects ...client.Object) AsyncAssertion { + return Eventually(func(g Gomega) { + for _, object := range objects { + err := env.Client.Get(env, client.ObjectKeyFromObject(object), object) + g.Expect(errors.IsNotFound(err)).To(BeTrue()) + } + }) +} + +func (env *Environment) ExpectCreatedNodeCount(comparator string, count int) []*v1.Node { + GinkgoHelper() + createdNodes := env.Monitor.CreatedNodes() + Expect(len(createdNodes)).To(BeNumerically(comparator, count), + fmt.Sprintf("expected %d created nodes, had %d (%v)", count, len(createdNodes), NodeNames(createdNodes))) + return createdNodes +} + +func (env *Environment) ExpectNodeCount(comparator string, count int) { + GinkgoHelper() + + nodeList := &v1.NodeList{} + Expect(env.Client.List(env, nodeList, client.HasLabels{test.DiscoveryLabel})).To(Succeed()) + Expect(len(nodeList.Items)).To(BeNumerically(comparator, count)) +} + +func (env *Environment) ExpectNodeClaimCount(comparator string, count int) { + GinkgoHelper() + + nodeClaimList := &corev1beta1.NodeClaimList{} + Expect(env.Client.List(env, nodeClaimList, client.HasLabels{test.DiscoveryLabel})).To(Succeed()) + Expect(len(nodeClaimList.Items)).To(BeNumerically(comparator, count)) +} + +func NodeClaimNames(nodeClaims []*corev1beta1.NodeClaim) []string { + return lo.Map(nodeClaims, func(n *corev1beta1.NodeClaim, index int) string { + return n.Name + }) +} + +func NodeNames(nodes []*v1.Node) []string { + return lo.Map(nodes, func(n *v1.Node, index int) string { + return n.Name + }) +} + +func (env *Environment) ConsistentlyExpectNodeCount(comparator string, count int, duration time.Duration) []*v1.Node { + GinkgoHelper() + By(fmt.Sprintf("expecting nodes to be %s to %d for %s", comparator, count, duration)) + nodeList := &v1.NodeList{} + Consistently(func(g Gomega) { + g.Expect(env.Client.List(env, nodeList, client.HasLabels{test.DiscoveryLabel})).To(Succeed()) + g.Expect(len(nodeList.Items)).To(BeNumerically(comparator, count), + fmt.Sprintf("expected %d nodes, had %d (%v) for %s", count, len(nodeList.Items), NodeNames(lo.ToSlicePtr(nodeList.Items)), duration)) + }, duration.String()).Should(Succeed()) + return lo.ToSlicePtr(nodeList.Items) +} + +func (env *Environment) ConsistentlyExpectNoDisruptions(nodeCount int, duration time.Duration) (taintedNodes []*v1.Node) { + GinkgoHelper() + return env.ConsistentlyExpectDisruptionsWithNodeCount(0, nodeCount, duration) +} + +// ConsistentlyExpectDisruptionsWithNodeCount will continually ensure that there are exactly disruptingNodes with totalNodes (including replacements and existing nodes) +func (env *Environment) ConsistentlyExpectDisruptionsWithNodeCount(disruptingNodes, totalNodes int, duration time.Duration) (taintedNodes []*v1.Node) { + GinkgoHelper() + nodes := []v1.Node{} + Consistently(func(g Gomega) { + // Ensure we don't change our NodeClaims + nodeClaimList := &corev1beta1.NodeClaimList{} + g.Expect(env.Client.List(env, nodeClaimList, client.HasLabels{test.DiscoveryLabel})).To(Succeed()) + g.Expect(nodeClaimList.Items).To(HaveLen(totalNodes)) + + nodeList := &v1.NodeList{} + g.Expect(env.Client.List(env, nodeList, client.HasLabels{test.DiscoveryLabel})).To(Succeed()) + g.Expect(nodeList.Items).To(HaveLen(totalNodes)) + + nodes = lo.Filter(nodeList.Items, func(n v1.Node, _ int) bool { + _, ok := lo.Find(n.Spec.Taints, func(t v1.Taint) bool { + return corev1beta1.IsDisruptingTaint(t) + }) + return ok + }) + g.Expect(nodes).To(HaveLen(disruptingNodes)) + }, duration).Should(Succeed()) + return lo.ToSlicePtr(nodes) +} + +func (env *Environment) EventuallyExpectTaintedNodeCount(comparator string, count int) []*v1.Node { + GinkgoHelper() + By(fmt.Sprintf("waiting for tainted nodes to be %s to %d", comparator, count)) + nodeList := &v1.NodeList{} + Eventually(func(g Gomega) { + g.Expect(env.Client.List(env, nodeList, client.MatchingFields{"spec.taints[*].karpenter.sh/disruption": "disrupting"})).To(Succeed()) + g.Expect(len(nodeList.Items)).To(BeNumerically(comparator, count), + fmt.Sprintf("expected %d tainted nodes, had %d (%v)", count, len(nodeList.Items), NodeNames(lo.ToSlicePtr(nodeList.Items)))) + }).Should(Succeed()) + return lo.ToSlicePtr(nodeList.Items) +} + +func (env *Environment) EventuallyExpectNodesUntaintedWithTimeout(timeout time.Duration, nodes ...*v1.Node) { + GinkgoHelper() + By(fmt.Sprintf("waiting for %d nodes to be untainted", len(nodes))) + nodeList := &v1.NodeList{} + Eventually(func(g Gomega) { + g.Expect(env.Client.List(env, nodeList, client.MatchingFields{"spec.taints[*].karpenter.sh/disruption": "disrupting"})).To(Succeed()) + taintedNodeNames := lo.Map(nodeList.Items, func(n v1.Node, _ int) string { return n.Name }) + g.Expect(taintedNodeNames).ToNot(ContainElements(lo.Map(nodes, func(n *v1.Node, _ int) interface{} { return n.Name })...)) + }).WithTimeout(timeout).Should(Succeed()) +} + +func (env *Environment) EventuallyExpectNodeClaimCount(comparator string, count int) []*corev1beta1.NodeClaim { + GinkgoHelper() + By(fmt.Sprintf("waiting for nodes to be %s to %d", comparator, count)) + nodeClaimList := &corev1beta1.NodeClaimList{} + Eventually(func(g Gomega) { + g.Expect(env.Client.List(env, nodeClaimList, client.HasLabels{test.DiscoveryLabel})).To(Succeed()) + g.Expect(len(nodeClaimList.Items)).To(BeNumerically(comparator, count), + fmt.Sprintf("expected %d nodeclaims, had %d (%v)", count, len(nodeClaimList.Items), NodeClaimNames(lo.ToSlicePtr(nodeClaimList.Items)))) + }).Should(Succeed()) + return lo.ToSlicePtr(nodeClaimList.Items) +} + +func (env *Environment) EventuallyExpectNodeCount(comparator string, count int) []*v1.Node { + GinkgoHelper() + By(fmt.Sprintf("waiting for nodes to be %s to %d", comparator, count)) + nodeList := &v1.NodeList{} + Eventually(func(g Gomega) { + g.Expect(env.Client.List(env, nodeList, client.HasLabels{test.DiscoveryLabel})).To(Succeed()) + g.Expect(len(nodeList.Items)).To(BeNumerically(comparator, count), + fmt.Sprintf("expected %d nodes, had %d (%v)", count, len(nodeList.Items), NodeNames(lo.ToSlicePtr(nodeList.Items)))) + }).Should(Succeed()) + return lo.ToSlicePtr(nodeList.Items) +} + +func (env *Environment) EventuallyExpectNodeCountWithSelector(comparator string, count int, selector labels.Selector) []*v1.Node { + GinkgoHelper() + By(fmt.Sprintf("waiting for nodes with selector %v to be %s to %d", selector, comparator, count)) + nodeList := &v1.NodeList{} + Eventually(func(g Gomega) { + g.Expect(env.Client.List(env, nodeList, client.HasLabels{test.DiscoveryLabel}, client.MatchingLabelsSelector{Selector: selector})).To(Succeed()) + g.Expect(len(nodeList.Items)).To(BeNumerically(comparator, count), + fmt.Sprintf("expected %d nodes, had %d (%v)", count, len(nodeList.Items), NodeNames(lo.ToSlicePtr(nodeList.Items)))) + }).Should(Succeed()) + return lo.ToSlicePtr(nodeList.Items) +} + +func (env *Environment) EventuallyExpectCreatedNodeCount(comparator string, count int) []*v1.Node { + GinkgoHelper() + By(fmt.Sprintf("waiting for created nodes to be %s to %d", comparator, count)) + var createdNodes []*v1.Node + Eventually(func(g Gomega) { + createdNodes = env.Monitor.CreatedNodes() + g.Expect(len(createdNodes)).To(BeNumerically(comparator, count), + fmt.Sprintf("expected %d created nodes, had %d (%v)", count, len(createdNodes), NodeNames(createdNodes))) + }).Should(Succeed()) + return createdNodes +} + +func (env *Environment) EventuallyExpectDeletedNodeCount(comparator string, count int) []*v1.Node { + GinkgoHelper() + By(fmt.Sprintf("waiting for deleted nodes to be %s to %d", comparator, count)) + var deletedNodes []*v1.Node + Eventually(func(g Gomega) { + deletedNodes = env.Monitor.DeletedNodes() + g.Expect(len(deletedNodes)).To(BeNumerically(comparator, count), + fmt.Sprintf("expected %d deleted nodes, had %d (%v)", count, len(deletedNodes), NodeNames(deletedNodes))) + }).Should(Succeed()) + return deletedNodes +} + +func (env *Environment) EventuallyExpectDeletedNodeCountWithSelector(comparator string, count int, selector labels.Selector) []*v1.Node { + GinkgoHelper() + By(fmt.Sprintf("waiting for deleted nodes with selector %v to be %s to %d", selector, comparator, count)) + var deletedNodes []*v1.Node + Eventually(func(g Gomega) { + deletedNodes = env.Monitor.DeletedNodes() + deletedNodes = lo.Filter(deletedNodes, func(n *v1.Node, _ int) bool { + return selector.Matches(labels.Set(n.Labels)) + }) + g.Expect(len(deletedNodes)).To(BeNumerically(comparator, count), + fmt.Sprintf("expected %d deleted nodes, had %d (%v)", count, len(deletedNodes), NodeNames(deletedNodes))) + }).Should(Succeed()) + return deletedNodes +} + +func (env *Environment) EventuallyExpectInitializedNodeCount(comparator string, count int) []*v1.Node { + GinkgoHelper() + By(fmt.Sprintf("waiting for initialized nodes to be %s to %d", comparator, count)) + var nodes []*v1.Node + Eventually(func(g Gomega) { + nodes = env.Monitor.CreatedNodes() + nodes = lo.Filter(nodes, func(n *v1.Node, _ int) bool { + return n.Labels[corev1beta1.NodeInitializedLabelKey] == "true" + }) + g.Expect(len(nodes)).To(BeNumerically(comparator, count)) + }).Should(Succeed()) + return nodes +} + +func (env *Environment) EventuallyExpectCreatedNodeClaimCount(comparator string, count int) []*corev1beta1.NodeClaim { + GinkgoHelper() + By(fmt.Sprintf("waiting for created nodeclaims to be %s to %d", comparator, count)) + nodeClaimList := &corev1beta1.NodeClaimList{} + Eventually(func(g Gomega) { + g.Expect(env.Client.List(env.Context, nodeClaimList)).To(Succeed()) + g.Expect(len(nodeClaimList.Items)).To(BeNumerically(comparator, count)) + }).Should(Succeed()) + return lo.Map(nodeClaimList.Items, func(nc corev1beta1.NodeClaim, _ int) *corev1beta1.NodeClaim { + return &nc + }) +} + +func (env *Environment) EventuallyExpectNodeClaimsReady(nodeClaims ...*corev1beta1.NodeClaim) { + GinkgoHelper() + Eventually(func(g Gomega) { + for _, nc := range nodeClaims { + temp := &corev1beta1.NodeClaim{} + g.Expect(env.Client.Get(env.Context, client.ObjectKeyFromObject(nc), temp)).Should(Succeed()) + g.Expect(temp.StatusConditions().Root().IsTrue()).To(BeTrue()) + } + }).Should(Succeed()) +} + +func (env *Environment) EventuallyExpectExpired(nodeClaims ...*corev1beta1.NodeClaim) { + GinkgoHelper() + Eventually(func(g Gomega) { + for _, nc := range nodeClaims { + g.Expect(env.Client.Get(env, client.ObjectKeyFromObject(nc), nc)).To(Succeed()) + g.Expect(nc.StatusConditions().Get(corev1beta1.ConditionTypeExpired).IsTrue()).To(BeTrue()) + } + }).Should(Succeed()) +} + +func (env *Environment) EventuallyExpectDrifted(nodeClaims ...*corev1beta1.NodeClaim) { + GinkgoHelper() + Eventually(func(g Gomega) { + for _, nc := range nodeClaims { + g.Expect(env.Client.Get(env, client.ObjectKeyFromObject(nc), nc)).To(Succeed()) + g.Expect(nc.StatusConditions().Get(corev1beta1.ConditionTypeDrifted).IsTrue()).To(BeTrue()) + } + }).Should(Succeed()) +} + +func (env *Environment) ConsistentlyExpectNodeClaimsNotDrifted(duration time.Duration, nodeClaims ...*corev1beta1.NodeClaim) { + GinkgoHelper() + nodeClaimNames := lo.Map(nodeClaims, func(nc *corev1beta1.NodeClaim, _ int) string { return nc.Name }) + By(fmt.Sprintf("consistently expect nodeclaims %s not to be drifted for %s", nodeClaimNames, duration)) + Consistently(func(g Gomega) { + for _, nc := range nodeClaims { + g.Expect(env.Client.Get(env, client.ObjectKeyFromObject(nc), nc)).To(Succeed()) + g.Expect(nc.StatusConditions().Get(corev1beta1.ConditionTypeDrifted)).To(BeNil()) + } + }, duration).Should(Succeed()) +} + +func (env *Environment) EventuallyExpectEmpty(nodeClaims ...*corev1beta1.NodeClaim) { + GinkgoHelper() + Eventually(func(g Gomega) { + for _, nc := range nodeClaims { + g.Expect(env.Client.Get(env, client.ObjectKeyFromObject(nc), nc)).To(Succeed()) + g.Expect(nc.StatusConditions().Get(corev1beta1.ConditionTypeEmpty).IsTrue()).To(BeTrue()) + } + }).Should(Succeed()) +} + +func (env *Environment) GetNode(nodeName string) v1.Node { + GinkgoHelper() + var node v1.Node + Expect(env.Client.Get(env.Context, types.NamespacedName{Name: nodeName}, &node)).To(Succeed()) + return node +} + +func (env *Environment) ExpectNoCrashes() { + GinkgoHelper() + for k, v := range env.Monitor.RestartCount("kube-system") { + if strings.Contains(k, "karpenter") && v > 0 { + Fail("expected karpenter containers to not crash") + } + } +} + +var ( + lastLogged = metav1.Now() +) + +func (env *Environment) printControllerLogs(options *v1.PodLogOptions) { + fmt.Println("------- START CONTROLLER LOGS -------") + defer fmt.Println("------- END CONTROLLER LOGS -------") + + if options.SinceTime == nil { + options.SinceTime = lastLogged.DeepCopy() + lastLogged = metav1.Now() + } + pods := env.ExpectKarpenterPods() + for _, pod := range pods { + temp := options.DeepCopy() // local version of the log options + + fmt.Printf("------- pod/%s -------\n", pod.Name) + if len(pod.Status.ContainerStatuses) > 0 && pod.Status.ContainerStatuses[0].RestartCount > 0 { + fmt.Printf("[PREVIOUS CONTAINER LOGS]\n") + temp.Previous = true + } + stream, err := env.KubeClient.CoreV1().Pods("kube-system").GetLogs(pod.Name, temp).Stream(env.Context) + if err != nil { + log.FromContext(env.Context).Error(err, "failed fetching controller logs") + return + } + raw := &bytes.Buffer{} + _, err = io.Copy(raw, stream) + Expect(err).ToNot(HaveOccurred()) + log.FromContext(env.Context).Info(raw.String()) + } +} + +func (env *Environment) EventuallyExpectMinUtilization(resource v1.ResourceName, comparator string, value float64) { + GinkgoHelper() + Eventually(func(g Gomega) { + g.Expect(env.Monitor.MinUtilization(resource)).To(BeNumerically(comparator, value)) + }).Should(Succeed()) +} + +func (env *Environment) EventuallyExpectAvgUtilization(resource v1.ResourceName, comparator string, value float64) { + GinkgoHelper() + Eventually(func(g Gomega) { + g.Expect(env.Monitor.AvgUtilization(resource)).To(BeNumerically(comparator, value)) + }, 10*time.Minute).Should(Succeed()) +} + +func (env *Environment) ExpectDaemonSetEnvironmentVariableUpdated(obj client.ObjectKey, name, value string, containers ...string) { + GinkgoHelper() + ds := &appsv1.DaemonSet{} + Expect(env.Client.Get(env.Context, obj, ds)).To(Succeed()) + if len(containers) == 0 { + Expect(len(ds.Spec.Template.Spec.Containers)).To(BeNumerically("==", 1)) + containers = append(containers, ds.Spec.Template.Spec.Containers[0].Name) + } + patch := client.StrategicMergeFrom(ds.DeepCopy()) + containerNames := sets.New(containers...) + for ci := range ds.Spec.Template.Spec.Containers { + c := &ds.Spec.Template.Spec.Containers[ci] + if !containerNames.Has(c.Name) { + continue + } + // If the env var already exists, update its value. Otherwise, create a new var. + if _, i, ok := lo.FindIndexOf(c.Env, func(e v1.EnvVar) bool { + return e.Name == name + }); ok { + c.Env[i].Value = value + } else { + c.Env = append(c.Env, v1.EnvVar{Name: name, Value: value}) + } + } + Expect(env.Client.Patch(env.Context, ds, patch)).To(Succeed()) +} + +// ForcePodsToSpread ensures that currently scheduled pods get spread evenly across all passed nodes by deleting pods off of existing +// nodes and waiting them to reschedule. This is useful for scenarios where you want to force the nodes be underutilized +// but you want to keep a consistent count of nodes rather than leaving around empty ones. +func (env *Environment) ForcePodsToSpread(nodes ...*v1.Node) { + GinkgoHelper() + + // Get the total count of pods across + podCount := 0 + for _, n := range nodes { + podCount += len(env.ExpectActivePodsForNode(n.Name)) + } + maxPodsPerNode := int(math.Ceil(float64(podCount) / float64(len(nodes)))) + + By(fmt.Sprintf("forcing %d pods to spread across %d nodes", podCount, len(nodes))) + start := time.Now() + for { + var nodePods []*v1.Pod + node, found := lo.Find(nodes, func(n *v1.Node) bool { + nodePods = env.ExpectActivePodsForNode(n.Name) + return len(nodePods) > maxPodsPerNode + }) + if !found { + break + } + // Set the nodes to unschedulable so that the pods won't reschedule. + Expect(env.Client.Get(env.Context, client.ObjectKeyFromObject(node), node)).To(Succeed()) + stored := node.DeepCopy() + node.Spec.Unschedulable = true + Expect(env.Client.Patch(env.Context, node, client.StrategicMergeFrom(stored))).To(Succeed()) + for _, pod := range nodePods[maxPodsPerNode:] { + env.ExpectDeleted(pod) + } + Eventually(func(g Gomega) { + g.Expect(len(env.ExpectActivePodsForNode(node.Name))).To(Or(Equal(maxPodsPerNode), Equal(maxPodsPerNode-1))) + }).WithTimeout(5 * time.Second).Should(Succeed()) + + // TODO: Consider moving this time check to an Eventually poll. This gets a little tricker with helper functions + // since you need to make sure that your Expectation helper functions are scoped to to your "g Gomega" scope + // so that you don't fail the first time you get a failure on your expectation + if time.Since(start) > time.Minute*15 { + Fail("forcing pods to spread failed due to a timeout") + } + } + for _, n := range nodes { + stored := n.DeepCopy() + n.Spec.Unschedulable = false + Expect(env.Client.Patch(env.Context, n, client.StrategicMergeFrom(stored))).To(Succeed()) + } +} + +func (env *Environment) ExpectActivePodsForNode(nodeName string) []*v1.Pod { + GinkgoHelper() + podList := &v1.PodList{} + Expect(env.Client.List(env, podList, client.MatchingFields{"spec.nodeName": nodeName}, client.HasLabels{test.DiscoveryLabel})).To(Succeed()) + + return lo.Filter(lo.ToSlicePtr(podList.Items), func(p *v1.Pod, _ int) bool { + return p.DeletionTimestamp.IsZero() + }) +} + +func (env *Environment) ExpectCABundle() string { + // Discover CA Bundle from the REST client. We could alternatively + // have used the simpler client-go InClusterConfig() method. + // However, that only works when Karpenter is running as a Pod + // within the same cluster it's managing. + GinkgoHelper() + transportConfig, err := env.Config.TransportConfig() + Expect(err).ToNot(HaveOccurred()) + _, err = transport.TLSConfigFor(transportConfig) // fills in CAData! + Expect(err).ToNot(HaveOccurred()) + log.FromContext(env.Context).WithValues("length", len(transportConfig.TLS.CAData)).V(1).Info("discovered caBundle") + return base64.StdEncoding.EncodeToString(transportConfig.TLS.CAData) +} + +func (env *Environment) GetDaemonSetCount(np *corev1beta1.NodePool) int { + GinkgoHelper() + + // Performs the same logic as the scheduler to get the number of daemonset + // pods that we estimate we will need to schedule as overhead to each node + daemonSetList := &appsv1.DaemonSetList{} + Expect(env.Client.List(env.Context, daemonSetList)).To(Succeed()) + + return lo.CountBy(daemonSetList.Items, func(d appsv1.DaemonSet) bool { + p := &v1.Pod{Spec: d.Spec.Template.Spec} + nodeClaimTemplate := pscheduling.NewNodeClaimTemplate(np) + if err := scheduling.Taints(nodeClaimTemplate.Spec.Taints).Tolerates(p); err != nil { + return false + } + if err := nodeClaimTemplate.Requirements.Compatible(scheduling.NewPodRequirements(p), scheduling.AllowUndefinedWellKnownLabels); err != nil { + return false + } + return true + }) +} + +func (env *Environment) GetDaemonSetOverhead(np *corev1beta1.NodePool) v1.ResourceList { + GinkgoHelper() + + // Performs the same logic as the scheduler to get the number of daemonset + // pods that we estimate we will need to schedule as overhead to each node + daemonSetList := &appsv1.DaemonSetList{} + Expect(env.Client.List(env.Context, daemonSetList)).To(Succeed()) + + return coreresources.RequestsForPods(lo.FilterMap(daemonSetList.Items, func(ds appsv1.DaemonSet, _ int) (*v1.Pod, bool) { + p := &v1.Pod{Spec: ds.Spec.Template.Spec} + nodeClaimTemplate := pscheduling.NewNodeClaimTemplate(np) + if err := scheduling.Taints(nodeClaimTemplate.Spec.Taints).Tolerates(p); err != nil { + return nil, false + } + if err := nodeClaimTemplate.Requirements.Compatible(scheduling.NewPodRequirements(p), scheduling.AllowUndefinedWellKnownLabels); err != nil { + return nil, false + } + return p, true + })...) +} diff --git a/test/pkg/environment/common/monitor.go b/test/pkg/environment/common/monitor.go new file mode 100644 index 0000000000..0f8516a992 --- /dev/null +++ b/test/pkg/environment/common/monitor.go @@ -0,0 +1,258 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package common + +import ( + "context" + "fmt" + "math" + "sync" + + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/util/sets" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/log" + + "github.com/samber/lo" + + "sigs.k8s.io/karpenter/pkg/apis/v1beta1" + "sigs.k8s.io/karpenter/pkg/utils/resources" +) + +// Monitor is used to monitor the cluster state during a running test +type Monitor struct { + ctx context.Context + kubeClient client.Client + + mu sync.RWMutex + + nodesAtReset map[string]*v1.Node +} + +type state struct { + pods v1.PodList + nodes map[string]*v1.Node // node name -> node + nodePods map[string][]*v1.Pod // node name -> pods bound to the node + nodeRequests map[string]v1.ResourceList // node name -> sum of pod resource requests +} + +func NewMonitor(ctx context.Context, kubeClient client.Client) *Monitor { + m := &Monitor{ + ctx: ctx, + kubeClient: kubeClient, + nodesAtReset: map[string]*v1.Node{}, + } + m.Reset() + return m +} + +// Reset resets the cluster monitor prior to running a test. +func (m *Monitor) Reset() { + m.mu.Lock() + defer m.mu.Unlock() + + st := m.poll() + m.nodesAtReset = deepCopyMap(st.nodes) +} + +// RestartCount returns the containers and number of restarts for that container for all containers in the pods in the +// given namespace +func (m *Monitor) RestartCount(namespace string) map[string]int { + st := m.poll() + + m.mu.RLock() + defer m.mu.RUnlock() + restarts := map[string]int{} + for _, pod := range st.pods.Items { + if pod.Namespace != namespace { + continue + } + for _, cs := range pod.Status.ContainerStatuses { + name := fmt.Sprintf("%s/%s", pod.Name, cs.Name) + restarts[name] = int(cs.RestartCount) + } + } + return restarts +} + +// NodeCount returns the current number of nodes +func (m *Monitor) NodeCount() int { + return len(m.poll().nodes) +} + +// NodeCountAtReset returns the number of nodes that were running when the monitor was last reset, typically at the +// beginning of a test +func (m *Monitor) NodeCountAtReset() int { + return len(m.NodesAtReset()) +} + +// CreatedNodeCount returns the number of nodes created since the last reset +func (m *Monitor) CreatedNodeCount() int { + return m.NodeCount() - m.NodeCountAtReset() +} + +// NodesAtReset returns a slice of nodes that the monitor saw at the last reset +func (m *Monitor) NodesAtReset() []*v1.Node { + m.mu.RLock() + defer m.mu.RUnlock() + return deepCopySlice(lo.Values(m.nodesAtReset)) +} + +// Nodes returns all the nodes on the cluster +func (m *Monitor) Nodes() []*v1.Node { + st := m.poll() + return lo.Values(st.nodes) +} + +// CreatedNodes returns the nodes that have been created since the last reset (essentially Nodes - NodesAtReset) +func (m *Monitor) CreatedNodes() []*v1.Node { + resetNodeNames := sets.NewString(lo.Map(m.NodesAtReset(), func(n *v1.Node, _ int) string { return n.Name })...) + return lo.Filter(m.Nodes(), func(n *v1.Node, _ int) bool { return !resetNodeNames.Has(n.Name) }) +} + +// DeletedNodes returns the nodes that have been deleted since the last reset (essentially NodesAtReset - Nodes) +func (m *Monitor) DeletedNodes() []*v1.Node { + currentNodeNames := sets.NewString(lo.Map(m.Nodes(), func(n *v1.Node, _ int) string { return n.Name })...) + return lo.Filter(m.NodesAtReset(), func(n *v1.Node, _ int) bool { return !currentNodeNames.Has(n.Name) }) +} + +// PendingPods returns the number of pending pods matching the given selector +func (m *Monitor) PendingPods(selector labels.Selector) []*v1.Pod { + var pods []*v1.Pod + for _, pod := range m.poll().pods.Items { + pod := pod + if pod.Status.Phase != v1.PodPending { + continue + } + if selector.Matches(labels.Set(pod.Labels)) { + pods = append(pods, &pod) + } + } + return pods +} + +func (m *Monitor) PendingPodsCount(selector labels.Selector) int { + return len(m.PendingPods(selector)) +} + +// RunningPods returns the number of running pods matching the given selector +func (m *Monitor) RunningPods(selector labels.Selector) []*v1.Pod { + var pods []*v1.Pod + for _, pod := range m.poll().pods.Items { + pod := pod + if pod.Status.Phase != v1.PodRunning { + continue + } + if selector.Matches(labels.Set(pod.Labels)) { + pods = append(pods, &pod) + } + } + return pods +} + +func (m *Monitor) RunningPodsCount(selector labels.Selector) int { + return len(m.RunningPods(selector)) +} + +func (m *Monitor) poll() state { + var nodes v1.NodeList + if err := m.kubeClient.List(m.ctx, &nodes); err != nil { + log.FromContext(m.ctx).Error(err, "failed listing nodes") + } + var pods v1.PodList + if err := m.kubeClient.List(m.ctx, &pods); err != nil { + log.FromContext(m.ctx).Error(err, "failing listing pods") + } + st := state{ + nodes: map[string]*v1.Node{}, + pods: pods, + nodePods: map[string][]*v1.Pod{}, + nodeRequests: map[string]v1.ResourceList{}, + } + for i := range nodes.Items { + st.nodes[nodes.Items[i].Name] = &nodes.Items[i] + } + // collect pods per node + for i := range pods.Items { + pod := &pods.Items[i] + if pod.Spec.NodeName == "" { + continue + } + st.nodePods[pod.Spec.NodeName] = append(st.nodePods[pod.Spec.NodeName], pod) + } + + for _, n := range nodes.Items { + st.nodeRequests[n.Name] = resources.RequestsForPods(st.nodePods[n.Name]...) + } + return st +} + +func (m *Monitor) AvgUtilization(resource v1.ResourceName) float64 { + utilization := m.nodeUtilization(resource) + sum := 0.0 + for _, v := range utilization { + sum += v + } + return sum / float64(len(utilization)) +} + +func (m *Monitor) MinUtilization(resource v1.ResourceName) float64 { + min := math.MaxFloat64 + for _, v := range m.nodeUtilization(resource) { + min = math.Min(v, min) + } + return min +} + +func (m *Monitor) nodeUtilization(resource v1.ResourceName) []float64 { + st := m.poll() + var utilization []float64 + for nodeName, requests := range st.nodeRequests { + allocatable := st.nodes[nodeName].Status.Allocatable[resource] + // skip any nodes we didn't launch + if st.nodes[nodeName].Labels[v1beta1.NodePoolLabelKey] == "" { + continue + } + if allocatable.IsZero() { + continue + } + requested := requests[resource] + utilization = append(utilization, requested.AsApproximateFloat64()/allocatable.AsApproximateFloat64()) + } + return utilization +} + +type copyable[T any] interface { + DeepCopy() T +} + +func deepCopyMap[K comparable, V copyable[V]](m map[K]V) map[K]V { + ret := map[K]V{} + for k, v := range m { + ret[k] = v.DeepCopy() + } + return ret +} + +func deepCopySlice[T copyable[T]](s []T) []T { + var ret []T + for _, elem := range s { + ret = append(ret, elem.DeepCopy()) + } + return ret +} diff --git a/test/pkg/environment/common/setup.go b/test/pkg/environment/common/setup.go new file mode 100644 index 0000000000..471c3c34bc --- /dev/null +++ b/test/pkg/environment/common/setup.go @@ -0,0 +1,166 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package common + +import ( + "fmt" + "sync" + "time" + + "github.com/samber/lo" + appsv1 "k8s.io/api/apps/v1" + v1 "k8s.io/api/core/v1" + policyv1 "k8s.io/api/policy/v1" + schedulingv1 "k8s.io/api/scheduling/v1" + storagev1 "k8s.io/api/storage/v1" + "k8s.io/apimachinery/pkg/api/equality" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/util/workqueue" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/apiutil" + + "sigs.k8s.io/karpenter/test/pkg/debug" + + "sigs.k8s.io/karpenter/pkg/apis/v1beta1" + "sigs.k8s.io/karpenter/pkg/test" + "sigs.k8s.io/karpenter/pkg/utils/pod" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +const TestingFinalizer = "testing/finalizer" + +var ( + CleanableObjects = []client.Object{ + &v1.Pod{}, + &appsv1.Deployment{}, + &appsv1.DaemonSet{}, + &policyv1.PodDisruptionBudget{}, + &v1.PersistentVolumeClaim{}, + &v1.PersistentVolume{}, + &storagev1.StorageClass{}, + &v1beta1.NodePool{}, + &v1.LimitRange{}, + &schedulingv1.PriorityClass{}, + &v1.Node{}, + &v1beta1.NodeClaim{}, + // TODO @njtran add KWOKNodeClass + } +) + +// nolint:gocyclo +func (env *Environment) BeforeEach() { + debug.BeforeEach(env.Context, env.Config, env.Client) + + // Expect this cluster to be clean for test runs to execute successfully + env.ExpectCleanCluster() + + env.Monitor.Reset() + env.StartingNodeCount = env.Monitor.NodeCountAtReset() +} + +func (env *Environment) ExpectCleanCluster() { + var nodes v1.NodeList + Expect(env.Client.List(env.Context, &nodes)).To(Succeed()) + for _, node := range nodes.Items { + if len(node.Spec.Taints) == 0 && !node.Spec.Unschedulable { + Fail(fmt.Sprintf("expected system pool node %s to be tainted", node.Name)) + } + } + var pods v1.PodList + Expect(env.Client.List(env.Context, &pods)).To(Succeed()) + for i := range pods.Items { + Expect(pod.IsProvisionable(&pods.Items[i])).To(BeFalse(), + fmt.Sprintf("expected to have no provisionable pods, found %s/%s", pods.Items[i].Namespace, pods.Items[i].Name)) + Expect(pods.Items[i].Namespace).ToNot(Equal("default"), + fmt.Sprintf("expected no pods in the `default` namespace, found %s/%s", pods.Items[i].Namespace, pods.Items[i].Name)) + } + // TODO @njtran add KWOKNodeClass + for _, obj := range []client.Object{&v1beta1.NodePool{}} { + metaList := &metav1.PartialObjectMetadataList{} + gvk := lo.Must(apiutil.GVKForObject(obj, env.Client.Scheme())) + metaList.SetGroupVersionKind(gvk) + Expect(env.Client.List(env.Context, metaList, client.Limit(1))).To(Succeed()) + Expect(metaList.Items).To(HaveLen(0), fmt.Sprintf("expected no %s to exist", gvk.Kind)) + } +} + +func (env *Environment) Cleanup() { + env.CleanupObjects(CleanableObjects...) + env.eventuallyExpectScaleDown() + env.ExpectNoCrashes() +} + +func (env *Environment) AfterEach() { + debug.AfterEach(env.Context) + env.printControllerLogs(&v1.PodLogOptions{Container: "controller"}) +} + +func (env *Environment) CleanupObjects(cleanableObjects ...client.Object) { + time.Sleep(time.Second) // wait one second to let the caches get up-to-date for deletion + wg := sync.WaitGroup{} + for _, obj := range cleanableObjects { + wg.Add(1) + go func(obj client.Object) { + defer wg.Done() + defer GinkgoRecover() + Eventually(func(g Gomega) { + // This only gets the metadata for the objects since we don't need all the details of the objects + metaList := &metav1.PartialObjectMetadataList{} + metaList.SetGroupVersionKind(lo.Must(apiutil.GVKForObject(obj, env.Client.Scheme()))) + g.Expect(env.Client.List(env, metaList, client.HasLabels([]string{test.DiscoveryLabel}))).To(Succeed()) + // Limit the concurrency of these calls to 50 workers per object so that we try to limit how aggressively we + // are deleting so that we avoid getting client-side throttled + workqueue.ParallelizeUntil(env, 50, len(metaList.Items), func(i int) { + defer GinkgoRecover() + g.Expect(env.ExpectTestingFinalizerRemoved(&metaList.Items[i])).To(Succeed()) + g.Expect(client.IgnoreNotFound(env.Client.Delete(env, &metaList.Items[i], + client.PropagationPolicy(metav1.DeletePropagationForeground), + &client.DeleteOptions{GracePeriodSeconds: lo.ToPtr(int64(0))}))).To(Succeed()) + }) + // If the deletes eventually succeed, we should have no elements here at the end of the test + g.Expect(env.Client.List(env, metaList, client.HasLabels([]string{test.DiscoveryLabel}), client.Limit(1))).To(Succeed()) + g.Expect(metaList.Items).To(HaveLen(0)) + }).Should(Succeed()) + }(obj) + } + wg.Wait() +} + +func (env *Environment) ExpectTestingFinalizerRemoved(obj client.Object) error { + metaObj := &metav1.PartialObjectMetadata{} + metaObj.SetGroupVersionKind(lo.Must(apiutil.GVKForObject(obj, env.Client.Scheme()))) + if err := env.Client.Get(env, client.ObjectKeyFromObject(obj), metaObj); err != nil { + return client.IgnoreNotFound(err) + } + deepCopy := metaObj.DeepCopy() + metaObj.Finalizers = lo.Reject(metaObj.Finalizers, func(finalizer string, _ int) bool { + return finalizer == TestingFinalizer + }) + + if !equality.Semantic.DeepEqual(metaObj, deepCopy) { + // If the Group is the "core" APIs, then we can strategic merge patch + // CRDs do not currently have support for strategic merge patching, so we can't blindly do it + // https://kubernetes.io/docs/concepts/extend-kubernetes/api-extension/custom-resources/#advanced-features-and-flexibility:~:text=Yes-,strategic%2Dmerge%2Dpatch,-The%20new%20endpoints + if metaObj.GroupVersionKind().Group == "" { + return client.IgnoreNotFound(env.Client.Patch(env, metaObj, client.StrategicMergeFrom(deepCopy))) + } + return client.IgnoreNotFound(env.Client.Patch(env, metaObj, client.MergeFrom(deepCopy))) + } + return nil +} diff --git a/test/suites/disruption/expiration_test.go b/test/suites/disruption/expiration_test.go new file mode 100644 index 0000000000..b9fb6c425f --- /dev/null +++ b/test/suites/disruption/expiration_test.go @@ -0,0 +1,676 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package disruption_test + +import ( + "fmt" + "time" + + "github.com/samber/lo" + appsv1 "k8s.io/api/apps/v1" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/apimachinery/pkg/util/sets" + "sigs.k8s.io/controller-runtime/pkg/client" + + "sigs.k8s.io/karpenter/kwok/apis/v1alpha1" + "sigs.k8s.io/karpenter/pkg/apis/v1beta1" + "sigs.k8s.io/karpenter/test/pkg/environment/common" + + "sigs.k8s.io/karpenter/pkg/test" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +var _ = Describe("Expiration", func() { + var dep *appsv1.Deployment + var selector labels.Selector + var numPods int + BeforeEach(func() { + numPods = 1 + // Add pods with a do-not-disrupt annotation so that we can check node metadata before we disrupt + dep = test.Deployment(test.DeploymentOptions{ + Replicas: int32(numPods), + PodOptions: test.PodOptions{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + "app": "my-app", + }, + Annotations: map[string]string{ + v1beta1.DoNotDisruptAnnotationKey: "true", + }, + }, + TerminationGracePeriodSeconds: lo.ToPtr[int64](0), + }, + }) + selector = labels.SelectorFromSet(dep.Spec.Selector.MatchLabels) + }) + Context("Budgets", func() { + // Two nodes, both expired or both drifted, the more drifted one with a pre-stop pod that sleeps for 300 seconds, + // and we consistently ensure that the second node is not tainted == disrupted. + It("should not continue to disrupt nodes that have been the target of pod nomination", func() { + test.ReplaceRequirements(nodePool, + v1beta1.NodeSelectorRequirementWithMinValues{ + NodeSelectorRequirement: v1.NodeSelectorRequirement{ + Key: v1alpha1.InstanceSizeLabelKey, + Operator: v1.NodeSelectorOpIn, + Values: []string{"2x"}, + }, + }, + ) + nodePool.Spec.Disruption.Budgets = []v1beta1.Budget{{ + Nodes: "100%", + }} + nodePool.Spec.Disruption.ExpireAfter = v1beta1.NillableDuration{} + + // Create a deployment with one pod to create one node. + dep = test.Deployment(test.DeploymentOptions{ + Replicas: 1, + PodOptions: test.PodOptions{ + ObjectMeta: metav1.ObjectMeta{ + Annotations: map[string]string{ + v1beta1.DoNotDisruptAnnotationKey: "true", + }, + Labels: map[string]string{"app": "large-app"}, + }, + // Each 2xlarge has 8 cpu, so each node should fit 2 pods. + ResourceRequirements: v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("3"), + }, + }, + Command: []string{"sh", "-c", "sleep 3600"}, + Image: "alpine:latest", + PreStopSleep: lo.ToPtr(int64(300)), + TerminationGracePeriodSeconds: lo.ToPtr(int64(500)), + }, + }) + selector = labels.SelectorFromSet(dep.Spec.Selector.MatchLabels) + env.ExpectCreated(nodeClass, nodePool, dep) + + env.EventuallyExpectCreatedNodeClaimCount("==", 1) + env.EventuallyExpectCreatedNodeCount("==", 1) + env.EventuallyExpectHealthyPodCount(selector, 1) + + // Set the node to unschedulable so that we can create another node with one pod. + node := env.EventuallyExpectNodeCount("==", 1)[0] + node.Spec.Unschedulable = true + env.ExpectUpdated(node) + + dep.Spec.Replicas = lo.ToPtr(int32(2)) + env.ExpectUpdated(dep) + + ncs := env.EventuallyExpectCreatedNodeClaimCount("==", 2) + env.EventuallyExpectCreatedNodeCount("==", 2) + pods := env.EventuallyExpectHealthyPodCount(selector, 2) + env.Monitor.Reset() // Reset the monitor so that we can expect a single node to be spun up after expiration + + node = env.ExpectExists(node).(*v1.Node) + node.Spec.Unschedulable = false + env.ExpectUpdated(node) + + By("enabling expiration") + nodePool.Spec.Disruption.ExpireAfter = v1beta1.NillableDuration{Duration: lo.ToPtr(time.Second * 30)} + env.ExpectUpdated(nodePool) + + // Expect that both of the nodes are expired, but not being disrupted + env.EventuallyExpectExpired(ncs...) + env.ConsistentlyExpectNoDisruptions(2, 30*time.Second) + + By("removing the do not disrupt annotations") + // Remove the do not disrupt annotation from the two pods + for _, p := range pods { + p := env.ExpectExists(p).(*v1.Pod) + delete(p.Annotations, v1beta1.DoNotDisruptAnnotationKey) + env.ExpectUpdated(p) + } + env.EventuallyExpectTaintedNodeCount("==", 1) + + By("expecting only one disruption for 60s") + // Expect only one node being disrupted as the other node should continue to be nominated. + // As the pod has a 300s pre-stop sleep. + env.ConsistentlyExpectDisruptionsWithNodeCount(1, 2, time.Minute) + }) + It("should respect budgets for empty expiration", func() { + test.ReplaceRequirements(nodePool, + v1beta1.NodeSelectorRequirementWithMinValues{ + NodeSelectorRequirement: v1.NodeSelectorRequirement{ + Key: v1alpha1.InstanceSizeLabelKey, + Operator: v1.NodeSelectorOpIn, + Values: []string{"2xlarge"}, + }, + }, + ) + nodePool.Spec.Disruption.Budgets = []v1beta1.Budget{{ + Nodes: "50%", + }} + nodePool.Spec.Disruption.ExpireAfter = v1beta1.NillableDuration{} + + numPods = 6 + dep = test.Deployment(test.DeploymentOptions{ + Replicas: int32(numPods), + PodOptions: test.PodOptions{ + ObjectMeta: metav1.ObjectMeta{ + Annotations: map[string]string{ + v1beta1.DoNotDisruptAnnotationKey: "true", + }, + Labels: map[string]string{"app": "large-app"}, + }, + // Each 2xlarge has 8 cpu, so each node should fit 2 pods. + ResourceRequirements: v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("3"), + }, + }, + }, + }) + selector = labels.SelectorFromSet(dep.Spec.Selector.MatchLabels) + env.ExpectCreated(nodeClass, nodePool, dep) + + nodeClaims := env.EventuallyExpectCreatedNodeClaimCount("==", 3) + nodes := env.EventuallyExpectCreatedNodeCount("==", 3) + env.EventuallyExpectHealthyPodCount(selector, numPods) + env.Monitor.Reset() // Reset the monitor so that we can expect a single node to be spun up after expiration + + By("adding finalizers to the nodes to prevent termination") + // Add a finalizer to each node so that we can stop termination disruptions + for _, node := range nodes { + Expect(env.Client.Get(env.Context, client.ObjectKeyFromObject(node), node)).To(Succeed()) + node.Finalizers = append(node.Finalizers, common.TestingFinalizer) + env.ExpectUpdated(node) + } + + By("making the nodes empty") + // Delete the deployment to make all nodes empty. + env.ExpectDeleted(dep) + + By("enabling expiration") + nodePool.Spec.Disruption.ExpireAfter = v1beta1.NillableDuration{Duration: lo.ToPtr(time.Second * 30)} + env.ExpectUpdated(nodePool) + + env.EventuallyExpectExpired(nodeClaims...) + + // Expect that two nodes are tainted. + env.EventuallyExpectTaintedNodeCount("==", 2) + nodes = env.ConsistentlyExpectDisruptionsWithNodeCount(2, 3, 5*time.Second) + + // Remove finalizers + for _, node := range nodes { + Expect(env.ExpectTestingFinalizerRemoved(node)).To(Succeed()) + } + + // After the deletion timestamp is set and all pods are drained + // the node should be gone + env.EventuallyExpectNotFound(nodes[0], nodes[1]) + + // Expect that only one node is tainted, even considering the new node that was just created. + nodes = env.EventuallyExpectTaintedNodeCount("==", 1) + + // Expect the finalizers to be removed and deleted. + Expect(env.ExpectTestingFinalizerRemoved(nodes[0])).To(Succeed()) + env.EventuallyExpectNotFound(nodes[0]) + }) + It("should respect budgets for non-empty delete expiration", func() { + nodePool = test.ReplaceRequirements(nodePool, + v1beta1.NodeSelectorRequirementWithMinValues{ + NodeSelectorRequirement: v1.NodeSelectorRequirement{ + Key: v1alpha1.InstanceSizeLabelKey, + Operator: v1.NodeSelectorOpIn, + Values: []string{"2xlarge"}, + }, + }, + ) + // We're expecting to create 3 nodes, so we'll expect to see at most 2 nodes deleting at one time. + nodePool.Spec.Disruption.Budgets = []v1beta1.Budget{{ + Nodes: "50%", + }} + // disable expiration so that we can enable it later when we want. + nodePool.Spec.Disruption.ExpireAfter = v1beta1.NillableDuration{} + numPods = 9 + dep = test.Deployment(test.DeploymentOptions{ + Replicas: int32(numPods), + PodOptions: test.PodOptions{ + ObjectMeta: metav1.ObjectMeta{ + Annotations: map[string]string{ + v1beta1.DoNotDisruptAnnotationKey: "true", + }, + Labels: map[string]string{"app": "large-app"}, + }, + // Each 2xlarge has 8 cpu, so each node should fit no more than 3 pods. + ResourceRequirements: v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("2100m"), + }, + }, + }, + }) + selector = labels.SelectorFromSet(dep.Spec.Selector.MatchLabels) + env.ExpectCreated(nodeClass, nodePool, dep) + + nodeClaims := env.EventuallyExpectCreatedNodeClaimCount("==", 3) + nodes := env.EventuallyExpectCreatedNodeCount("==", 3) + env.EventuallyExpectHealthyPodCount(selector, numPods) + + By("scaling down the deployment") + // Update the deployment to a third of the replicas. + dep.Spec.Replicas = lo.ToPtr[int32](3) + env.ExpectUpdated(dep) + + // First expect there to be 3 pods, then try to spread the pods. + env.EventuallyExpectHealthyPodCount(selector, 3) + env.ForcePodsToSpread(nodes...) + env.EventuallyExpectHealthyPodCount(selector, 3) + + By("cordoning and adding finalizer to the nodes") + // Add a finalizer to each node so that we can stop termination disruptions + for _, node := range nodes { + Expect(env.Client.Get(env.Context, client.ObjectKeyFromObject(node), node)).To(Succeed()) + node.Finalizers = append(node.Finalizers, common.TestingFinalizer) + env.ExpectUpdated(node) + } + + By("expiring the nodes") + // expire the nodeclaims + nodePool.Spec.Disruption.ExpireAfter = v1beta1.NillableDuration{Duration: lo.ToPtr(time.Second * 30)} + env.ExpectUpdated(nodePool) + + env.EventuallyExpectExpired(nodeClaims...) + + By("enabling disruption by removing the do not disrupt annotation") + pods := env.EventuallyExpectHealthyPodCount(selector, 3) + // Remove the do-not-disrupt annotation so that the nodes are now disruptable + for _, pod := range pods { + delete(pod.Annotations, v1beta1.DoNotDisruptAnnotationKey) + env.ExpectUpdated(pod) + } + + // Ensure that we get two nodes tainted, and they have overlap during the expiration + env.EventuallyExpectTaintedNodeCount("==", 2) + nodes = env.ConsistentlyExpectDisruptionsWithNodeCount(2, 3, 5*time.Second) + + By("removing the finalizer from the nodes") + Expect(env.ExpectTestingFinalizerRemoved(nodes[0])).To(Succeed()) + Expect(env.ExpectTestingFinalizerRemoved(nodes[1])).To(Succeed()) + + // After the deletion timestamp is set and all pods are drained + // the node should be gone + env.EventuallyExpectNotFound(nodes[0], nodes[1]) + }) + It("should respect budgets for non-empty replace expiration", func() { + appLabels := map[string]string{"app": "large-app"} + nodePool.Labels = appLabels + // We're expecting to create 5 nodes, so we'll expect to see at most 3 nodes deleting at one time. + nodePool.Spec.Disruption.Budgets = []v1beta1.Budget{{ + Nodes: "3", + }} + + // Create a 5 pod deployment with hostname inter-pod anti-affinity to ensure each pod is placed on a unique node + selector = labels.SelectorFromSet(appLabels) + numPods = 5 + deployment := test.Deployment(test.DeploymentOptions{ + Replicas: int32(numPods), + PodOptions: test.PodOptions{ + ObjectMeta: metav1.ObjectMeta{ + Labels: appLabels, + }, + PodAntiRequirements: []v1.PodAffinityTerm{{ + TopologyKey: v1.LabelHostname, + LabelSelector: &metav1.LabelSelector{ + MatchLabels: appLabels, + }, + }}, + }, + }) + + env.ExpectCreated(nodeClass, nodePool, deployment) + + env.EventuallyExpectCreatedNodeClaimCount("==", numPods) + nodes := env.EventuallyExpectCreatedNodeCount("==", numPods) + + // Check that all daemonsets and deployment pods are online + env.EventuallyExpectHealthyPodCount(selector, numPods) + + By("cordoning and adding finalizer to the nodes") + // Add a finalizer to each node so that we can stop termination disruptions + for _, node := range nodes { + Expect(env.Client.Get(env.Context, client.ObjectKeyFromObject(node), node)).To(Succeed()) + node.Finalizers = append(node.Finalizers, common.TestingFinalizer) + env.ExpectUpdated(node) + } + + By("enabling expiration") + nodePool.Spec.Disruption.ExpireAfter = v1beta1.NillableDuration{Duration: lo.ToPtr(30 * time.Second)} + env.ExpectUpdated(nodePool) + + // Ensure that we get two nodes tainted, and they have overlap during the expiration + env.EventuallyExpectTaintedNodeCount("==", 3) + env.EventuallyExpectNodeClaimCount("==", 8) + env.EventuallyExpectNodeCount("==", 8) + nodes = env.ConsistentlyExpectDisruptionsWithNodeCount(3, 8, 5*time.Second) + + // Set the expireAfter to "Never" to make sure new node isn't deleted + // This is CRITICAL since it prevents nodes that are immediately spun up from immediately being expired and + // racing at the end of the E2E test, leaking node resources into subsequent tests + nodePool.Spec.Disruption.ExpireAfter.Duration = nil + env.ExpectUpdated(nodePool) + + for _, node := range nodes { + Expect(env.ExpectTestingFinalizerRemoved(node)).To(Succeed()) + } + + env.EventuallyExpectNotFound(nodes[0], nodes[1], nodes[2]) + env.ExpectNodeCount("==", 5) + }) + It("should not allow expiration if the budget is fully blocking", func() { + // We're going to define a budget that doesn't allow any expirations to happen + nodePool.Spec.Disruption.Budgets = []v1beta1.Budget{{ + Nodes: "0", + }} + + dep.Spec.Template.Annotations = nil + env.ExpectCreated(nodeClass, nodePool, dep) + + nodeClaim := env.EventuallyExpectCreatedNodeClaimCount("==", 1)[0] + env.EventuallyExpectCreatedNodeCount("==", 1) + env.EventuallyExpectHealthyPodCount(selector, numPods) + + env.EventuallyExpectExpired(nodeClaim) + env.ConsistentlyExpectNoDisruptions(1, time.Minute) + }) + It("should not allow expiration if the budget is fully blocking during a scheduled time", func() { + // We're going to define a budget that doesn't allow any expirations to happen + // This is going to be on a schedule that only lasts 30 minutes, whose window starts 15 minutes before + // the current time and extends 15 minutes past the current time + // Times need to be in UTC since the karpenter containers were built in UTC time + windowStart := time.Now().Add(-time.Minute * 15).UTC() + nodePool.Spec.Disruption.Budgets = []v1beta1.Budget{{ + Nodes: "0", + Schedule: lo.ToPtr(fmt.Sprintf("%d %d * * *", windowStart.Minute(), windowStart.Hour())), + Duration: &metav1.Duration{Duration: time.Minute * 30}, + }} + + dep.Spec.Template.Annotations = nil + env.ExpectCreated(nodeClass, nodePool, dep) + + nodeClaim := env.EventuallyExpectCreatedNodeClaimCount("==", 1)[0] + env.EventuallyExpectCreatedNodeCount("==", 1) + env.EventuallyExpectHealthyPodCount(selector, numPods) + + env.EventuallyExpectExpired(nodeClaim) + env.ConsistentlyExpectNoDisruptions(1, time.Minute) + }) + }) + It("should expire the node after the expiration is reached", func() { + env.ExpectCreated(nodeClass, nodePool, dep) + + nodeClaim := env.EventuallyExpectCreatedNodeClaimCount("==", 1)[0] + node := env.EventuallyExpectCreatedNodeCount("==", 1)[0] + env.EventuallyExpectHealthyPodCount(selector, numPods) + env.Monitor.Reset() // Reset the monitor so that we can expect a single node to be spun up after expiration + + env.EventuallyExpectExpired(nodeClaim) + + // Remove the do-not-disrupt annotation so that the Nodes are now deprovisionable + for _, pod := range env.ExpectPodsMatchingSelector(selector) { + delete(pod.Annotations, v1beta1.DoNotDisruptAnnotationKey) + env.ExpectUpdated(pod) + } + + // Eventually the node will be tainted, which means its actively being disrupted + Eventually(func(g Gomega) { + g.Expect(env.Client.Get(env.Context, client.ObjectKeyFromObject(node), node)).Should(Succeed()) + _, ok := lo.Find(node.Spec.Taints, func(t v1.Taint) bool { + return v1beta1.IsDisruptingTaint(t) + }) + g.Expect(ok).To(BeTrue()) + }).Should(Succeed()) + + // Set the expireAfter to "Never" to make sure new node isn't deleted + // This is CRITICAL since it prevents nodes that are immediately spun up from immediately being expired and + // racing at the end of the E2E test, leaking node resources into subsequent tests + nodePool.Spec.Disruption.ExpireAfter.Duration = nil + env.ExpectUpdated(nodePool) + + // After the deletion timestamp is set and all pods are drained + // the node should be gone + env.EventuallyExpectNotFound(nodeClaim, node) + + env.EventuallyExpectCreatedNodeClaimCount("==", 1) + env.EventuallyExpectCreatedNodeCount("==", 1) + env.EventuallyExpectHealthyPodCount(selector, numPods) + }) + It("should replace expired node with a single node and schedule all pods", func() { + var numPods int32 = 5 + // We should setup a PDB that will only allow a minimum of 1 pod to be pending at a time + minAvailable := intstr.FromInt32(numPods - 1) + pdb := test.PodDisruptionBudget(test.PDBOptions{ + Labels: map[string]string{ + "app": "large-app", + }, + MinAvailable: &minAvailable, + }) + dep := test.Deployment(test.DeploymentOptions{ + Replicas: numPods, + PodOptions: test.PodOptions{ + ObjectMeta: metav1.ObjectMeta{ + Annotations: map[string]string{ + v1beta1.DoNotDisruptAnnotationKey: "true", + }, + Labels: map[string]string{"app": "large-app"}, + }, + }, + }) + selector := labels.SelectorFromSet(dep.Spec.Selector.MatchLabels) + env.ExpectCreated(nodeClass, nodePool, pdb, dep) + + nodeClaim := env.EventuallyExpectCreatedNodeClaimCount("==", 1)[0] + node := env.EventuallyExpectCreatedNodeCount("==", 1)[0] + env.EventuallyExpectHealthyPodCount(selector, int(numPods)) + env.Monitor.Reset() // Reset the monitor so that we can expect a single node to be spun up after expiration + + // Set the expireAfter value to get the node deleted + nodePool.Spec.Disruption.ExpireAfter.Duration = lo.ToPtr(time.Minute) + env.ExpectUpdated(nodePool) + + env.EventuallyExpectExpired(nodeClaim) + + // Remove the do-not-disruption annotation so that the Nodes are now deprovisionable + for _, pod := range env.ExpectPodsMatchingSelector(selector) { + delete(pod.Annotations, v1beta1.DoNotDisruptAnnotationKey) + env.ExpectUpdated(pod) + } + + // Eventually the node will be tainted, which means its actively being disrupted + Eventually(func(g Gomega) { + g.Expect(env.Client.Get(env.Context, client.ObjectKeyFromObject(node), node)).Should(Succeed()) + _, ok := lo.Find(node.Spec.Taints, func(t v1.Taint) bool { + return v1beta1.IsDisruptingTaint(t) + }) + g.Expect(ok).To(BeTrue()) + }).Should(Succeed()) + + // Set the expireAfter to "Never" to make sure new node isn't deleted + // This is CRITICAL since it prevents nodes that are immediately spun up from immediately being expired and + // racing at the end of the E2E test, leaking node resources into subsequent tests + nodePool.Spec.Disruption.ExpireAfter.Duration = nil + env.ExpectUpdated(nodePool) + + // After the deletion timestamp is set and all pods are drained + // the node should be gone + env.EventuallyExpectNotFound(nodeClaim, node) + + env.EventuallyExpectCreatedNodeClaimCount("==", 1) + env.EventuallyExpectCreatedNodeCount("==", 1) + env.EventuallyExpectHealthyPodCount(selector, int(numPods)) + }) + Context("Failure", func() { + It("should not continue to expire if a node never registers", func() { + // Launch a new NodeClaim + var numPods int32 = 2 + dep := test.Deployment(test.DeploymentOptions{ + Replicas: 2, + PodOptions: test.PodOptions{ + ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{"app": "inflate"}}, + PodAntiRequirements: []v1.PodAffinityTerm{{ + TopologyKey: v1.LabelHostname, + LabelSelector: &metav1.LabelSelector{ + MatchLabels: map[string]string{"app": "inflate"}, + }}, + }, + }, + }) + env.ExpectCreated(dep, nodeClass, nodePool) + + startingNodeClaimState := env.EventuallyExpectCreatedNodeClaimCount("==", int(numPods)) + env.EventuallyExpectCreatedNodeCount("==", int(numPods)) + + env.ExpectCreatedOrUpdated(nodeClass) + + env.EventuallyExpectExpired(startingNodeClaimState...) + + // Expect nodes To get tainted + taintedNodes := env.EventuallyExpectTaintedNodeCount("==", 1) + + // Expire should fail and the original node should be untainted + // TODO: reduce timeouts when deprovisioning waits are factored out + env.EventuallyExpectNodesUntaintedWithTimeout(11*time.Minute, taintedNodes...) + + // The nodeclaims that never registers will be removed + Eventually(func(g Gomega) { + nodeClaims := &v1beta1.NodeClaimList{} + g.Expect(env.Client.List(env, nodeClaims, client.HasLabels{test.DiscoveryLabel})).To(Succeed()) + g.Expect(len(nodeClaims.Items)).To(BeNumerically("==", int(numPods))) + }).WithTimeout(6 * time.Minute).Should(Succeed()) + + // Expect all the NodeClaims that existed on the initial provisioning loop are not removed + Consistently(func(g Gomega) { + nodeClaims := &v1beta1.NodeClaimList{} + g.Expect(env.Client.List(env, nodeClaims, client.HasLabels{test.DiscoveryLabel})).To(Succeed()) + + startingNodeClaimUIDs := lo.Map(startingNodeClaimState, func(nc *v1beta1.NodeClaim, _ int) types.UID { return nc.UID }) + nodeClaimUIDs := lo.Map(nodeClaims.Items, func(nc v1beta1.NodeClaim, _ int) types.UID { return nc.UID }) + g.Expect(sets.New(nodeClaimUIDs...).IsSuperset(sets.New(startingNodeClaimUIDs...))).To(BeTrue()) + }, "2m").Should(Succeed()) + }) + It("should not continue to expire if a node registers but never becomes initialized", func() { + // Set a configuration that will allow us to make a NodeClaim not be initialized + nodePool.Spec.Template.Spec.StartupTaints = []v1.Taint{{Key: "example.com/taint", Effect: v1.TaintEffectPreferNoSchedule}} + + // Launch a new NodeClaim + var numPods int32 = 2 + dep := test.Deployment(test.DeploymentOptions{ + Replicas: 2, + PodOptions: test.PodOptions{ + ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{"app": "inflate"}}, + PodAntiRequirements: []v1.PodAffinityTerm{{ + TopologyKey: v1.LabelHostname, + LabelSelector: &metav1.LabelSelector{ + MatchLabels: map[string]string{"app": "inflate"}, + }}, + }, + }, + }) + env.ExpectCreated(dep, nodeClass, nodePool) + + startingNodeClaimState := env.EventuallyExpectCreatedNodeClaimCount("==", int(numPods)) + nodes := env.EventuallyExpectCreatedNodeCount("==", int(numPods)) + + // Remove the startup taints from these nodes to initialize them + Eventually(func(g Gomega) { + for _, node := range nodes { + g.Expect(env.Client.Get(env.Context, client.ObjectKeyFromObject(node), node)).To(Succeed()) + stored := node.DeepCopy() + node.Spec.Taints = lo.Reject(node.Spec.Taints, func(t v1.Taint, _ int) bool { return t.Key == "example.com/taint" }) + g.Expect(env.Client.Patch(env.Context, node, client.StrategicMergeFrom(stored))).To(Succeed()) + } + }).Should(Succeed()) + + env.EventuallyExpectExpired(startingNodeClaimState...) + + // Expect nodes To be tainted + taintedNodes := env.EventuallyExpectTaintedNodeCount("==", 1) + + // Expire should fail and original node should be untainted and no NodeClaims should be removed + // TODO: reduce timeouts when deprovisioning waits are factored out + env.EventuallyExpectNodesUntaintedWithTimeout(11*time.Minute, taintedNodes...) + + // Expect that the new NodeClaim/Node is kept around after the un-cordon + nodeList := &v1.NodeList{} + Expect(env.Client.List(env, nodeList, client.HasLabels{test.DiscoveryLabel})).To(Succeed()) + Expect(nodeList.Items).To(HaveLen(int(numPods) + 1)) + + nodeClaimList := &v1beta1.NodeClaimList{} + Expect(env.Client.List(env, nodeClaimList, client.HasLabels{test.DiscoveryLabel})).To(Succeed()) + Expect(nodeClaimList.Items).To(HaveLen(int(numPods) + 1)) + + // Expect all the NodeClaims that existed on the initial provisioning loop are not removed + Consistently(func(g Gomega) { + nodeClaims := &v1beta1.NodeClaimList{} + g.Expect(env.Client.List(env, nodeClaims, client.HasLabels{test.DiscoveryLabel})).To(Succeed()) + + startingNodeClaimUIDs := lo.Map(startingNodeClaimState, func(nc *v1beta1.NodeClaim, _ int) types.UID { return nc.UID }) + nodeClaimUIDs := lo.Map(nodeClaims.Items, func(nc v1beta1.NodeClaim, _ int) types.UID { return nc.UID }) + g.Expect(sets.New(nodeClaimUIDs...).IsSuperset(sets.New(startingNodeClaimUIDs...))).To(BeTrue()) + }, "2m").Should(Succeed()) + }) + It("should not expire any nodes if their PodDisruptionBudgets are unhealthy", func() { + // Create a deployment that contains a readiness probe that will never succeed + // This way, the pod will bind to the node, but the PodDisruptionBudget will never go healthy + var numPods int32 = 2 + dep := test.Deployment(test.DeploymentOptions{ + Replicas: 2, + PodOptions: test.PodOptions{ + ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{"app": "inflate"}}, + PodAntiRequirements: []v1.PodAffinityTerm{{ + TopologyKey: v1.LabelHostname, + LabelSelector: &metav1.LabelSelector{ + MatchLabels: map[string]string{"app": "inflate"}, + }}, + }, + ReadinessProbe: &v1.Probe{ + ProbeHandler: v1.ProbeHandler{ + HTTPGet: &v1.HTTPGetAction{ + Port: intstr.FromInt32(80), + }, + }, + }, + }, + }) + selector := labels.SelectorFromSet(dep.Spec.Selector.MatchLabels) + minAvailable := intstr.FromInt32(numPods - 1) + pdb := test.PodDisruptionBudget(test.PDBOptions{ + Labels: dep.Spec.Template.Labels, + MinAvailable: &minAvailable, + }) + env.ExpectCreated(dep, nodeClass, nodePool, pdb) + + nodeClaims := env.EventuallyExpectCreatedNodeClaimCount("==", int(numPods)) + env.EventuallyExpectCreatedNodeCount("==", int(numPods)) + + // Expect pods to be bound but not to be ready since we are intentionally failing the readiness check + env.EventuallyExpectBoundPodCount(selector, int(numPods)) + + env.EventuallyExpectExpired(nodeClaims...) + env.ConsistentlyExpectNoDisruptions(int(numPods), time.Minute) + }) + }) +}) diff --git a/test/suites/disruption/suite_test.go b/test/suites/disruption/suite_test.go new file mode 100644 index 0000000000..e8a166a925 --- /dev/null +++ b/test/suites/disruption/suite_test.go @@ -0,0 +1,54 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package disruption_test + +import ( + "testing" + "time" + + "sigs.k8s.io/karpenter/kwok/apis/v1alpha1" + "sigs.k8s.io/karpenter/pkg/apis/v1beta1" + "sigs.k8s.io/karpenter/test/pkg/environment/common" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "github.com/samber/lo" +) + +var nodePool *v1beta1.NodePool +var nodeClass *v1alpha1.KWOKNodeClass +var env *common.Environment + +func TestDisruption(t *testing.T) { + RegisterFailHandler(Fail) + BeforeSuite(func() { + env = common.NewEnvironment(t) + }) + AfterSuite(func() { + env.Stop() + }) + RunSpecs(t, "Disruption") +} + +var _ = BeforeEach(func() { + env.BeforeEach() + nodeClass = env.DefaultNodeClass() + nodePool = env.DefaultNodePool(nodeClass) + nodePool.Spec.Disruption.ExpireAfter = v1beta1.NillableDuration{Duration: lo.ToPtr(time.Second * 30)} +}) +var _ = AfterEach(func() { env.Cleanup() }) +var _ = AfterEach(func() { env.AfterEach() })