Skip to content

Commit

Permalink
Add get_artifact for template invocator use (#1284)
Browse files Browse the repository at this point in the history
**Pull Request Checklist**
- [x] Fixes #1283
- [x] Tests added
- [x] Documentation/examples added
- [x] [Good commit messages](https://cbea.ms/git-commit/) and/or PR
title

**Description of PR**
Adds `get_artifact` to `IOMixin` to allow tasks to get artifacts from
the parent (DAG/Steps) template.

Also added on-cluster test using Artifacts, which requires extra set up
steps to add minio and configure it as the artifact repository.

---------

Signed-off-by: Elliot Gunton <[email protected]>
  • Loading branch information
elliotgunton authored Dec 9, 2024
1 parent 52597f2 commit 25ac420
Show file tree
Hide file tree
Showing 9 changed files with 372 additions and 14 deletions.
15 changes: 6 additions & 9 deletions .github/workflows/cicd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ jobs:
- name: Install poetry
run: pipx install poetry

- name: setup python ${{ matrix.python-version }}
- name: set up python ${{ matrix.python-version }}
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}
Expand Down Expand Up @@ -75,7 +75,7 @@ jobs:
- name: Install poetry
run: pipx install poetry

- name: setup python ${{ matrix.python-version }}
- name: set up python ${{ matrix.python-version }}
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}
Expand Down Expand Up @@ -109,7 +109,7 @@ jobs:
- name: Install poetry
run: pipx install poetry

- name: setup python 3.9
- name: set up python 3.9
uses: actions/setup-python@v5
with:
python-version: 3.9
Expand All @@ -118,17 +118,14 @@ jobs:
- name: Install dependencies
run: poetry install --all-extras

- name: setup k3d cluster
run: make install-k3d

- name: setup and run argo
run: make run-argo
- name: set up cluster
run: make install-k3d set-up-cluster set-up-argo set-up-artifacts

- name: run workflow tests
run: make test-on-cluster

- name: stop argo cluster
run: make stop-argo
run: make stop-cluster

concurrency:
group: ${{ github.workflow }}-${{ github.ref || github.run_id }}
Expand Down
19 changes: 14 additions & 5 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ install-k3d: ## Install k3d client
curl -s https://raw.githubusercontent.com/k3d-io/k3d/main/install.sh | bash

.PHONY: install-argo
install-argo: ## Install argo client
install-argo: ## Install argo CLI client
# Download the binary
curl -sLO https://github.com/argoproj/argo-workflows/releases/download/v$(ARGO_WORKFLOWS_VERSION)/argo-linux-amd64.gz

Expand All @@ -163,18 +163,27 @@ install-argo: ## Install argo client
# Test installation
argo version

.PHONY: run-argo
run-argo: ## Start the argo server
.PHONY: set-up-cluster
set-up-cluster: ## Create the cluster and argo namespace
k3d cluster list | grep test-cluster || k3d cluster create test-cluster
k3d kubeconfig merge test-cluster --kubeconfig-switch-context
kubectl get namespace argo || kubectl create namespace argo

.PHONY: set-up-argo
set-up-argo: ## Start the argo service
kubectl apply -n argo -f https://github.com/argoproj/argo-workflows/releases/download/v$(ARGO_WORKFLOWS_VERSION)/install.yaml
kubectl patch deployment argo-server --namespace argo --type='json' -p='[{"op": "replace", "path": "/spec/template/spec/containers/0/args", "value": ["server", "--auth-mode=server"]}]'
kubectl create rolebinding default-admin --clusterrole=admin --serviceaccount=argo:default --namespace=argo
kubectl rollout status -n argo deployment/argo-server --timeout=120s --watch=true

.PHONY: stop-argo
stop-argo: ## Stop the argo server
.PHONY: set-up-artifacts
set-up-artifacts: ## Adds minio for running examples with artifact storage
kubectl apply -n argo -f https://raw.githubusercontent.com/argoproj-labs/training-material/main/config/minio/minio.yaml
kubectl apply -n argo -f https://raw.githubusercontent.com/argoproj-labs/training-material/main/config/argo-workflows/workflows-controller-configmap.yaml
kubectl apply -n argo -f tests/submissions/roles.yaml

