Skip to content

Commit

Permalink
fix conflicts for sdk 2.7.0 release
Browse files Browse the repository at this point in the history
  • Loading branch information
Tomcli committed Feb 19, 2024
2 parents cbdb47e + 449c304 commit 45d66eb
Show file tree
Hide file tree
Showing 160 changed files with 13,601 additions and 11,032 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -85,3 +85,6 @@ __pycache__
# Coverage
.coverage
.coverage*

# kfp local execution default directory
local_outputs/
4 changes: 2 additions & 2 deletions backend/src/apiserver/model/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,11 +363,11 @@ var runAPIToModelFieldMap = map[string]string{
"storage_state": "StorageState",
"status": "Conditions",
"namespace": "Namespace", // v2beta1 API
"experiment_id": "ExperimentId", // v2beta1 API
"experiment_id": "ExperimentUUID", // v2beta1 API
"state": "State", // v2beta1 API
"state_history": "StateHistory", // v2beta1 API
"runtime_details": "PipelineRuntimeManifest", // v2beta1 API
"recurring_run_id": "RecurringRunId", // v2beta1 API
"recurring_run_id": "JobUUID", // v2beta1 API
}

// APIToModelFieldMap returns a map from API names to field names for model Run.
Expand Down
6 changes: 3 additions & 3 deletions backend/src/apiserver/model/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,8 @@ var taskAPIToModelFieldMap = map[string]string{
"namespace": "Namespace",
"pipeline_name": "PipelineName", // v2beta1 API
"pipelineName": "PipelineName", // v1beta1 API
"run_id": "RunId", // v2beta1 API
"runId": "RunId", // v1beta1 API
"run_id": "RunUUID", // v2beta1 API
"runId": "RunUUID", // v1beta1 API
"display_name": "Name", // v2beta1 API
"execution_id": "MLMDExecutionID", // v2beta1 API
"create_time": "CreatedTimestamp", // v2beta1 API
Expand All @@ -91,7 +91,7 @@ var taskAPIToModelFieldMap = map[string]string{
"fingerprint": "Fingerprint",
"state": "State", // v2beta1 API
"state_history": "StateHistory", // v2beta1 API
"parent_task_id": "ParentTaskId", // v2beta1 API
"parent_task_id": "ParentTaskUUID", // v2beta1 API
"mlmdExecutionID": "MLMDExecutionID", // v1beta1 API
"created_at": "CreatedTimestamp", // v1beta1 API
"finished_at": "FinishedTimestamp", // v1beta1 API
Expand Down
2 changes: 1 addition & 1 deletion backend/src/apiserver/resource/resource_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -1722,7 +1722,7 @@ func (r *ResourceManager) IsAuthorized(ctx context.Context, resourceAttributes *
v1.CreateOptions{},
)
if err != nil {
if err, ok := err.(net.Error); ok && err.Timeout() {
if netError, ok := err.(net.Error); ok && netError.Timeout() {
reportErr := util.NewUnavailableServerError(
err,
"Failed to create SubjectAccessReview for user '%s' (request: %+v) - try again later",
Expand Down
2 changes: 1 addition & 1 deletion backend/src/apiserver/server/pipeline_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ func (s *PipelineServer) getPipelineByName(ctx context.Context, name string, nam
switch apiRequestVersion {
case "v1beta1":
return s.resourceManager.GetPipelineByNameAndNamespaceV1(name, namespace)
case "V2beta1":
case "v2beta1":
p, err := s.resourceManager.GetPipelineByNameAndNamespace(name, namespace)
return p, nil, err
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package storage

import (
"github.com/kubeflow/pipelines/backend/src/apiserver/model"
"testing"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -93,3 +94,9 @@ func TestUnsetDefaultExperimentIdIfIdMatches(t *testing.T) {

db.Close()
}

func TestExperimentAPIFieldMap(t *testing.T) {
for _, modelField := range (&model.Experiment{}).APIToModelFieldMap() {
assert.Contains(t, experimentColumns, modelField)
}
}
6 changes: 6 additions & 0 deletions backend/src/apiserver/storage/job_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -964,3 +964,9 @@ func TestDeleteJob_InternalError(t *testing.T) {
assert.Equal(t, codes.Internal, err.(*util.UserError).ExternalStatusCode(),
"Expected delete job to return internal error")
}

func TestJobAPIFieldMap(t *testing.T) {
for _, modelField := range (&model.Job{}).APIToModelFieldMap() {
assert.Contains(t, jobColumns, modelField)
}
}
6 changes: 6 additions & 0 deletions backend/src/apiserver/storage/run_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1421,3 +1421,9 @@ func TestParseResourceReferences(t *testing.T) {
assert.Nil(t, err)
assert.Equal(t, expectedResourceReferences, actualResourceReferences)
}

func TestRunAPIFieldMap(t *testing.T) {
for _, modelField := range (&model.Run{}).APIToModelFieldMap() {
assert.Contains(t, runColumns, modelField)
}
}
6 changes: 6 additions & 0 deletions backend/src/apiserver/storage/task_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -617,3 +617,9 @@ func TestTaskStore_UpdateOrCreateTasks(t *testing.T) {
})
}
}

func TestTaskAPIFieldMap(t *testing.T) {
for _, modelField := range (&model.Task{}).APIToModelFieldMap() {
assert.Contains(t, taskColumns, modelField)
}
}
5 changes: 5 additions & 0 deletions backend/src/v2/compiler/argocompiler/argo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ func Test_argo_compiler(t *testing.T) {
platformSpecPath: "../testdata/create_mount_delete_dynamic_pvc_platform.json",
argoYAMLPath: "testdata/create_mount_delete_dynamic_pvc.yaml",
},
{
jobPath: "../testdata/hello_world.json",
platformSpecPath: "../testdata/create_pod_metadata.json",
argoYAMLPath: "testdata/create_pod_metadata.yaml",
},
}
for _, tt := range tests {
t.Run(fmt.Sprintf("%+v", tt), func(t *testing.T) {
Expand Down
86 changes: 69 additions & 17 deletions backend/src/v2/compiler/argocompiler/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,22 @@
package argocompiler

import (
wfapi "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
"os"

wfapi "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
"github.com/golang/protobuf/jsonpb"
"github.com/kubeflow/pipelines/api/v2alpha1/go/pipelinespec"
"github.com/kubeflow/pipelines/backend/src/v2/component"
"github.com/kubeflow/pipelines/kubernetes_platform/go/kubernetesplatform"
k8score "k8s.io/api/core/v1"
)

const (
volumeNameKFPLauncher = "kfp-launcher"
DefaultLauncherImage = "gcr.io/ml-pipeline/kfp-launcher@sha256:80cf120abd125db84fa547640fd6386c4b2a26936e0c2b04a7d3634991a850a4"
DefaultLauncherImage = "gcr.io/ml-pipeline/kfp-launcher@sha256:80cf120abd125db84fa547640fd6386c4b2a26936e0c2b04a7d3634991a850a4"
LauncherImageEnvVar = "V2_LAUNCHER_IMAGE"
DefaultDriverImage = "gcr.io/ml-pipeline/kfp-driver@sha256:8e60086b04d92b657898a310ca9757631d58547e76bbbb8bfc376d654bef1707"
DriverImageEnvVar = "V2_DRIVER_IMAGE"
DefaultDriverImage = "gcr.io/ml-pipeline/kfp-driver@sha256:8e60086b04d92b657898a310ca9757631d58547e76bbbb8bfc376d654bef1707"
DriverImageEnvVar = "V2_DRIVER_IMAGE"
)

func (c *workflowCompiler) Container(name string, component *pipelinespec.ComponentSpec, container *pipelinespec.PipelineDeploymentConfig_PipelineContainerSpec) error {
Expand Down Expand Up @@ -58,19 +61,19 @@ type containerDriverInputs struct {
}

func GetLauncherImage() string {
launcherImage := os.Getenv(LauncherImageEnvVar)
if launcherImage == "" {
launcherImage = DefaultLauncherImage
}
return launcherImage
launcherImage := os.Getenv(LauncherImageEnvVar)
if launcherImage == "" {
launcherImage = DefaultLauncherImage
}
return launcherImage
}

func GetDriverImage() string {
driverImage := os.Getenv(DriverImageEnvVar)
if driverImage == "" {
driverImage = DefaultDriverImage
}
return driverImage
driverImage := os.Getenv(DriverImageEnvVar)
if driverImage == "" {
driverImage = DefaultDriverImage
}
return driverImage
}

func (c *workflowCompiler) containerDriverTask(name string, inputs containerDriverInputs) (*wfapi.DAGTask, *containerDriverOutputs) {
Expand Down Expand Up @@ -169,14 +172,14 @@ type containerExecutorInputs struct {
// name: argo workflows DAG task name
// The other arguments are argo workflows task parameters, they can be either a
// string or a placeholder.
func (c *workflowCompiler) containerExecutorTask(name string, inputs containerExecutorInputs) *wfapi.DAGTask {
func (c *workflowCompiler) containerExecutorTask(name string, inputs containerExecutorInputs, refName string) *wfapi.DAGTask {
when := ""
if inputs.condition != "" {
when = inputs.condition + " != false"
}
return &wfapi.DAGTask{
Name: name,
Template: c.addContainerExecutorTemplate(),
Template: c.addContainerExecutorTemplate(refName),
When: when,
Arguments: wfapi.Arguments{
Parameters: []wfapi.Parameter{
Expand All @@ -191,7 +194,7 @@ func (c *workflowCompiler) containerExecutorTask(name string, inputs containerEx
// any container component task.
// During runtime, it's expected that pod-spec-patch will specify command, args
// and resources etc, that are different for different tasks.
func (c *workflowCompiler) addContainerExecutorTemplate() string {
func (c *workflowCompiler) addContainerExecutorTemplate(refName string) string {
// container template is parent of container implementation template
nameContainerExecutor := "system-container-executor"
nameContainerImpl := "system-container-impl"
Expand Down Expand Up @@ -273,7 +276,56 @@ func (c *workflowCompiler) addContainerExecutorTemplate() string {
Env: commonEnvs,
},
}
// Update pod metadata if it defined in the Kubernetes Spec
if kubernetesConfigString, ok := c.wf.Annotations[annotationKubernetesSpec+refName]; ok {
k8sExecCfg := &kubernetesplatform.KubernetesExecutorConfig{}
if err := jsonpb.UnmarshalString(kubernetesConfigString, k8sExecCfg); err == nil {
extendPodMetadata(&executor.Metadata, k8sExecCfg)
}
}
c.templates[nameContainerImpl] = executor
c.wf.Spec.Templates = append(c.wf.Spec.Templates, *container, *executor)
return nameContainerExecutor
}

// Extends the PodMetadata to include Kubernetes-specific executor config.
// Although the current podMetadata object is always empty, this function
// doesn't overwrite the existing podMetadata because for security reasons
// the existing podMetadata should have higher privilege than the user definition.
func extendPodMetadata(
podMetadata *wfapi.Metadata,
kubernetesExecutorConfig *kubernetesplatform.KubernetesExecutorConfig,
) {
// Get pod metadata information
if kubernetesExecutorConfig.GetPodMetadata() != nil {
labels := kubernetesExecutorConfig.GetPodMetadata().GetLabels()
if labels != nil {
if podMetadata.Labels == nil {
podMetadata.Labels = labels
} else {
podMetadata.Labels = extendMetadataMap(podMetadata.Labels, labels)
}
}
annotations := kubernetesExecutorConfig.GetPodMetadata().GetAnnotations()
if annotations != nil {
if podMetadata.Annotations == nil {
podMetadata.Annotations = annotations
} else {
podMetadata.Annotations = extendMetadataMap(podMetadata.Annotations, annotations)
}
}
}
}

// Extends metadata map values, highPriorityMap should overwrites lowPriorityMap values
// The original Map inputs should have higher priority since its defined by admin
// TODO: Use maps.Copy after moving to go 1.21+
func extendMetadataMap(
highPriorityMap map[string]string,
lowPriorityMap map[string]string,
) map[string]string {
for k, v := range highPriorityMap {
lowPriorityMap[k] = v
}
return lowPriorityMap
}
90 changes: 90 additions & 0 deletions backend/src/v2/compiler/argocompiler/container_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
// Copyright 2021-2024 The Kubeflow Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package argocompiler

import (
"testing"

wfapi "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
"github.com/kubeflow/pipelines/kubernetes_platform/go/kubernetesplatform"
"github.com/stretchr/testify/assert"
)

func Test_extendPodMetadata(t *testing.T) {
tests := []struct {
name string
podMetadata *wfapi.Metadata
kubernetesExecutorConfig *kubernetesplatform.KubernetesExecutorConfig
expected *wfapi.Metadata
}{
{
"Valid - add pod labels and annotations",
&wfapi.Metadata{},
&kubernetesplatform.KubernetesExecutorConfig{
PodMetadata: &kubernetesplatform.PodMetadata{
Annotations: map[string]string{
"run_id": "123456",
},
Labels: map[string]string{
"kubeflow.com/kfp": "pipeline-node",
},
},
},
&wfapi.Metadata{
Annotations: map[string]string{
"run_id": "123456",
},
Labels: map[string]string{
"kubeflow.com/kfp": "pipeline-node",
},
},
},
{
"Valid - try overwrite default pod labels and annotations",
&wfapi.Metadata{
Annotations: map[string]string{
"run_id": "654321",
},
Labels: map[string]string{
"kubeflow.com/kfp": "default-node",
},
},
&kubernetesplatform.KubernetesExecutorConfig{
PodMetadata: &kubernetesplatform.PodMetadata{
Annotations: map[string]string{
"run_id": "123456",
},
Labels: map[string]string{
"kubeflow.com/kfp": "pipeline-node",
},
},
},
&wfapi.Metadata{
Annotations: map[string]string{
"run_id": "654321",
},
Labels: map[string]string{
"kubeflow.com/kfp": "default-node",
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
extendPodMetadata(tt.podMetadata, tt.kubernetesExecutorConfig)
assert.Equal(t, tt.expected, tt.podMetadata)
})
}
}
2 changes: 1 addition & 1 deletion backend/src/v2/compiler/argocompiler/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ func (c *workflowCompiler) task(name string, task *pipelinespec.PipelineTaskSpec
podSpecPatch: driverOutputs.podSpecPatch,
cachedDecision: driverOutputs.cached,
condition: driverOutputs.condition,
})
}, task.GetComponentRef().GetName())
executor.Depends = depends([]string{driverTaskName})
return []wfapi.DAGTask{*driver, *executor}, nil
case *pipelinespec.PipelineDeploymentConfig_ExecutorSpec_Importer:
Expand Down
Loading

0 comments on commit 45d66eb

Please sign in to comment.