From 81ef2f99308a38cba71b7cdecb8039eccf16cb1c Mon Sep 17 00:00:00 2001 From: kuizhiqing Date: Sat, 17 Feb 2024 16:17:03 +0000 Subject: [PATCH] set RunLauncherAsWorker optional; fix ci --- deploy/v2beta1/mpi-operator.yaml | 2 +- examples/v2beta1/pi/pi2.yaml | 49 ------------- manifests/base/kubeflow.org_mpijobs.yaml | 2 +- pkg/apis/kubeflow/v2beta1/swagger.json | 2 +- pkg/apis/kubeflow/v2beta1/types.go | 4 +- .../kubeflow/v2beta1/zz_generated.deepcopy.go | 5 ++ pkg/controller/mpi_job_controller.go | 15 ++-- pkg/controller/mpi_job_controller_test.go | 73 +++++++++++++------ sdk/python/v2beta1/docs/V2beta1MPIJobSpec.md | 2 +- .../mpijob/models/v2beta1_mpi_job_spec.py | 4 +- 10 files changed, 74 insertions(+), 84 deletions(-) delete mode 100644 examples/v2beta1/pi/pi2.yaml diff --git a/deploy/v2beta1/mpi-operator.yaml b/deploy/v2beta1/mpi-operator.yaml index 445bf49c6..a8c0ed09f 100644 --- a/deploy/v2beta1/mpi-operator.yaml +++ b/deploy/v2beta1/mpi-operator.yaml @@ -8192,7 +8192,7 @@ spec: type: object runLauncherAsWorker: default: false - description: RunLauncherAsWorker indicate wether to run worker process + description: RunLauncherAsWorker indicates wether to run worker process in launcher Defaults to false. type: boolean runPolicy: diff --git a/examples/v2beta1/pi/pi2.yaml b/examples/v2beta1/pi/pi2.yaml deleted file mode 100644 index 9429c1e02..000000000 --- a/examples/v2beta1/pi/pi2.yaml +++ /dev/null @@ -1,49 +0,0 @@ -apiVersion: kubeflow.org/v2beta1 -kind: MPIJob -metadata: - name: pi2 -spec: - slotsPerWorker: 1 - runLauncherAsWorker: true - runPolicy: - cleanPodPolicy: Running - ttlSecondsAfterFinished: 60 - sshAuthMountPath: /home/mpiuser/.ssh - mpiReplicaSpecs: - Launcher: - replicas: 1 - template: - spec: - containers: - - image: mpioperator/mpi-pi:openmpi - name: mpi-launcher - securityContext: - runAsUser: 1000 - command: - - bash - args: - - -c - - "/usr/sbin/sshd -f /home/mpiuser/.sshd_config && mpirun /home/mpiuser/pi" - resources: - limits: - cpu: 1 - memory: 1Gi - Worker: - replicas: 2 - template: - spec: - containers: - - image: mpioperator/mpi-pi:openmpi - name: mpi-worker - securityContext: - runAsUser: 1000 - command: - - /usr/sbin/sshd - args: - - -De - - -f - - /home/mpiuser/.sshd_config - resources: - limits: - cpu: 1 - memory: 1Gi diff --git a/manifests/base/kubeflow.org_mpijobs.yaml b/manifests/base/kubeflow.org_mpijobs.yaml index fbf626fe0..10518a654 100644 --- a/manifests/base/kubeflow.org_mpijobs.yaml +++ b/manifests/base/kubeflow.org_mpijobs.yaml @@ -8169,7 +8169,7 @@ spec: type: object runLauncherAsWorker: default: false - description: RunLauncherAsWorker indicate wether to run worker process + description: RunLauncherAsWorker indicates wether to run worker process in launcher Defaults to false. type: boolean runPolicy: diff --git a/pkg/apis/kubeflow/v2beta1/swagger.json b/pkg/apis/kubeflow/v2beta1/swagger.json index 2ecee503d..004b34ecf 100644 --- a/pkg/apis/kubeflow/v2beta1/swagger.json +++ b/pkg/apis/kubeflow/v2beta1/swagger.json @@ -157,7 +157,7 @@ } }, "runLauncherAsWorker": { - "description": "RunLauncherAsWorker indicate wether to run worker process in launcher Defaults to false.", + "description": "RunLauncherAsWorker indicates wether to run worker process in launcher Defaults to false.", "type": "boolean" }, "runPolicy": { diff --git a/pkg/apis/kubeflow/v2beta1/types.go b/pkg/apis/kubeflow/v2beta1/types.go index 041d68463..65a098582 100644 --- a/pkg/apis/kubeflow/v2beta1/types.go +++ b/pkg/apis/kubeflow/v2beta1/types.go @@ -154,11 +154,11 @@ type MPIJobSpec struct { // +kubebuilder:default:=1 SlotsPerWorker *int32 `json:"slotsPerWorker,omitempty"` - // RunLauncherAsWorker indicate wether to run worker process in launcher + // RunLauncherAsWorker indicates wether to run worker process in launcher // Defaults to false. // +optional // +kubebuilder:default:=false - RunLauncherAsWorker bool `json:"runLauncherAsWorker,omitempty"` + RunLauncherAsWorker *bool `json:"runLauncherAsWorker,omitempty"` // RunPolicy encapsulates various runtime policies of the job. RunPolicy RunPolicy `json:"runPolicy,omitempty"` diff --git a/pkg/apis/kubeflow/v2beta1/zz_generated.deepcopy.go b/pkg/apis/kubeflow/v2beta1/zz_generated.deepcopy.go index 1036b571b..75ef51aa3 100644 --- a/pkg/apis/kubeflow/v2beta1/zz_generated.deepcopy.go +++ b/pkg/apis/kubeflow/v2beta1/zz_generated.deepcopy.go @@ -163,6 +163,11 @@ func (in *MPIJobSpec) DeepCopyInto(out *MPIJobSpec) { *out = new(int32) **out = **in } + if in.RunLauncherAsWorker != nil { + in, out := &in.RunLauncherAsWorker, &out.RunLauncherAsWorker + *out = new(bool) + **out = **in + } in.RunPolicy.DeepCopyInto(&out.RunPolicy) if in.MPIReplicaSpecs != nil { in, out := &in.MPIReplicaSpecs, &out.MPIReplicaSpecs diff --git a/pkg/controller/mpi_job_controller.go b/pkg/controller/mpi_job_controller.go index 1c79d1dff..ecc98fa26 100644 --- a/pkg/controller/mpi_job_controller.go +++ b/pkg/controller/mpi_job_controller.go @@ -656,12 +656,15 @@ func (c *MPIJobController) syncHandler(key string) error { return err } } - // NEW: always create service for launcher for different implementations and compatible with running launcher as worker // The Intel and MPICH implementations require workers to communicate with the // launcher through its hostname. For that, we create a Service which // has the same name as the launcher's hostname. - if _, err = c.getOrCreateService(mpiJob, newLauncherService(mpiJob)); err != nil { - return fmt.Errorf("getting or creating Service to front launcher: %w", err) + if (mpiJob.Spec.RunLauncherAsWorker != nil && *mpiJob.Spec.RunLauncherAsWorker) || + mpiJob.Spec.MPIImplementation == kubeflow.MPIImplementationIntel || + mpiJob.Spec.MPIImplementation == kubeflow.MPIImplementationMPICH { + if _, err = c.getOrCreateService(mpiJob, newLauncherService(mpiJob)); err != nil { + return fmt.Errorf("getting or creating Service to front launcher: %w", err) + } } if launcher == nil { if mpiJob.Spec.LauncherCreationPolicy == kubeflow.LauncherCreationPolicyAtStartup || c.countReadyWorkerPods(worker) == len(worker) { @@ -1284,7 +1287,7 @@ func newConfigMap(mpiJob *kubeflow.MPIJob, workerReplicas int32) *corev1.ConfigM // note that pod.spec.dnsConfig also affect the svc resolution // ref: https://kubernetes.io/docs/concepts/services-networking/dns-pod-service/ // launcher can be reach with hostname or service name - if mpiJob.Spec.RunLauncherAsWorker { + if mpiJob.Spec.RunLauncherAsWorker != nil && *mpiJob.Spec.RunLauncherAsWorker { launcherService := mpiJob.Name + launcherSuffix switch mpiJob.Spec.MPIImplementation { case kubeflow.MPIImplementationOpenMPI: @@ -1331,8 +1334,8 @@ func updateDiscoverHostsInConfigMap(configMap *corev1.ConfigMap, mpiJob *kubeflo var buffer bytes.Buffer buffer.WriteString("#!/bin/sh\n") - // We donnot check if launcher is running here, launcher should always be there or the job failed - if mpiJob.Spec.RunLauncherAsWorker { + // We don't check if launcher is running here, launcher should always be there or the job failed + if mpiJob.Spec.RunLauncherAsWorker != nil && *mpiJob.Spec.RunLauncherAsWorker { launcherService := mpiJob.Name + launcherSuffix buffer.WriteString(fmt.Sprintf("echo %s.%s.svc\n", launcherService, mpiJob.Namespace)) } diff --git a/pkg/controller/mpi_job_controller_test.go b/pkg/controller/mpi_job_controller_test.go index 3faf6f611..2186fac46 100644 --- a/pkg/controller/mpi_job_controller_test.go +++ b/pkg/controller/mpi_job_controller_test.go @@ -110,18 +110,6 @@ func newMPIJobCommon(name string, startTime, completionTime *metav1.Time) *kubef CleanPodPolicy: &cleanPodPolicyAll, }, MPIReplicaSpecs: map[kubeflow.MPIReplicaType]*kubeflow.ReplicaSpec{ - kubeflow.MPIReplicaTypeWorker: { - Template: corev1.PodTemplateSpec{ - Spec: corev1.PodSpec{ - Containers: []corev1.Container{ - { - Name: "foo", - Image: "bar", - }, - }, - }, - }, - }, kubeflow.MPIReplicaTypeLauncher: { Template: corev1.PodTemplateSpec{ Spec: corev1.PodSpec{ @@ -151,7 +139,22 @@ func newMPIJobCommon(name string, startTime, completionTime *metav1.Time) *kubef func newMPIJob(name string, replicas *int32, startTime, completionTime *metav1.Time) *kubeflow.MPIJob { mpiJob := newMPIJobCommon(name, startTime, completionTime) - mpiJob.Spec.MPIReplicaSpecs[kubeflow.MPIReplicaTypeWorker].Replicas = replicas + if *replicas > 0 { + mpiJob.Spec.MPIReplicaSpecs[kubeflow.MPIReplicaTypeWorker] = + &kubeflow.ReplicaSpec{ + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "foo", + Image: "bar", + }, + }, + }, + }, + Replicas: replicas, + } + } return mpiJob } @@ -526,7 +529,11 @@ func TestAllResourcesCreated(t *testing.T) { for i := 0; i < 5; i++ { f.expectCreatePodAction(fmjc.newWorker(mpiJobCopy, i)) } - f.expectCreateServiceAction(newLauncherService(mpiJobCopy)) + if (mpiJob.Spec.RunLauncherAsWorker != nil && *mpiJob.Spec.RunLauncherAsWorker) || + implementation == kubeflow.MPIImplementationIntel || + implementation == kubeflow.MPIImplementationMPICH { + f.expectCreateServiceAction(newLauncherService(mpiJobCopy)) + } f.expectCreateJobAction(fmjc.newLauncherJob(mpiJobCopy)) mpiJobCopy.Status.Conditions = []kubeflow.JobCondition{newCondition(kubeflow.JobCreated, corev1.ConditionTrue, mpiJobCreatedReason, "MPIJob default/foo is created.")} @@ -819,7 +826,11 @@ func TestCreateSuspendedMPIJob(t *testing.T) { t.Fatalf("Failed creating secret") } f.expectCreateSecretAction(secret) - f.expectCreateServiceAction(newLauncherService(mpiJob)) + if (mpiJob.Spec.RunLauncherAsWorker != nil && *mpiJob.Spec.RunLauncherAsWorker) || + implementation == kubeflow.MPIImplementationIntel || + implementation == kubeflow.MPIImplementationMPICH { + f.expectCreateServiceAction(newLauncherService(mpiJob)) + } // expect creating of the launcher fmjc := f.newFakeMPIJobController() @@ -881,7 +892,6 @@ func TestSuspendedRunningMPIJob(t *testing.T) { // setup objects scheme.Scheme.Default(mpiJob) - f.setUpService(newLauncherService(mpiJob)) f.setUpService(newWorkersService(mpiJob)) cfgMap := newConfigMap(mpiJob, replicas) @@ -986,7 +996,6 @@ func TestResumeMPIJob(t *testing.T) { // expect the launcher update to resume it launcherCopy := launcher.DeepCopy() launcherCopy.Spec.Suspend = pointer.Bool(false) - f.expectCreateServiceAction(newLauncherService(mpiJob)) f.expectUpdateJobAction(launcherCopy) // expect an update to add the conditions @@ -1043,7 +1052,6 @@ func TestLauncherActiveWorkerNotReady(t *testing.T) { configMap := newConfigMap(mpiJobCopy, replicas) updateDiscoverHostsInConfigMap(configMap, mpiJobCopy, nil) f.setUpConfigMap(configMap) - f.setUpService(newLauncherService(mpiJob)) f.setUpService(newWorkersService(mpiJobCopy)) secret, err := newSSHAuthSecret(mpiJobCopy) if err != nil { @@ -1094,7 +1102,6 @@ func TestLauncherActiveWorkerReady(t *testing.T) { mpiJobCopy := mpiJob.DeepCopy() scheme.Scheme.Default(mpiJobCopy) - f.setUpService(newLauncherService(mpiJob)) f.setUpService(newWorkersService(mpiJobCopy)) secret, err := newSSHAuthSecret(mpiJobCopy) if err != nil { @@ -1154,7 +1161,6 @@ func TestWorkerReady(t *testing.T) { mpiJobCopy := mpiJob.DeepCopy() scheme.Scheme.Default(mpiJobCopy) - f.setUpService(newLauncherService(mpiJob)) f.setUpService(newWorkersService(mpiJobCopy)) secret, err := newSSHAuthSecret(mpiJobCopy) if err != nil { @@ -1545,7 +1551,7 @@ func TestNewConfigMap(t *testing.T) { }, Spec: kubeflow.MPIJobSpec{ MPIImplementation: kubeflow.MPIImplementationOpenMPI, - RunLauncherAsWorker: true, + RunLauncherAsWorker: pointer.Bool(true), }, }, workerReplicas: 2, @@ -1562,6 +1568,31 @@ func TestNewConfigMap(t *testing.T) { }, }, }, + "OpenMPI without slots, zero explicit workers": { + mpiJob: &kubeflow.MPIJob{ + ObjectMeta: metav1.ObjectMeta{ + Name: "openmpi-without-slots", + Namespace: "tenant-a", + }, + Spec: kubeflow.MPIJobSpec{ + MPIImplementation: kubeflow.MPIImplementationOpenMPI, + RunLauncherAsWorker: pointer.Bool(true), + }, + }, + workerReplicas: 0, + wantCM: &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: "openmpi-without-slots-config", + Namespace: "tenant-a", + Labels: map[string]string{ + "app": "openmpi-without-slots", + }, + }, + Data: map[string]string{ + "hostfile": "openmpi-without-slots-launcher.tenant-a.svc slots=1\n", + }, + }, + }, "OpenMPI without slots, disable launcher as worker": { mpiJob: &kubeflow.MPIJob{ ObjectMeta: metav1.ObjectMeta{ diff --git a/sdk/python/v2beta1/docs/V2beta1MPIJobSpec.md b/sdk/python/v2beta1/docs/V2beta1MPIJobSpec.md index 88b9ae18c..295b374b7 100644 --- a/sdk/python/v2beta1/docs/V2beta1MPIJobSpec.md +++ b/sdk/python/v2beta1/docs/V2beta1MPIJobSpec.md @@ -7,7 +7,7 @@ Name | Type | Description | Notes **launcher_creation_policy** | **str** | launcherCreationPolicy if WaitForWorkersReady, the launcher is created only after all workers are in Ready state. Defaults to AtStartup. | [optional] **mpi_implementation** | **str** | MPIImplementation is the MPI implementation. Options are \"OpenMPI\" (default), \"Intel\" and \"MPICH\". | [optional] **mpi_replica_specs** | [**dict(str, V2beta1ReplicaSpec)**](V2beta1ReplicaSpec.md) | MPIReplicaSpecs contains maps from `MPIReplicaType` to `ReplicaSpec` that specify the MPI replicas to run. | -**run_launcher_as_worker** | **bool** | RunLauncherAsWorker indicate wether to run worker process in launcher Defaults to false. | [optional] +**run_launcher_as_worker** | **bool** | RunLauncherAsWorker indicates wether to run worker process in launcher Defaults to false. | [optional] **run_policy** | [**V2beta1RunPolicy**](V2beta1RunPolicy.md) | | [optional] **slots_per_worker** | **int** | Specifies the number of slots per worker used in hostfile. Defaults to 1. | [optional] **ssh_auth_mount_path** | **str** | SSHAuthMountPath is the directory where SSH keys are mounted. Defaults to \"/root/.ssh\". | [optional] diff --git a/sdk/python/v2beta1/mpijob/models/v2beta1_mpi_job_spec.py b/sdk/python/v2beta1/mpijob/models/v2beta1_mpi_job_spec.py index c9cb4d314..81accf8a3 100644 --- a/sdk/python/v2beta1/mpijob/models/v2beta1_mpi_job_spec.py +++ b/sdk/python/v2beta1/mpijob/models/v2beta1_mpi_job_spec.py @@ -156,7 +156,7 @@ def mpi_replica_specs(self, mpi_replica_specs): def run_launcher_as_worker(self): """Gets the run_launcher_as_worker of this V2beta1MPIJobSpec. # noqa: E501 - RunLauncherAsWorker indicate wether to run worker process in launcher Defaults to false. # noqa: E501 + RunLauncherAsWorker indicates wether to run worker process in launcher Defaults to false. # noqa: E501 :return: The run_launcher_as_worker of this V2beta1MPIJobSpec. # noqa: E501 :rtype: bool @@ -167,7 +167,7 @@ def run_launcher_as_worker(self): def run_launcher_as_worker(self, run_launcher_as_worker): """Sets the run_launcher_as_worker of this V2beta1MPIJobSpec. - RunLauncherAsWorker indicate wether to run worker process in launcher Defaults to false. # noqa: E501 + RunLauncherAsWorker indicates wether to run worker process in launcher Defaults to false. # noqa: E501 :param run_launcher_as_worker: The run_launcher_as_worker of this V2beta1MPIJobSpec. # noqa: E501 :type run_launcher_as_worker: bool