From 2014d2f909b8477dd4748f7ce534d3f0c70a1419 Mon Sep 17 00:00:00 2001 From: kuizhiqing Date: Fri, 5 Jan 2024 11:14:08 +0000 Subject: [PATCH] add RunLauncherAsWorker in spec --- examples/v2beta1/pi/pi-intel.yaml | 2 +- examples/v2beta1/pi/pi-mpich.yaml | 2 +- examples/v2beta1/pi/pi.yaml | 2 +- examples/v2beta1/pi/pi2.yaml | 1 + pkg/apis/kubeflow/v2beta1/types.go | 6 ++++ pkg/controller/mpi_job_controller.go | 29 +++++------------ pkg/controller/mpi_job_controller_test.go | 39 ++--------------------- 7 files changed, 20 insertions(+), 61 deletions(-) diff --git a/examples/v2beta1/pi/pi-intel.yaml b/examples/v2beta1/pi/pi-intel.yaml index 4cedd3c28..3bbde70f6 100644 --- a/examples/v2beta1/pi/pi-intel.yaml +++ b/examples/v2beta1/pi/pi-intel.yaml @@ -27,7 +27,7 @@ spec: resources: limits: cpu: 1 - memory: 200m + memory: 1Gi Worker: replicas: 2 template: diff --git a/examples/v2beta1/pi/pi-mpich.yaml b/examples/v2beta1/pi/pi-mpich.yaml index ad3414003..7c4a70cb7 100644 --- a/examples/v2beta1/pi/pi-mpich.yaml +++ b/examples/v2beta1/pi/pi-mpich.yaml @@ -27,7 +27,7 @@ spec: resources: limits: cpu: 1 - memory: 200m + memory: 1Gi Worker: replicas: 2 template: diff --git a/examples/v2beta1/pi/pi.yaml b/examples/v2beta1/pi/pi.yaml index e1d75cd93..109b62071 100644 --- a/examples/v2beta1/pi/pi.yaml +++ b/examples/v2beta1/pi/pi.yaml @@ -27,7 +27,7 @@ spec: resources: limits: cpu: 1 - memory: 200m + memory: 1Gi Worker: replicas: 2 template: diff --git a/examples/v2beta1/pi/pi2.yaml b/examples/v2beta1/pi/pi2.yaml index 56eaa50c4..9429c1e02 100644 --- a/examples/v2beta1/pi/pi2.yaml +++ b/examples/v2beta1/pi/pi2.yaml @@ -4,6 +4,7 @@ metadata: name: pi2 spec: slotsPerWorker: 1 + runLauncherAsWorker: true runPolicy: cleanPodPolicy: Running ttlSecondsAfterFinished: 60 diff --git a/pkg/apis/kubeflow/v2beta1/types.go b/pkg/apis/kubeflow/v2beta1/types.go index 7525a053e..041d68463 100644 --- a/pkg/apis/kubeflow/v2beta1/types.go +++ b/pkg/apis/kubeflow/v2beta1/types.go @@ -154,6 +154,12 @@ type MPIJobSpec struct { // +kubebuilder:default:=1 SlotsPerWorker *int32 `json:"slotsPerWorker,omitempty"` + // RunLauncherAsWorker indicate wether to run worker process in launcher + // Defaults to false. + // +optional + // +kubebuilder:default:=false + RunLauncherAsWorker bool `json:"runLauncherAsWorker,omitempty"` + // RunPolicy encapsulates various runtime policies of the job. RunPolicy RunPolicy `json:"runPolicy,omitempty"` diff --git a/pkg/controller/mpi_job_controller.go b/pkg/controller/mpi_job_controller.go index 21dcc27ef..3edfc4768 100644 --- a/pkg/controller/mpi_job_controller.go +++ b/pkg/controller/mpi_job_controller.go @@ -1243,26 +1243,6 @@ func (c *MPIJobController) doUpdateJobStatus(mpiJob *kubeflow.MPIJob) error { return err } -// enableLauncherAsWorker check whether to run worker process in launcher -func enableLauncherAsWorker(mpiJob *kubeflow.MPIJob) bool { - // case 1: have no worker - worker := mpiJob.Spec.MPIReplicaSpecs[kubeflow.MPIReplicaTypeWorker] - if worker == nil || len(worker.Template.Spec.Containers) < 1 { - return true - } - - // case -1: no resource declaration for launcher - launcher := mpiJob.Spec.MPIReplicaSpecs[kubeflow.MPIReplicaTypeLauncher] - launcherRes := launcher.Template.Spec.Containers[0].Resources - if launcherRes.Limits == nil && launcherRes.Requests == nil { - return false - } - - // case 2: launcher declare the same resource as worker - workerRes := worker.Template.Spec.Containers[0].Resources - return equality.Semantic.DeepEqual(workerRes, launcherRes) -} - // newConfigMap creates a new ConfigMap containing configurations for an MPIJob // resource. It also sets the appropriate OwnerReferences on the resource so // handleObject can discover the MPIJob resource that 'owns' it. @@ -1276,7 +1256,7 @@ func newConfigMap(mpiJob *kubeflow.MPIJob, workerReplicas int32) *corev1.ConfigM // note that pod.spec.dnsConfig also affect the svc resolution // ref: https://kubernetes.io/docs/concepts/services-networking/dns-pod-service/ // launcher can be reach with hostname or service name - if enableLauncherAsWorker(mpiJob) { + if mpiJob.Spec.RunLauncherAsWorker { launcherService := mpiJob.Name + launcherSuffix switch mpiJob.Spec.MPIImplementation { case kubeflow.MPIImplementationOpenMPI: @@ -1322,6 +1302,13 @@ func updateDiscoverHostsInConfigMap(configMap *corev1.ConfigMap, mpiJob *kubeflo var buffer bytes.Buffer buffer.WriteString("#!/bin/sh\n") + + // We donnot check if launcher is running here, launcher should always be there or the job failed + if mpiJob.Spec.RunLauncherAsWorker { + launcherService := mpiJob.Name + launcherSuffix + buffer.WriteString(fmt.Sprintf("echo %s.%s.svc\n", launcherService, mpiJob.Namespace)) + } + workersService := mpiJob.Name + workerSuffix for _, p := range runningPods { buffer.WriteString(fmt.Sprintf("echo %s.%s.%s.svc\n", p.Name, workersService, p.Namespace)) diff --git a/pkg/controller/mpi_job_controller_test.go b/pkg/controller/mpi_job_controller_test.go index af46b5682..3faf6f611 100644 --- a/pkg/controller/mpi_job_controller_test.go +++ b/pkg/controller/mpi_job_controller_test.go @@ -25,7 +25,6 @@ import ( batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" schedulingv1 "k8s.io/api/scheduling/v1" - "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" @@ -1532,25 +1531,6 @@ func TestNewLauncherAndWorker(t *testing.T) { } } -func newReplicaSpec(name string, cpu string) *kubeflow.ReplicaSpec { - return &kubeflow.ReplicaSpec{ - Template: corev1.PodTemplateSpec{ - Spec: corev1.PodSpec{ - Containers: []corev1.Container{ - { - Name: name, - Resources: corev1.ResourceRequirements{ - Limits: corev1.ResourceList{ - corev1.ResourceCPU: resource.MustParse(cpu), - }, - }, - }, - }, - }, - }, - } -} - func TestNewConfigMap(t *testing.T) { testCases := map[string]struct { mpiJob *kubeflow.MPIJob @@ -1564,11 +1544,8 @@ func TestNewConfigMap(t *testing.T) { Namespace: "tenant-a", }, Spec: kubeflow.MPIJobSpec{ - MPIImplementation: kubeflow.MPIImplementationOpenMPI, - MPIReplicaSpecs: map[kubeflow.MPIReplicaType]*kubeflow.ReplicaSpec{ - kubeflow.MPIReplicaTypeLauncher: newReplicaSpec("launcher", "2"), - kubeflow.MPIReplicaTypeWorker: newReplicaSpec("worker", "2"), - }, + MPIImplementation: kubeflow.MPIImplementationOpenMPI, + RunLauncherAsWorker: true, }, }, workerReplicas: 2, @@ -1593,10 +1570,6 @@ func TestNewConfigMap(t *testing.T) { }, Spec: kubeflow.MPIJobSpec{ MPIImplementation: kubeflow.MPIImplementationOpenMPI, - MPIReplicaSpecs: map[kubeflow.MPIReplicaType]*kubeflow.ReplicaSpec{ - kubeflow.MPIReplicaTypeLauncher: newReplicaSpec("launcher", "1"), - kubeflow.MPIReplicaTypeWorker: newReplicaSpec("worker", "2"), - }, }, }, workerReplicas: 2, @@ -1622,10 +1595,6 @@ func TestNewConfigMap(t *testing.T) { Spec: kubeflow.MPIJobSpec{ SlotsPerWorker: pointer.Int32(10), MPIImplementation: kubeflow.MPIImplementationIntel, - MPIReplicaSpecs: map[kubeflow.MPIReplicaType]*kubeflow.ReplicaSpec{ - kubeflow.MPIReplicaTypeLauncher: newReplicaSpec("launcher", "1"), - kubeflow.MPIReplicaTypeWorker: newReplicaSpec("worker", "2"), - }, }, }, workerReplicas: 1, @@ -1651,10 +1620,6 @@ func TestNewConfigMap(t *testing.T) { Spec: kubeflow.MPIJobSpec{ SlotsPerWorker: pointer.Int32(10), MPIImplementation: kubeflow.MPIImplementationMPICH, - MPIReplicaSpecs: map[kubeflow.MPIReplicaType]*kubeflow.ReplicaSpec{ - kubeflow.MPIReplicaTypeLauncher: newReplicaSpec("launcher", "1"), - kubeflow.MPIReplicaTypeWorker: newReplicaSpec("worker", "2"), - }, }, }, workerReplicas: 1,