Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

run worker process in launcher pod #612

Merged
merged 5 commits into from
Feb 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions deploy/v2beta1/mpi-operator.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8190,6 +8190,11 @@ spec:
description: MPIReplicaSpecs contains maps from `MPIReplicaType` to
`ReplicaSpec` that specify the MPI replicas to run.
type: object
runLauncherAsWorker:
default: false
description: RunLauncherAsWorker indicates whether to run worker process
in launcher Defaults to false.
type: boolean
runPolicy:
description: RunPolicy encapsulates various runtime policies of the
job.
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ require (
k8s.io/code-generator v0.27.4
k8s.io/klog v1.0.0
k8s.io/kube-openapi v0.0.0-20230501164219-8b0f38b5fd1f
k8s.io/utils v0.0.0-20230209194617-a36077c30491
k8s.io/utils v0.0.0-20240102154912-e7106e64919e
sigs.k8s.io/controller-runtime v0.15.1
sigs.k8s.io/scheduler-plugins v0.26.7
sigs.k8s.io/structured-merge-diff/v4 v4.2.3
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -384,8 +384,8 @@ k8s.io/klog/v2 v2.90.1 h1:m4bYOKall2MmOiRaR1J+We67Do7vm9KiQVlT96lnHUw=
k8s.io/klog/v2 v2.90.1/go.mod h1:y1WjHnz7Dj687irZUWR/WLkLc5N1YHtjLdmgWjndZn0=
k8s.io/kube-openapi v0.0.0-20230501164219-8b0f38b5fd1f h1:2kWPakN3i/k81b0gvD5C5FJ2kxm1WrQFanWchyKuqGg=
k8s.io/kube-openapi v0.0.0-20230501164219-8b0f38b5fd1f/go.mod h1:byini6yhqGC14c3ebc/QwanvYwhuMWF6yz2F8uwW8eg=
k8s.io/utils v0.0.0-20230209194617-a36077c30491 h1:r0BAOLElQnnFhE/ApUsg3iHdVYYPBjNSSOMowRZxxsY=
k8s.io/utils v0.0.0-20230209194617-a36077c30491/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0=
k8s.io/utils v0.0.0-20240102154912-e7106e64919e h1:eQ/4ljkx21sObifjzXwlPKpdGLrCfRziVtos3ofG/sQ=
k8s.io/utils v0.0.0-20240102154912-e7106e64919e/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0=
sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.1.2 h1:trsWhjU5jZrx6UvFu4WzQDrN7Pga4a7Qg+zcfcj64PA=
sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.1.2/go.mod h1:+qG7ISXqCDVVcyO8hLn12AKVYYUjM7ftlqsqmrhMZE0=
sigs.k8s.io/controller-runtime v0.15.1 h1:9UvgKD4ZJGcj24vefUFgZFP3xej/3igL9BsOUTb/+4c=
Expand Down
5 changes: 5 additions & 0 deletions manifests/base/kubeflow.org_mpijobs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8167,6 +8167,11 @@ spec:
description: MPIReplicaSpecs contains maps from `MPIReplicaType` to
`ReplicaSpec` that specify the MPI replicas to run.
type: object
runLauncherAsWorker:
default: false
description: RunLauncherAsWorker indicates whether to run worker process
in launcher Defaults to false.
type: boolean
runPolicy:
description: RunPolicy encapsulates various runtime policies of the
job.
Expand Down
4 changes: 4 additions & 0 deletions pkg/apis/kubeflow/v2beta1/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,10 @@
"$ref": "#/definitions/v2beta1.ReplicaSpec"
}
},
"runLauncherAsWorker": {
"description": "RunLauncherAsWorker indicates whether to run worker process in launcher Defaults to false.",
"type": "boolean"
},
"runPolicy": {
"description": "RunPolicy encapsulates various runtime policies of the job.",
"default": {},
Expand Down
6 changes: 6 additions & 0 deletions pkg/apis/kubeflow/v2beta1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,12 @@ type MPIJobSpec struct {
// +kubebuilder:default:=1
SlotsPerWorker *int32 `json:"slotsPerWorker,omitempty"`

// RunLauncherAsWorker indicates whether to run worker process in launcher
// Defaults to false.
// +optional
// +kubebuilder:default:=false
RunLauncherAsWorker *bool `json:"runLauncherAsWorker,omitempty"`

// RunPolicy encapsulates various runtime policies of the job.
RunPolicy RunPolicy `json:"runPolicy,omitempty"`

Expand Down
5 changes: 5 additions & 0 deletions pkg/apis/kubeflow/v2beta1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions pkg/client/applyconfiguration/kubeflow/v2beta1/mpijobspec.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

68 changes: 42 additions & 26 deletions pkg/controller/mpi_job_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@
"k8s.io/client-go/util/workqueue"
"k8s.io/klog"
"k8s.io/utils/clock"
"k8s.io/utils/pointer"

Check failure on line 56 in pkg/controller/mpi_job_controller.go

View workflow job for this annotation

GitHub Actions / CodeQL-Build

SA1019: "k8s.io/utils/pointer" is deprecated: Use functions in k8s.io/utils/ptr instead: ptr.To to obtain a pointer, ptr.Deref to dereference a pointer, ptr.Equal to compare dereferenced pointers. (staticcheck)
"k8s.io/utils/ptr"
schedclientset "sigs.k8s.io/scheduler-plugins/pkg/generated/clientset/versioned"
volcanoclient "volcano.sh/apis/pkg/client/clientset/versioned"

Expand Down Expand Up @@ -630,7 +631,7 @@
// We're done if the launcher either succeeded or failed.
done := launcher != nil && isJobFinished(launcher)
if !done {
_, err := c.getOrCreateService(mpiJob, newWorkersService(mpiJob))
_, err := c.getOrCreateService(mpiJob, newJobService(mpiJob))
if err != nil {
return fmt.Errorf("getting or creating Service to front workers: %w", err)
}
Expand All @@ -656,16 +657,6 @@
return err
}
}
if mpiJob.Spec.MPIImplementation == kubeflow.MPIImplementationIntel ||
mpiJob.Spec.MPIImplementation == kubeflow.MPIImplementationMPICH {
// The Intel and MPICH implementations require workers to communicate with the
// launcher through its hostname. For that, we create a Service which
// has the same name as the launcher's hostname.
_, err := c.getOrCreateService(mpiJob, newLauncherService(mpiJob))
if err != nil {
return fmt.Errorf("getting or creating Service to front launcher: %w", err)
}
}
if launcher == nil {
if mpiJob.Spec.LauncherCreationPolicy == kubeflow.LauncherCreationPolicyAtStartup || c.countReadyWorkerPods(worker) == len(worker) {
launcher, err = c.kubeClient.BatchV1().Jobs(namespace).Create(context.TODO(), c.newLauncherJob(mpiJob), metav1.CreateOptions{})
Expand Down Expand Up @@ -1279,17 +1270,30 @@
// handleObject can discover the MPIJob resource that 'owns' it.
func newConfigMap(mpiJob *kubeflow.MPIJob, workerReplicas int32) *corev1.ConfigMap {
var buffer bytes.Buffer
workersService := mpiJob.Name + workerSuffix
slots := 1
if mpiJob.Spec.SlotsPerWorker != nil {
slots = int(*mpiJob.Spec.SlotsPerWorker)
}
// note that pod.spec.dnsConfig also affect the svc resolution
// ref: https://kubernetes.io/docs/concepts/services-networking/dns-pod-service/
// launcher can be reach with hostname or service name
if ptr.Deref(mpiJob.Spec.RunLauncherAsWorker, false) {
name := mpiJob.Name + launcherSuffix
switch mpiJob.Spec.MPIImplementation {
case kubeflow.MPIImplementationOpenMPI:
buffer.WriteString(fmt.Sprintf("%s.%s.%s.svc slots=%d\n", name, mpiJob.Name, mpiJob.Namespace, slots))
case kubeflow.MPIImplementationIntel, kubeflow.MPIImplementationMPICH:
buffer.WriteString(fmt.Sprintf("%s.%s.%s.svc:%d\n", name, mpiJob.Name, mpiJob.Namespace, slots))
}
}

for i := 0; i < int(workerReplicas); i++ {
name := workerName(mpiJob, i)
switch mpiJob.Spec.MPIImplementation {
case kubeflow.MPIImplementationOpenMPI:
buffer.WriteString(fmt.Sprintf("%s%s-%d.%s.%s.svc slots=%d\n", mpiJob.Name, workerSuffix, i, workersService, mpiJob.Namespace, slots))
buffer.WriteString(fmt.Sprintf("%s.%s.%s.svc slots=%d\n", name, mpiJob.Name, mpiJob.Namespace, slots))
case kubeflow.MPIImplementationIntel, kubeflow.MPIImplementationMPICH:
buffer.WriteString(fmt.Sprintf("%s%s-%d.%s.%s.svc:%d\n", mpiJob.Name, workerSuffix, i, workersService, mpiJob.Namespace, slots))
buffer.WriteString(fmt.Sprintf("%s.%s.%s.svc:%d\n", name, mpiJob.Name, mpiJob.Namespace, slots))
}
}

Expand Down Expand Up @@ -1319,22 +1323,27 @@

var buffer bytes.Buffer
buffer.WriteString("#!/bin/sh\n")
workersService := mpiJob.Name + workerSuffix

// We don't check if launcher is running here, launcher should always be there or the job failed
if ptr.Deref(mpiJob.Spec.RunLauncherAsWorker, false) {
name := mpiJob.Name + launcherSuffix
buffer.WriteString(fmt.Sprintf("echo %s.%s.%s.svc\n", name, mpiJob.Name, mpiJob.Namespace))
}

for _, p := range runningPods {
buffer.WriteString(fmt.Sprintf("echo %s.%s.%s.svc\n", p.Name, workersService, p.Namespace))
buffer.WriteString(fmt.Sprintf("echo %s.%s.%s.svc\n", p.Name, mpiJob.Name, p.Namespace))
}

configMap.Data[discoverHostsScriptName] = buffer.String()
}

// newWorkersService creates a new workers' Service for an MPIJob resource.
func newWorkersService(job *kubeflow.MPIJob) *corev1.Service {
return newService(job, job.Name+workerSuffix, defaultLabels(job.Name, worker))
}

// newLauncherService creates a new launcher's Service for an MPIJob resource.
func newLauncherService(job *kubeflow.MPIJob) *corev1.Service {
return newService(job, job.Name+launcherSuffix, defaultLabels(job.Name, launcher))
// newJobService creates a Service with the same name of Job for both launcher and worker pods
func newJobService(job *kubeflow.MPIJob) *corev1.Service {
labels := map[string]string{
kubeflow.OperatorNameLabel: kubeflow.OperatorName,
kubeflow.JobNameLabel: job.Name,
}
return newService(job, job.Name, labels)
}

func newService(job *kubeflow.MPIJob, name string, selector map[string]string) *corev1.Service {
Expand Down Expand Up @@ -1416,12 +1425,19 @@
}
podTemplate.Labels[kubeflow.ReplicaIndexLabel] = strconv.Itoa(index)
podTemplate.Spec.Hostname = name
podTemplate.Spec.Subdomain = mpiJob.Name + workerSuffix // Matches workers' Service name.
podTemplate.Spec.Subdomain = mpiJob.Name // Matches job' Service name.
if podTemplate.Spec.HostNetwork {
// Allows resolution of worker hostnames without needing to include the
// namespace or cluster domain.
podTemplate.Spec.DNSPolicy = corev1.DNSClusterFirstWithHostNet
}
// The Intel and MPICH implementations require workers to communicate with the launcher through its hostname.
searche := fmt.Sprintf("%s.%s.svc.cluster.local", mpiJob.Name, mpiJob.Namespace)
if podTemplate.Spec.DNSConfig == nil {
podTemplate.Spec.DNSConfig = &corev1.PodDNSConfig{Searches: []string{searche}}
} else {
podTemplate.Spec.DNSConfig.Searches = append(podTemplate.Spec.DNSConfig.Searches, searche)
}
setRestartPolicy(podTemplate, mpiJob.Spec.MPIReplicaSpecs[kubeflow.MPIReplicaTypeWorker])

container := &podTemplate.Spec.Containers[0]
Expand Down Expand Up @@ -1494,7 +1510,7 @@
c.PodGroupCtrl.decoratePodTemplateSpec(podTemplate, mpiJob.Name)
}
podTemplate.Spec.Hostname = launcherName
podTemplate.Spec.Subdomain = mpiJob.Name + workerSuffix // Matches workers' Service name.
podTemplate.Spec.Subdomain = mpiJob.Name // Matches job' Service name.
if podTemplate.Spec.HostNetwork {
// Allows resolution of worker hostnames without needing to include the
// namespace or cluster domain.
Expand Down
Loading
Loading