Skip to content

Commit

Permalink
Merge pull request #956 from revit13/kfpv2tunid
Browse files Browse the repository at this point in the history
[KFP] Obtain the Ray cluster run ID from the user for KFP v2.
  • Loading branch information
touma-I authored Jan 27, 2025
2 parents d56687b + a306cbf commit 8e03629
Show file tree
Hide file tree
Showing 64 changed files with 498 additions and 400 deletions.
1 change: 1 addition & 0 deletions examples/kfp-pipelines/superworkflows/ray/kfp_v2/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ Another useful feature of the KFP v2 is the `Json` editor for the `dict` type in
- It creates just one run that includes all the nested transfroms and their sub-tasks.
- No need for additional component as `executeSubWorkflowComponent.yaml`. All the implementation in the same pipeline file.
- In superpipelines of KFP v1 there exists an option to override the common parameters with specific values for each one of the transforms. This option is missing in the KFP v2 superpipelines.
- In kfp V2 pipelines the user is requested to insert a unique string for the ray cluster created at run creation time (called `ray_run_id_KFPv2`). This is because in KFPv2 `dsl.RUN_ID_PLACEHOLDER` is deprecated and cannot be used since SDK 2.5.0 and we cannot generate a unique string at run-time, see https://github.com/kubeflow/pipelines/issues/10187.

### How to compile the superpipeline
```
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ def super_pipeline(
p1_pipeline_data_max_files: int = -1,
p1_pipeline_data_num_samples: int = -1,
p1_pipeline_data_checkpointing: bool = False,
p1_pipeline_ray_run_id_KFPv2: str = "",
# noop step parameters
p2_name: str = "noop",
p2_skip: bool = False,
Expand Down
59 changes: 44 additions & 15 deletions kfp/doc/simple_transform_pipeline.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@ Note: the project and the explanation below are based on [KFPv1](https://www.kub
import kfp.compiler as compiler
import kfp.components as comp
import kfp.dsl as dsl
import os
from kfp_support.workflow_support.runtime_utils import (
DEFAULT_KFP_COMPONENT_SPEC_PATH,
ONE_HOUR_SEC,
ONE_WEEK_SEC,
ComponentUtils,
Expand All @@ -56,18 +58,24 @@ Ray cluster. For each step we have to define a component that will execute them:

```python
# components
base_kfp_image = "quay.io/dataprep1/data-prep-kit/kfp-data-processing:0.0.2"
# compute execution parameters. Here different transforms might need different implementations. As
# a result, instead of creating a component we are creating it in place here.
compute_exec_params_op = comp.func_to_container_op(
func=ComponentUtils.default_compute_execution_params, base_image=base_kfp_image
)
base_kfp_image = "quay.io/dataprep1/data-prep-kit/kfp-data-processing:latest"
component_spec_path = os.getenv("KFP_COMPONENT_SPEC_PATH", DEFAULT_KFP_COMPONENT_SPEC_PATH)
# KFPv1 and KFP2 uses different methods to create a component from a function. KFPv1 uses the
# `create_component_from_func` function, but it is deprecated by KFPv2 and so has a different import path.
# KFPv2 recommends using the `@dsl.component` decorator, which doesn't exist in KFPv1. Therefore, here we use
# this if/else statement and explicitly call the decorator.
if os.getenv("KFPv2", "0") == "1":
compute_exec_params_op = dsl.component_decorator.component(
func=compute_exec_params_func, base_image=base_kfp_image
)
else:
compute_exec_params_op = comp.create_component_from_func(func=compute_exec_params_func, base_image=base_kfp_image)
# create Ray cluster
create_ray_op = comp.load_component_from_file("../../../kfp_ray_components/createRayComponent.yaml")
create_ray_op = comp.load_component_from_file(component_spec_path + "createRayClusterComponent.yaml")
# execute job
execute_ray_jobs_op = comp.load_component_from_file("../../../kfp_ray_components/executeRayJobComponent.yaml")
execute_ray_jobs_op = comp.load_component_from_file(component_spec_path + "executeRayJobComponent.yaml")
# clean up Ray
cleanup_ray_op = comp.load_component_from_file("../../../kfp_ray_components/cleanupRayComponent.yaml")
cleanup_ray_op = comp.load_component_from_file(component_spec_path + "deleteRayClusterComponent.yaml")
# Task name is part of the pipeline name, the ray cluster name and the job name in DMF.
TASK_NAME: str = "noop"
```
Expand All @@ -84,6 +92,7 @@ The input parameters section defines all the parameters required for the pipelin
```python
# Ray cluster
ray_name: str = "noop-kfp-ray", # name of Ray cluster
ray_run_id_KFPv2: str = "",
ray_head_options: str = '{"cpu": 1, "memory": 4, \
"image": "' + task_image + '" }',
ray_worker_options: str = '{"replicas": 2, "max_replicas": 2, "min_replicas": 2, "cpu": 2, "memory": 4, \
Expand All @@ -94,6 +103,7 @@ The input parameters section defines all the parameters required for the pipelin
data_s3_access_secret: str = "s3-secret",
data_max_files: int = -1,
data_num_samples: int = -1,
data_checkpointing: bool = False,
# orchestrator
actor_options: str = "{'num_cpus': 0.8}",
pipeline_id: str = "pipeline_id",
Expand All @@ -107,6 +117,7 @@ The input parameters section defines all the parameters required for the pipelin
The parameters used here are as follows:

