Skip to content

Commit

Permalink
per conversation with @alculquicondor, removing the the option
Browse files Browse the repository at this point in the history
  • Loading branch information
emsixteeen committed Feb 5, 2024
1 parent df31981 commit 38299ef
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 39 deletions.
5 changes: 0 additions & 5 deletions cmd/mpi-operator/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ type ServerOption struct {
LockNamespace string
QPS int
Burst int
FailInformersOnErr bool
}

// NewServerOption creates a new CMServer with a default config.
Expand Down Expand Up @@ -76,8 +75,4 @@ func (s *ServerOption) AddFlags(fs *flag.FlagSet) {

fs.IntVar(&s.QPS, "kube-api-qps", 5, "QPS indicates the maximum QPS to the master from this client.")
fs.IntVar(&s.Burst, "kube-api-burst", 10, "Maximum burst for throttle.")

fs.BoolVar(&s.FailInformersOnErr, "fail-informers-on-error", false,
`Exit the mpi-operator if it fails to list or watch objects from the API server due to lack of permissions (instead of retrying indefinitely).
Note: This only applies if a list/watch fails with Unauthorized or Forbidden errors.`)
}
2 changes: 1 addition & 1 deletion cmd/mpi-operator/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func Run(opt *options.ServerOption) error {
kubeInformerFactory.Core().V1().Pods(),
kubeInformerFactory.Scheduling().V1().PriorityClasses(),
kubeflowInformerFactory.Kubeflow().V2beta1().MPIJobs(),
namespace, opt.GangSchedulingName, opt.FailInformersOnErr)
namespace, opt.GangSchedulingName)
if err != nil {
klog.Fatalf("Failed to setup the controller")
}
Expand Down
56 changes: 27 additions & 29 deletions pkg/controller/mpi_job_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,10 +272,10 @@ func NewMPIJobController(
podInformer coreinformers.PodInformer,
priorityClassInformer schedulinginformers.PriorityClassInformer,
mpiJobInformer informers.MPIJobInformer,
namespace, gangSchedulingName string, failInformersOnErr bool) (*MPIJobController, error) {
namespace, gangSchedulingName string) (*MPIJobController, error) {
return NewMPIJobControllerWithClock(kubeClient, kubeflowClient, volcanoClient, schedClient,
configMapInformer, secretInformer, serviceInformer, jobInformer, podInformer,
priorityClassInformer, mpiJobInformer, &clock.RealClock{}, namespace, gangSchedulingName, failInformersOnErr)
priorityClassInformer, mpiJobInformer, &clock.RealClock{}, namespace, gangSchedulingName)
}

// NewMPIJobControllerWithClock returns a new MPIJob controller.
Expand All @@ -292,7 +292,7 @@ func NewMPIJobControllerWithClock(
priorityClassInformer schedulinginformers.PriorityClassInformer,
mpiJobInformer informers.MPIJobInformer,
clock clock.WithTicker,
namespace, gangSchedulingName string, failInformersOnErr bool) (*MPIJobController, error) {
namespace, gangSchedulingName string) (*MPIJobController, error) {

// Create event broadcaster.
klog.V(4).Info("Creating event broadcaster")
Expand Down Expand Up @@ -346,33 +346,31 @@ func NewMPIJobControllerWithClock(

controller.updateStatusHandler = controller.doUpdateJobStatus

// Set up error handlers for informers if asked to do so
if failInformersOnErr {
klog.Info("Setting up informer error handlers")
informers := map[string]cache.SharedInformer{
"configMapInformer": configMapInformer.Informer(),
"secretInformer": secretInformer.Informer(),
"serviceInformer": serviceInformer.Informer(),
"jobInformer": jobInformer.Informer(),
"podInformer": podInformer.Informer(),
"priorityClassInformer": priorityClassInformer.Informer(),
"mpiJobInformer": mpiJobInformer.Informer(),
}

for name, informer := range informers {
err := informer.SetWatchErrorHandler(func(r *cache.Reflector, err error) {
// Pipe to default handler first, which just logs the error
cache.DefaultWatchErrorHandler(r, err)

if errors.IsUnauthorized(err) || errors.IsForbidden(err) {
klog.Fatalf("Unable to sync cache for informer %s: %s. Exiting.", name, err)
}
})

if err != nil {
// return NewMPIJobControllerWithClock(...) (nil, error)
return nil, fmt.Errorf("unable to set error handler for informer %s: %s", name, err)
// Set up error handlers for informers
klog.Info("Setting up informer error handlers")
informers := map[string]cache.SharedInformer{
"configMapInformer": configMapInformer.Informer(),
"secretInformer": secretInformer.Informer(),
"serviceInformer": serviceInformer.Informer(),
"jobInformer": jobInformer.Informer(),
"podInformer": podInformer.Informer(),
"priorityClassInformer": priorityClassInformer.Informer(),
"mpiJobInformer": mpiJobInformer.Informer(),
}

for name, informer := range informers {
err := informer.SetWatchErrorHandler(func(r *cache.Reflector, err error) {
// Pipe to default handler first, which just logs the error
cache.DefaultWatchErrorHandler(r, err)

if errors.IsUnauthorized(err) || errors.IsForbidden(err) {
klog.Fatalf("Unable to sync cache for informer %s: %s. Requesting controller to exit.", name, err)
}
})

if err != nil {
// return NewMPIJobControllerWithClock(...) (nil, error)
return nil, fmt.Errorf("unable to set error handler for informer %s: %s", name, err)
}
}

Expand Down
3 changes: 0 additions & 3 deletions pkg/controller/mpi_job_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@ type fixture struct {
objects []runtime.Object

gangSchedulingName string
failInformersOnErr bool
}

func newFixture(t *testing.T, gangSchedulingName string) *fixture {
Expand All @@ -95,7 +94,6 @@ func newFixture(t *testing.T, gangSchedulingName string) *fixture {
f.objects = []runtime.Object{}
f.kubeObjects = []runtime.Object{}
f.gangSchedulingName = gangSchedulingName
f.failInformersOnErr = false
return f
}

Expand Down Expand Up @@ -178,7 +176,6 @@ func (f *fixture) newController(clock clock.WithTicker) (*MPIJobController, info
clock,
metav1.NamespaceAll,
f.gangSchedulingName,
f.failInformersOnErr,
)
if err != nil {
fmt.Println("Failed to setup the controller")
Expand Down
2 changes: 1 addition & 1 deletion test/integration/mpi_job_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -855,7 +855,7 @@ func startController(
kubeInformerFactory.Core().V1().Pods(),
kubeInformerFactory.Scheduling().V1().PriorityClasses(),
mpiInformerFactory.Kubeflow().V2beta1().MPIJobs(),
metav1.NamespaceAll, schedulerName, false,
metav1.NamespaceAll, schedulerName,
)
if err != nil {
panic(err)
Expand Down

0 comments on commit 38299ef

Please sign in to comment.