diff --git a/pkg/controller/podgroup.go b/pkg/controller/podgroup.go index ed3f2bd4..37cfa9bf 100644 --- a/pkg/controller/podgroup.go +++ b/pkg/controller/podgroup.go @@ -356,7 +356,12 @@ func calPGMinResource(minMember *int32, mpiJob *kubeflow.MPIJob, pcLister schedu sort.Sort(sort.Reverse(order)) // Launcher + Worker > minMember - if minMember != nil && *order[0].Replicas+*order[1].Replicas > *minMember { + replicas := *order[0].Replicas + if len(order) > 1 { + // When using runLauncherAsWorker, there may be no worker. + replicas += *order[1].Replicas + } + if minMember != nil && replicas > *minMember { // If the launcher and workers have the same priority, it treats workers as a lower priority. if order[0].priority == order[1].priority { wIndex := order.getWorkerIndex() diff --git a/pkg/controller/podgroup_test.go b/pkg/controller/podgroup_test.go index c1d7e040..1a7574ba 100644 --- a/pkg/controller/podgroup_test.go +++ b/pkg/controller/podgroup_test.go @@ -38,6 +38,11 @@ var ( corev1.ResourceMemory: resource.MustParse("512Gi"), "example.com/gpu": resource.MustParse("40"), } + + minResourcesNoMinMember = &corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("1"), + corev1.ResourceMemory: resource.MustParse("2Gi"), + } ) func TestNewPodGroup(t *testing.T) { @@ -208,6 +213,73 @@ func TestNewPodGroup(t *testing.T) { }, }, }, + "no worker no MinResources": { + mpiJob: &kubeflow.MPIJob{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + Annotations: map[string]string{ + volcanov1beta1.QueueNameAnnotationKey: "project-x", + }, + }, + Spec: kubeflow.MPIJobSpec{ + RunLauncherAsWorker: ptr.To[bool](true), + RunPolicy: kubeflow.RunPolicy{ + SchedulingPolicy: &kubeflow.SchedulingPolicy{ + MinAvailable: ptr.To[int32](1), + Queue: "project-y", + PriorityClass: "high", + ScheduleTimeoutSeconds: ptr.To[int32](100), + }, + }, + MPIReplicaSpecs: map[kubeflow.MPIReplicaType]*kubeflow.ReplicaSpec{ + kubeflow.MPIReplicaTypeLauncher: { + Replicas: ptr.To[int32](1), + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{{ + Resources: corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("1"), + corev1.ResourceMemory: resource.MustParse("2Gi"), + }, + }, + }}, + }, + }, + }, + }, + }, + }, + wantVolcanoPG: &volcanov1beta1.PodGroup{ + TypeMeta: metav1.TypeMeta{ + APIVersion: volcanov1beta1.SchemeGroupVersion.String(), + Kind: "PodGroup", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + }, + Spec: volcanov1beta1.PodGroupSpec{ + MinMember: 1, + Queue: "project-y", + PriorityClassName: "high", + MinResources: minResourcesNoMinMember, + }, + }, + wantSchedPG: &schedv1alpha1.PodGroup{ + TypeMeta: metav1.TypeMeta{ + APIVersion: schedv1alpha1.SchemeGroupVersion.String(), + Kind: "PodGroup", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + }, + Spec: schedv1alpha1.PodGroupSpec{ + MinMember: 1, + MinResources: *minResourcesNoMinMember, + ScheduleTimeoutSeconds: ptr.To[int32](100), + }, + }, + }, } for name, tc := range testCases { t.Run(name, func(t *testing.T) { @@ -447,6 +519,39 @@ func TestCalculatePGMinResources(t *testing.T) { corev1.ResourceMemory: resource.MustParse("65Gi"), }, }, + "without worker without priorityClass": { + minMember: 3, + job: &kubeflow.MPIJob{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + }, + Spec: kubeflow.MPIJobSpec{ + MPIReplicaSpecs: map[kubeflow.MPIReplicaType]*kubeflow.ReplicaSpec{ + kubeflow.MPIReplicaTypeLauncher: { + Replicas: ptr.To[int32](1), + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Resources: corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("2"), + corev1.ResourceMemory: resource.MustParse("1Gi"), + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + want: &corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("2"), + corev1.ResourceMemory: resource.MustParse("1Gi"), + }, + }, } for name, tc := range volcanoTests { t.Run(name, func(t *testing.T) {