Skip to content

Commit

Permalink
set RunLauncherAsWorker optional; fix ci
Browse files Browse the repository at this point in the history
  • Loading branch information
kuizhiqing committed Feb 17, 2024
1 parent c4bdd7a commit 81ef2f9
Show file tree
Hide file tree
Showing 10 changed files with 74 additions and 84 deletions.
2 changes: 1 addition & 1 deletion deploy/v2beta1/mpi-operator.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8192,7 +8192,7 @@ spec:
type: object
runLauncherAsWorker:
default: false
description: RunLauncherAsWorker indicate wether to run worker process
description: RunLauncherAsWorker indicates wether to run worker process
in launcher Defaults to false.
type: boolean
runPolicy:
Expand Down
49 changes: 0 additions & 49 deletions examples/v2beta1/pi/pi2.yaml

This file was deleted.

2 changes: 1 addition & 1 deletion manifests/base/kubeflow.org_mpijobs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8169,7 +8169,7 @@ spec:
type: object
runLauncherAsWorker:
default: false
description: RunLauncherAsWorker indicate wether to run worker process
description: RunLauncherAsWorker indicates wether to run worker process
in launcher Defaults to false.
type: boolean
runPolicy:
Expand Down
2 changes: 1 addition & 1 deletion pkg/apis/kubeflow/v2beta1/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@
}
},
"runLauncherAsWorker": {
"description": "RunLauncherAsWorker indicate wether to run worker process in launcher Defaults to false.",
"description": "RunLauncherAsWorker indicates wether to run worker process in launcher Defaults to false.",
"type": "boolean"
},
"runPolicy": {
Expand Down
4 changes: 2 additions & 2 deletions pkg/apis/kubeflow/v2beta1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,11 +154,11 @@ type MPIJobSpec struct {
// +kubebuilder:default:=1
SlotsPerWorker *int32 `json:"slotsPerWorker,omitempty"`

// RunLauncherAsWorker indicate wether to run worker process in launcher
// RunLauncherAsWorker indicates wether to run worker process in launcher
// Defaults to false.
// +optional
// +kubebuilder:default:=false
RunLauncherAsWorker bool `json:"runLauncherAsWorker,omitempty"`
RunLauncherAsWorker *bool `json:"runLauncherAsWorker,omitempty"`

// RunPolicy encapsulates various runtime policies of the job.
RunPolicy RunPolicy `json:"runPolicy,omitempty"`
Expand Down
5 changes: 5 additions & 0 deletions pkg/apis/kubeflow/v2beta1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 9 additions & 6 deletions pkg/controller/mpi_job_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -656,12 +656,15 @@ func (c *MPIJobController) syncHandler(key string) error {
return err
}
}
// NEW: always create service for launcher for different implementations and compatible with running launcher as worker
// The Intel and MPICH implementations require workers to communicate with the
// launcher through its hostname. For that, we create a Service which
// has the same name as the launcher's hostname.
if _, err = c.getOrCreateService(mpiJob, newLauncherService(mpiJob)); err != nil {
return fmt.Errorf("getting or creating Service to front launcher: %w", err)
if (mpiJob.Spec.RunLauncherAsWorker != nil && *mpiJob.Spec.RunLauncherAsWorker) ||
mpiJob.Spec.MPIImplementation == kubeflow.MPIImplementationIntel ||
mpiJob.Spec.MPIImplementation == kubeflow.MPIImplementationMPICH {
if _, err = c.getOrCreateService(mpiJob, newLauncherService(mpiJob)); err != nil {
return fmt.Errorf("getting or creating Service to front launcher: %w", err)
}
}
if launcher == nil {
if mpiJob.Spec.LauncherCreationPolicy == kubeflow.LauncherCreationPolicyAtStartup || c.countReadyWorkerPods(worker) == len(worker) {
Expand Down Expand Up @@ -1284,7 +1287,7 @@ func newConfigMap(mpiJob *kubeflow.MPIJob, workerReplicas int32) *corev1.ConfigM
// note that pod.spec.dnsConfig also affect the svc resolution
// ref: https://kubernetes.io/docs/concepts/services-networking/dns-pod-service/
// launcher can be reach with hostname or service name
if mpiJob.Spec.RunLauncherAsWorker {
if mpiJob.Spec.RunLauncherAsWorker != nil && *mpiJob.Spec.RunLauncherAsWorker {
launcherService := mpiJob.Name + launcherSuffix
switch mpiJob.Spec.MPIImplementation {
case kubeflow.MPIImplementationOpenMPI:
Expand Down Expand Up @@ -1331,8 +1334,8 @@ func updateDiscoverHostsInConfigMap(configMap *corev1.ConfigMap, mpiJob *kubeflo
var buffer bytes.Buffer
buffer.WriteString("#!/bin/sh\n")

// We donnot check if launcher is running here, launcher should always be there or the job failed
if mpiJob.Spec.RunLauncherAsWorker {
// We don't check if launcher is running here, launcher should always be there or the job failed
if mpiJob.Spec.RunLauncherAsWorker != nil && *mpiJob.Spec.RunLauncherAsWorker {
launcherService := mpiJob.Name + launcherSuffix
buffer.WriteString(fmt.Sprintf("echo %s.%s.svc\n", launcherService, mpiJob.Namespace))
}
Expand Down
73 changes: 52 additions & 21 deletions pkg/controller/mpi_job_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,18 +110,6 @@ func newMPIJobCommon(name string, startTime, completionTime *metav1.Time) *kubef
CleanPodPolicy: &cleanPodPolicyAll,
},
MPIReplicaSpecs: map[kubeflow.MPIReplicaType]*kubeflow.ReplicaSpec{
kubeflow.MPIReplicaTypeWorker: {
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "foo",
Image: "bar",
},
},
},
},
},
kubeflow.MPIReplicaTypeLauncher: {
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
Expand Down Expand Up @@ -151,7 +139,22 @@ func newMPIJobCommon(name string, startTime, completionTime *metav1.Time) *kubef

func newMPIJob(name string, replicas *int32, startTime, completionTime *metav1.Time) *kubeflow.MPIJob {
mpiJob := newMPIJobCommon(name, startTime, completionTime)
mpiJob.Spec.MPIReplicaSpecs[kubeflow.MPIReplicaTypeWorker].Replicas = replicas
if *replicas > 0 {
mpiJob.Spec.MPIReplicaSpecs[kubeflow.MPIReplicaTypeWorker] =
&kubeflow.ReplicaSpec{
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "foo",
Image: "bar",
},
},
},
},
Replicas: replicas,
}
}
return mpiJob
}

Expand Down Expand Up @@ -526,7 +529,11 @@ func TestAllResourcesCreated(t *testing.T) {
for i := 0; i < 5; i++ {
f.expectCreatePodAction(fmjc.newWorker(mpiJobCopy, i))
}
f.expectCreateServiceAction(newLauncherService(mpiJobCopy))
if (mpiJob.Spec.RunLauncherAsWorker != nil && *mpiJob.Spec.RunLauncherAsWorker) ||
implementation == kubeflow.MPIImplementationIntel ||
implementation == kubeflow.MPIImplementationMPICH {
f.expectCreateServiceAction(newLauncherService(mpiJobCopy))
}
f.expectCreateJobAction(fmjc.newLauncherJob(mpiJobCopy))

mpiJobCopy.Status.Conditions = []kubeflow.JobCondition{newCondition(kubeflow.JobCreated, corev1.ConditionTrue, mpiJobCreatedReason, "MPIJob default/foo is created.")}
Expand Down Expand Up @@ -819,7 +826,11 @@ func TestCreateSuspendedMPIJob(t *testing.T) {
t.Fatalf("Failed creating secret")
}
f.expectCreateSecretAction(secret)
f.expectCreateServiceAction(newLauncherService(mpiJob))
if (mpiJob.Spec.RunLauncherAsWorker != nil && *mpiJob.Spec.RunLauncherAsWorker) ||
implementation == kubeflow.MPIImplementationIntel ||
implementation == kubeflow.MPIImplementationMPICH {
f.expectCreateServiceAction(newLauncherService(mpiJob))
}

// expect creating of the launcher
fmjc := f.newFakeMPIJobController()
Expand Down Expand Up @@ -881,7 +892,6 @@ func TestSuspendedRunningMPIJob(t *testing.T) {

// setup objects
scheme.Scheme.Default(mpiJob)
f.setUpService(newLauncherService(mpiJob))
f.setUpService(newWorkersService(mpiJob))

cfgMap := newConfigMap(mpiJob, replicas)
Expand Down Expand Up @@ -986,7 +996,6 @@ func TestResumeMPIJob(t *testing.T) {
// expect the launcher update to resume it
launcherCopy := launcher.DeepCopy()
launcherCopy.Spec.Suspend = pointer.Bool(false)
f.expectCreateServiceAction(newLauncherService(mpiJob))
f.expectUpdateJobAction(launcherCopy)

// expect an update to add the conditions
Expand Down Expand Up @@ -1043,7 +1052,6 @@ func TestLauncherActiveWorkerNotReady(t *testing.T) {
configMap := newConfigMap(mpiJobCopy, replicas)
updateDiscoverHostsInConfigMap(configMap, mpiJobCopy, nil)
f.setUpConfigMap(configMap)
f.setUpService(newLauncherService(mpiJob))
f.setUpService(newWorkersService(mpiJobCopy))
secret, err := newSSHAuthSecret(mpiJobCopy)
if err != nil {
Expand Down Expand Up @@ -1094,7 +1102,6 @@ func TestLauncherActiveWorkerReady(t *testing.T) {

mpiJobCopy := mpiJob.DeepCopy()
scheme.Scheme.Default(mpiJobCopy)
f.setUpService(newLauncherService(mpiJob))
f.setUpService(newWorkersService(mpiJobCopy))
secret, err := newSSHAuthSecret(mpiJobCopy)
if err != nil {
Expand Down Expand Up @@ -1154,7 +1161,6 @@ func TestWorkerReady(t *testing.T) {

mpiJobCopy := mpiJob.DeepCopy()
scheme.Scheme.Default(mpiJobCopy)
f.setUpService(newLauncherService(mpiJob))
f.setUpService(newWorkersService(mpiJobCopy))
secret, err := newSSHAuthSecret(mpiJobCopy)
if err != nil {
Expand Down Expand Up @@ -1545,7 +1551,7 @@ func TestNewConfigMap(t *testing.T) {
},
Spec: kubeflow.MPIJobSpec{
MPIImplementation: kubeflow.MPIImplementationOpenMPI,
RunLauncherAsWorker: true,
RunLauncherAsWorker: pointer.Bool(true),
},
},
workerReplicas: 2,
Expand All @@ -1562,6 +1568,31 @@ func TestNewConfigMap(t *testing.T) {
},
},
},
"OpenMPI without slots, zero explicit workers": {
mpiJob: &kubeflow.MPIJob{
ObjectMeta: metav1.ObjectMeta{
Name: "openmpi-without-slots",
Namespace: "tenant-a",
},
Spec: kubeflow.MPIJobSpec{
MPIImplementation: kubeflow.MPIImplementationOpenMPI,
RunLauncherAsWorker: pointer.Bool(true),
},
},
workerReplicas: 0,
wantCM: &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: "openmpi-without-slots-config",
Namespace: "tenant-a",
Labels: map[string]string{
"app": "openmpi-without-slots",
},
},
Data: map[string]string{
"hostfile": "openmpi-without-slots-launcher.tenant-a.svc slots=1\n",
},
},
},
"OpenMPI without slots, disable launcher as worker": {
mpiJob: &kubeflow.MPIJob{
ObjectMeta: metav1.ObjectMeta{
Expand Down
2 changes: 1 addition & 1 deletion sdk/python/v2beta1/docs/V2beta1MPIJobSpec.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions sdk/python/v2beta1/mpijob/models/v2beta1_mpi_job_spec.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 81ef2f9

Please sign in to comment.