* ray_name: name of the Ray cluster
* ray_run_id_KFPv2: Ray cluster unique ID used only in KFP v2
* ray_head_options: head node options, containing the following:
* cpu - number of cpus
* memory - memory
Expand Down Expand Up @@ -148,21 +159,39 @@ Now, when all components and input parameters are defined, we can implement pipe
component execution and parameters submitted to every component.

```python
# In KFPv2 dsl.RUN_ID_PLACEHOLDER is deprecated and cannot be used since SDK 2.5.0. On another hand we cannot create
# a unique string in a component (at runtime) and pass it to the `clean_up_task` of `ExitHandler`, due to
# https://github.com/kubeflow/pipelines/issues/10187. Therefore, meantime the user is requested to insert
# a unique string created at run creation time.
if os.getenv("KFPv2", "0") == "1":
print("WARNING: the ray cluster name can be non-unique at runtime, please do not execute simultaneous Runs of the "
"same version of the same pipeline !!!")
run_id = ray_run_id_KFPv2
else:
run_id = dsl.RUN_ID_PLACEHOLDER
# create clean_up task
clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=dsl.RUN_ID_PLACEHOLDER, server_url=server_url, additional_params=additional_params)
clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=run_id, server_url=server_url, additional_params=additional_params)
ComponentUtils.add_settings_to_component(clean_up_task, ONE_HOUR_SEC * 2)
# pipeline definition
with dsl.ExitHandler(clean_up_task):
# compute execution params
compute_exec_params = compute_exec_params_op(
worker_options=ray_worker_options,
actor_options=actor_options,
worker_options=ray_worker_options,
actor_options=runtime_actor_options,
data_s3_config=data_s3_config,
data_max_files=data_max_files,
data_num_samples=data_num_samples,
data_checkpointing=data_checkpointing,
runtime_pipeline_id=runtime_pipeline_id,
runtime_job_id=run_id,
runtime_code_location=runtime_code_location,
noop_sleep_sec=noop_sleep_sec,
)
ComponentUtils.add_settings_to_component(compute_exec_params, ONE_HOUR_SEC * 2)
# start Ray cluster
ray_cluster = create_ray_op(
ray_name=ray_name,
run_id=dsl.RUN_ID_PLACEHOLDER,
run_id=run_id,
ray_head_options=ray_head_options,
ray_worker_options=ray_worker_options,
server_url=server_url,
Expand All @@ -173,7 +202,7 @@ component execution and parameters submitted to every component.
# Execute job
execute_job = execute_ray_jobs_op(
ray_name=ray_name,
run_id=dsl.RUN_ID_PLACEHOLDER,
run_id=run_id,
additional_params=additional_params,
# note that the parameters below are specific for NOOP transform
exec_params={
Expand All @@ -183,7 +212,7 @@ component execution and parameters submitted to every component.
"num_workers": compute_exec_params.output,
"worker_options": actor_options,
"pipeline_id": pipeline_id,
"job_id": dsl.RUN_ID_PLACEHOLDER,
"job_id": run_id,
"code_location": code_location,
"noop_sleep_sec": noop_sleep_sec,
},
Expand Down
4 changes: 2 additions & 2 deletions kfp/kfp_ray_components/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ FROM ${BASE_IMAGE}

# see https://docs.openshift.com/container-platform/4.17/openshift_images/create-images.html#use-uid_create-images
USER root
RUN chown ray:root /home/ray && chmod 775 /home/ray
RUN chown ray:root /home/ray && chmod g=u /home/ray
USER ray

# install libraries
Expand All @@ -30,7 +30,7 @@ RUN pip install --no-cache-dir pydantic==2.6.3
# remove credentials-containing file
RUN rm requirements.txt
# components
COPY ./src /pipelines/component/src
COPY --chmod=775 --chown=ray:root ./src /pipelines/component/src

# Set environment
ENV KFP_v2=$KFP_v2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,27 @@ def run_test(pipeline_package_path: str, endpoint: str = "http://localhost:8080/
logger.info(f"Pipeline {pipeline_name} successfully completed")
return pipeline_name

def _set_run_id(pipeline_package_path: str):
"""
Assign a dummy run ID value for testing purposes. By default, this value
is empty and is set by the user during runtime.
:param pipeline_package_path: Local path to the pipeline package.
"""
import yaml
import uuid

try:
stream = open(pipeline_package_path, "r")
docs = list(yaml.load_all(stream, yaml.FullLoader))
for doc in docs:
if "root" in doc:
doc["root"]["inputDefinitions"]["parameters"]["ray_run_id_KFPv2"]["defaultValue"] = uuid.uuid4().hex
with open(pipeline_package_path, "w") as outfile:
yaml.dump_all(docs, outfile)
except Exception as e:
logger.error(f"Failed to update run id value, exception {e}")
sys.exit(1)

if __name__ == "__main__":
import argparse
Expand All @@ -74,6 +95,7 @@ def run_test(pipeline_package_path: str, endpoint: str = "http://localhost:8080/
if pipeline is None:
sys.exit(1)
case "sanity-test":
_set_run_id(args.pipeline_package_path)
run = run_test(
endpoint=args.endpoint,
pipeline_package_path=args.pipeline_package_path,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,10 @@ def runtime_name(ray_name: str = "", run_id: str = "") -> str:
# the return value plus namespace name will be the name of the Ray Route,
# which length is restricted to 64 characters,
# therefore we restrict the return name by 15 character.
if run_id != "":
return f"{ray_name[:9]}-{run_id[:5]}"
return ray_name[:15]
if run_id == "":
logger.error("Run ID must not be provided")
sys.exit(1)
return f"{ray_name[:9]}-{run_id[:5]}"

@staticmethod
def dict_to_req(d: dict[str, Any], executor: str = "transformer_launcher.py") -> str:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
pipeline_parameters:
name: "noop"
description: "Pipeline for noop task"
script_name: "noop_transform.py"
script_name: "-m dpk_noop.ray.runtime"
prefix: ""
multi_s3: False
compute_func_name: ""
Expand Down
24 changes: 12 additions & 12 deletions kfp/pipeline_generator/single-pipeline/templates/simple_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,23 +73,11 @@ def compute_exec_params_func(
# KFPv2 recommends using the `@dsl.component` decorator, which doesn't exist in KFPv1. Therefore, here we use
# this if/else statement and explicitly call the decorator.
if os.getenv("KFPv2", "0") == "1":
# In KFPv2 dsl.RUN_ID_PLACEHOLDER is deprecated and cannot be used since SDK 2.5.0. On another hand we cannot create
# a unique string in a component (at runtime) and pass it to the `clean_up_task` of `ExitHandler`, due to
# https://github.com/kubeflow/pipelines/issues/10187. Therefore, meantime we use a unique string created at
# compilation time.
import uuid

compute_exec_params_op = dsl.component_decorator.component(
func=compute_exec_params_func, base_image=base_kfp_image
)
print(
"WARNING: the ray cluster name can be non-unique at runtime, please do not execute simultaneous Runs of the "
+ "same version of the same pipeline !!!"
)
run_id = uuid.uuid4().hex
else:
compute_exec_params_op = comp.create_component_from_func(func=compute_exec_params_func, base_image=base_kfp_image)
run_id = dsl.RUN_ID_PLACEHOLDER

# create Ray cluster
create_ray_op = comp.load_component_from_file(component_spec_path + "createRayClusterComponent.yaml")
Expand All @@ -109,6 +97,7 @@ def compute_exec_params_func(
def {{ pipeline_name }}(
# Ray cluster
ray_name: str = "{{ pipeline_name }}-kfp-ray", # name of Ray cluster
ray_run_id_KFPv2: str = "", # Ray cluster unique ID used only in KFP v2
# Add image_pull_secret and image_pull_policy to ray workers if needed
{%- if image_pull_secret != "" %}
ray_head_options: dict = {"cpu": 1, "memory": 4, "image_pull_secret": "{{ image_pull_secret }}", "image": task_image},
Expand Down Expand Up @@ -142,6 +131,7 @@ def {{ pipeline_name }}(
"""
Pipeline to execute {{ pipeline_name }} transform
:param ray_name: name of the Ray cluster
:param ray_run_id_KFPv2: a unique string id used for the Ray cluster, applicable only in KFP v2.
:param ray_head_options: head node options, containing the following:
cpu - number of cpus
memory - memory
Expand Down Expand Up @@ -177,6 +167,16 @@ def {{ pipeline_name }}(
{%- endfor %}
:return: None
"""
# In KFPv2 dsl.RUN_ID_PLACEHOLDER is deprecated and cannot be used since SDK 2.5.0. On another hand we cannot create
# a unique string in a component (at runtime) and pass it to the `clean_up_task` of `ExitHandler`, due to
# https://github.com/kubeflow/pipelines/issues/10187. Therefore, meantime the user is requested to insert
# a unique string created at run creation time.
if os.getenv("KFPv2", "0") == "1":
print("WARNING: the ray cluster name can be non-unique at runtime, please do not execute simultaneous Runs of the "
"same version of the same pipeline !!!")
run_id = ray_run_id_KFPv2
else:
run_id = dsl.RUN_ID_PLACEHOLDER
# create clean_up task
clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=run_id, server_url=server_url, additional_params=additional_params)
ComponentUtils.add_settings_to_component(clean_up_task, ONE_HOUR_SEC * 2)
Expand Down
2 changes: 1 addition & 1 deletion tools/ingest2parquet/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ FROM ${BASE_IMAGE}

# see https://docs.openshift.com/container-platform/4.17/openshift_images/create-images.html#use-uid_create-images
USER root
RUN chown ray:root /home/ray && chmod 775 /home/ray
RUN chown ray:root /home/ray && chmod g=u /home/ray
USER ray

# install pytest
Expand Down
2 changes: 1 addition & 1 deletion transforms/Dockerfile.ray.template
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ FROM ${BASE_IMAGE}

# see https://docs.openshift.com/container-platform/4.17/openshift_images/create-images.html#use-uid_create-images
USER root
RUN chown ray:root /home/ray && chmod 775 /home/ray
RUN chown ray:root /home/ray && chmod g=u /home/ray
USER ray

RUN pip install --upgrade --no-cache-dir pip
Expand Down
24 changes: 12 additions & 12 deletions transforms/code/code2parquet/kfp_ray/code2parquet_wf.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,23 +77,11 @@ def compute_exec_params_func(
# KFPv2 recommends using the `@dsl.component` decorator, which doesn't exist in KFPv1. Therefore, here we use
# this if/else statement and explicitly call the decorator.
if os.getenv("KFPv2", "0") == "1":
# In KFPv2 dsl.RUN_ID_PLACEHOLDER is deprecated and cannot be used since SDK 2.5.0. On another hand we cannot create
# a unique string in a component (at runtime) and pass it to the `clean_up_task` of `ExitHandler`, due to
# https://github.com/kubeflow/pipelines/issues/10187. Therefore, meantime we use a unique string created at
# compilation time.
import uuid

compute_exec_params_op = dsl.component_decorator.component(
func=compute_exec_params_func, base_image=base_kfp_image
)
print(
"WARNING: the ray cluster name can be non-unique at runtime, please do not execute simultaneous Runs of the "
+ "same version of the same pipeline !!!"
)
run_id = uuid.uuid4().hex
else:
compute_exec_params_op = comp.create_component_from_func(func=compute_exec_params_func, base_image=base_kfp_image)
run_id = dsl.RUN_ID_PLACEHOLDER


# create Ray cluster
Expand All @@ -113,6 +101,7 @@ def compute_exec_params_func(
)
def code2parquet(
ray_name: str = "code2parquet-kfp-ray", # name of Ray cluster
ray_run_id_KFPv2: str = "", # Ray cluster unique ID used only in KFP v2
# Add image_pull_secret and image_pull_policy to ray workers if needed
ray_head_options: dict = {"cpu": 1, "memory": 4, "image": task_image},
ray_worker_options: dict = {"replicas": 2, "max_replicas": 2, "min_replicas": 2, "cpu": 2, "memory": 4, "image": task_image},
Expand All @@ -139,6 +128,7 @@ def code2parquet(
"""
Pipeline to execute NOOP transform
:param ray_name: name of the Ray cluster
:param ray_run_id_KFPv2: a unique string id used for the Ray cluster, applicable only in KFP v2.
:param ray_head_options: head node options, containing the following:
cpu - number of cpus
memory - memory
Expand Down Expand Up @@ -178,6 +168,16 @@ def code2parquet(
(here we are assuming that select language info is in S3, but potentially in the different bucket)
:return: None
"""
# In KFPv2 dsl.RUN_ID_PLACEHOLDER is deprecated and cannot be used since SDK 2.5.0. On another hand we cannot create
# a unique string in a component (at runtime) and pass it to the `clean_up_task` of `ExitHandler`, due to
# https://github.com/kubeflow/pipelines/issues/10187. Therefore, meantime the user is requested to insert
# a unique string created at run creation time.
if os.getenv("KFPv2", "0") == "1":
print("WARNING: the ray cluster name can be non-unique at runtime, please do not execute simultaneous Runs of the "
"same version of the same pipeline !!!")
run_id = ray_run_id_KFPv2
else:
run_id = dsl.RUN_ID_PLACEHOLDER
# create clean_up task
clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=run_id, server_url=server_url, additional_params=additional_params)
ComponentUtils.add_settings_to_component(clean_up_task, ONE_HOUR_SEC * 2)
Expand Down
10 changes: 5 additions & 5 deletions transforms/code/code2parquet/ray/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ FROM ${BASE_IMAGE}

# see https://docs.openshift.com/container-platform/4.17/openshift_images/create-images.html#use-uid_create-images
USER root
RUN chown ray:root /home/ray && chmod 775 /home/ray
RUN chown ray:root /home/ray && chmod g=u /home/ray
USER ray

RUN pip install --upgrade --no-cache-dir pip
Expand All @@ -28,14 +28,14 @@ COPY --chmod=775 --chown=ray:root pyproject.toml pyproject.toml
RUN pip install --no-cache-dir -e .

# copy the main() entry point to the image
COPY src/code2parquet_transform_ray.py .
COPY --chmod=775 --chown=ray:root src/code2parquet_transform_ray.py .

# copy some of the samples in
COPY src/code2parquet_local_ray.py local/
COPY --chmod=775 --chown=ray:root src/code2parquet_local_ray.py local/

# copy test
COPY test/ test/
COPY test-data/ test-data/
COPY --chmod=775 --chown=ray:root test/ test/
COPY --chmod=775 --chown=ray:root test-data/ test-data/

# Set environment
ENV PYTHONPATH /home/ray
Expand Down
2 changes: 1 addition & 1 deletion transforms/code/code_profiler/Dockerfile.ray
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ FROM ${BASE_IMAGE}

# see https://docs.openshift.com/container-platform/4.17/openshift_images/create-images.html#use-uid_create-images
USER root
RUN chown ray:root /home/ray && chmod 775 /home/ray
RUN chown ray:root /home/ray && chmod g=u /home/ray
USER ray

RUN pip install --upgrade --no-cache-dir pip
Expand Down
Loading

0 comments on commit 8e03629

Please sign in to comment.