.PHONY: stop-cluster
stop-cluster: ## Stop the cluster
k3d cluster stop test-cluster

.PHONY: test-on-cluster
Expand Down
121 changes: 121 additions & 0 deletions docs/examples/workflows/artifacts/artifacts_in_dags.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
# Artifacts In Dags



This example shows how to use artifacts as inputs and outputs of DAGs.


=== "Hera"

```python linenums="1"
from hera.workflows import DAG, Artifact, Container, Workflow

with Workflow(
generate_name="artifacts-in-dags-",
entrypoint="runner-dag",
) as w:
hello_world_to_file = Container(
name="hello-world-to-file",
image="busybox",
command=["sh", "-c"],
args=["sleep 1; echo hello world | tee /tmp/hello_world.txt"],
outputs=[Artifact(name="hello-art", path="/tmp/hello_world.txt")],
)
print_message_from_file = Container(
name="print-message-from-file",
image="alpine:latest",
command=["sh", "-c"],
args=["cat /tmp/message"],
inputs=[Artifact(name="message", path="/tmp/message")],
)

# First DAG generates an artifact from a task, and "lifts" it out as an output of the DAG template itself
with DAG(
name="generate-artifact-dag",
outputs=[Artifact(name="hello-file", from_="{{tasks.hello-world-to-file.outputs.artifacts.hello-art}}")],
) as d1:
hello_world_to_file()

# Second DAG takes an artifact input, and the task references it using `get_artifact`
with DAG(name="consume-artifact-dag", inputs=[Artifact(name="hello-file-input")]) as d2:
print_message_from_file(
arguments=d2.get_artifact("hello-file-input").with_name("message"),
)

# Third DAG orchestrates the first two, by creating tasks by "calling" the objects
with DAG(name="runner-dag"):
generator_dag = d1()
consumer_dag = d2(arguments=generator_dag.get_artifact("hello-file").with_name("hello-file-input"))

generator_dag >> consumer_dag
```

=== "YAML"

```yaml linenums="1"
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: artifacts-in-dags-
spec:
entrypoint: runner-dag
templates:
- container:
args:
- sleep 1; echo hello world | tee /tmp/hello_world.txt
command:
- sh
- -c
image: busybox
name: hello-world-to-file
outputs:
artifacts:
- name: hello-art
path: /tmp/hello_world.txt
- container:
args:
- cat /tmp/message
command:
- sh
- -c
image: alpine:latest
inputs:
artifacts:
- name: message
path: /tmp/message
name: print-message-from-file
- dag:
tasks:
- name: hello-world-to-file
template: hello-world-to-file
name: generate-artifact-dag
outputs:
artifacts:
- from: '{{tasks.hello-world-to-file.outputs.artifacts.hello-art}}'
name: hello-file
- dag:
tasks:
- arguments:
artifacts:
- from: '{{inputs.artifacts.hello-file-input}}'
name: message
name: print-message-from-file
template: print-message-from-file
inputs:
artifacts:
- name: hello-file-input
name: consume-artifact-dag
- dag:
tasks:
- name: generate-artifact-dag
template: generate-artifact-dag
- arguments:
artifacts:
- from: '{{tasks.generate-artifact-dag.outputs.artifacts.hello-file}}'
name: hello-file-input
depends: generate-artifact-dag
name: consume-artifact-dag
template: consume-artifact-dag
name: runner-dag
```

