From 323e1ee09562ac17bde423aa185aa34b94c529bc Mon Sep 17 00:00:00 2001 From: kuizhiqing Date: Sat, 17 Feb 2024 16:57:38 +0000 Subject: [PATCH 1/5] run worker in launcher pod; fix DCO issue Signed-off-by: kuizhiqing --- deploy/v2beta1/mpi-operator.yaml | 5 ++ manifests/base/kubeflow.org_mpijobs.yaml | 5 ++ pkg/apis/kubeflow/v2beta1/swagger.json | 4 + pkg/apis/kubeflow/v2beta1/types.go | 6 ++ .../kubeflow/v2beta1/zz_generated.deepcopy.go | 5 ++ .../kubeflow/v2beta1/mpijobspec.go | 9 ++ pkg/controller/mpi_job_controller.go | 37 ++++++-- pkg/controller/mpi_job_controller_test.go | 87 +++++++++++++++---- sdk/python/v2beta1/docs/V2beta1MPIJobSpec.md | 1 + .../mpijob/models/v2beta1_mpi_job_spec.py | 30 ++++++- test/integration/mpi_job_controller_test.go | 11 ++- 11 files changed, 171 insertions(+), 29 deletions(-) diff --git a/deploy/v2beta1/mpi-operator.yaml b/deploy/v2beta1/mpi-operator.yaml index 1a6cbf8b..a8c0ed09 100644 --- a/deploy/v2beta1/mpi-operator.yaml +++ b/deploy/v2beta1/mpi-operator.yaml @@ -8190,6 +8190,11 @@ spec: description: MPIReplicaSpecs contains maps from `MPIReplicaType` to `ReplicaSpec` that specify the MPI replicas to run. type: object + runLauncherAsWorker: + default: false + description: RunLauncherAsWorker indicates wether to run worker process + in launcher Defaults to false. + type: boolean runPolicy: description: RunPolicy encapsulates various runtime policies of the job. diff --git a/manifests/base/kubeflow.org_mpijobs.yaml b/manifests/base/kubeflow.org_mpijobs.yaml index e5257f5e..10518a65 100644 --- a/manifests/base/kubeflow.org_mpijobs.yaml +++ b/manifests/base/kubeflow.org_mpijobs.yaml @@ -8167,6 +8167,11 @@ spec: description: MPIReplicaSpecs contains maps from `MPIReplicaType` to `ReplicaSpec` that specify the MPI replicas to run. type: object + runLauncherAsWorker: + default: false + description: RunLauncherAsWorker indicates wether to run worker process + in launcher Defaults to false. + type: boolean runPolicy: description: RunPolicy encapsulates various runtime policies of the job. diff --git a/pkg/apis/kubeflow/v2beta1/swagger.json b/pkg/apis/kubeflow/v2beta1/swagger.json index 283f2125..004b34ec 100644 --- a/pkg/apis/kubeflow/v2beta1/swagger.json +++ b/pkg/apis/kubeflow/v2beta1/swagger.json @@ -156,6 +156,10 @@ "$ref": "#/definitions/v2beta1.ReplicaSpec" } }, + "runLauncherAsWorker": { + "description": "RunLauncherAsWorker indicates wether to run worker process in launcher Defaults to false.", + "type": "boolean" + }, "runPolicy": { "description": "RunPolicy encapsulates various runtime policies of the job.", "default": {}, diff --git a/pkg/apis/kubeflow/v2beta1/types.go b/pkg/apis/kubeflow/v2beta1/types.go index 7525a053..65a09858 100644 --- a/pkg/apis/kubeflow/v2beta1/types.go +++ b/pkg/apis/kubeflow/v2beta1/types.go @@ -154,6 +154,12 @@ type MPIJobSpec struct { // +kubebuilder:default:=1 SlotsPerWorker *int32 `json:"slotsPerWorker,omitempty"` + // RunLauncherAsWorker indicates wether to run worker process in launcher + // Defaults to false. + // +optional + // +kubebuilder:default:=false + RunLauncherAsWorker *bool `json:"runLauncherAsWorker,omitempty"` + // RunPolicy encapsulates various runtime policies of the job. RunPolicy RunPolicy `json:"runPolicy,omitempty"` diff --git a/pkg/apis/kubeflow/v2beta1/zz_generated.deepcopy.go b/pkg/apis/kubeflow/v2beta1/zz_generated.deepcopy.go index 1036b571..75ef51aa 100644 --- a/pkg/apis/kubeflow/v2beta1/zz_generated.deepcopy.go +++ b/pkg/apis/kubeflow/v2beta1/zz_generated.deepcopy.go @@ -163,6 +163,11 @@ func (in *MPIJobSpec) DeepCopyInto(out *MPIJobSpec) { *out = new(int32) **out = **in } + if in.RunLauncherAsWorker != nil { + in, out := &in.RunLauncherAsWorker, &out.RunLauncherAsWorker + *out = new(bool) + **out = **in + } in.RunPolicy.DeepCopyInto(&out.RunPolicy) if in.MPIReplicaSpecs != nil { in, out := &in.MPIReplicaSpecs, &out.MPIReplicaSpecs diff --git a/pkg/client/applyconfiguration/kubeflow/v2beta1/mpijobspec.go b/pkg/client/applyconfiguration/kubeflow/v2beta1/mpijobspec.go index 372c332c..c569d533 100644 --- a/pkg/client/applyconfiguration/kubeflow/v2beta1/mpijobspec.go +++ b/pkg/client/applyconfiguration/kubeflow/v2beta1/mpijobspec.go @@ -24,6 +24,7 @@ import ( // with apply. type MPIJobSpecApplyConfiguration struct { SlotsPerWorker *int32 `json:"slotsPerWorker,omitempty"` + RunLauncherAsWorker *bool `json:"runLauncherAsWorker,omitempty"` RunPolicy *RunPolicyApplyConfiguration `json:"runPolicy,omitempty"` MPIReplicaSpecs map[kubeflowv2beta1.MPIReplicaType]*kubeflowv2beta1.ReplicaSpec `json:"mpiReplicaSpecs,omitempty"` SSHAuthMountPath *string `json:"sshAuthMountPath,omitempty"` @@ -45,6 +46,14 @@ func (b *MPIJobSpecApplyConfiguration) WithSlotsPerWorker(value int32) *MPIJobSp return b } +// WithRunLauncherAsWorker sets the RunLauncherAsWorker field in the declarative configuration to the given value +// and returns the receiver, so that objects can be built by chaining "With" function invocations. +// If called multiple times, the RunLauncherAsWorker field is set to the value of the last call. +func (b *MPIJobSpecApplyConfiguration) WithRunLauncherAsWorker(value bool) *MPIJobSpecApplyConfiguration { + b.RunLauncherAsWorker = &value + return b +} + // WithRunPolicy sets the RunPolicy field in the declarative configuration to the given value // and returns the receiver, so that objects can be built by chaining "With" function invocations. // If called multiple times, the RunPolicy field is set to the value of the last call. diff --git a/pkg/controller/mpi_job_controller.go b/pkg/controller/mpi_job_controller.go index d6b2ffc1..ecc98fa2 100644 --- a/pkg/controller/mpi_job_controller.go +++ b/pkg/controller/mpi_job_controller.go @@ -656,13 +656,13 @@ func (c *MPIJobController) syncHandler(key string) error { return err } } - if mpiJob.Spec.MPIImplementation == kubeflow.MPIImplementationIntel || + // The Intel and MPICH implementations require workers to communicate with the + // launcher through its hostname. For that, we create a Service which + // has the same name as the launcher's hostname. + if (mpiJob.Spec.RunLauncherAsWorker != nil && *mpiJob.Spec.RunLauncherAsWorker) || + mpiJob.Spec.MPIImplementation == kubeflow.MPIImplementationIntel || mpiJob.Spec.MPIImplementation == kubeflow.MPIImplementationMPICH { - // The Intel and MPICH implementations require workers to communicate with the - // launcher through its hostname. For that, we create a Service which - // has the same name as the launcher's hostname. - _, err := c.getOrCreateService(mpiJob, newLauncherService(mpiJob)) - if err != nil { + if _, err = c.getOrCreateService(mpiJob, newLauncherService(mpiJob)); err != nil { return fmt.Errorf("getting or creating Service to front launcher: %w", err) } } @@ -1284,12 +1284,26 @@ func newConfigMap(mpiJob *kubeflow.MPIJob, workerReplicas int32) *corev1.ConfigM if mpiJob.Spec.SlotsPerWorker != nil { slots = int(*mpiJob.Spec.SlotsPerWorker) } + // note that pod.spec.dnsConfig also affect the svc resolution + // ref: https://kubernetes.io/docs/concepts/services-networking/dns-pod-service/ + // launcher can be reach with hostname or service name + if mpiJob.Spec.RunLauncherAsWorker != nil && *mpiJob.Spec.RunLauncherAsWorker { + launcherService := mpiJob.Name + launcherSuffix + switch mpiJob.Spec.MPIImplementation { + case kubeflow.MPIImplementationOpenMPI: + buffer.WriteString(fmt.Sprintf("%s.%s.svc slots=%d\n", launcherService, mpiJob.Namespace, slots)) + case kubeflow.MPIImplementationIntel, kubeflow.MPIImplementationMPICH: + buffer.WriteString(fmt.Sprintf("%s.%s.svc:%d\n", launcherService, mpiJob.Namespace, slots)) + } + } + for i := 0; i < int(workerReplicas); i++ { + name := workerName(mpiJob, i) switch mpiJob.Spec.MPIImplementation { case kubeflow.MPIImplementationOpenMPI: - buffer.WriteString(fmt.Sprintf("%s%s-%d.%s.%s.svc slots=%d\n", mpiJob.Name, workerSuffix, i, workersService, mpiJob.Namespace, slots)) + buffer.WriteString(fmt.Sprintf("%s.%s.%s.svc slots=%d\n", name, workersService, mpiJob.Namespace, slots)) case kubeflow.MPIImplementationIntel, kubeflow.MPIImplementationMPICH: - buffer.WriteString(fmt.Sprintf("%s%s-%d.%s.%s.svc:%d\n", mpiJob.Name, workerSuffix, i, workersService, mpiJob.Namespace, slots)) + buffer.WriteString(fmt.Sprintf("%s.%s.%s.svc:%d\n", name, workersService, mpiJob.Namespace, slots)) } } @@ -1319,6 +1333,13 @@ func updateDiscoverHostsInConfigMap(configMap *corev1.ConfigMap, mpiJob *kubeflo var buffer bytes.Buffer buffer.WriteString("#!/bin/sh\n") + + // We don't check if launcher is running here, launcher should always be there or the job failed + if mpiJob.Spec.RunLauncherAsWorker != nil && *mpiJob.Spec.RunLauncherAsWorker { + launcherService := mpiJob.Name + launcherSuffix + buffer.WriteString(fmt.Sprintf("echo %s.%s.svc\n", launcherService, mpiJob.Namespace)) + } + workersService := mpiJob.Name + workerSuffix for _, p := range runningPods { buffer.WriteString(fmt.Sprintf("echo %s.%s.%s.svc\n", p.Name, workersService, p.Namespace)) diff --git a/pkg/controller/mpi_job_controller_test.go b/pkg/controller/mpi_job_controller_test.go index 51c6a8e7..2186fac4 100644 --- a/pkg/controller/mpi_job_controller_test.go +++ b/pkg/controller/mpi_job_controller_test.go @@ -110,18 +110,6 @@ func newMPIJobCommon(name string, startTime, completionTime *metav1.Time) *kubef CleanPodPolicy: &cleanPodPolicyAll, }, MPIReplicaSpecs: map[kubeflow.MPIReplicaType]*kubeflow.ReplicaSpec{ - kubeflow.MPIReplicaTypeWorker: { - Template: corev1.PodTemplateSpec{ - Spec: corev1.PodSpec{ - Containers: []corev1.Container{ - { - Name: "foo", - Image: "bar", - }, - }, - }, - }, - }, kubeflow.MPIReplicaTypeLauncher: { Template: corev1.PodTemplateSpec{ Spec: corev1.PodSpec{ @@ -151,7 +139,22 @@ func newMPIJobCommon(name string, startTime, completionTime *metav1.Time) *kubef func newMPIJob(name string, replicas *int32, startTime, completionTime *metav1.Time) *kubeflow.MPIJob { mpiJob := newMPIJobCommon(name, startTime, completionTime) - mpiJob.Spec.MPIReplicaSpecs[kubeflow.MPIReplicaTypeWorker].Replicas = replicas + if *replicas > 0 { + mpiJob.Spec.MPIReplicaSpecs[kubeflow.MPIReplicaTypeWorker] = + &kubeflow.ReplicaSpec{ + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "foo", + Image: "bar", + }, + }, + }, + }, + Replicas: replicas, + } + } return mpiJob } @@ -526,7 +529,8 @@ func TestAllResourcesCreated(t *testing.T) { for i := 0; i < 5; i++ { f.expectCreatePodAction(fmjc.newWorker(mpiJobCopy, i)) } - if implementation == kubeflow.MPIImplementationIntel || + if (mpiJob.Spec.RunLauncherAsWorker != nil && *mpiJob.Spec.RunLauncherAsWorker) || + implementation == kubeflow.MPIImplementationIntel || implementation == kubeflow.MPIImplementationMPICH { f.expectCreateServiceAction(newLauncherService(mpiJobCopy)) } @@ -822,7 +826,8 @@ func TestCreateSuspendedMPIJob(t *testing.T) { t.Fatalf("Failed creating secret") } f.expectCreateSecretAction(secret) - if implementation == kubeflow.MPIImplementationIntel || + if (mpiJob.Spec.RunLauncherAsWorker != nil && *mpiJob.Spec.RunLauncherAsWorker) || + implementation == kubeflow.MPIImplementationIntel || implementation == kubeflow.MPIImplementationMPICH { f.expectCreateServiceAction(newLauncherService(mpiJob)) } @@ -1538,7 +1543,57 @@ func TestNewConfigMap(t *testing.T) { workerReplicas int32 wantCM *corev1.ConfigMap }{ - "OpenMPI without slots": { + "OpenMPI without slots, enable launcher as worker": { + mpiJob: &kubeflow.MPIJob{ + ObjectMeta: metav1.ObjectMeta{ + Name: "openmpi-without-slots", + Namespace: "tenant-a", + }, + Spec: kubeflow.MPIJobSpec{ + MPIImplementation: kubeflow.MPIImplementationOpenMPI, + RunLauncherAsWorker: pointer.Bool(true), + }, + }, + workerReplicas: 2, + wantCM: &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: "openmpi-without-slots-config", + Namespace: "tenant-a", + Labels: map[string]string{ + "app": "openmpi-without-slots", + }, + }, + Data: map[string]string{ + "hostfile": "openmpi-without-slots-launcher.tenant-a.svc slots=1\nopenmpi-without-slots-worker-0.openmpi-without-slots-worker.tenant-a.svc slots=1\nopenmpi-without-slots-worker-1.openmpi-without-slots-worker.tenant-a.svc slots=1\n", + }, + }, + }, + "OpenMPI without slots, zero explicit workers": { + mpiJob: &kubeflow.MPIJob{ + ObjectMeta: metav1.ObjectMeta{ + Name: "openmpi-without-slots", + Namespace: "tenant-a", + }, + Spec: kubeflow.MPIJobSpec{ + MPIImplementation: kubeflow.MPIImplementationOpenMPI, + RunLauncherAsWorker: pointer.Bool(true), + }, + }, + workerReplicas: 0, + wantCM: &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: "openmpi-without-slots-config", + Namespace: "tenant-a", + Labels: map[string]string{ + "app": "openmpi-without-slots", + }, + }, + Data: map[string]string{ + "hostfile": "openmpi-without-slots-launcher.tenant-a.svc slots=1\n", + }, + }, + }, + "OpenMPI without slots, disable launcher as worker": { mpiJob: &kubeflow.MPIJob{ ObjectMeta: metav1.ObjectMeta{ Name: "openmpi-without-slots", diff --git a/sdk/python/v2beta1/docs/V2beta1MPIJobSpec.md b/sdk/python/v2beta1/docs/V2beta1MPIJobSpec.md index 44d2ef04..295b374b 100644 --- a/sdk/python/v2beta1/docs/V2beta1MPIJobSpec.md +++ b/sdk/python/v2beta1/docs/V2beta1MPIJobSpec.md @@ -7,6 +7,7 @@ Name | Type | Description | Notes **launcher_creation_policy** | **str** | launcherCreationPolicy if WaitForWorkersReady, the launcher is created only after all workers are in Ready state. Defaults to AtStartup. | [optional] **mpi_implementation** | **str** | MPIImplementation is the MPI implementation. Options are \"OpenMPI\" (default), \"Intel\" and \"MPICH\". | [optional] **mpi_replica_specs** | [**dict(str, V2beta1ReplicaSpec)**](V2beta1ReplicaSpec.md) | MPIReplicaSpecs contains maps from `MPIReplicaType` to `ReplicaSpec` that specify the MPI replicas to run. | +**run_launcher_as_worker** | **bool** | RunLauncherAsWorker indicates wether to run worker process in launcher Defaults to false. | [optional] **run_policy** | [**V2beta1RunPolicy**](V2beta1RunPolicy.md) | | [optional] **slots_per_worker** | **int** | Specifies the number of slots per worker used in hostfile. Defaults to 1. | [optional] **ssh_auth_mount_path** | **str** | SSHAuthMountPath is the directory where SSH keys are mounted. Defaults to \"/root/.ssh\". | [optional] diff --git a/sdk/python/v2beta1/mpijob/models/v2beta1_mpi_job_spec.py b/sdk/python/v2beta1/mpijob/models/v2beta1_mpi_job_spec.py index 68656d76..81accf8a 100644 --- a/sdk/python/v2beta1/mpijob/models/v2beta1_mpi_job_spec.py +++ b/sdk/python/v2beta1/mpijob/models/v2beta1_mpi_job_spec.py @@ -36,6 +36,7 @@ class V2beta1MPIJobSpec(object): 'launcher_creation_policy': 'str', 'mpi_implementation': 'str', 'mpi_replica_specs': 'dict(str, V2beta1ReplicaSpec)', + 'run_launcher_as_worker': 'bool', 'run_policy': 'V2beta1RunPolicy', 'slots_per_worker': 'int', 'ssh_auth_mount_path': 'str' @@ -45,12 +46,13 @@ class V2beta1MPIJobSpec(object): 'launcher_creation_policy': 'launcherCreationPolicy', 'mpi_implementation': 'mpiImplementation', 'mpi_replica_specs': 'mpiReplicaSpecs', + 'run_launcher_as_worker': 'runLauncherAsWorker', 'run_policy': 'runPolicy', 'slots_per_worker': 'slotsPerWorker', 'ssh_auth_mount_path': 'sshAuthMountPath' } - def __init__(self, launcher_creation_policy=None, mpi_implementation=None, mpi_replica_specs=None, run_policy=None, slots_per_worker=None, ssh_auth_mount_path=None, local_vars_configuration=None): # noqa: E501 + def __init__(self, launcher_creation_policy=None, mpi_implementation=None, mpi_replica_specs=None, run_launcher_as_worker=None, run_policy=None, slots_per_worker=None, ssh_auth_mount_path=None, local_vars_configuration=None): # noqa: E501 """V2beta1MPIJobSpec - a model defined in OpenAPI""" # noqa: E501 if local_vars_configuration is None: local_vars_configuration = Configuration.get_default_copy() @@ -59,6 +61,7 @@ def __init__(self, launcher_creation_policy=None, mpi_implementation=None, mpi_r self._launcher_creation_policy = None self._mpi_implementation = None self._mpi_replica_specs = None + self._run_launcher_as_worker = None self._run_policy = None self._slots_per_worker = None self._ssh_auth_mount_path = None @@ -69,6 +72,8 @@ def __init__(self, launcher_creation_policy=None, mpi_implementation=None, mpi_r if mpi_implementation is not None: self.mpi_implementation = mpi_implementation self.mpi_replica_specs = mpi_replica_specs + if run_launcher_as_worker is not None: + self.run_launcher_as_worker = run_launcher_as_worker if run_policy is not None: self.run_policy = run_policy if slots_per_worker is not None: @@ -147,6 +152,29 @@ def mpi_replica_specs(self, mpi_replica_specs): self._mpi_replica_specs = mpi_replica_specs + @property + def run_launcher_as_worker(self): + """Gets the run_launcher_as_worker of this V2beta1MPIJobSpec. # noqa: E501 + + RunLauncherAsWorker indicates wether to run worker process in launcher Defaults to false. # noqa: E501 + + :return: The run_launcher_as_worker of this V2beta1MPIJobSpec. # noqa: E501 + :rtype: bool + """ + return self._run_launcher_as_worker + + @run_launcher_as_worker.setter + def run_launcher_as_worker(self, run_launcher_as_worker): + """Sets the run_launcher_as_worker of this V2beta1MPIJobSpec. + + RunLauncherAsWorker indicates wether to run worker process in launcher Defaults to false. # noqa: E501 + + :param run_launcher_as_worker: The run_launcher_as_worker of this V2beta1MPIJobSpec. # noqa: E501 + :type run_launcher_as_worker: bool + """ + + self._run_launcher_as_worker = run_launcher_as_worker + @property def run_policy(self): """Gets the run_policy of this V2beta1MPIJobSpec. # noqa: E501 diff --git a/test/integration/mpi_job_controller_test.go b/test/integration/mpi_job_controller_test.go index 87c7eaf6..883c609e 100644 --- a/test/integration/mpi_job_controller_test.go +++ b/test/integration/mpi_job_controller_test.go @@ -17,6 +17,7 @@ package integration import ( "context" "fmt" + "strings" "testing" "time" @@ -895,7 +896,7 @@ func validateMPIJobDependencies( if err := wait.PollUntilContextTimeout(ctx, waitInterval, wait.ForeverTestTimeout, false, func(ctx context.Context) (bool, error) { problems = nil var err error - svc, err = getServiceForJob(ctx, kubeClient, job) + svc, err = getServiceForJob(ctx, kubeClient, job, "worker") if err != nil { return false, err } @@ -1026,14 +1027,16 @@ func updatePodsCondition(ctx context.Context, client kubernetes.Interface, pods return nil } -func getServiceForJob(ctx context.Context, client kubernetes.Interface, job *kubeflow.MPIJob) (*corev1.Service, error) { +func getServiceForJob(ctx context.Context, client kubernetes.Interface, job *kubeflow.MPIJob, role string) (*corev1.Service, error) { result, err := client.CoreV1().Services(job.Namespace).List(ctx, metav1.ListOptions{}) if err != nil { return nil, err } for _, obj := range result.Items { - if metav1.IsControlledBy(&obj, job) { - return &obj, nil + if strings.HasSuffix(obj.Name, role) { + if metav1.IsControlledBy(&obj, job) { + return &obj, nil + } } } return nil, nil From 31add861f98fe4c384f3f8d43b125dfe5ef8487f Mon Sep 17 00:00:00 2001 From: kuizhiqing Date: Mon, 19 Feb 2024 12:41:32 +0000 Subject: [PATCH 2/5] use ptr.Deref Signed-off-by: kuizhiqing --- go.mod | 2 +- go.sum | 4 ++-- pkg/apis/kubeflow/v2beta1/types.go | 2 +- pkg/controller/mpi_job_controller.go | 6 ++++-- pkg/controller/mpi_job_controller_test.go | 7 ++++--- 5 files changed, 12 insertions(+), 9 deletions(-) diff --git a/go.mod b/go.mod index 4fb934f2..55361ccd 100644 --- a/go.mod +++ b/go.mod @@ -15,7 +15,7 @@ require ( k8s.io/code-generator v0.27.4 k8s.io/klog v1.0.0 k8s.io/kube-openapi v0.0.0-20230501164219-8b0f38b5fd1f - k8s.io/utils v0.0.0-20230209194617-a36077c30491 + k8s.io/utils v0.0.0-20240102154912-e7106e64919e sigs.k8s.io/controller-runtime v0.15.1 sigs.k8s.io/scheduler-plugins v0.26.7 sigs.k8s.io/structured-merge-diff/v4 v4.2.3 diff --git a/go.sum b/go.sum index d0bb6f57..01dede9e 100644 --- a/go.sum +++ b/go.sum @@ -384,8 +384,8 @@ k8s.io/klog/v2 v2.90.1 h1:m4bYOKall2MmOiRaR1J+We67Do7vm9KiQVlT96lnHUw= k8s.io/klog/v2 v2.90.1/go.mod h1:y1WjHnz7Dj687irZUWR/WLkLc5N1YHtjLdmgWjndZn0= k8s.io/kube-openapi v0.0.0-20230501164219-8b0f38b5fd1f h1:2kWPakN3i/k81b0gvD5C5FJ2kxm1WrQFanWchyKuqGg= k8s.io/kube-openapi v0.0.0-20230501164219-8b0f38b5fd1f/go.mod h1:byini6yhqGC14c3ebc/QwanvYwhuMWF6yz2F8uwW8eg= -k8s.io/utils v0.0.0-20230209194617-a36077c30491 h1:r0BAOLElQnnFhE/ApUsg3iHdVYYPBjNSSOMowRZxxsY= -k8s.io/utils v0.0.0-20230209194617-a36077c30491/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0= +k8s.io/utils v0.0.0-20240102154912-e7106e64919e h1:eQ/4ljkx21sObifjzXwlPKpdGLrCfRziVtos3ofG/sQ= +k8s.io/utils v0.0.0-20240102154912-e7106e64919e/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0= sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.1.2 h1:trsWhjU5jZrx6UvFu4WzQDrN7Pga4a7Qg+zcfcj64PA= sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.1.2/go.mod h1:+qG7ISXqCDVVcyO8hLn12AKVYYUjM7ftlqsqmrhMZE0= sigs.k8s.io/controller-runtime v0.15.1 h1:9UvgKD4ZJGcj24vefUFgZFP3xej/3igL9BsOUTb/+4c= diff --git a/pkg/apis/kubeflow/v2beta1/types.go b/pkg/apis/kubeflow/v2beta1/types.go index 65a09858..5480d842 100644 --- a/pkg/apis/kubeflow/v2beta1/types.go +++ b/pkg/apis/kubeflow/v2beta1/types.go @@ -154,7 +154,7 @@ type MPIJobSpec struct { // +kubebuilder:default:=1 SlotsPerWorker *int32 `json:"slotsPerWorker,omitempty"` - // RunLauncherAsWorker indicates wether to run worker process in launcher + // RunLauncherAsWorker indicates whether to run worker process in launcher // Defaults to false. // +optional // +kubebuilder:default:=false diff --git a/pkg/controller/mpi_job_controller.go b/pkg/controller/mpi_job_controller.go index ecc98fa2..d0dcca90 100644 --- a/pkg/controller/mpi_job_controller.go +++ b/pkg/controller/mpi_job_controller.go @@ -54,6 +54,7 @@ import ( "k8s.io/klog" "k8s.io/utils/clock" "k8s.io/utils/pointer" + "k8s.io/utils/ptr" schedclientset "sigs.k8s.io/scheduler-plugins/pkg/generated/clientset/versioned" volcanoclient "volcano.sh/apis/pkg/client/clientset/versioned" @@ -656,10 +657,11 @@ func (c *MPIJobController) syncHandler(key string) error { return err } } + // If we want to run process in launcher, we should create a service for launcher. // The Intel and MPICH implementations require workers to communicate with the // launcher through its hostname. For that, we create a Service which // has the same name as the launcher's hostname. - if (mpiJob.Spec.RunLauncherAsWorker != nil && *mpiJob.Spec.RunLauncherAsWorker) || + if ptr.Deref(mpiJob.Spec.RunLauncherAsWorker, false) || mpiJob.Spec.MPIImplementation == kubeflow.MPIImplementationIntel || mpiJob.Spec.MPIImplementation == kubeflow.MPIImplementationMPICH { if _, err = c.getOrCreateService(mpiJob, newLauncherService(mpiJob)); err != nil { @@ -1335,7 +1337,7 @@ func updateDiscoverHostsInConfigMap(configMap *corev1.ConfigMap, mpiJob *kubeflo buffer.WriteString("#!/bin/sh\n") // We don't check if launcher is running here, launcher should always be there or the job failed - if mpiJob.Spec.RunLauncherAsWorker != nil && *mpiJob.Spec.RunLauncherAsWorker { + if ptr.Deref(mpiJob.Spec.RunLauncherAsWorker, false) { launcherService := mpiJob.Name + launcherSuffix buffer.WriteString(fmt.Sprintf("echo %s.%s.svc\n", launcherService, mpiJob.Namespace)) } diff --git a/pkg/controller/mpi_job_controller_test.go b/pkg/controller/mpi_job_controller_test.go index 2186fac4..cc157aa8 100644 --- a/pkg/controller/mpi_job_controller_test.go +++ b/pkg/controller/mpi_job_controller_test.go @@ -38,6 +38,7 @@ import ( "k8s.io/utils/clock" clocktesting "k8s.io/utils/clock/testing" "k8s.io/utils/pointer" + "k8s.io/utils/ptr" schedv1alpha1 "sigs.k8s.io/scheduler-plugins/apis/scheduling/v1alpha1" schedclientset "sigs.k8s.io/scheduler-plugins/pkg/generated/clientset/versioned" volcanov1beta1 "volcano.sh/apis/pkg/apis/scheduling/v1beta1" @@ -139,7 +140,7 @@ func newMPIJobCommon(name string, startTime, completionTime *metav1.Time) *kubef func newMPIJob(name string, replicas *int32, startTime, completionTime *metav1.Time) *kubeflow.MPIJob { mpiJob := newMPIJobCommon(name, startTime, completionTime) - if *replicas > 0 { + if ptr.Deref(replicas, 0) > 0 { mpiJob.Spec.MPIReplicaSpecs[kubeflow.MPIReplicaTypeWorker] = &kubeflow.ReplicaSpec{ Template: corev1.PodTemplateSpec{ @@ -529,7 +530,7 @@ func TestAllResourcesCreated(t *testing.T) { for i := 0; i < 5; i++ { f.expectCreatePodAction(fmjc.newWorker(mpiJobCopy, i)) } - if (mpiJob.Spec.RunLauncherAsWorker != nil && *mpiJob.Spec.RunLauncherAsWorker) || + if ptr.Deref(mpiJob.Spec.RunLauncherAsWorker, false) || implementation == kubeflow.MPIImplementationIntel || implementation == kubeflow.MPIImplementationMPICH { f.expectCreateServiceAction(newLauncherService(mpiJobCopy)) @@ -826,7 +827,7 @@ func TestCreateSuspendedMPIJob(t *testing.T) { t.Fatalf("Failed creating secret") } f.expectCreateSecretAction(secret) - if (mpiJob.Spec.RunLauncherAsWorker != nil && *mpiJob.Spec.RunLauncherAsWorker) || + if ptr.Deref(mpiJob.Spec.RunLauncherAsWorker, false) || implementation == kubeflow.MPIImplementationIntel || implementation == kubeflow.MPIImplementationMPICH { f.expectCreateServiceAction(newLauncherService(mpiJob)) From 224e5e2e88a68b8ee83a22058466cc9b51e02ae4 Mon Sep 17 00:00:00 2001 From: kuizhiqing Date: Mon, 19 Feb 2024 13:08:31 +0000 Subject: [PATCH 3/5] update manifest Signed-off-by: kuizhiqing --- deploy/v2beta1/mpi-operator.yaml | 2 +- manifests/base/kubeflow.org_mpijobs.yaml | 2 +- pkg/apis/kubeflow/v2beta1/swagger.json | 2 +- sdk/python/v2beta1/docs/V2beta1MPIJobSpec.md | 2 +- sdk/python/v2beta1/mpijob/models/v2beta1_mpi_job_spec.py | 4 ++-- 5 files changed, 6 insertions(+), 6 deletions(-) diff --git a/deploy/v2beta1/mpi-operator.yaml b/deploy/v2beta1/mpi-operator.yaml index a8c0ed09..3b45024f 100644 --- a/deploy/v2beta1/mpi-operator.yaml +++ b/deploy/v2beta1/mpi-operator.yaml @@ -8192,7 +8192,7 @@ spec: type: object runLauncherAsWorker: default: false - description: RunLauncherAsWorker indicates wether to run worker process + description: RunLauncherAsWorker indicates whether to run worker process in launcher Defaults to false. type: boolean runPolicy: diff --git a/manifests/base/kubeflow.org_mpijobs.yaml b/manifests/base/kubeflow.org_mpijobs.yaml index 10518a65..0864dd8d 100644 --- a/manifests/base/kubeflow.org_mpijobs.yaml +++ b/manifests/base/kubeflow.org_mpijobs.yaml @@ -8169,7 +8169,7 @@ spec: type: object runLauncherAsWorker: default: false - description: RunLauncherAsWorker indicates wether to run worker process + description: RunLauncherAsWorker indicates whether to run worker process in launcher Defaults to false. type: boolean runPolicy: diff --git a/pkg/apis/kubeflow/v2beta1/swagger.json b/pkg/apis/kubeflow/v2beta1/swagger.json index 004b34ec..8b56d364 100644 --- a/pkg/apis/kubeflow/v2beta1/swagger.json +++ b/pkg/apis/kubeflow/v2beta1/swagger.json @@ -157,7 +157,7 @@ } }, "runLauncherAsWorker": { - "description": "RunLauncherAsWorker indicates wether to run worker process in launcher Defaults to false.", + "description": "RunLauncherAsWorker indicates whether to run worker process in launcher Defaults to false.", "type": "boolean" }, "runPolicy": { diff --git a/sdk/python/v2beta1/docs/V2beta1MPIJobSpec.md b/sdk/python/v2beta1/docs/V2beta1MPIJobSpec.md index 295b374b..0691e4a4 100644 --- a/sdk/python/v2beta1/docs/V2beta1MPIJobSpec.md +++ b/sdk/python/v2beta1/docs/V2beta1MPIJobSpec.md @@ -7,7 +7,7 @@ Name | Type | Description | Notes **launcher_creation_policy** | **str** | launcherCreationPolicy if WaitForWorkersReady, the launcher is created only after all workers are in Ready state. Defaults to AtStartup. | [optional] **mpi_implementation** | **str** | MPIImplementation is the MPI implementation. Options are \"OpenMPI\" (default), \"Intel\" and \"MPICH\". | [optional] **mpi_replica_specs** | [**dict(str, V2beta1ReplicaSpec)**](V2beta1ReplicaSpec.md) | MPIReplicaSpecs contains maps from `MPIReplicaType` to `ReplicaSpec` that specify the MPI replicas to run. | -**run_launcher_as_worker** | **bool** | RunLauncherAsWorker indicates wether to run worker process in launcher Defaults to false. | [optional] +**run_launcher_as_worker** | **bool** | RunLauncherAsWorker indicates whether to run worker process in launcher Defaults to false. | [optional] **run_policy** | [**V2beta1RunPolicy**](V2beta1RunPolicy.md) | | [optional] **slots_per_worker** | **int** | Specifies the number of slots per worker used in hostfile. Defaults to 1. | [optional] **ssh_auth_mount_path** | **str** | SSHAuthMountPath is the directory where SSH keys are mounted. Defaults to \"/root/.ssh\". | [optional] diff --git a/sdk/python/v2beta1/mpijob/models/v2beta1_mpi_job_spec.py b/sdk/python/v2beta1/mpijob/models/v2beta1_mpi_job_spec.py index 81accf8a..2d79da36 100644 --- a/sdk/python/v2beta1/mpijob/models/v2beta1_mpi_job_spec.py +++ b/sdk/python/v2beta1/mpijob/models/v2beta1_mpi_job_spec.py @@ -156,7 +156,7 @@ def mpi_replica_specs(self, mpi_replica_specs): def run_launcher_as_worker(self): """Gets the run_launcher_as_worker of this V2beta1MPIJobSpec. # noqa: E501 - RunLauncherAsWorker indicates wether to run worker process in launcher Defaults to false. # noqa: E501 + RunLauncherAsWorker indicates whether to run worker process in launcher Defaults to false. # noqa: E501 :return: The run_launcher_as_worker of this V2beta1MPIJobSpec. # noqa: E501 :rtype: bool @@ -167,7 +167,7 @@ def run_launcher_as_worker(self): def run_launcher_as_worker(self, run_launcher_as_worker): """Sets the run_launcher_as_worker of this V2beta1MPIJobSpec. - RunLauncherAsWorker indicates wether to run worker process in launcher Defaults to false. # noqa: E501 + RunLauncherAsWorker indicates whether to run worker process in launcher Defaults to false. # noqa: E501 :param run_launcher_as_worker: The run_launcher_as_worker of this V2beta1MPIJobSpec. # noqa: E501 :type run_launcher_as_worker: bool From 934ee87fb2a960aa7fb0e823e74eb770de8cf81a Mon Sep 17 00:00:00 2001 From: kuizhiqing Date: Wed, 21 Feb 2024 07:30:09 +0000 Subject: [PATCH 4/5] more Deref Signed-off-by: kuizhiqing --- pkg/controller/mpi_job_controller.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/controller/mpi_job_controller.go b/pkg/controller/mpi_job_controller.go index d0dcca90..fdfcaf53 100644 --- a/pkg/controller/mpi_job_controller.go +++ b/pkg/controller/mpi_job_controller.go @@ -1289,7 +1289,7 @@ func newConfigMap(mpiJob *kubeflow.MPIJob, workerReplicas int32) *corev1.ConfigM // note that pod.spec.dnsConfig also affect the svc resolution // ref: https://kubernetes.io/docs/concepts/services-networking/dns-pod-service/ // launcher can be reach with hostname or service name - if mpiJob.Spec.RunLauncherAsWorker != nil && *mpiJob.Spec.RunLauncherAsWorker { + if ptr.Deref(mpiJob.Spec.RunLauncherAsWorker, false) { launcherService := mpiJob.Name + launcherSuffix switch mpiJob.Spec.MPIImplementation { case kubeflow.MPIImplementationOpenMPI: From af85ab07bd3d45b7ac16286832c3b4a7c5209ad2 Mon Sep 17 00:00:00 2001 From: kuizhiqing Date: Thu, 22 Feb 2024 16:15:16 +0000 Subject: [PATCH 5/5] create one service for both launcher and worker Signed-off-by: kuizhiqing --- pkg/controller/mpi_job_controller.go | 57 ++++++++--------- pkg/controller/mpi_job_controller_test.go | 71 ++++++++++----------- test/integration/mpi_job_controller_test.go | 11 ++-- 3 files changed, 61 insertions(+), 78 deletions(-) diff --git a/pkg/controller/mpi_job_controller.go b/pkg/controller/mpi_job_controller.go index fdfcaf53..8f19a89a 100644 --- a/pkg/controller/mpi_job_controller.go +++ b/pkg/controller/mpi_job_controller.go @@ -631,7 +631,7 @@ func (c *MPIJobController) syncHandler(key string) error { // We're done if the launcher either succeeded or failed. done := launcher != nil && isJobFinished(launcher) if !done { - _, err := c.getOrCreateService(mpiJob, newWorkersService(mpiJob)) + _, err := c.getOrCreateService(mpiJob, newJobService(mpiJob)) if err != nil { return fmt.Errorf("getting or creating Service to front workers: %w", err) } @@ -657,17 +657,6 @@ func (c *MPIJobController) syncHandler(key string) error { return err } } - // If we want to run process in launcher, we should create a service for launcher. - // The Intel and MPICH implementations require workers to communicate with the - // launcher through its hostname. For that, we create a Service which - // has the same name as the launcher's hostname. - if ptr.Deref(mpiJob.Spec.RunLauncherAsWorker, false) || - mpiJob.Spec.MPIImplementation == kubeflow.MPIImplementationIntel || - mpiJob.Spec.MPIImplementation == kubeflow.MPIImplementationMPICH { - if _, err = c.getOrCreateService(mpiJob, newLauncherService(mpiJob)); err != nil { - return fmt.Errorf("getting or creating Service to front launcher: %w", err) - } - } if launcher == nil { if mpiJob.Spec.LauncherCreationPolicy == kubeflow.LauncherCreationPolicyAtStartup || c.countReadyWorkerPods(worker) == len(worker) { launcher, err = c.kubeClient.BatchV1().Jobs(namespace).Create(context.TODO(), c.newLauncherJob(mpiJob), metav1.CreateOptions{}) @@ -1281,7 +1270,6 @@ func (c *MPIJobController) doUpdateJobStatus(mpiJob *kubeflow.MPIJob) error { // handleObject can discover the MPIJob resource that 'owns' it. func newConfigMap(mpiJob *kubeflow.MPIJob, workerReplicas int32) *corev1.ConfigMap { var buffer bytes.Buffer - workersService := mpiJob.Name + workerSuffix slots := 1 if mpiJob.Spec.SlotsPerWorker != nil { slots = int(*mpiJob.Spec.SlotsPerWorker) @@ -1290,12 +1278,12 @@ func newConfigMap(mpiJob *kubeflow.MPIJob, workerReplicas int32) *corev1.ConfigM // ref: https://kubernetes.io/docs/concepts/services-networking/dns-pod-service/ // launcher can be reach with hostname or service name if ptr.Deref(mpiJob.Spec.RunLauncherAsWorker, false) { - launcherService := mpiJob.Name + launcherSuffix + name := mpiJob.Name + launcherSuffix switch mpiJob.Spec.MPIImplementation { case kubeflow.MPIImplementationOpenMPI: - buffer.WriteString(fmt.Sprintf("%s.%s.svc slots=%d\n", launcherService, mpiJob.Namespace, slots)) + buffer.WriteString(fmt.Sprintf("%s.%s.%s.svc slots=%d\n", name, mpiJob.Name, mpiJob.Namespace, slots)) case kubeflow.MPIImplementationIntel, kubeflow.MPIImplementationMPICH: - buffer.WriteString(fmt.Sprintf("%s.%s.svc:%d\n", launcherService, mpiJob.Namespace, slots)) + buffer.WriteString(fmt.Sprintf("%s.%s.%s.svc:%d\n", name, mpiJob.Name, mpiJob.Namespace, slots)) } } @@ -1303,9 +1291,9 @@ func newConfigMap(mpiJob *kubeflow.MPIJob, workerReplicas int32) *corev1.ConfigM name := workerName(mpiJob, i) switch mpiJob.Spec.MPIImplementation { case kubeflow.MPIImplementationOpenMPI: - buffer.WriteString(fmt.Sprintf("%s.%s.%s.svc slots=%d\n", name, workersService, mpiJob.Namespace, slots)) + buffer.WriteString(fmt.Sprintf("%s.%s.%s.svc slots=%d\n", name, mpiJob.Name, mpiJob.Namespace, slots)) case kubeflow.MPIImplementationIntel, kubeflow.MPIImplementationMPICH: - buffer.WriteString(fmt.Sprintf("%s.%s.%s.svc:%d\n", name, workersService, mpiJob.Namespace, slots)) + buffer.WriteString(fmt.Sprintf("%s.%s.%s.svc:%d\n", name, mpiJob.Name, mpiJob.Namespace, slots)) } } @@ -1338,26 +1326,24 @@ func updateDiscoverHostsInConfigMap(configMap *corev1.ConfigMap, mpiJob *kubeflo // We don't check if launcher is running here, launcher should always be there or the job failed if ptr.Deref(mpiJob.Spec.RunLauncherAsWorker, false) { - launcherService := mpiJob.Name + launcherSuffix - buffer.WriteString(fmt.Sprintf("echo %s.%s.svc\n", launcherService, mpiJob.Namespace)) + name := mpiJob.Name + launcherSuffix + buffer.WriteString(fmt.Sprintf("echo %s.%s.%s.svc\n", name, mpiJob.Name, mpiJob.Namespace)) } - workersService := mpiJob.Name + workerSuffix for _, p := range runningPods { - buffer.WriteString(fmt.Sprintf("echo %s.%s.%s.svc\n", p.Name, workersService, p.Namespace)) + buffer.WriteString(fmt.Sprintf("echo %s.%s.%s.svc\n", p.Name, mpiJob.Name, p.Namespace)) } configMap.Data[discoverHostsScriptName] = buffer.String() } -// newWorkersService creates a new workers' Service for an MPIJob resource. -func newWorkersService(job *kubeflow.MPIJob) *corev1.Service { - return newService(job, job.Name+workerSuffix, defaultLabels(job.Name, worker)) -} - -// newLauncherService creates a new launcher's Service for an MPIJob resource. -func newLauncherService(job *kubeflow.MPIJob) *corev1.Service { - return newService(job, job.Name+launcherSuffix, defaultLabels(job.Name, launcher)) +// newJobService creates a Service with the same name of Job for both launcher and worker pods +func newJobService(job *kubeflow.MPIJob) *corev1.Service { + labels := map[string]string{ + kubeflow.OperatorNameLabel: kubeflow.OperatorName, + kubeflow.JobNameLabel: job.Name, + } + return newService(job, job.Name, labels) } func newService(job *kubeflow.MPIJob, name string, selector map[string]string) *corev1.Service { @@ -1439,12 +1425,19 @@ func (c *MPIJobController) newWorker(mpiJob *kubeflow.MPIJob, index int) *corev1 } podTemplate.Labels[kubeflow.ReplicaIndexLabel] = strconv.Itoa(index) podTemplate.Spec.Hostname = name - podTemplate.Spec.Subdomain = mpiJob.Name + workerSuffix // Matches workers' Service name. + podTemplate.Spec.Subdomain = mpiJob.Name // Matches job' Service name. if podTemplate.Spec.HostNetwork { // Allows resolution of worker hostnames without needing to include the // namespace or cluster domain. podTemplate.Spec.DNSPolicy = corev1.DNSClusterFirstWithHostNet } + // The Intel and MPICH implementations require workers to communicate with the launcher through its hostname. + searche := fmt.Sprintf("%s.%s.svc.cluster.local", mpiJob.Name, mpiJob.Namespace) + if podTemplate.Spec.DNSConfig == nil { + podTemplate.Spec.DNSConfig = &corev1.PodDNSConfig{Searches: []string{searche}} + } else { + podTemplate.Spec.DNSConfig.Searches = append(podTemplate.Spec.DNSConfig.Searches, searche) + } setRestartPolicy(podTemplate, mpiJob.Spec.MPIReplicaSpecs[kubeflow.MPIReplicaTypeWorker]) container := &podTemplate.Spec.Containers[0] @@ -1517,7 +1510,7 @@ func (c *MPIJobController) newLauncherPodTemplate(mpiJob *kubeflow.MPIJob) corev c.PodGroupCtrl.decoratePodTemplateSpec(podTemplate, mpiJob.Name) } podTemplate.Spec.Hostname = launcherName - podTemplate.Spec.Subdomain = mpiJob.Name + workerSuffix // Matches workers' Service name. + podTemplate.Spec.Subdomain = mpiJob.Name // Matches job' Service name. if podTemplate.Spec.HostNetwork { // Allows resolution of worker hostnames without needing to include the // namespace or cluster domain. diff --git a/pkg/controller/mpi_job_controller_test.go b/pkg/controller/mpi_job_controller_test.go index cc157aa8..dbc164d3 100644 --- a/pkg/controller/mpi_job_controller_test.go +++ b/pkg/controller/mpi_job_controller_test.go @@ -518,7 +518,7 @@ func TestAllResourcesCreated(t *testing.T) { fmjc := f.newFakeMPIJobController() mpiJobCopy := mpiJob.DeepCopy() scheme.Scheme.Default(mpiJobCopy) - f.expectCreateServiceAction(newWorkersService(mpiJobCopy)) + f.expectCreateServiceAction(newJobService(mpiJobCopy)) cfgMap := newConfigMap(mpiJobCopy, 5) updateDiscoverHostsInConfigMap(cfgMap, mpiJob, nil) f.expectCreateConfigMapAction(cfgMap) @@ -530,11 +530,6 @@ func TestAllResourcesCreated(t *testing.T) { for i := 0; i < 5; i++ { f.expectCreatePodAction(fmjc.newWorker(mpiJobCopy, i)) } - if ptr.Deref(mpiJob.Spec.RunLauncherAsWorker, false) || - implementation == kubeflow.MPIImplementationIntel || - implementation == kubeflow.MPIImplementationMPICH { - f.expectCreateServiceAction(newLauncherService(mpiJobCopy)) - } f.expectCreateJobAction(fmjc.newLauncherJob(mpiJobCopy)) mpiJobCopy.Status.Conditions = []kubeflow.JobCondition{newCondition(kubeflow.JobCreated, corev1.ConditionTrue, mpiJobCreatedReason, "MPIJob default/foo is created.")} @@ -669,7 +664,7 @@ func TestConfigMapNotControlledByUs(t *testing.T) { var replicas int32 = 64 mpiJob := newMPIJob("test", &replicas, &startTime, &completionTime) f.setUpMPIJob(mpiJob) - f.setUpService(newWorkersService(mpiJob)) + f.setUpService(newJobService(mpiJob)) configMap := newConfigMap(mpiJob, replicas) updateDiscoverHostsInConfigMap(configMap, mpiJob, nil) @@ -690,7 +685,7 @@ func TestWorkerServiceNotControlledByUs(t *testing.T) { mpiJobCopy := mpiJob.DeepCopy() scheme.Scheme.Default(mpiJobCopy) - service := newWorkersService(mpiJobCopy) + service := newJobService(mpiJobCopy) service.OwnerReferences = nil f.setUpService(service) @@ -709,7 +704,8 @@ func TestLauncherServiceNotControlledByUs(t *testing.T) { mpiJobCopy := mpiJob.DeepCopy() scheme.Scheme.Default(mpiJobCopy) - service := newWorkersService(mpiJobCopy) + service := newJobService(mpiJobCopy) + service.OwnerReferences = nil f.setUpService(service) configMap := newConfigMap(mpiJobCopy, replicas) secret, err := newSSHAuthSecret(mpiJobCopy) @@ -725,10 +721,6 @@ func TestLauncherServiceNotControlledByUs(t *testing.T) { f.setUpPod(worker) } - service = newLauncherService(mpiJobCopy) - service.OwnerReferences = nil - f.setUpService(service) - f.runExpectError(getKey(mpiJob, t)) } @@ -746,7 +738,7 @@ func TestSecretNotControlledByUs(t *testing.T) { configMap := newConfigMap(mpiJobCopy, replicas) updateDiscoverHostsInConfigMap(configMap, mpiJobCopy, nil) f.setUpConfigMap(configMap) - f.setUpService(newWorkersService(mpiJobCopy)) + f.setUpService(newJobService(mpiJobCopy)) secret, err := newSSHAuthSecret(mpiJobCopy) if err != nil { @@ -818,7 +810,7 @@ func TestCreateSuspendedMPIJob(t *testing.T) { // expect creation of objects scheme.Scheme.Default(mpiJob) - f.expectCreateServiceAction(newWorkersService(mpiJob)) + f.expectCreateServiceAction(newJobService(mpiJob)) cfgMap := newConfigMap(mpiJob, replicas) updateDiscoverHostsInConfigMap(cfgMap, mpiJob, nil) f.expectCreateConfigMapAction(cfgMap) @@ -827,11 +819,6 @@ func TestCreateSuspendedMPIJob(t *testing.T) { t.Fatalf("Failed creating secret") } f.expectCreateSecretAction(secret) - if ptr.Deref(mpiJob.Spec.RunLauncherAsWorker, false) || - implementation == kubeflow.MPIImplementationIntel || - implementation == kubeflow.MPIImplementationMPICH { - f.expectCreateServiceAction(newLauncherService(mpiJob)) - } // expect creating of the launcher fmjc := f.newFakeMPIJobController() @@ -893,7 +880,7 @@ func TestSuspendedRunningMPIJob(t *testing.T) { // setup objects scheme.Scheme.Default(mpiJob) - f.setUpService(newWorkersService(mpiJob)) + f.setUpService(newJobService(mpiJob)) cfgMap := newConfigMap(mpiJob, replicas) updateDiscoverHostsInConfigMap(cfgMap, mpiJob, runningPodList) @@ -966,7 +953,7 @@ func TestResumeMPIJob(t *testing.T) { // expect creation of objects scheme.Scheme.Default(mpiJob) - f.expectCreateServiceAction(newWorkersService(mpiJob)) + f.expectCreateServiceAction(newJobService(mpiJob)) cfgMap := newConfigMap(mpiJob, replicas) updateDiscoverHostsInConfigMap(cfgMap, mpiJob, nil) f.setUpConfigMap(cfgMap) @@ -1022,7 +1009,7 @@ func TestWorkerNotControlledByUs(t *testing.T) { configMap := newConfigMap(mpiJobCopy, replicas) updateDiscoverHostsInConfigMap(configMap, mpiJobCopy, nil) f.setUpConfigMap(configMap) - f.setUpService(newWorkersService(mpiJobCopy)) + f.setUpService(newJobService(mpiJobCopy)) secret, err := newSSHAuthSecret(mpiJobCopy) if err != nil { t.Fatalf("Creating SSH auth secret: %v", err) @@ -1053,7 +1040,7 @@ func TestLauncherActiveWorkerNotReady(t *testing.T) { configMap := newConfigMap(mpiJobCopy, replicas) updateDiscoverHostsInConfigMap(configMap, mpiJobCopy, nil) f.setUpConfigMap(configMap) - f.setUpService(newWorkersService(mpiJobCopy)) + f.setUpService(newJobService(mpiJobCopy)) secret, err := newSSHAuthSecret(mpiJobCopy) if err != nil { t.Fatalf("Creating SSH auth secret: %v", err) @@ -1103,7 +1090,7 @@ func TestLauncherActiveWorkerReady(t *testing.T) { mpiJobCopy := mpiJob.DeepCopy() scheme.Scheme.Default(mpiJobCopy) - f.setUpService(newWorkersService(mpiJobCopy)) + f.setUpService(newJobService(mpiJobCopy)) secret, err := newSSHAuthSecret(mpiJobCopy) if err != nil { t.Fatalf("Creating SSH auth secret: %v", err) @@ -1162,7 +1149,7 @@ func TestWorkerReady(t *testing.T) { mpiJobCopy := mpiJob.DeepCopy() scheme.Scheme.Default(mpiJobCopy) - f.setUpService(newWorkersService(mpiJobCopy)) + f.setUpService(newJobService(mpiJobCopy)) secret, err := newSSHAuthSecret(mpiJobCopy) if err != nil { t.Fatalf("Creating SSH auth secret: %v", err) @@ -1257,7 +1244,7 @@ func TestNewLauncherAndWorker(t *testing.T) { }, Spec: corev1.PodSpec{ Hostname: "foo-launcher", - Subdomain: "foo-worker", + Subdomain: "foo", RestartPolicy: corev1.RestartPolicyOnFailure, Containers: []corev1.Container{ { @@ -1311,8 +1298,11 @@ func TestNewLauncherAndWorker(t *testing.T) { }, }, Spec: corev1.PodSpec{ - Hostname: "foo-worker-0", - Subdomain: "foo-worker", + Hostname: "foo-worker-0", + Subdomain: "foo", + DNSConfig: &corev1.PodDNSConfig{ + Searches: []string{"foo.bar.svc.cluster.local"}, + }, RestartPolicy: corev1.RestartPolicyNever, Containers: []corev1.Container{ { @@ -1426,7 +1416,7 @@ func TestNewLauncherAndWorker(t *testing.T) { HostNetwork: true, DNSPolicy: corev1.DNSClusterFirstWithHostNet, Hostname: "bar-launcher", - Subdomain: "bar-worker", + Subdomain: "bar", RestartPolicy: corev1.RestartPolicyOnFailure, Containers: []corev1.Container{ { @@ -1486,10 +1476,13 @@ func TestNewLauncherAndWorker(t *testing.T) { }, }, Spec: corev1.PodSpec{ - HostNetwork: true, - DNSPolicy: corev1.DNSClusterFirstWithHostNet, - Hostname: "bar-worker-12", - Subdomain: "bar-worker", + HostNetwork: true, + DNSPolicy: corev1.DNSClusterFirstWithHostNet, + Hostname: "bar-worker-12", + Subdomain: "bar", + DNSConfig: &corev1.PodDNSConfig{ + Searches: []string{"bar.foo.svc.cluster.local"}, + }, RestartPolicy: corev1.RestartPolicyNever, Containers: []corev1.Container{ { @@ -1565,7 +1558,7 @@ func TestNewConfigMap(t *testing.T) { }, }, Data: map[string]string{ - "hostfile": "openmpi-without-slots-launcher.tenant-a.svc slots=1\nopenmpi-without-slots-worker-0.openmpi-without-slots-worker.tenant-a.svc slots=1\nopenmpi-without-slots-worker-1.openmpi-without-slots-worker.tenant-a.svc slots=1\n", + "hostfile": "openmpi-without-slots-launcher.openmpi-without-slots.tenant-a.svc slots=1\nopenmpi-without-slots-worker-0.openmpi-without-slots.tenant-a.svc slots=1\nopenmpi-without-slots-worker-1.openmpi-without-slots.tenant-a.svc slots=1\n", }, }, }, @@ -1590,7 +1583,7 @@ func TestNewConfigMap(t *testing.T) { }, }, Data: map[string]string{ - "hostfile": "openmpi-without-slots-launcher.tenant-a.svc slots=1\n", + "hostfile": "openmpi-without-slots-launcher.openmpi-without-slots.tenant-a.svc slots=1\n", }, }, }, @@ -1614,7 +1607,7 @@ func TestNewConfigMap(t *testing.T) { }, }, Data: map[string]string{ - "hostfile": "openmpi-without-slots-worker-0.openmpi-without-slots-worker.tenant-a.svc slots=1\nopenmpi-without-slots-worker-1.openmpi-without-slots-worker.tenant-a.svc slots=1\n", + "hostfile": "openmpi-without-slots-worker-0.openmpi-without-slots.tenant-a.svc slots=1\nopenmpi-without-slots-worker-1.openmpi-without-slots.tenant-a.svc slots=1\n", }, }, }, @@ -1639,7 +1632,7 @@ func TestNewConfigMap(t *testing.T) { }, }, Data: map[string]string{ - "hostfile": "intelmpi-with-slots-worker-0.intelmpi-with-slots-worker.project-x.svc:10\n", + "hostfile": "intelmpi-with-slots-worker-0.intelmpi-with-slots.project-x.svc:10\n", }, }, }, @@ -1664,7 +1657,7 @@ func TestNewConfigMap(t *testing.T) { }, }, Data: map[string]string{ - "hostfile": "mpich-with-slots-worker-0.mpich-with-slots-worker.project-x.svc:10\n", + "hostfile": "mpich-with-slots-worker-0.mpich-with-slots.project-x.svc:10\n", }, }, }, diff --git a/test/integration/mpi_job_controller_test.go b/test/integration/mpi_job_controller_test.go index 883c609e..87c7eaf6 100644 --- a/test/integration/mpi_job_controller_test.go +++ b/test/integration/mpi_job_controller_test.go @@ -17,7 +17,6 @@ package integration import ( "context" "fmt" - "strings" "testing" "time" @@ -896,7 +895,7 @@ func validateMPIJobDependencies( if err := wait.PollUntilContextTimeout(ctx, waitInterval, wait.ForeverTestTimeout, false, func(ctx context.Context) (bool, error) { problems = nil var err error - svc, err = getServiceForJob(ctx, kubeClient, job, "worker") + svc, err = getServiceForJob(ctx, kubeClient, job) if err != nil { return false, err } @@ -1027,16 +1026,14 @@ func updatePodsCondition(ctx context.Context, client kubernetes.Interface, pods return nil } -func getServiceForJob(ctx context.Context, client kubernetes.Interface, job *kubeflow.MPIJob, role string) (*corev1.Service, error) { +func getServiceForJob(ctx context.Context, client kubernetes.Interface, job *kubeflow.MPIJob) (*corev1.Service, error) { result, err := client.CoreV1().Services(job.Namespace).List(ctx, metav1.ListOptions{}) if err != nil { return nil, err } for _, obj := range result.Items { - if strings.HasSuffix(obj.Name, role) { - if metav1.IsControlledBy(&obj, job) { - return &obj, nil - } + if metav1.IsControlledBy(&obj, job) { + return &obj, nil } } return nil, nil