Skip to content

Commit

Permalink
add RunLauncherAsWorker in spec
Browse files Browse the repository at this point in the history
  • Loading branch information
kuizhiqing committed Jan 5, 2024
1 parent 471b26c commit 9dfb551
Show file tree
Hide file tree
Showing 6 changed files with 19 additions and 61 deletions.
2 changes: 1 addition & 1 deletion examples/v2beta1/pi/pi-intel.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ spec:
resources:
limits:
cpu: 1
memory: 200m
memory: 1Gi
Worker:
replicas: 2
template:
Expand Down
2 changes: 1 addition & 1 deletion examples/v2beta1/pi/pi-mpich.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ spec:
resources:
limits:
cpu: 1
memory: 200m
memory: 1Gi
Worker:
replicas: 2
template:
Expand Down
2 changes: 1 addition & 1 deletion examples/v2beta1/pi/pi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ spec:
resources:
limits:
cpu: 1
memory: 200m
memory: 1Gi
Worker:
replicas: 2
template:
Expand Down
6 changes: 6 additions & 0 deletions pkg/apis/kubeflow/v2beta1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,12 @@ type MPIJobSpec struct {
// +kubebuilder:default:=1
SlotsPerWorker *int32 `json:"slotsPerWorker,omitempty"`

// RunLauncherAsWorker indicate wether to run worker process in launcher
// Defaults to false.
// +optional
// +kubebuilder:default:=false
RunLauncherAsWorker bool `json:"runLauncherAsWorker,omitempty"`

// RunPolicy encapsulates various runtime policies of the job.
RunPolicy RunPolicy `json:"runPolicy,omitempty"`

Expand Down
29 changes: 8 additions & 21 deletions pkg/controller/mpi_job_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -1243,26 +1243,6 @@ func (c *MPIJobController) doUpdateJobStatus(mpiJob *kubeflow.MPIJob) error {
return err
}

// enableLauncherAsWorker check whether to run worker process in launcher
func enableLauncherAsWorker(mpiJob *kubeflow.MPIJob) bool {
// case 1: have no worker
worker := mpiJob.Spec.MPIReplicaSpecs[kubeflow.MPIReplicaTypeWorker]
if worker == nil || len(worker.Template.Spec.Containers) < 1 {
return true
}

// case -1: no resource declaration for launcher
launcher := mpiJob.Spec.MPIReplicaSpecs[kubeflow.MPIReplicaTypeLauncher]
launcherRes := launcher.Template.Spec.Containers[0].Resources
if launcherRes.Limits == nil && launcherRes.Requests == nil {
return false
}

// case 2: launcher declare the same resource as worker
workerRes := worker.Template.Spec.Containers[0].Resources
return equality.Semantic.DeepEqual(workerRes, launcherRes)
}

// newConfigMap creates a new ConfigMap containing configurations for an MPIJob
// resource. It also sets the appropriate OwnerReferences on the resource so
// handleObject can discover the MPIJob resource that 'owns' it.
Expand All @@ -1276,7 +1256,7 @@ func newConfigMap(mpiJob *kubeflow.MPIJob, workerReplicas int32) *corev1.ConfigM
// note that pod.spec.dnsConfig also affect the svc resolution
// ref: https://kubernetes.io/docs/concepts/services-networking/dns-pod-service/
// launcher can be reach with hostname or service name
if enableLauncherAsWorker(mpiJob) {
if mpiJob.Spec.RunLauncherAsWorker {
launcherService := mpiJob.Name + launcherSuffix
switch mpiJob.Spec.MPIImplementation {
case kubeflow.MPIImplementationOpenMPI:
Expand Down Expand Up @@ -1322,6 +1302,13 @@ func updateDiscoverHostsInConfigMap(configMap *corev1.ConfigMap, mpiJob *kubeflo

var buffer bytes.Buffer
buffer.WriteString("#!/bin/sh\n")

// We donnot check if launcher is running here, launcher should always be there or the job failed
if mpiJob.Spec.RunLauncherAsWorker {
launcherService := mpiJob.Name + launcherSuffix
buffer.WriteString(fmt.Sprintf("echo %s.%s.svc\n", launcherService, mpiJob.Namespace))
}

workersService := mpiJob.Name + workerSuffix
for _, p := range runningPods {
buffer.WriteString(fmt.Sprintf("echo %s.%s.%s.svc\n", p.Name, workersService, p.Namespace))
Expand Down
39 changes: 2 additions & 37 deletions pkg/controller/mpi_job_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
schedulingv1 "k8s.io/api/scheduling/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
Expand Down Expand Up @@ -1532,25 +1531,6 @@ func TestNewLauncherAndWorker(t *testing.T) {
}
}

func newReplicaSpec(name string, cpu string) *kubeflow.ReplicaSpec {
return &kubeflow.ReplicaSpec{
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: name,
Resources: corev1.ResourceRequirements{
Limits: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse(cpu),
},
},
},
},
},
},
}
}

func TestNewConfigMap(t *testing.T) {
testCases := map[string]struct {
mpiJob *kubeflow.MPIJob
Expand All @@ -1564,11 +1544,8 @@ func TestNewConfigMap(t *testing.T) {
Namespace: "tenant-a",
},
Spec: kubeflow.MPIJobSpec{
MPIImplementation: kubeflow.MPIImplementationOpenMPI,
MPIReplicaSpecs: map[kubeflow.MPIReplicaType]*kubeflow.ReplicaSpec{
kubeflow.MPIReplicaTypeLauncher: newReplicaSpec("launcher", "2"),
kubeflow.MPIReplicaTypeWorker: newReplicaSpec("worker", "2"),
},
MPIImplementation: kubeflow.MPIImplementationOpenMPI,
RunLauncherAsWorker: true,
},
},
workerReplicas: 2,
Expand All @@ -1593,10 +1570,6 @@ func TestNewConfigMap(t *testing.T) {
},
Spec: kubeflow.MPIJobSpec{
MPIImplementation: kubeflow.MPIImplementationOpenMPI,
MPIReplicaSpecs: map[kubeflow.MPIReplicaType]*kubeflow.ReplicaSpec{
kubeflow.MPIReplicaTypeLauncher: newReplicaSpec("launcher", "1"),
kubeflow.MPIReplicaTypeWorker: newReplicaSpec("worker", "2"),
},
},
},
workerReplicas: 2,
Expand All @@ -1622,10 +1595,6 @@ func TestNewConfigMap(t *testing.T) {
Spec: kubeflow.MPIJobSpec{
SlotsPerWorker: pointer.Int32(10),
MPIImplementation: kubeflow.MPIImplementationIntel,
MPIReplicaSpecs: map[kubeflow.MPIReplicaType]*kubeflow.ReplicaSpec{
kubeflow.MPIReplicaTypeLauncher: newReplicaSpec("launcher", "1"),
kubeflow.MPIReplicaTypeWorker: newReplicaSpec("worker", "2"),
},
},
},
workerReplicas: 1,
Expand All @@ -1651,10 +1620,6 @@ func TestNewConfigMap(t *testing.T) {
Spec: kubeflow.MPIJobSpec{
SlotsPerWorker: pointer.Int32(10),
MPIImplementation: kubeflow.MPIImplementationMPICH,
MPIReplicaSpecs: map[kubeflow.MPIReplicaType]*kubeflow.ReplicaSpec{
kubeflow.MPIReplicaTypeLauncher: newReplicaSpec("launcher", "1"),
kubeflow.MPIReplicaTypeWorker: newReplicaSpec("worker", "2"),
},
},
},
workerReplicas: 1,
Expand Down

0 comments on commit 9dfb551

Please sign in to comment.