64 changes: 64 additions & 0 deletions examples/workflows/artifacts/artifacts-in-dags.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: artifacts-in-dags-
spec:
entrypoint: runner-dag
templates:
- container:
args:
- sleep 1; echo hello world | tee /tmp/hello_world.txt
command:
- sh
- -c
image: busybox
name: hello-world-to-file
outputs:
artifacts:
- name: hello-art
path: /tmp/hello_world.txt
- container:
args:
- cat /tmp/message
command:
- sh
- -c
image: alpine:latest
inputs:
artifacts:
- name: message
path: /tmp/message
name: print-message-from-file
- dag:
tasks:
- name: hello-world-to-file
template: hello-world-to-file
name: generate-artifact-dag
outputs:
artifacts:
- from: '{{tasks.hello-world-to-file.outputs.artifacts.hello-art}}'
name: hello-file
- dag:
tasks:
- arguments:
artifacts:
- from: '{{inputs.artifacts.hello-file-input}}'
name: message
name: print-message-from-file
template: print-message-from-file
inputs:
artifacts:
- name: hello-file-input
name: consume-artifact-dag
- dag:
tasks:
- name: generate-artifact-dag
template: generate-artifact-dag
- arguments:
artifacts:
- from: '{{tasks.generate-artifact-dag.outputs.artifacts.hello-file}}'
name: hello-file-input
depends: generate-artifact-dag
name: consume-artifact-dag
template: consume-artifact-dag
name: runner-dag
42 changes: 42 additions & 0 deletions examples/workflows/artifacts/artifacts_in_dags.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
"""This example shows how to use artifacts as inputs and outputs of DAGs."""

from hera.workflows import DAG, Artifact, Container, Workflow

with Workflow(
generate_name="artifacts-in-dags-",
entrypoint="runner-dag",
) as w:
hello_world_to_file = Container(
name="hello-world-to-file",
image="busybox",
command=["sh", "-c"],
args=["sleep 1; echo hello world | tee /tmp/hello_world.txt"],
outputs=[Artifact(name="hello-art", path="/tmp/hello_world.txt")],
)
print_message_from_file = Container(
name="print-message-from-file",
image="alpine:latest",
command=["sh", "-c"],
args=["cat /tmp/message"],
inputs=[Artifact(name="message", path="/tmp/message")],
)

# First DAG generates an artifact from a task, and "lifts" it out as an output of the DAG template itself
with DAG(
name="generate-artifact-dag",
outputs=[Artifact(name="hello-file", from_="{{tasks.hello-world-to-file.outputs.artifacts.hello-art}}")],
) as d1:
hello_world_to_file()

# Second DAG takes an artifact input, and the task references it using `get_artifact`
with DAG(name="consume-artifact-dag", inputs=[Artifact(name="hello-file-input")]) as d2:
print_message_from_file(
arguments=d2.get_artifact("hello-file-input").with_name("message"),
)

# Third DAG orchestrates the first two, by creating tasks by "calling" the objects
with DAG(name="runner-dag"):
generator_dag = d1()
consumer_dag = d2(arguments=generator_dag.get_artifact("hello-file").with_name("hello-file-input"))

generator_dag >> consumer_dag
24 changes: 24 additions & 0 deletions src/hera/workflows/_mixins.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,30 @@ def get_parameter(self, name: str) -> Parameter:
return param
raise KeyError(f"Parameter {name} not found.")

def get_artifact(self, name: str) -> Artifact:
"""Finds and returns the artifact with the supplied name.
Note that this method will raise an error if the artifact is not found.
Args:
name: name of the input artifact to find and return.
Returns:
Artifact: the artifact with the supplied name.
Raises:
KeyError: if the artifact is not found.
"""
inputs = self._build_inputs()
if inputs is None:
raise KeyError(f"No inputs set. Artifact {name} not found.")
if inputs.artifacts is None:
raise KeyError(f"No artifacts set. Artifact {name} not found.")
for artifact in inputs.artifacts:
if artifact.name == name:
return Artifact(name=name, from_=f"{{{{inputs.artifacts.{artifact.name}}}}}")
raise KeyError(f"Artifact {name} not found.")

def _build_inputs(self) -> Optional[ModelInputs]:
"""Processes the `inputs` field and returns a generated `ModelInputs`."""
if self.inputs is None:
Expand Down
25 changes: 25 additions & 0 deletions tests/submissions/roles.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: executor
rules:
- apiGroups:
- argoproj.io
resources:
- workflowtaskresults
verbs:
- create
- patch
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: argo-executor-binding
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: executor
subjects:
- kind: ServiceAccount
name: argo
namespace: argo
Loading

0 comments on commit 25ac420

Please sign in to comment.