Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

run worker process in launcher pod #612

Merged
merged 5 commits into from
Feb 26, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions deploy/v2beta1/mpi-operator.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8190,6 +8190,11 @@ spec:
description: MPIReplicaSpecs contains maps from `MPIReplicaType` to
`ReplicaSpec` that specify the MPI replicas to run.
type: object
runLauncherAsWorker:
default: false
description: RunLauncherAsWorker indicates wether to run worker process
in launcher Defaults to false.
type: boolean
runPolicy:
description: RunPolicy encapsulates various runtime policies of the
job.
Expand Down
5 changes: 5 additions & 0 deletions manifests/base/kubeflow.org_mpijobs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8167,6 +8167,11 @@ spec:
description: MPIReplicaSpecs contains maps from `MPIReplicaType` to
`ReplicaSpec` that specify the MPI replicas to run.
type: object
runLauncherAsWorker:
default: false
description: RunLauncherAsWorker indicates wether to run worker process
in launcher Defaults to false.
type: boolean
runPolicy:
description: RunPolicy encapsulates various runtime policies of the
job.
Expand Down
4 changes: 4 additions & 0 deletions pkg/apis/kubeflow/v2beta1/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,10 @@
"$ref": "#/definitions/v2beta1.ReplicaSpec"
}
},
"runLauncherAsWorker": {
"description": "RunLauncherAsWorker indicates wether to run worker process in launcher Defaults to false.",
"type": "boolean"
},
"runPolicy": {
"description": "RunPolicy encapsulates various runtime policies of the job.",
"default": {},
Expand Down
6 changes: 6 additions & 0 deletions pkg/apis/kubeflow/v2beta1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,12 @@ type MPIJobSpec struct {
// +kubebuilder:default:=1
SlotsPerWorker *int32 `json:"slotsPerWorker,omitempty"`

// RunLauncherAsWorker indicates wether to run worker process in launcher
kuizhiqing marked this conversation as resolved.
Show resolved Hide resolved
// Defaults to false.
// +optional
// +kubebuilder:default:=false
RunLauncherAsWorker *bool `json:"runLauncherAsWorker,omitempty"`

// RunPolicy encapsulates various runtime policies of the job.
RunPolicy RunPolicy `json:"runPolicy,omitempty"`

Expand Down
5 changes: 5 additions & 0 deletions pkg/apis/kubeflow/v2beta1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions pkg/client/applyconfiguration/kubeflow/v2beta1/mpijobspec.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

37 changes: 29 additions & 8 deletions pkg/controller/mpi_job_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -656,13 +656,13 @@ func (c *MPIJobController) syncHandler(key string) error {
return err
}
}
if mpiJob.Spec.MPIImplementation == kubeflow.MPIImplementationIntel ||
// The Intel and MPICH implementations require workers to communicate with the
// launcher through its hostname. For that, we create a Service which
// has the same name as the launcher's hostname.
kuizhiqing marked this conversation as resolved.
Show resolved Hide resolved
if (mpiJob.Spec.RunLauncherAsWorker != nil && *mpiJob.Spec.RunLauncherAsWorker) ||
kuizhiqing marked this conversation as resolved.
Show resolved Hide resolved
mpiJob.Spec.MPIImplementation == kubeflow.MPIImplementationIntel ||
mpiJob.Spec.MPIImplementation == kubeflow.MPIImplementationMPICH {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Uhm, I'm wondering if we can make this condition another function like workersCanHaveDedicatedService since we're using the same condition in some places.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for introducing me with ptr.Deref. There is 1 time in controller and 2 times in test now, so it's OK for me to keep it in this PR without adding another function. WDYT.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm ok with either way.

// The Intel and MPICH implementations require workers to communicate with the
// launcher through its hostname. For that, we create a Service which
// has the same name as the launcher's hostname.
_, err := c.getOrCreateService(mpiJob, newLauncherService(mpiJob))
if err != nil {
if _, err = c.getOrCreateService(mpiJob, newLauncherService(mpiJob)); err != nil {
return fmt.Errorf("getting or creating Service to front launcher: %w", err)
}
}
Expand Down Expand Up @@ -1284,12 +1284,26 @@ func newConfigMap(mpiJob *kubeflow.MPIJob, workerReplicas int32) *corev1.ConfigM
if mpiJob.Spec.SlotsPerWorker != nil {
slots = int(*mpiJob.Spec.SlotsPerWorker)
}
// note that pod.spec.dnsConfig also affect the svc resolution
// ref: https://kubernetes.io/docs/concepts/services-networking/dns-pod-service/
// launcher can be reach with hostname or service name
if mpiJob.Spec.RunLauncherAsWorker != nil && *mpiJob.Spec.RunLauncherAsWorker {
kuizhiqing marked this conversation as resolved.
Show resolved Hide resolved
launcherService := mpiJob.Name + launcherSuffix
switch mpiJob.Spec.MPIImplementation {
case kubeflow.MPIImplementationOpenMPI:
buffer.WriteString(fmt.Sprintf("%s.%s.svc slots=%d\n", launcherService, mpiJob.Namespace, slots))
case kubeflow.MPIImplementationIntel, kubeflow.MPIImplementationMPICH:
buffer.WriteString(fmt.Sprintf("%s.%s.svc:%d\n", launcherService, mpiJob.Namespace, slots))
}
}

for i := 0; i < int(workerReplicas); i++ {
name := workerName(mpiJob, i)
switch mpiJob.Spec.MPIImplementation {
case kubeflow.MPIImplementationOpenMPI:
buffer.WriteString(fmt.Sprintf("%s%s-%d.%s.%s.svc slots=%d\n", mpiJob.Name, workerSuffix, i, workersService, mpiJob.Namespace, slots))
buffer.WriteString(fmt.Sprintf("%s.%s.%s.svc slots=%d\n", name, workersService, mpiJob.Namespace, slots))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is nice refactorings :)

case kubeflow.MPIImplementationIntel, kubeflow.MPIImplementationMPICH:
buffer.WriteString(fmt.Sprintf("%s%s-%d.%s.%s.svc:%d\n", mpiJob.Name, workerSuffix, i, workersService, mpiJob.Namespace, slots))
buffer.WriteString(fmt.Sprintf("%s.%s.%s.svc:%d\n", name, workersService, mpiJob.Namespace, slots))
}
}

Expand Down Expand Up @@ -1319,6 +1333,13 @@ func updateDiscoverHostsInConfigMap(configMap *corev1.ConfigMap, mpiJob *kubeflo

var buffer bytes.Buffer
buffer.WriteString("#!/bin/sh\n")

// We don't check if launcher is running here, launcher should always be there or the job failed
if mpiJob.Spec.RunLauncherAsWorker != nil && *mpiJob.Spec.RunLauncherAsWorker {
launcherService := mpiJob.Name + launcherSuffix
buffer.WriteString(fmt.Sprintf("echo %s.%s.svc\n", launcherService, mpiJob.Namespace))
}

workersService := mpiJob.Name + workerSuffix
for _, p := range runningPods {
buffer.WriteString(fmt.Sprintf("echo %s.%s.%s.svc\n", p.Name, workersService, p.Namespace))
Expand Down
87 changes: 71 additions & 16 deletions pkg/controller/mpi_job_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,18 +110,6 @@ func newMPIJobCommon(name string, startTime, completionTime *metav1.Time) *kubef
CleanPodPolicy: &cleanPodPolicyAll,
},
MPIReplicaSpecs: map[kubeflow.MPIReplicaType]*kubeflow.ReplicaSpec{
kubeflow.MPIReplicaTypeWorker: {
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "foo",
Image: "bar",
},
},
},
},
},
kubeflow.MPIReplicaTypeLauncher: {
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
Expand Down Expand Up @@ -151,7 +139,22 @@ func newMPIJobCommon(name string, startTime, completionTime *metav1.Time) *kubef

func newMPIJob(name string, replicas *int32, startTime, completionTime *metav1.Time) *kubeflow.MPIJob {
mpiJob := newMPIJobCommon(name, startTime, completionTime)
mpiJob.Spec.MPIReplicaSpecs[kubeflow.MPIReplicaTypeWorker].Replicas = replicas
if *replicas > 0 {
kuizhiqing marked this conversation as resolved.
Show resolved Hide resolved
mpiJob.Spec.MPIReplicaSpecs[kubeflow.MPIReplicaTypeWorker] =
&kubeflow.ReplicaSpec{
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "foo",
Image: "bar",
},
},
},
},
Replicas: replicas,
}
}
return mpiJob
}

