Skip to content

Commit

Permalink
minor bugs and improving docs (#136)
Browse files Browse the repository at this point in the history
* fix: minor bugs with on faliure behaviours

* fix: minor bugs with on faliure behaviours

* fix: adding secrets to sdk
  • Loading branch information
vijayvammi authored Apr 17, 2024
1 parent 59b848b commit 4033b2b
Show file tree
Hide file tree
Showing 51 changed files with 1,772 additions and 524 deletions.
44 changes: 44 additions & 0 deletions examples/01-tasks/notebook.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
"""
You can execute this pipeline by:
python examples/01-tasks/notebook.py
The notebook is executed in the same environment so any installed packages are available for the
notebook.
Upon successful execution, the output notebook with cell outputs is stored in the catalog.
For example, the catalog structure for this execution would be:
.catalog
└── meek-rosalind-0853
├── examples
│   └── common
│   └── simple_notebook_out.ipynb
└── notebook.execution.log
The notebook simple_notebook_out.ipynb has the captured stdout of "Hello World!".
"""

from runnable import NotebookTask, Pipeline


def main():
# Execute the notebook present in examples/common/simple_notebook.ipynb.
# The path is relative to the project root.
# If this step executes successfully, the pipeline will terminate with success
hello_task = NotebookTask(
name="hello",
notebook="examples/common/simple_notebook.ipynb",
terminate_with_success=True,
)

# The pipeline has only one step.
pipeline = Pipeline(steps=[hello_task])

pipeline.execute()

return pipeline


if __name__ == "__main__":
main()
35 changes: 35 additions & 0 deletions examples/01-tasks/notebook.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
dag:
description: |
This is a sample pipeline with one step that executes a notebook.
The notebook is executed in the same environment so any installed
packages are available for the notebook.
Upon successful execution, the output notebook with cell outputs
is stored in the catalog.
For example, the catalog structure for this execution would be:
.catalog
└── meek-rosalind-0853
├── examples
│   └── common
│   └── simple_notebook_out.ipynb
└── notebook.execution.log
The notebook simple_notebook_out.ipynb has the captured stdout of "Hello World!".
You can run this pipeline as:
runnable execute -f examples/01-tasks/notebook.yaml
start_at: notebook
steps:
notebook:
type: task
command_type: notebook
command: examples/common/simple_notebook.ipynb # The path is relative to the root of the project.
next: success
success:
type: success
fail:
type: fail
41 changes: 41 additions & 0 deletions examples/01-tasks/python_tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
"""
You can execute this pipeline by:
python examples/01-tasks/python_tasks.py
The stdout of "Hello World!" would be captured as execution log and stored in the catalog.
An example of the catalog structure:
.catalog
└── baked-heyrovsky-0602
└── hello.execution.log
2 directories, 1 file
The hello.execution.log has the captured stdout of "Hello World!".
"""

from examples.common.functions import hello
from runnable import Pipeline, PythonTask


def main():
# Create a tasks which calls the function "hello"
# If this step executes successfully, the pipeline will terminate with success
hello_task = PythonTask(
name="hello",
function=hello,
terminate_with_success=True,
)

# The pipeline has only one step.
pipeline = Pipeline(steps=[hello_task])

pipeline.execute()

return pipeline


if __name__ == "__main__":
main()
27 changes: 27 additions & 0 deletions examples/01-tasks/python_tasks.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
dag:
description: |
You can run this pipeline by:
runnable execute -f examples/01-tasks/python_tasks.yaml
The stdout of "Hello World!" would be captured as
execution log and stored in the catalog.
An example of the catalog structure:
.catalog
└── baked-heyrovsky-0602
└── hello.execution.log
2 directories, 1 file
The hello.execution.log has the captured stdout of "Hello World!".
start_at: hello_task
steps:
hello_task:
type: task # The functional unit of the pipeline which does the work.
command: examples.common.functions.hello # dotted path to the function.
next: success # If this function succeeds, mark the pipeline as success
success:
type: success
fail:
type: fail
37 changes: 37 additions & 0 deletions examples/01-tasks/scripts.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
"""
You can execute this pipeline by:
python examples/01-tasks/scripts.py
The command can be anything that can be executed in a shell.
The stdout/stderr of the execution is captured as execution log and stored in the catalog.
For example:
.catalog
└── seasoned-perlman-1355
└── hello.execution.log
"""

from runnable import Pipeline, ShellTask


def main():
# If this step executes successfully, the pipeline will terminate with success
hello_task = ShellTask(
name="hello",
command="echo 'Hello World!'",
terminate_with_success=True,
)

# The pipeline has only one step.
pipeline = Pipeline(steps=[hello_task])

pipeline.execute()

return pipeline


if __name__ == "__main__":
main()
27 changes: 27 additions & 0 deletions examples/01-tasks/scripts.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
dag:
description: |
This is a sample pipeline with one step that executes a shell command.
You can run this pipeline by:
runnable execute -f examples/01-tasks/scripts.yaml
The command can be anything that can be executed in a shell.
The stdout/stderr of the execution is captured as execution log and
stored in the catalog.
For example:
.catalog
└── seasoned-perlman-1355
└── hello.execution.log
start_at: notebook
steps:
notebook:
type: task
command_type: shell
command: echo "hello world!!" # The path is relative to the root of the project.
next: success
success:
type: success
fail:
type: fail
13 changes: 9 additions & 4 deletions examples/mocking.py → examples/01-tasks/stub.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,27 @@
to mock steps within mature pipelines.
You can run this pipeline by:
python examples/mocking.py
python examples/01-tasks/stub.py
"""

from runnable import Pipeline, Stub


def main():
step1 = Stub(name="step1") # (1)
# this will always succeed
step1 = Stub(name="step1")

# It takes arbitrary arguments
# Useful for temporarily silencing steps within mature pipelines
step2 = Stub(name="step2", what="is this thing")

step3 = Stub(name="step3", terminate_with_success=True) # (3)
step3 = Stub(name="step3", terminate_with_success=True)

pipeline = Pipeline(steps=[step1, step2, step3], add_terminal_nodes=True) # (4)
pipeline = Pipeline(steps=[step1, step2, step3], add_terminal_nodes=True)

pipeline.execute()

# A function that creates pipeline should always return a Pipeline object
return pipeline


Expand Down
6 changes: 3 additions & 3 deletions examples/mocking.yaml → examples/01-tasks/stub.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,15 @@ dag:
to mock steps within mature pipelines.
You can run this pipeline by:
runnable execute -f examples/mocking.yaml
runnable execute -f examples/01-tasks/stub.yaml
start_at: step 1
steps:
step 1:
type: stub
type: stub # This will always succeed
next: step 2
step 2:
type: stub
what: is this thing?
what: is this thing? # It takes arbitrary keys
It: does not matter!!
next: step 3
step 3:
Expand Down
33 changes: 33 additions & 0 deletions examples/02-sequential/default_fail.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
"""
When defining a Pipeline(), it automatically adds a success node and failure node.
By default any failure in a step is considered to be a failure in the pipeline.
In the below example, the progression would be as follows:
step 1 >> step 2 >> fail
You can run this example by: python examples/02-sequential/default_fail.py
"""

from examples.common.functions import raise_ex
from runnable import Pipeline, PythonTask, Stub


def main():
step1 = Stub(name="step 1")

step2 = PythonTask(name="step 2", function=raise_ex) # This will fail

step3 = Stub(name="step 3", terminate_with_success=True) # This step will not be executed

pipeline = Pipeline(steps=[step1, step2, step3])

pipeline.execute()

return pipeline


if __name__ == "__main__":
main()
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,19 @@ dag:
The default behavior is to traverse to step type fail and mark the run as failed.
You can control the flow by using on_failure, please check example/on-failure.yaml
You can run this pipeline by runnable execute -f examples/default-fail.yaml
You can run this pipeline by: runnable execute -f examples/02-sequential/default_fail.yaml
start_at: step 1
steps:
step 1:
type: stub
next: step 2
step 2:
type: task
command_type: shell
command: exit 1 # This will fail
command_type: python
command: examples.common.functions.raise_ex # This will fail
next: step 3
step 3:
type: stub
type: stub # This will never execute
next: success
success:
type: success
Expand Down
41 changes: 41 additions & 0 deletions examples/02-sequential/on_failure_fail.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
"""
This pipeline showcases handling failures in a pipeline.
The path taken if none of the steps failed:
step_1 -> step_2 -> step_3 -> success
step_1 is a python function that raises an exception.
And we can instruct the pipeline to execute step_4 if step_1 fails
and then eventually fail.
step_1 -> step_4 -> fail
This pattern is handy when you need to do something before eventually
failing (eg: sending a notification, updating status, etc...)
Run this pipeline as: python examples/02-sequential/on_failure_fail.py
"""

from examples.common.functions import raise_ex
from runnable import Pipeline, PythonTask, Stub


def main():
step_1 = PythonTask(name="step 1", function=raise_ex) # This will fail

step_2 = Stub(name="step 2")

step_3 = Stub(name="step 3", terminate_with_success=True)
step_4 = Stub(name="step 4", terminate_with_failure=True)

step_1.on_failure = step_4.name

pipeline = Pipeline(
steps=[step_1, step_2, step_3, [step_4]],
)
pipeline.execute()

return pipeline


if __name__ == "__main__":
main()
Loading

0 comments on commit 4033b2b

Please sign in to comment.