From cbe4f8aab9445f80bdae60868a040f59a3863c81 Mon Sep 17 00:00:00 2001 From: Rotem Elad <165548751+roteme-runai@users.noreply.github.com> Date: Mon, 13 Jan 2025 03:47:07 +0200 Subject: [PATCH] Expose job controller's workqueue rate limiting configs (#674) * Expose controller workqueue config via options Signed-off-by: Rotem Elad * Fix double hyphen typo Signed-off-by: Rotem Elad * Generate Signed-off-by: Rotem Elad --------- Signed-off-by: Rotem Elad --- ADOPTERS.md | 1 + cmd/mpi-operator/app/options/options.go | 25 ++++++++++++--------- cmd/mpi-operator/app/server.go | 13 ++++++++++- pkg/controller/mpi_job_controller.go | 10 +++++---- pkg/controller/mpi_job_controller_test.go | 3 +++ test/integration/mpi_job_controller_test.go | 3 +++ 6 files changed, 40 insertions(+), 15 deletions(-) diff --git a/ADOPTERS.md b/ADOPTERS.md index 8e4a51655..06a1254be 100644 --- a/ADOPTERS.md +++ b/ADOPTERS.md @@ -19,4 +19,5 @@ This page contains a list of organizations who are using MPI Operator. If you'd | [PITS Global Data Recovery Services](https://www.pitsdatarecovery.net/) | [Benjamin Trudeau](https://github.com/benjx1990) | | [Polyaxon](https://polyaxon.com/) | [Mourad Mourafiq](https://github.com/mouradmourafiq) | | [Qutoutiao](https://www.qutoutiao.net/) | [Zhaojing Yu](https://github.com/yuzhaojing) | +| [Run:AI](https://www.run.ai/) | [Rotem Elad](https://github.com/roteme-runai) | | [Tencent](http://tencent.com/en-us/) | [Lei Xue](https://github.com/carmark) | diff --git a/cmd/mpi-operator/app/options/options.go b/cmd/mpi-operator/app/options/options.go index 432e914a3..e5179277b 100644 --- a/cmd/mpi-operator/app/options/options.go +++ b/cmd/mpi-operator/app/options/options.go @@ -28,16 +28,18 @@ const ( // ServerOption is the main context object for the controller manager. type ServerOption struct { - Kubeconfig string - MasterURL string - Threadiness int - MonitoringPort int - PrintVersion bool - GangSchedulingName string - Namespace string - LockNamespace string - QPS int - Burst int + Kubeconfig string + MasterURL string + Threadiness int + MonitoringPort int + PrintVersion bool + GangSchedulingName string + Namespace string + LockNamespace string + QPS int + Burst int + ControllerRateLimit int + ControllerBurst int } // NewServerOption creates a new CMServer with a default config. @@ -75,4 +77,7 @@ func (s *ServerOption) AddFlags(fs *flag.FlagSet) { fs.IntVar(&s.QPS, "kube-api-qps", 5, "QPS indicates the maximum QPS to the master from this client.") fs.IntVar(&s.Burst, "kube-api-burst", 10, "Maximum burst for throttle.") + + fs.IntVar(&s.ControllerRateLimit, "controller-queue-rate-limit", 10, "Rate limit of the controller events queue .") + fs.IntVar(&s.ControllerBurst, "controller-queue-burst", 100, "Maximum burst of the controller events queue.") } diff --git a/cmd/mpi-operator/app/server.go b/cmd/mpi-operator/app/server.go index 4024ca63a..0ff59cbd0 100644 --- a/cmd/mpi-operator/app/server.go +++ b/cmd/mpi-operator/app/server.go @@ -23,6 +23,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" + "golang.org/x/time/rate" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -38,6 +39,7 @@ import ( election "k8s.io/client-go/tools/leaderelection" "k8s.io/client-go/tools/leaderelection/resourcelock" "k8s.io/client-go/tools/record" + "k8s.io/client-go/util/workqueue" "k8s.io/klog" schedclientset "sigs.k8s.io/scheduler-plugins/pkg/generated/clientset/versioned" volcanoclient "volcano.sh/apis/pkg/client/clientset/versioned" @@ -67,6 +69,9 @@ var ( // allowed for timeout. Checks within the timeout period after the lease // expires will still return healthy. leaderHealthzAdaptorTimeout = time.Second * 20 + //exponential workqueue rate limiting config + workqueueExponentialBaseDelay = 5 * time.Millisecond + workqueueExponentialMaxDelay = 1000 * time.Second ) var ( @@ -141,6 +146,11 @@ func Run(opt *options.ServerOption) error { kubeInformerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(kubeClient, 0, kubeInformerFactoryOpts...) kubeflowInformerFactory := informers.NewSharedInformerFactoryWithOptions(mpiJobClientSet, 0, kubeflowInformerFactoryOpts...) + workqueueRateLimiter := workqueue.NewTypedMaxOfRateLimiter( + workqueue.NewTypedItemExponentialFailureRateLimiter[any](workqueueExponentialBaseDelay, workqueueExponentialMaxDelay), + &workqueue.TypedBucketRateLimiter[any]{Limiter: rate.NewLimiter(rate.Limit(opt.ControllerRateLimit), opt.ControllerBurst)}, + ) + controller, err := controllersv1.NewMPIJobController( kubeClient, mpiJobClientSet, @@ -153,7 +163,8 @@ func Run(opt *options.ServerOption) error { kubeInformerFactory.Core().V1().Pods(), kubeInformerFactory.Scheduling().V1().PriorityClasses(), kubeflowInformerFactory.Kubeflow().V2beta1().MPIJobs(), - namespace, opt.GangSchedulingName) + namespace, opt.GangSchedulingName, + workqueueRateLimiter) if err != nil { klog.Fatalf("Failed to setup the controller") } diff --git a/pkg/controller/mpi_job_controller.go b/pkg/controller/mpi_job_controller.go index 4fcca25f2..74fa5bdda 100644 --- a/pkg/controller/mpi_job_controller.go +++ b/pkg/controller/mpi_job_controller.go @@ -269,10 +269,11 @@ func NewMPIJobController( podInformer coreinformers.PodInformer, priorityClassInformer schedulinginformers.PriorityClassInformer, mpiJobInformer informers.MPIJobInformer, - namespace, gangSchedulingName string) (*MPIJobController, error) { + namespace, gangSchedulingName string, + workqueueRateLimiter workqueue.TypedRateLimiter[any]) (*MPIJobController, error) { return NewMPIJobControllerWithClock(kubeClient, kubeflowClient, volcanoClient, schedClient, configMapInformer, secretInformer, serviceInformer, jobInformer, podInformer, - priorityClassInformer, mpiJobInformer, &clock.RealClock{}, namespace, gangSchedulingName) + priorityClassInformer, mpiJobInformer, &clock.RealClock{}, namespace, gangSchedulingName, workqueueRateLimiter) } // NewMPIJobControllerWithClock returns a new MPIJob controller. @@ -289,7 +290,8 @@ func NewMPIJobControllerWithClock( priorityClassInformer schedulinginformers.PriorityClassInformer, mpiJobInformer informers.MPIJobInformer, clock clock.WithTicker, - namespace, gangSchedulingName string) (*MPIJobController, error) { + namespace, gangSchedulingName string, + workqueueRateLimiter workqueue.TypedRateLimiter[any]) (*MPIJobController, error) { // Create event broadcaster. klog.V(4).Info("Creating event broadcaster") @@ -336,7 +338,7 @@ func NewMPIJobControllerWithClock( priorityClassSynced: priorityClassSynced, mpiJobLister: mpiJobInformer.Lister(), mpiJobSynced: mpiJobInformer.Informer().HasSynced, - queue: workqueue.NewTypedRateLimitingQueueWithConfig(workqueue.DefaultTypedControllerRateLimiter[any](), workqueue.TypedRateLimitingQueueConfig[any]{Name: "MPIJob"}), + queue: workqueue.NewTypedRateLimitingQueueWithConfig(workqueueRateLimiter, workqueue.TypedRateLimitingQueueConfig[any]{Name: "MPIJob"}), recorder: recorder, clock: clock, } diff --git a/pkg/controller/mpi_job_controller_test.go b/pkg/controller/mpi_job_controller_test.go index cf51131fe..49585daa2 100644 --- a/pkg/controller/mpi_job_controller_test.go +++ b/pkg/controller/mpi_job_controller_test.go @@ -35,6 +35,7 @@ import ( core "k8s.io/client-go/testing" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" + "k8s.io/client-go/util/workqueue" "k8s.io/utils/clock" clocktesting "k8s.io/utils/clock/testing" "k8s.io/utils/ptr" @@ -163,6 +164,7 @@ func (f *fixture) newController(clock clock.WithTicker) (*MPIJobController, info f.kubeClient = k8sfake.NewSimpleClientset(f.kubeObjects...) i := informers.NewSharedInformerFactory(f.client, noResyncPeriodFunc()) k8sI := kubeinformers.NewSharedInformerFactory(f.kubeClient, noResyncPeriodFunc()) + workqueueRateLimiter := workqueue.DefaultTypedControllerRateLimiter[any]() c, err := NewMPIJobControllerWithClock( f.kubeClient, @@ -179,6 +181,7 @@ func (f *fixture) newController(clock clock.WithTicker) (*MPIJobController, info clock, metav1.NamespaceAll, f.gangSchedulingName, + workqueueRateLimiter, ) if err != nil { fmt.Println("Failed to setup the controller") diff --git a/test/integration/mpi_job_controller_test.go b/test/integration/mpi_job_controller_test.go index f260d829c..3d604c74e 100644 --- a/test/integration/mpi_job_controller_test.go +++ b/test/integration/mpi_job_controller_test.go @@ -32,6 +32,7 @@ import ( kubeinformers "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/reference" + "k8s.io/client-go/util/workqueue" "k8s.io/utils/ptr" schedv1alpha1 "sigs.k8s.io/scheduler-plugins/apis/scheduling/v1alpha1" schedclientset "sigs.k8s.io/scheduler-plugins/pkg/generated/clientset/versioned" @@ -909,6 +910,7 @@ func startController( ) { kubeInformerFactory := kubeinformers.NewSharedInformerFactory(kClient, 0) mpiInformerFactory := informers.NewSharedInformerFactory(mpiClient, 0) + workqueueRateLimiter := workqueue.DefaultTypedControllerRateLimiter[any]() var ( volcanoClient volcanoclient.Interface schedClient schedclientset.Interface @@ -935,6 +937,7 @@ func startController( kubeInformerFactory.Scheduling().V1().PriorityClasses(), mpiInformerFactory.Kubeflow().V2beta1().MPIJobs(), metav1.NamespaceAll, schedulerName, + workqueueRateLimiter, ) if err != nil { panic(err)