Expand Down Expand Up @@ -526,7 +529,8 @@ func TestAllResourcesCreated(t *testing.T) {
for i := 0; i < 5; i++ {
f.expectCreatePodAction(fmjc.newWorker(mpiJobCopy, i))
}
if implementation == kubeflow.MPIImplementationIntel ||
if (mpiJob.Spec.RunLauncherAsWorker != nil && *mpiJob.Spec.RunLauncherAsWorker) ||
kuizhiqing marked this conversation as resolved.
Show resolved Hide resolved
implementation == kubeflow.MPIImplementationIntel ||
implementation == kubeflow.MPIImplementationMPICH {
f.expectCreateServiceAction(newLauncherService(mpiJobCopy))
}
Expand Down Expand Up @@ -822,7 +826,8 @@ func TestCreateSuspendedMPIJob(t *testing.T) {
t.Fatalf("Failed creating secret")
}
f.expectCreateSecretAction(secret)
if implementation == kubeflow.MPIImplementationIntel ||
if (mpiJob.Spec.RunLauncherAsWorker != nil && *mpiJob.Spec.RunLauncherAsWorker) ||
implementation == kubeflow.MPIImplementationIntel ||
kuizhiqing marked this conversation as resolved.
Show resolved Hide resolved
implementation == kubeflow.MPIImplementationMPICH {
f.expectCreateServiceAction(newLauncherService(mpiJob))
}
Expand Down Expand Up @@ -1538,7 +1543,57 @@ func TestNewConfigMap(t *testing.T) {
workerReplicas int32
wantCM *corev1.ConfigMap
}{
"OpenMPI without slots": {
"OpenMPI without slots, enable launcher as worker": {
mpiJob: &kubeflow.MPIJob{
ObjectMeta: metav1.ObjectMeta{
Name: "openmpi-without-slots",
Namespace: "tenant-a",
},
Spec: kubeflow.MPIJobSpec{
MPIImplementation: kubeflow.MPIImplementationOpenMPI,
RunLauncherAsWorker: pointer.Bool(true),
},
},
workerReplicas: 2,
wantCM: &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: "openmpi-without-slots-config",
Namespace: "tenant-a",
Labels: map[string]string{
"app": "openmpi-without-slots",
},
},
Data: map[string]string{
"hostfile": "openmpi-without-slots-launcher.tenant-a.svc slots=1\nopenmpi-without-slots-worker-0.openmpi-without-slots-worker.tenant-a.svc slots=1\nopenmpi-without-slots-worker-1.openmpi-without-slots-worker.tenant-a.svc slots=1\n",
},
},
},
"OpenMPI without slots, zero explicit workers": {
mpiJob: &kubeflow.MPIJob{
ObjectMeta: metav1.ObjectMeta{
Name: "openmpi-without-slots",
Namespace: "tenant-a",
},
Spec: kubeflow.MPIJobSpec{
MPIImplementation: kubeflow.MPIImplementationOpenMPI,
RunLauncherAsWorker: pointer.Bool(true),
},
},
workerReplicas: 0,
wantCM: &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: "openmpi-without-slots-config",
Namespace: "tenant-a",
Labels: map[string]string{
"app": "openmpi-without-slots",
},
},
Data: map[string]string{
"hostfile": "openmpi-without-slots-launcher.tenant-a.svc slots=1\n",
},
},
},
"OpenMPI without slots, disable launcher as worker": {
mpiJob: &kubeflow.MPIJob{
ObjectMeta: metav1.ObjectMeta{
Name: "openmpi-without-slots",
Expand Down
1 change: 1 addition & 0 deletions sdk/python/v2beta1/docs/V2beta1MPIJobSpec.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

30 changes: 29 additions & 1 deletion sdk/python/v2beta1/mpijob/models/v2beta1_mpi_job_spec.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 7 additions & 4 deletions test/integration/mpi_job_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package integration
import (
"context"
"fmt"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -895,7 +896,7 @@ func validateMPIJobDependencies(
if err := wait.PollUntilContextTimeout(ctx, waitInterval, wait.ForeverTestTimeout, false, func(ctx context.Context) (bool, error) {
problems = nil
var err error
svc, err = getServiceForJob(ctx, kubeClient, job)
svc, err = getServiceForJob(ctx, kubeClient, job, "worker")
if err != nil {
return false, err
}
Expand Down Expand Up @@ -1026,14 +1027,16 @@ func updatePodsCondition(ctx context.Context, client kubernetes.Interface, pods
return nil
}

func getServiceForJob(ctx context.Context, client kubernetes.Interface, job *kubeflow.MPIJob) (*corev1.Service, error) {
func getServiceForJob(ctx context.Context, client kubernetes.Interface, job *kubeflow.MPIJob, role string) (*corev1.Service, error) {
result, err := client.CoreV1().Services(job.Namespace).List(ctx, metav1.ListOptions{})
if err != nil {
return nil, err
}
for _, obj := range result.Items {
if metav1.IsControlledBy(&obj, job) {
return &obj, nil
if strings.HasSuffix(obj.Name, role) {
if metav1.IsControlledBy(&obj, job) {
return &obj, nil
}
}
}
return nil, nil
Expand Down
Loading