From c5117e54c52e324f29182c07fcbe3a613768e09a Mon Sep 17 00:00:00 2001 From: Revital Sur Date: Mon, 27 Jan 2025 06:11:40 +0200 Subject: [PATCH] Fix super pipeline kfp v2. Signed-off-by: Revital Sur --- examples/kfp-pipelines/superworkflows/ray/kfp_v2/README.md | 1 + .../superworkflows/ray/kfp_v2/superpipeline_noop_docId_v2_wf.py | 2 ++ transforms/universal/doc_id/kfp_ray/doc_id_wf.py | 2 +- 3 files changed, 4 insertions(+), 1 deletion(-) diff --git a/examples/kfp-pipelines/superworkflows/ray/kfp_v2/README.md b/examples/kfp-pipelines/superworkflows/ray/kfp_v2/README.md index f68c1aaf7..2a16be57f 100644 --- a/examples/kfp-pipelines/superworkflows/ray/kfp_v2/README.md +++ b/examples/kfp-pipelines/superworkflows/ray/kfp_v2/README.md @@ -21,6 +21,7 @@ Another useful feature of the KFP v2 is the `Json` editor for the `dict` type in - It creates just one run that includes all the nested transfroms and their sub-tasks. - No need for additional component as `executeSubWorkflowComponent.yaml`. All the implementation in the same pipeline file. - In superpipelines of KFP v1 there exists an option to override the common parameters with specific values for each one of the transforms. This option is missing in the KFP v2 superpipelines. +- In kfp V2 pipelines the user is requested to insert a unique string for the ray cluster created at run creation time (called `ray_run_id_KFPv2`). This is because in KFPv2 dsl.RUN_ID_PLACEHOLDER is deprecated and cannot be used since SDK 2.5.0. ### How to compile the superpipeline ``` diff --git a/examples/kfp-pipelines/superworkflows/ray/kfp_v2/superpipeline_noop_docId_v2_wf.py b/examples/kfp-pipelines/superworkflows/ray/kfp_v2/superpipeline_noop_docId_v2_wf.py index 434d84ab0..7c82ab79a 100644 --- a/examples/kfp-pipelines/superworkflows/ray/kfp_v2/superpipeline_noop_docId_v2_wf.py +++ b/examples/kfp-pipelines/superworkflows/ray/kfp_v2/superpipeline_noop_docId_v2_wf.py @@ -62,6 +62,7 @@ def super_pipeline( p2_skip: bool = False, p2_noop_sleep_sec: int = 10, p2_ray_name: str = "noop-kfp-ray", + p2_ray_run_id_KFPv2: str = "", p2_ray_head_options: dict = {"cpu": 1, "memory": 4, "image_pull_secret": "", "image": noop_image}, p2_ray_worker_options: dict = { "replicas": 2, @@ -75,6 +76,7 @@ def super_pipeline( # Document ID step parameters p3_name: str = "doc_id", p3_ray_name: str = "docid-kfp-ray", + p3_ray_run_id_KFPv2: str = "", p3_ray_head_options: dict = {"cpu": 1, "memory": 4, "image_pull_secret": "", "image": doc_id_image}, p3_ray_worker_options: dict = { "replicas": 2, diff --git a/transforms/universal/doc_id/kfp_ray/doc_id_wf.py b/transforms/universal/doc_id/kfp_ray/doc_id_wf.py index 0b9ccd42d..2542a876c 100644 --- a/transforms/universal/doc_id/kfp_ray/doc_id_wf.py +++ b/transforms/universal/doc_id/kfp_ray/doc_id_wf.py @@ -27,7 +27,7 @@ # the name of the job script EXEC_SCRIPT_NAME: str = "-m dpk_doc_id.ray.transform" # components -base_kfp_image = "quay.io/dataprep1/data-prep-kit/kfp-data-processing:0.2.3" +base_kfp_image = "quay.io/dataprep1/data-prep-kit/kfp-data-processing:latest" # path to kfp component specifications files component_spec_path = os.getenv(