diff --git a/examples/01-tasks/notebook.py b/examples/01-tasks/notebook.py new file mode 100644 index 00000000..68447032 --- /dev/null +++ b/examples/01-tasks/notebook.py @@ -0,0 +1,44 @@ +""" +You can execute this pipeline by: + + python examples/01-tasks/notebook.py + +The notebook is executed in the same environment so any installed packages are available for the +notebook. + +Upon successful execution, the output notebook with cell outputs is stored in the catalog. +For example, the catalog structure for this execution would be: + +.catalog +└── meek-rosalind-0853 + ├── examples + │   └── common + │   └── simple_notebook_out.ipynb + └── notebook.execution.log + +The notebook simple_notebook_out.ipynb has the captured stdout of "Hello World!". +""" + +from runnable import NotebookTask, Pipeline + + +def main(): + # Execute the notebook present in examples/common/simple_notebook.ipynb. + # The path is relative to the project root. + # If this step executes successfully, the pipeline will terminate with success + hello_task = NotebookTask( + name="hello", + notebook="examples/common/simple_notebook.ipynb", + terminate_with_success=True, + ) + + # The pipeline has only one step. + pipeline = Pipeline(steps=[hello_task]) + + pipeline.execute() + + return pipeline + + +if __name__ == "__main__": + main() diff --git a/examples/01-tasks/notebook.yaml b/examples/01-tasks/notebook.yaml new file mode 100644 index 00000000..db79591a --- /dev/null +++ b/examples/01-tasks/notebook.yaml @@ -0,0 +1,35 @@ +dag: + description: | + This is a sample pipeline with one step that executes a notebook. + + The notebook is executed in the same environment so any installed + packages are available for the notebook. + + Upon successful execution, the output notebook with cell outputs + is stored in the catalog. + + For example, the catalog structure for this execution would be: + + .catalog + └── meek-rosalind-0853 + ├── examples + │   └── common + │   └── simple_notebook_out.ipynb + └── notebook.execution.log + + The notebook simple_notebook_out.ipynb has the captured stdout of "Hello World!". + + You can run this pipeline as: + runnable execute -f examples/01-tasks/notebook.yaml + + start_at: notebook + steps: + notebook: + type: task + command_type: notebook + command: examples/common/simple_notebook.ipynb # The path is relative to the root of the project. + next: success + success: + type: success + fail: + type: fail diff --git a/examples/01-tasks/python_tasks.py b/examples/01-tasks/python_tasks.py new file mode 100644 index 00000000..5a493c5a --- /dev/null +++ b/examples/01-tasks/python_tasks.py @@ -0,0 +1,41 @@ +""" +You can execute this pipeline by: + + python examples/01-tasks/python_tasks.py + +The stdout of "Hello World!" would be captured as execution log and stored in the catalog. +An example of the catalog structure: + +.catalog +└── baked-heyrovsky-0602 + └── hello.execution.log + +2 directories, 1 file + + +The hello.execution.log has the captured stdout of "Hello World!". +""" + +from examples.common.functions import hello +from runnable import Pipeline, PythonTask + + +def main(): + # Create a tasks which calls the function "hello" + # If this step executes successfully, the pipeline will terminate with success + hello_task = PythonTask( + name="hello", + function=hello, + terminate_with_success=True, + ) + + # The pipeline has only one step. + pipeline = Pipeline(steps=[hello_task]) + + pipeline.execute() + + return pipeline + + +if __name__ == "__main__": + main() diff --git a/examples/01-tasks/python_tasks.yaml b/examples/01-tasks/python_tasks.yaml new file mode 100644 index 00000000..1a86719c --- /dev/null +++ b/examples/01-tasks/python_tasks.yaml @@ -0,0 +1,27 @@ +dag: + description: | + You can run this pipeline by: + runnable execute -f examples/01-tasks/python_tasks.yaml + + The stdout of "Hello World!" would be captured as + execution log and stored in the catalog. + + An example of the catalog structure: + + .catalog + └── baked-heyrovsky-0602 + └── hello.execution.log + + 2 directories, 1 file + + The hello.execution.log has the captured stdout of "Hello World!". + start_at: hello_task + steps: + hello_task: + type: task # The functional unit of the pipeline which does the work. + command: examples.common.functions.hello # dotted path to the function. + next: success # If this function succeeds, mark the pipeline as success + success: + type: success + fail: + type: fail diff --git a/examples/01-tasks/scripts.py b/examples/01-tasks/scripts.py new file mode 100644 index 00000000..528a0fc4 --- /dev/null +++ b/examples/01-tasks/scripts.py @@ -0,0 +1,37 @@ +""" +You can execute this pipeline by: + + python examples/01-tasks/scripts.py + +The command can be anything that can be executed in a shell. +The stdout/stderr of the execution is captured as execution log and stored in the catalog. + +For example: + +.catalog +└── seasoned-perlman-1355 + └── hello.execution.log + +""" + +from runnable import Pipeline, ShellTask + + +def main(): + # If this step executes successfully, the pipeline will terminate with success + hello_task = ShellTask( + name="hello", + command="echo 'Hello World!'", + terminate_with_success=True, + ) + + # The pipeline has only one step. + pipeline = Pipeline(steps=[hello_task]) + + pipeline.execute() + + return pipeline + + +if __name__ == "__main__": + main() diff --git a/examples/01-tasks/scripts.yaml b/examples/01-tasks/scripts.yaml new file mode 100644 index 00000000..ee1687e3 --- /dev/null +++ b/examples/01-tasks/scripts.yaml @@ -0,0 +1,27 @@ +dag: + description: | + This is a sample pipeline with one step that executes a shell command. + + You can run this pipeline by: + runnable execute -f examples/01-tasks/scripts.yaml + + The command can be anything that can be executed in a shell. + The stdout/stderr of the execution is captured as execution log and + stored in the catalog. + + For example: + .catalog + └── seasoned-perlman-1355 + └── hello.execution.log + + start_at: notebook + steps: + notebook: + type: task + command_type: shell + command: echo "hello world!!" # The path is relative to the root of the project. + next: success + success: + type: success + fail: + type: fail diff --git a/examples/mocking.py b/examples/01-tasks/stub.py similarity index 65% rename from examples/mocking.py rename to examples/01-tasks/stub.py index a7ae7d11..ea3ac68c 100644 --- a/examples/mocking.py +++ b/examples/01-tasks/stub.py @@ -10,22 +10,27 @@ to mock steps within mature pipelines. You can run this pipeline by: - python examples/mocking.py + python examples/01-tasks/stub.py """ from runnable import Pipeline, Stub def main(): - step1 = Stub(name="step1") # (1) + # this will always succeed + step1 = Stub(name="step1") + + # It takes arbitrary arguments + # Useful for temporarily silencing steps within mature pipelines step2 = Stub(name="step2", what="is this thing") - step3 = Stub(name="step3", terminate_with_success=True) # (3) + step3 = Stub(name="step3", terminate_with_success=True) - pipeline = Pipeline(steps=[step1, step2, step3], add_terminal_nodes=True) # (4) + pipeline = Pipeline(steps=[step1, step2, step3], add_terminal_nodes=True) pipeline.execute() + # A function that creates pipeline should always return a Pipeline object return pipeline diff --git a/examples/mocking.yaml b/examples/01-tasks/stub.yaml similarity index 81% rename from examples/mocking.yaml rename to examples/01-tasks/stub.yaml index d4921f2d..788828fe 100644 --- a/examples/mocking.yaml +++ b/examples/01-tasks/stub.yaml @@ -11,15 +11,15 @@ dag: to mock steps within mature pipelines. You can run this pipeline by: - runnable execute -f examples/mocking.yaml + runnable execute -f examples/01-tasks/stub.yaml start_at: step 1 steps: step 1: - type: stub + type: stub # This will always succeed next: step 2 step 2: type: stub - what: is this thing? + what: is this thing? # It takes arbitrary keys It: does not matter!! next: step 3 step 3: diff --git a/examples/02-sequential/default_fail.py b/examples/02-sequential/default_fail.py new file mode 100644 index 00000000..92a4e578 --- /dev/null +++ b/examples/02-sequential/default_fail.py @@ -0,0 +1,33 @@ +""" +When defining a Pipeline(), it automatically adds a success node and failure node. + +By default any failure in a step is considered to be a failure in the pipeline. + +In the below example, the progression would be as follows: + + step 1 >> step 2 >> fail + + +You can run this example by: python examples/02-sequential/default_fail.py +""" + +from examples.common.functions import raise_ex +from runnable import Pipeline, PythonTask, Stub + + +def main(): + step1 = Stub(name="step 1") + + step2 = PythonTask(name="step 2", function=raise_ex) # This will fail + + step3 = Stub(name="step 3", terminate_with_success=True) # This step will not be executed + + pipeline = Pipeline(steps=[step1, step2, step3]) + + pipeline.execute() + + return pipeline + + +if __name__ == "__main__": + main() diff --git a/examples/default-fail.yaml b/examples/02-sequential/default_fail.yaml similarity index 63% rename from examples/default-fail.yaml rename to examples/02-sequential/default_fail.yaml index 7dde5464..f8e423f9 100644 --- a/examples/default-fail.yaml +++ b/examples/02-sequential/default_fail.yaml @@ -6,9 +6,7 @@ dag: The default behavior is to traverse to step type fail and mark the run as failed. - You can control the flow by using on_failure, please check example/on-failure.yaml - - You can run this pipeline by runnable execute -f examples/default-fail.yaml + You can run this pipeline by: runnable execute -f examples/02-sequential/default_fail.yaml start_at: step 1 steps: step 1: @@ -16,11 +14,11 @@ dag: next: step 2 step 2: type: task - command_type: shell - command: exit 1 # This will fail + command_type: python + command: examples.common.functions.raise_ex # This will fail next: step 3 step 3: - type: stub + type: stub # This will never execute next: success success: type: success diff --git a/examples/02-sequential/on_failure_fail.py b/examples/02-sequential/on_failure_fail.py new file mode 100644 index 00000000..cb4fa6cd --- /dev/null +++ b/examples/02-sequential/on_failure_fail.py @@ -0,0 +1,41 @@ +""" +This pipeline showcases handling failures in a pipeline. + +The path taken if none of the steps failed: +step_1 -> step_2 -> step_3 -> success + +step_1 is a python function that raises an exception. +And we can instruct the pipeline to execute step_4 if step_1 fails +and then eventually fail. +step_1 -> step_4 -> fail + +This pattern is handy when you need to do something before eventually +failing (eg: sending a notification, updating status, etc...) + +Run this pipeline as: python examples/02-sequential/on_failure_fail.py +""" + +from examples.common.functions import raise_ex +from runnable import Pipeline, PythonTask, Stub + + +def main(): + step_1 = PythonTask(name="step 1", function=raise_ex) # This will fail + + step_2 = Stub(name="step 2") + + step_3 = Stub(name="step 3", terminate_with_success=True) + step_4 = Stub(name="step 4", terminate_with_failure=True) + + step_1.on_failure = step_4.name + + pipeline = Pipeline( + steps=[step_1, step_2, step_3, [step_4]], + ) + pipeline.execute() + + return pipeline + + +if __name__ == "__main__": + main() diff --git a/examples/02-sequential/on_failure_fail.yaml b/examples/02-sequential/on_failure_fail.yaml new file mode 100644 index 00000000..6e94242a --- /dev/null +++ b/examples/02-sequential/on_failure_fail.yaml @@ -0,0 +1,35 @@ +dag: + description: | + This pipeline showcases handling failures in a pipeline. + + The path taken if none of the steps failed: + step_1 -> step_2 -> step_3 -> success + + step_1 is a python function that raises an exception. + And we can instruct the pipeline to execute step_4 if step_1 fails + and then eventually fail. + step_1 -> step_4 -> fail + + This pattern is handy when you need to do something before eventually + failing (eg: sending a notification, updating status, etc...) + start_at: step_1 + steps: + step_1: + type: task + command_type: shell + command: exit 1 # This will fail! + next: step_2 + on_failure: step_4 + step_2: + type: stub + next: step_3 + step_3: + type: stub + next: success + step_4: + type: stub + next: fail + success: + type: success + fail: + type: fail diff --git a/examples/02-sequential/on_failure_succeed.py b/examples/02-sequential/on_failure_succeed.py new file mode 100644 index 00000000..6015bd01 --- /dev/null +++ b/examples/02-sequential/on_failure_succeed.py @@ -0,0 +1,41 @@ +""" +This pipeline showcases handling failures in a pipeline. + +The path taken if none of the steps failed: +step_1 -> step_2 -> step_3 -> success + +step_1 is a python function that raises an exception. +And we can instruct the pipeline to execute step_4 if step_1 fails +and then eventually succeed too. +step_1 -> step_4 -> success + +This pattern is handy when you are expecting a failure of a step +and have ways to handle it. + +Run this pipeline: python examples/02-sequential/on_failure_succeed.py +""" + +from examples.common.functions import raise_ex +from runnable import Pipeline, PythonTask, Stub + + +def main(): + step_1 = PythonTask(name="step 1", function=raise_ex) # This will fail + + step_2 = Stub(name="step 2") + + step_3 = Stub(name="step 3", terminate_with_success=True) + step_4 = Stub(name="step 4", terminate_with_success=True) + + step_1.on_failure = step_4.name + + pipeline = Pipeline( + steps=[step_1, step_2, step_3, [step_4]], + ) + pipeline.execute() + + return pipeline + + +if __name__ == "__main__": + main() diff --git a/examples/02-sequential/on_failure_succeed.yaml b/examples/02-sequential/on_failure_succeed.yaml new file mode 100644 index 00000000..3977e175 --- /dev/null +++ b/examples/02-sequential/on_failure_succeed.yaml @@ -0,0 +1,38 @@ +dag: + description: | + This pipeline showcases handling failures in a pipeline. + + The path taken if none of the steps failed: + step_1 -> step_2 -> step_3 -> success + + step_1 is a python function that raises an exception. + And we can instruct the pipeline to execute step_4 if step_1 fails + and then eventually fail. + step_1 -> step_4 -> success + + This pattern is handy when you are expecting a failure of a step + and have ways to handle it. + + Run this pipeline as: + runnable execute -f examples/02-sequential/on_failure_succeed.yaml + start_at: step_1 + steps: + step_1: + type: task + command_type: shell + command: exit 1 # This will fail! + next: step_2 + on_failure: step_4 + step_2: + type: stub + next: step_3 + step_3: + type: stub + next: success + step_4: + type: stub + next: fail + success: + type: success + fail: + type: fail diff --git a/examples/02-sequential/traversal.py b/examples/02-sequential/traversal.py new file mode 100644 index 00000000..bcb35254 --- /dev/null +++ b/examples/02-sequential/traversal.py @@ -0,0 +1,58 @@ +""" +You can execute this pipeline by: + + python examples/02-sequential/traversal.py + + A pipeline can have any "tasks" as part of it. In the + below example, we have a mix of stub, python, shell and notebook tasks. + + As with simpler tasks, the stdout and stderr of each task are captured + and stored in the catalog. + + .catalog + └── cold-jennings-1534 + ├── examples + │   └── common + │   └── simple_notebook_out.ipynb + ├── hello_notebook.execution.log + ├── hello_python.execution.log + └── hello_shell.execution.log + + 4 directories, 4 files + +""" + +from examples.common.functions import hello +from runnable import NotebookTask, Pipeline, PythonTask, ShellTask, Stub + + +def main(): + stub_task = Stub(name="hello stub") + + python_task = PythonTask( + name="hello python", + function=hello, + ) + + shell_task = ShellTask( + name="hello shell", + command="echo 'Hello World!'", + ) + + notebook_task = NotebookTask( + name="hello notebook", + notebook="examples/common/simple_notebook.ipynb", + terminate_with_success=True, + ) + + # The pipeline has a mix of tasks. + # The order of execution follows the order of the tasks in the list. + pipeline = Pipeline(steps=[stub_task, python_task, shell_task, notebook_task]) + + pipeline.execute() + + return pipeline + + +if __name__ == "__main__": + main() diff --git a/examples/02-sequential/traversal.yaml b/examples/02-sequential/traversal.yaml new file mode 100644 index 00000000..88ed3632 --- /dev/null +++ b/examples/02-sequential/traversal.yaml @@ -0,0 +1,50 @@ +dag: + description: | + A pipeline can have any "tasks" as part of it. In the + below example, we have a mix of stub, python, shell and notebook tasks. + + As with simpler tasks, the stdout and stderr of each task are captured + and stored in the catalog. + + For example, the catalog structure for this execution would be: + + .catalog + └── cold-jennings-1534 + ├── examples + │   └── common + │   └── simple_notebook_out.ipynb + ├── hello_notebook.execution.log + ├── hello_python.execution.log + └── hello_shell.execution.log + + 4 directories, 4 files + + The notebook simple_notebook_out.ipynb has the captured stdout of "Hello World!". + + You can run this pipeline as: + runnable execute -f examples/02-sequential/traversal.yaml + + start_at: hello stub + steps: + hello stub: + type: stub + next: hello python + hello python: + type: task + command_type: python + command: examples.common.functions.hello # dotted path to the function. + next: hello shell + hello shell: + type: task + command_type: shell + command: echo "Hello World!" # Command to run + next: hello notebook + hello notebook: + type: task + command_type: notebook + command: examples/common/simple_notebook.ipynb # The path is relative to the root of the project. + next: success + success: + type: success + fail: + type: fail diff --git a/examples/03-parameters/passing_parameters_notebook.py b/examples/03-parameters/passing_parameters_notebook.py new file mode 100644 index 00000000..ecb00783 --- /dev/null +++ b/examples/03-parameters/passing_parameters_notebook.py @@ -0,0 +1,40 @@ +from examples.common.functions import read_parameter +from runnable import NotebookTask, Pipeline, PythonTask, metric, pickled + + +def main(): + write_parameters_from_notebook = NotebookTask( + notebook="examples/common/write_parameters.ipynb", + returns=[ + pickled("df"), + "integer", + "floater", + "stringer", + "pydantic_param", + metric("score"), + ], + name="set_parameter", + ) + + read_parameters = PythonTask( + function=read_parameter, + name="get_parameters", + ) + + read_parameters_in_notebook = NotebookTask( + notebook="examples/common/read_parameters.ipynb", + terminate_with_success=True, + name="read_parameters_in_notebook", + ) + + pipeline = Pipeline( + steps=[write_parameters_from_notebook, read_parameters, read_parameters_in_notebook], + ) + + _ = pipeline.execute() + + return pipeline + + +if __name__ == "__main__": + main() diff --git a/examples/03-parameters/passing_parameters_notebook.yaml b/examples/03-parameters/passing_parameters_notebook.yaml new file mode 100644 index 00000000..e69de29b diff --git a/examples/03-parameters/passing_parameters_python.py b/examples/03-parameters/passing_parameters_python.py new file mode 100644 index 00000000..baf9c1e5 --- /dev/null +++ b/examples/03-parameters/passing_parameters_python.py @@ -0,0 +1,52 @@ +""" +The below example shows how to set/get parameters in python +tasks of the pipeline. + +The function, set_parameter, returns + - simple python data types (int, float, str) + - pydantic models + - pandas dataframe, any "object" type + +pydantic models are implicitly handled by runnable +but "object" types should be marked as "pickled". + +Use pickled even for python data types is advised for +reasonably large collections. + +""" + +from examples.common.functions import read_parameter, write_parameter +from runnable import Pipeline, PythonTask, metric, pickled + + +def main(): + write_parameters = PythonTask( + function=write_parameter, + returns=[ + pickled("df"), + "integer", + "floater", + "stringer", + "pydantic_param", + metric("score"), + ], + name="set_parameter", + ) + + read_parameters = PythonTask( + function=read_parameter, + terminate_with_success=True, + name="get_parameters", + ) + + pipeline = Pipeline( + steps=[write_parameters, read_parameters], + ) + + _ = pipeline.execute() + + return pipeline + + +if __name__ == "__main__": + main() diff --git a/examples/03-parameters/passing_parameters_python.yaml b/examples/03-parameters/passing_parameters_python.yaml new file mode 100644 index 00000000..e69de29b diff --git a/examples/03-parameters/passing_parameters_shell.py b/examples/03-parameters/passing_parameters_shell.py new file mode 100644 index 00000000..3ef14bc2 --- /dev/null +++ b/examples/03-parameters/passing_parameters_shell.py @@ -0,0 +1,41 @@ +from examples.common.functions import read_unpickled_parameter +from runnable import Pipeline, PythonTask, ShellTask, metric + + +def main(): + export_env_command = """ + export integer=1 + export floater=3.14 + export stringer="hello" + export pydantic_param='{"x": 10, "foo": "bar"}' + export score=0.9 + """ + write_parameters_in_shell = ShellTask( + command=export_env_command, + returns=[ + "integer", + "floater", + "stringer", + "pydantic_param", + metric("score"), + ], + name="write_parameter", + ) + + read_parameters = PythonTask( + function=read_unpickled_parameter, + name="read_parameters", + terminate_with_success=True, + ) + + pipeline = Pipeline( + steps=[write_parameters_in_shell, read_parameters], + ) + + _ = pipeline.execute() + + return pipeline + + +if __name__ == "__main__": + main() diff --git a/examples/03-parameters/passing_parameters_shell.yaml b/examples/03-parameters/passing_parameters_shell.yaml new file mode 100644 index 00000000..e69de29b diff --git a/examples/03-parameters/static_parameters_non_python.py b/examples/03-parameters/static_parameters_non_python.py new file mode 100644 index 00000000..41eae659 --- /dev/null +++ b/examples/03-parameters/static_parameters_non_python.py @@ -0,0 +1,55 @@ +""" +The below example showcases setting up known initial parameters for a pipeline +of notebook and shell based commands. + +The initial parameters as defined in the yaml file are: + integer: 1 + floater : 3.14 + stringer : hello + pydantic_param: + x: 10 + foo: bar + +runnable exposes the nested parameters as dictionary for notebook based tasks +as a json string for the shell based tasks. + +""" + +from runnable import NotebookTask, Pipeline, ShellTask + + +def main(): + read_params_in_notebook = NotebookTask( + name="read_params_in_notebook", + notebook="examples/common/read_parameters.ipynb", + ) + + shell_command = """ + if [ "$integer" = 1 ] \ + && [ "$floater" = 3.14 ] \ + && [ "$stringer" = "hello" ] \ + && [ "$pydantic_param" = '{"x": 10, "foo": "bar"}' ]; then + echo "yaay" + exit 0; + else + echo "naay" + exit 1; + fi + """ + read_params_in_shell = ShellTask( + name="read_params_in_shell", + command=shell_command, + terminate_with_success=True, + ) + + pipeline = Pipeline( + steps=[read_params_in_notebook, read_params_in_shell], + ) + + _ = pipeline.execute(parameters_file="examples/common/initial_parameters.yaml") + + return pipeline + + +if __name__ == "__main__": + main() diff --git a/examples/03-parameters/static_parameters_non_python.yaml b/examples/03-parameters/static_parameters_non_python.yaml new file mode 100644 index 00000000..ba581fa8 --- /dev/null +++ b/examples/03-parameters/static_parameters_non_python.yaml @@ -0,0 +1,41 @@ +dag: + description: | + The below example showcases setting up known initial parameters for a pipeline + of notebook and shell based commands. + + The initial parameters as defined in the yaml file are: + integer: 1 + floater : 3.14 + stringer : hello + pydantic_param: + x: 10 + foo: bar + + runnable exposes the nested parameters as dictionary for notebook based tasks + as a json string for the shell based tasks. + start_at: read_params_in_notebook + steps: + read_params_in_notebook: + type: task + command_type: notebook + command: examples/common/read_parameters.ipynb + next: read_params_in_shell + read_params_in_shell: + type: task + command_type: shell + command: | + if [ "$integer" = 1 ] \ + && [ "$floater" = 3.14 ] \ + && [ "$stringer" = "hello" ] \ + && [ "$pydantic_param" = '{"x": 10, "foo": "bar"}' ]; then + echo "yaay" + exit 0; + else + echo "naay" + exit 1; + fi + next: success + success: + type: success + fail: + type: fail diff --git a/examples/03-parameters/static_parameters_python.py b/examples/03-parameters/static_parameters_python.py new file mode 100644 index 00000000..7c82ea3a --- /dev/null +++ b/examples/03-parameters/static_parameters_python.py @@ -0,0 +1,49 @@ +""" +The below example showcases setting up known initial parameters for a pipeline +of only python tasks + +The initial parameters as defined in the yaml file are: + simple: 1 + complex_param: + x: 10 + y: "hello world!!" + +runnable allows using pydantic models for deeply nested parameters and +casts appropriately based on annotation. eg: read_initial_params_as_pydantic + +If no annotation is provided, the parameter is assumed to be a dictionary. +eg: read_initial_params_as_json + +""" + +from examples.common.functions import ( + read_initial_params_as_json, + read_initial_params_as_pydantic, +) +from runnable import Pipeline, PythonTask + + +def main(): + read_params_as_pydantic = PythonTask( + function=read_initial_params_as_pydantic, + name="read_params_as_pydantic", + ) + + read_params_as_json = PythonTask( + function=read_initial_params_as_json, + terminate_with_success=True, + name="read_params_json", + ) + + pipeline = Pipeline( + steps=[read_params_as_pydantic, read_params_as_json], + add_terminal_nodes=True, + ) + + _ = pipeline.execute(parameters_file="examples/common/initial_parameters.yaml") + + return pipeline + + +if __name__ == "__main__": + main() diff --git a/examples/03-parameters/static_parameters_python.yaml b/examples/03-parameters/static_parameters_python.yaml new file mode 100644 index 00000000..ea0b8b7a --- /dev/null +++ b/examples/03-parameters/static_parameters_python.yaml @@ -0,0 +1,30 @@ +dag: + description: | + The below example showcases setting up known initial parameters for a pipeline + of only python tasks + + The initial parameters as defined in the yaml file are: + simple: 1 + complex_param: + x: 10 + y: "hello world!!" + + runnable allows using pydantic models for deeply nested parameters and + casts appropriately based on annotation. eg: read_initial_params_as_pydantic + + If no annotation is provided, the parameter is assumed to be a dictionary. + eg: read_initial_params_as_json + start_at: read_params_as_pydantic + steps: + read_params_as_pydantic: + type: task + command: examples.common.functions.read_initial_params_as_pydantic + next: read_params_json + read_params_json: + type: task + command: examples.common.functions.read_initial_params_as_json + next: success + success: + type: success + fail: + type: fail diff --git a/examples/README.md b/examples/README.md new file mode 100644 index 00000000..a0eb415b --- /dev/null +++ b/examples/README.md @@ -0,0 +1,52 @@ +Examples in this section are ordered from simple to advanced. +All examples have both python SDK and yaml representations. + +Please use this as an index to find specific example. + + +- common: Has python functions/notebooks/scripts that are used across the examples + +- 01-tasks: Examples of the tasks that can be part of the pipeline. + + - [stub.py](./01-tasks/stub.py), [stub.yaml](./01-tasks/stub.yaml): demonstrates the concept of a stub. + + - [python_tasks.py](./01-tasks/python_tasks.py), [python_tasks.yaml](./01-tasks/python_tasks.yaml): uses python functions as tasks. + The stdout/stderr of all the tasks are captured and stored in the catalog. + - [notebook.py](./01-tasks/notebook.py), [notebook.yaml](./01-tasks/notebook.yaml): uses notebooks as tasks + The executed notebook is captured in the catalog. + - [scripts.py](./01-tasks/scripts.py), [scripts.yaml](./01-tasks/scripts.yaml): uses shell scripts as tasks + The stdout/stderr of all scripts are captured and stored in the catalog. + + +The above examples showcase executable units of the pipeline. +The next section has examples on stitching these tasks together for complex operations. + +- 02-sequential: Examples of stitching tasks together including behavior in case of failures. + + - [traversal.py](./02-sequential/traversal.py), [traversal.yaml](./02-sequential/traversal.yaml): A pipeline which is a mixed bag of notebooks, python functions and + shell scripts. + - [default_fail.py](./02-sequential/default_fail.py), [default_fail.yaml](./02-sequential/default_fail.yaml): The default failure behavior. + - [on_failure_fail](./02-sequential/on_failure_fail.py), [on_faliure_fail.yaml](./02-sequential/on_failure_fail.yaml) On failure of a step, do some action and fail + - [on_failure_success.py](./02-sequential/on_failure_succeed.py), [on_failure_success.yaml](./02-sequential/on_failure_succeed.yaml): On failure of a step, take a different route + + +The above examples show stitching complex operations of the pipeline. +The next section has examples on communicating between tasks during execution. + +- 03: Examples of passing parameters between tasks of a pipeline. + + Guidelines: + + - python functions can get/set simple python data types, pydantic models, objects marked as pickled. Some of the + simple data types can also be marked as a metric. + - + + + - [static_parameters_python.py](./03-parameters/static_parameters_python.py), [static_parameters_python.yaml](./03-parameters/static_parameters_python.yaml): A pipeline to show the access of static or known parameters by python tasks. + + - [static_parameters_non_python.py](./03-parameters/static_parameters_non_python.py), [static_parameters_non_python.yaml](./03-parameters/static_parameters_non_python.yaml): A pipeline to show the access of static or known parameters by python tasks. + + - [passing_parameters_python.py](./03-parameters/passing_parameters_python.py), [passing_parameters_python.yaml](./03-parameters/passing_parameters_python.yaml): shows the mechanism of passing parameters (simple python datatypes, "dillable" objects, pydantic models) and registering metrics between python tasks. + + - [passing_parameters_notebook.py](./03-parameters/passing_parameters_notebook.py), [passing_parameters_notebook.yaml](./03-parameters/passing_parameters_notebook.yaml): shows the mechanism of passing parameters (simple python datatypes, "dillable" objects, pydantic models) and registering metrics between tasks. runnable can "get" object + parameters from notebooks but cannot inject them into notebooks. diff --git a/examples/common/functions.py b/examples/common/functions.py new file mode 100644 index 00000000..36eb3d9a --- /dev/null +++ b/examples/common/functions.py @@ -0,0 +1,89 @@ +from typing import Dict, Union + +import pandas as pd +from pydantic import BaseModel + + +def hello(): + "The most basic function" + print("Hello World!") + + +def raise_ex(): + "A function that raises an exception" + raise Exception("This is an exception") + + +class ComplexParams(BaseModel): + x: int + foo: str + + +def read_initial_params_as_pydantic( + integer: int, + floater: float, + stringer: str, + pydantic_param: ComplexParams, +): + assert integer == 1 + assert floater == 3.14 + assert stringer == "hello" + assert pydantic_param.x == 10 + assert pydantic_param.foo == "bar" + + +def read_initial_params_as_json( + integer: int, + floater: float, + stringer: str, + pydantic_param: Dict[str, Union[int, str]], +): + assert integer == 1 + assert floater == 3.14 + assert stringer == "hello" + assert pydantic_param["x"] == 10 + assert pydantic_param["foo"] == "bar" + + +def write_parameter(): + integer = 1 + floater = 3.14 + c = ComplexParams(x=10, foo="bar") + data = {"calories": [420, 380, 390], "duration": [50, 40, 45]} + + df = pd.DataFrame(data) + score = 0.9 + + return df, integer, floater, "hello", c, score + + +def read_parameter( + df: pd.DataFrame, + integer: int, + floater: float, + stringer: str, + pydantic_param: ComplexParams, + score: float, +): + assert integer == 1 + assert floater == 3.14 + assert stringer == "hello" + assert pydantic_param.x == 10 + assert pydantic_param.foo == "bar" + assert df.shape == (3, 2) + assert score == 0.9 + + +def read_unpickled_parameter( + integer: int, + floater: float, + stringer: str, + pydantic_param: ComplexParams, + score: float, +): + assert integer == 1 + assert floater == 3.14 + assert stringer == "hello" + assert pydantic_param.x == 10 + assert pydantic_param.foo == "bar" + assert score == 0.9 diff --git a/examples/common/initial_parameters.yaml b/examples/common/initial_parameters.yaml new file mode 100644 index 00000000..eb987ed4 --- /dev/null +++ b/examples/common/initial_parameters.yaml @@ -0,0 +1,6 @@ +integer: 1 +floater : 3.14 +stringer : hello +pydantic_param: + x: 10 + foo: bar diff --git a/examples/common/read_parameters.ipynb b/examples/common/read_parameters.ipynb new file mode 100644 index 00000000..b8935014 --- /dev/null +++ b/examples/common/read_parameters.ipynb @@ -0,0 +1,69 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": null, + "id": "41a71aa7", + "metadata": { + "tags": [ + "parameters" + ] + }, + "outputs": [], + "source": [ + "integer = None\n", + "stringer = None\n", + "floater = None\n", + "pydantic_param = None\n", + "score = None" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "3e98e89e-765c-42d4-81ea-c371c2eab14d", + "metadata": {}, + "outputs": [], + "source": [ + "assert integer == 1\n", + "assert stringer == \"hello\"\n", + "assert floater == 3.14" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "faf6769e", + "metadata": {}, + "outputs": [], + "source": [ + "from examples.common.functions import ComplexParams\n", + "\n", + "pydantic_param = ComplexParams(**pydantic_param)\n", + "assert pydantic_param.x == 10\n", + "assert pydantic_param.foo == \"bar\"" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.8.16" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/examples/common/read_parameters_out.ipynb b/examples/common/read_parameters_out.ipynb new file mode 100644 index 00000000..ea6525f8 --- /dev/null +++ b/examples/common/read_parameters_out.ipynb @@ -0,0 +1,105 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": 1, + "id": "41a71aa7", + "metadata": { + "ploomber": { + "timestamp_end": 1713380823.765499, + "timestamp_start": 1713380823.765069 + }, + "tags": [ + "parameters" + ] + }, + "outputs": [], + "source": [ + "integer = None\n", + "stringer = None\n", + "floater = None\n", + "pydantic_param = None\n", + "score = None" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "id": "d88d58c6", + "metadata": { + "ploomber": { + "timestamp_end": 1713380823.765846, + "timestamp_start": 1713380823.765527 + }, + "tags": [ + "injected-parameters" + ] + }, + "outputs": [], + "source": [ + "# Injected parameters\n", + "integer = 1\n", + "floater = 3.14\n", + "stringer = \"hello\"\n", + "pydantic_param = {\"x\": 10, \"foo\": \"bar\"}\n" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "id": "3e98e89e-765c-42d4-81ea-c371c2eab14d", + "metadata": { + "ploomber": { + "timestamp_end": 1713380823.766088, + "timestamp_start": 1713380823.765864 + } + }, + "outputs": [], + "source": [ + "assert integer == 1\n", + "assert stringer == \"hello\"\n", + "assert floater == 3.14" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "id": "faf6769e", + "metadata": { + "ploomber": { + "timestamp_end": 1713380823.766474, + "timestamp_start": 1713380823.766105 + } + }, + "outputs": [], + "source": [ + "from examples.common.functions import ComplexParams\n", + "\n", + "pydantic_param = ComplexParams(**pydantic_param)\n", + "assert pydantic_param.x == 10\n", + "assert pydantic_param.foo == \"bar\"" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.8.16" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/examples/common/simple_notebook.ipynb b/examples/common/simple_notebook.ipynb new file mode 100644 index 00000000..ced7632d --- /dev/null +++ b/examples/common/simple_notebook.ipynb @@ -0,0 +1,46 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": null, + "id": "3e98e89e-765c-42d4-81ea-c371c2eab14d", + "metadata": {}, + "outputs": [], + "source": [ + "def function():\n", + " print(\"Hello World!\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "8eac7a3f", + "metadata": {}, + "outputs": [], + "source": [ + "function()" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.8.16" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/examples/common/simple_notebook_out.ipynb b/examples/common/simple_notebook_out.ipynb new file mode 100644 index 00000000..91e156fe --- /dev/null +++ b/examples/common/simple_notebook_out.ipynb @@ -0,0 +1,82 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": 1, + "id": "c8a68d0d", + "metadata": { + "ploomber": { + "timestamp_end": 1713380822.228675, + "timestamp_start": 1713380822.228447 + }, + "tags": [ + "injected-parameters" + ] + }, + "outputs": [], + "source": [ + "# Injected parameters\n" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "id": "3e98e89e-765c-42d4-81ea-c371c2eab14d", + "metadata": { + "ploomber": { + "timestamp_end": 1713380822.22899, + "timestamp_start": 1713380822.228748 + } + }, + "outputs": [], + "source": [ + "def function():\n", + " print(\"Hello World!\")" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "id": "8eac7a3f", + "metadata": { + "ploomber": { + "timestamp_end": 1713380822.229158, + "timestamp_start": 1713380822.229008 + } + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Hello World!\n" + ] + } + ], + "source": [ + "function()" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.8.16" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/examples/common/write_parameters.ipynb b/examples/common/write_parameters.ipynb new file mode 100644 index 00000000..da016317 --- /dev/null +++ b/examples/common/write_parameters.ipynb @@ -0,0 +1,68 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": null, + "id": "41a71aa7", + "metadata": { + "tags": [ + "parameters" + ] + }, + "outputs": [], + "source": [ + "import pandas as pd\n", + "\n", + "from examples.common.functions import ComplexParams\n", + "\n", + "pydantic_param = ComplexParams(x=10, foo=\"bar\")\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "764f661d", + "metadata": {}, + "outputs": [], + "source": [ + "data = {\"calories\": [420, 380, 390], \"duration\": [50, 40, 45]}\n", + "\n", + "df = pd.DataFrame(data)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "3e98e89e-765c-42d4-81ea-c371c2eab14d", + "metadata": {}, + "outputs": [], + "source": [ + "integer = 1\n", + "floater = 3.14\n", + "stringer = \"hello\"\n", + "score = 0.9" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.8.16" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/examples/common/write_parameters_out.ipynb b/examples/common/write_parameters_out.ipynb new file mode 100644 index 00000000..e6c06d4a --- /dev/null +++ b/examples/common/write_parameters_out.ipynb @@ -0,0 +1,100 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": 1, + "id": "41a71aa7", + "metadata": { + "ploomber": { + "timestamp_end": 1713380822.509565, + "timestamp_start": 1713380822.508958 + }, + "tags": [ + "parameters" + ] + }, + "outputs": [], + "source": [ + "import pandas as pd\n", + "\n", + "from examples.common.functions import ComplexParams\n", + "\n", + "pydantic_param = ComplexParams(x=10, foo=\"bar\")\n" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "id": "d53507d8", + "metadata": { + "ploomber": { + "timestamp_end": 1713380822.509736, + "timestamp_start": 1713380822.509595 + }, + "tags": [ + "injected-parameters" + ] + }, + "outputs": [], + "source": [ + "# Injected parameters\n" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "id": "764f661d", + "metadata": { + "ploomber": { + "timestamp_end": 1713380822.511416, + "timestamp_start": 1713380822.509754 + } + }, + "outputs": [], + "source": [ + "data = {\"calories\": [420, 380, 390], \"duration\": [50, 40, 45]}\n", + "\n", + "df = pd.DataFrame(data)" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "id": "3e98e89e-765c-42d4-81ea-c371c2eab14d", + "metadata": { + "ploomber": { + "timestamp_end": 1713380822.511728, + "timestamp_start": 1713380822.51144 + } + }, + "outputs": [], + "source": [ + "integer = 1\n", + "floater = 3.14\n", + "stringer = \"hello\"\n", + "score = 0.9" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.8.16" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/examples/concepts/notebook_native_parameters_consume_out.ipynb b/examples/concepts/notebook_native_parameters_consume_out.ipynb index 16c8e6ad..ec796d3f 100644 --- a/examples/concepts/notebook_native_parameters_consume_out.ipynb +++ b/examples/concepts/notebook_native_parameters_consume_out.ipynb @@ -6,8 +6,8 @@ "id": "3e98e89e-765c-42d4-81ea-c371c2eab14d", "metadata": { "ploomber": { - "timestamp_end": 1712673802.476133, - "timestamp_start": 1712673802.474048 + "timestamp_end": 1712723378.894586, + "timestamp_start": 1712723378.892554 } }, "outputs": [], @@ -30,8 +30,8 @@ "id": "e7f0aab2", "metadata": { "ploomber": { - "timestamp_end": 1712673802.476318, - "timestamp_start": 1712673802.476155 + "timestamp_end": 1712723378.894782, + "timestamp_start": 1712723378.894609 }, "tags": [ "parameters" @@ -48,11 +48,11 @@ { "cell_type": "code", "execution_count": 3, - "id": "143149bb", + "id": "59862ac0", "metadata": { "ploomber": { - "timestamp_end": 1712673802.476461, - "timestamp_start": 1712673802.476332 + "timestamp_end": 1712723378.894933, + "timestamp_start": 1712723378.894796 }, "tags": [ "injected-parameters" @@ -71,8 +71,8 @@ "id": "0e04f11a", "metadata": { "ploomber": { - "timestamp_end": 1712673802.476606, - "timestamp_start": 1712673802.476473 + "timestamp_end": 1712723378.895087, + "timestamp_start": 1712723378.894946 } }, "outputs": [], @@ -86,8 +86,8 @@ "id": "9f1cbac6-cada-42b0-8fb1-ddb25a88836c", "metadata": { "ploomber": { - "timestamp_end": 1712673802.476742, - "timestamp_start": 1712673802.476618 + "timestamp_end": 1712723378.895227, + "timestamp_start": 1712723378.895099 } }, "outputs": [], diff --git a/examples/concepts/notebook_native_parameters_out.ipynb b/examples/concepts/notebook_native_parameters_out.ipynb index 0550af9c..4546caaf 100644 --- a/examples/concepts/notebook_native_parameters_out.ipynb +++ b/examples/concepts/notebook_native_parameters_out.ipynb @@ -6,8 +6,8 @@ "id": "3e98e89e-765c-42d4-81ea-c371c2eab14d", "metadata": { "ploomber": { - "timestamp_end": 1712673802.301212, - "timestamp_start": 1712673802.299645 + "timestamp_end": 1712723378.486399, + "timestamp_start": 1712723378.482792 } }, "outputs": [], @@ -36,8 +36,8 @@ "id": "e7f0aab2", "metadata": { "ploomber": { - "timestamp_end": 1712673802.30143, - "timestamp_start": 1712673802.301298 + "timestamp_end": 1712723378.486896, + "timestamp_start": 1712723378.486585 }, "tags": [ "parameters" @@ -53,11 +53,11 @@ { "cell_type": "code", "execution_count": 3, - "id": "2aa79db0", + "id": "bb75b0e8", "metadata": { "ploomber": { - "timestamp_end": 1712673802.30158, - "timestamp_start": 1712673802.301444 + "timestamp_end": 1712723378.487227, + "timestamp_start": 1712723378.486928 }, "tags": [ "injected-parameters" @@ -76,8 +76,8 @@ "id": "0e04f11a", "metadata": { "ploomber": { - "timestamp_end": 1712673802.301748, - "timestamp_start": 1712673802.301593 + "timestamp_end": 1712723378.487586, + "timestamp_start": 1712723378.487256 } }, "outputs": [], @@ -91,8 +91,8 @@ "id": "9f1cbac6-cada-42b0-8fb1-ddb25a88836c", "metadata": { "ploomber": { - "timestamp_end": 1712673802.301934, - "timestamp_start": 1712673802.301761 + "timestamp_end": 1712723378.488026, + "timestamp_start": 1712723378.487615 } }, "outputs": [], diff --git a/examples/concepts/simple_notebook.ipynb b/examples/concepts/simple_notebook.ipynb index 1f2547b7..167f26de 100644 --- a/examples/concepts/simple_notebook.ipynb +++ b/examples/concepts/simple_notebook.ipynb @@ -7,8 +7,8 @@ "metadata": {}, "outputs": [], "source": [ - "def add(x, y):\n", - " return x + y" + "def function():\n", + " print(\"hello world\")" ] }, { @@ -18,40 +18,7 @@ "metadata": {}, "outputs": [], "source": [ - "def multiply(x, y):\n", - " return x * y\n", - "\n", - "from pydantic import BaseModel\n", - "\n", - "class EggsModel(BaseModel):\n", - " ham: str\n", - "\n", - "\n", - "class ObjectType:\n", - " def __init__(self):\n", - " self.salute = \"hello\"" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "9dcadc93-aa77-4a0a-9465-2e33eef4da44", - "metadata": {}, - "outputs": [], - "source": [ - "a = add(40, 2)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "7b872cdf-820b-47b5-8f22-15c4b69c8637", - "metadata": {}, - "outputs": [], - "source": [ - "b = multiply(2, 100)\n", - "\n", - "c = EggsModel(ham=\"hello\")" + "function()" ] } ], diff --git a/examples/concepts/simple_notebook_out.ipynb b/examples/concepts/simple_notebook_out.ipynb index 2abe5bb4..410609b9 100644 --- a/examples/concepts/simple_notebook_out.ipynb +++ b/examples/concepts/simple_notebook_out.ipynb @@ -3,11 +3,11 @@ { "cell_type": "code", "execution_count": 1, - "id": "eda8faba", + "id": "0ee2a616", "metadata": { "ploomber": { - "timestamp_end": 1712673803.931629, - "timestamp_start": 1712673803.931437 + "timestamp_end": 1712723380.314318, + "timestamp_start": 1712723380.314131 }, "tags": [ "injected-parameters" @@ -24,8 +24,8 @@ "id": "3e98e89e-765c-42d4-81ea-c371c2eab14d", "metadata": { "ploomber": { - "timestamp_end": 1712673803.931918, - "timestamp_start": 1712673803.931656 + "timestamp_end": 1712723380.314528, + "timestamp_start": 1712723380.314339 } }, "outputs": [], @@ -40,8 +40,8 @@ "id": "9f1cbac6-cada-42b0-8fb1-ddb25a88836c", "metadata": { "ploomber": { - "timestamp_end": 1712673803.932905, - "timestamp_start": 1712673803.931932 + "timestamp_end": 1712723380.31542, + "timestamp_start": 1712723380.314542 } }, "outputs": [], @@ -66,8 +66,8 @@ "id": "9dcadc93-aa77-4a0a-9465-2e33eef4da44", "metadata": { "ploomber": { - "timestamp_end": 1712673803.933058, - "timestamp_start": 1712673803.932921 + "timestamp_end": 1712723380.315571, + "timestamp_start": 1712723380.315436 } }, "outputs": [], @@ -81,8 +81,8 @@ "id": "7b872cdf-820b-47b5-8f22-15c4b69c8637", "metadata": { "ploomber": { - "timestamp_end": 1712673803.933233, - "timestamp_start": 1712673803.933071 + "timestamp_end": 1712723380.315741, + "timestamp_start": 1712723380.315585 } }, "outputs": [], diff --git a/examples/on-failure.yaml b/examples/on-failure.yaml deleted file mode 100644 index 02ae42d8..00000000 --- a/examples/on-failure.yaml +++ /dev/null @@ -1,31 +0,0 @@ -dag: - description: | - This is a simple pipeline to demonstrate failure in a step. - - The default behavior is to traverse to step type fail and mark the run as failed. - But you can control it by providing on_failure. - - In this example: step 1 fails and moves to step 3 skipping step 2. The pipeline status - is considered to be success. - - step 1 (FAIL) >> step 3 >> success - - You can run this pipeline by runnable execute -f examples/on-failure.yaml - start_at: step 1 - steps: - step 1: - type: task - command_type: shell - command: exit 1 # This will fail! - next: step 2 - on_failure: step 3 - step 2: - type: stub # This step will never reach - next: step 3 - step 3: - type: stub - next: success - success: - type: success - fail: - type: fail diff --git a/examples/on_failure.py b/examples/on_failure.py deleted file mode 100644 index b8464e04..00000000 --- a/examples/on_failure.py +++ /dev/null @@ -1,38 +0,0 @@ -""" -This is a simple pipeline to demonstrate failure in a step. - - The default behavior is to traverse to step type fail and mark the run as failed. - But you can control it by providing on_failure. - - In this example: step 1 fails and moves to step 3 skipping step 2. The pipeline status - is considered to be success. - - step 1 (FAIL) >> step 3 >> success - - You can run this example by: - python examples/on_failure.py -""" - -from runnable import Pipeline, ShellTask, Stub - - -def main(): - step_1 = ShellTask(name="step 1", command="exit 1") # This will fail - - step_2 = Stub(name="step 2") - - step_3 = Stub(name="step 3", terminate_with_success=True) - - step_1.on_failure = step_3.name - - pipeline = Pipeline( - steps=[step_1, step_2, step_3], - add_terminal_nodes=True, - ) - pipeline.execute() - - return pipeline - - -if __name__ == "__main__": - main() diff --git a/examples/parameters_initial.yaml b/examples/parameters_initial.yaml deleted file mode 100644 index e88e14c0..00000000 --- a/examples/parameters_initial.yaml +++ /dev/null @@ -1,4 +0,0 @@ -simple: 1 -inner: - x: 10 - y: "hello world!!" diff --git a/examples/tutorials/mnist/modular_source.py b/examples/tutorials/mnist/modular_source.py index 67ad6ea5..8411de67 100644 --- a/examples/tutorials/mnist/modular_source.py +++ b/examples/tutorials/mnist/modular_source.py @@ -1,3 +1,4 @@ +import time from typing import List import numpy as np diff --git a/runnable/__init__.py b/runnable/__init__.py index 247a6311..89b9b6a1 100644 --- a/runnable/__init__.py +++ b/runnable/__init__.py @@ -29,6 +29,10 @@ pickled, ) +## TODO: Summary should be a bit better for catalog. +## If the execution fails, hint them about the retry executor. +# Make the retry executor loose! + # TODO: Think of model registry as a central place to store models. # TODO: Implement Sagemaker pipelines as a executor. diff --git a/runnable/entrypoints.py b/runnable/entrypoints.py index 2aa49c7c..bdddeb7e 100644 --- a/runnable/entrypoints.py +++ b/runnable/entrypoints.py @@ -172,6 +172,7 @@ def execute( ) console.print("Working with context:") console.print(run_context) + console.rule(style="[dark orange]") executor = run_context.executor @@ -243,6 +244,7 @@ def execute_single_node( ) console.print("Working with context:") console.print(run_context) + console.rule(style="[dark orange]") executor = run_context.executor run_context.execution_plan = defaults.EXECUTION_PLAN.CHAINED.value @@ -296,6 +298,7 @@ def execute_notebook( console.print("Working with context:") console.print(run_context) + console.rule(style="[dark orange]") step_config = { "command": notebook_file, @@ -358,6 +361,7 @@ def execute_function( console.print("Working with context:") console.print(run_context) + console.rule(style="[dark orange]") # Prepare the graph with a single node step_config = { @@ -427,6 +431,7 @@ def fan( ) console.print("Working with context:") console.print(run_context) + console.rule(style="[dark orange]") executor = run_context.executor run_context.execution_plan = defaults.EXECUTION_PLAN.CHAINED.value diff --git a/runnable/extensions/executor/__init__.py b/runnable/extensions/executor/__init__.py index 9ab9011e..791c483f 100644 --- a/runnable/extensions/executor/__init__.py +++ b/runnable/extensions/executor/__init__.py @@ -476,6 +476,8 @@ def execute_graph(self, dag: Graph, map_variable: TypeMapVariable = None, **kwar logger.exception(e) raise + console.rule(style="[dark orange]") + if working_on.node_type in ["success", "fail"]: break diff --git a/runnable/sdk.py b/runnable/sdk.py index 3a707a0e..d14d8ce6 100644 --- a/runnable/sdk.py +++ b/runnable/sdk.py @@ -15,8 +15,13 @@ field_validator, model_validator, ) -from rich import print -from rich.progress import BarColumn, Progress, TextColumn, TimeElapsedColumn +from rich.progress import ( + BarColumn, + Progress, + SpinnerColumn, + TextColumn, + TimeElapsedColumn, +) from rich.table import Column from typing_extensions import Self @@ -71,7 +76,7 @@ class Catalog(BaseModel): class BaseTraversal(ABC, BaseModel): name: str - next_node: str = Field(default="", alias="next") + next_node: str = Field(default="", serialization_alias="next_node") terminate_with_success: bool = Field(default=False, exclude=True) terminate_with_failure: bool = Field(default=False, exclude=True) on_failure: str = Field(default="", alias="on_failure") @@ -83,6 +88,12 @@ class BaseTraversal(ABC, BaseModel): def internal_name(self) -> str: return self.name + def __hash__(self): + """ + Needed to Uniqueize DataCatalog objects. + """ + return hash(self.name) + def __rshift__(self, other: StepType) -> StepType: if self.next_node: raise Exception(f"The node {self} already has a next node: {self.next_node}") @@ -180,6 +191,7 @@ class BaseTask(BaseTraversal): catalog: Optional[Catalog] = Field(default=None, alias="catalog") overrides: Dict[str, Any] = Field(default_factory=dict, alias="overrides") returns: List[Union[str, TaskReturns]] = Field(default_factory=list, alias="returns") + secrets: List[str] = Field(default_factory=list) @field_validator("returns", mode="before") @classmethod @@ -201,7 +213,7 @@ def create_node(self) -> TaskNode: if not (self.terminate_with_failure or self.terminate_with_success): raise AssertionError("A node not being terminated must have a user defined next node") - return TaskNode.parse_from_config(self.model_dump(exclude_none=True)) + return TaskNode.parse_from_config(self.model_dump(exclude_none=True, by_alias=True)) class PythonTask(BaseTask): @@ -297,9 +309,9 @@ class NotebookTask(BaseTask): """ - notebook: str = Field(alias="command") + notebook: str = Field(serialization_alias="command") - notebook_output_path: Optional[str] = Field(default=None, alias="notebook_output_path") + notebook_output_path: Optional[str] = Field(default=None, alias="notebook_output_path", validate_default=True) optional_ploomber_args: Optional[Dict[str, Any]] = Field(default=None, alias="optional_ploomber_args") @computed_field @@ -526,7 +538,7 @@ class Pipeline(BaseModel): _dag: graph.Graph = PrivateAttr() model_config = ConfigDict(extra="forbid") - def _validate_path(self, path: List[StepType]) -> None: + def _validate_path(self, path: List[StepType], failure_path: bool = False) -> None: # Check if one and only one step terminates with success # Check no more than one step terminates with failure @@ -544,7 +556,7 @@ def _validate_path(self, path: List[StepType]) -> None: raise Exception("A pipeline cannot have more than one step that terminates with failure") reached_failure = True - if not reached_success: + if not reached_success and not reached_failure: raise Exception("A pipeline must have at least one step that terminates with success") def _construct_path(self, path: List[StepType]) -> None: @@ -594,11 +606,21 @@ def model_post_init(self, __context: Any) -> None: # Check all paths are valid and construct the path paths = [success_path] + on_failure_paths + failure_path = False for path in paths: - self._validate_path(path) + self._validate_path(path, failure_path) self._construct_path(path) - all_steps: List[StepType] = [step for step in success_path + on_failure_paths] # type: ignore + failure_path = True + + all_steps: List[StepType] = [] + + for path in paths: + for step in path: + all_steps.append(step) + + seen = set() + unique = [x for x in all_steps if not (x in seen or seen.add(x))] # type: ignore self._dag = graph.Graph( start_at=all_steps[0].name, @@ -606,7 +628,7 @@ def model_post_init(self, __context: Any) -> None: internal_branch_name=self.internal_branch_name, ) - for step in all_steps: + for step in unique: self._dag.add_node(step.create_node()) if self.add_terminal_nodes: @@ -675,8 +697,9 @@ def execute( run_context.dag = graph.create_graph(dag_definition) - print("Working with context:") - print(run_context) + console.print("Working with context:") + console.print(run_context) + console.rule(style="[dark orange]") if not run_context.executor._local: # We are not working with non local executor @@ -693,6 +716,7 @@ def execute( run_context.executor.prepare_for_graph_execution() with Progress( + SpinnerColumn(spinner_name="runner"), TextColumn("[progress.description]{task.description}", table_column=Column(ratio=2)), BarColumn(table_column=Column(ratio=1), style="dark_orange"), TimeElapsedColumn(table_column=Column(ratio=1)), diff --git a/runnable/tasks.py b/runnable/tasks.py index 1c46c8e2..e80bb468 100644 --- a/runnable/tasks.py +++ b/runnable/tasks.py @@ -9,13 +9,14 @@ from datetime import datetime from pickle import PicklingError from string import Template -from typing import Any, Dict, List, Literal, Tuple +from typing import Any, Dict, List, Literal, Optional, Tuple from pydantic import BaseModel, ConfigDict, Field, ValidationInfo, field_validator +from rich.console import Console from stevedore import driver import runnable.context as context -from runnable import console, defaults, exceptions, parameters, utils +from runnable import defaults, exceptions, parameters, utils from runnable.datastore import ( JsonParameter, MetricParameter, @@ -32,6 +33,9 @@ # TODO: Can we add memory peak, cpu usage, etc. to the metrics? +console = Console(file=io.StringIO()) + + class TaskReturns(BaseModel): name: str kind: Literal["json", "object", "metric"] = Field(default="json") @@ -42,7 +46,7 @@ class BaseTaskType(BaseModel): task_type: str = Field(serialization_alias="command_type") node_name: str = Field(exclude=True) - secrets: Dict[str, str] = Field(default_factory=dict) + secrets: List[str] = Field(default_factory=list) returns: List[TaskReturns] = Field(default_factory=list, alias="returns") model_config = ConfigDict(extra="forbid") @@ -69,15 +73,14 @@ def get_cli_options(self) -> Tuple[str, dict]: raise NotImplementedError() def set_secrets_as_env_variables(self): - for key, value in self.secrets.items(): + for key in self.secrets: secret_value = context.run_context.secrets_handler.get(key) - self.secrets[value] = secret_value - os.environ[value] = secret_value + os.environ[key] = secret_value def delete_secrets_from_env_variables(self): - for _, value in self.secrets.items(): - if value in os.environ: - del os.environ[value] + for key in self.secrets: + if key in os.environ: + del os.environ[key] def execute_command( self, @@ -135,17 +138,21 @@ def execution_context(self, map_variable: TypeMapVariable = None, allow_complex: if not allow_complex: params = {key: value for key, value in params.items() if isinstance(value, JsonParameter)} - log_file_name = self.node_name.replace(" ", "_") + ".execution.log" + log_file_name = self.node_name # + ".execution.log" if map_variable: for _, value in map_variable.items(): log_file_name += "_" + str(value) + log_file_name = "".join(x for x in log_file_name if x.isalnum()) + ".execution.log" + log_file = open(log_file_name, "w") f = io.StringIO() try: with contextlib.redirect_stdout(f): + # with contextlib.nullcontext(): yield params + print(console.file.getvalue()) # type: ignore except Exception as e: # pylint: disable=broad-except logger.exception(e) finally: @@ -156,10 +163,11 @@ def execution_context(self, map_variable: TypeMapVariable = None, allow_complex: log_file.close() # Put the log file in the catalog - # self._context.catalog_handler.put(name=log_file.name, run_id=context.run_context.run_id) + self._context.catalog_handler.put(name=log_file.name, run_id=context.run_context.run_id) os.remove(log_file.name) # Update parameters + # This should only update the parameters that are changed at the root level. self._context.run_log_store.set_parameters(parameters=params, run_id=self._context.run_id) @@ -219,8 +227,7 @@ def execute_command( logger.info(f"Calling {func} from {module} with {filtered_parameters}") user_set_parameters = f(**filtered_parameters) # This is a tuple or single value except Exception as e: - logger.exception(e) - console.print(e, style=defaults.error_style) + console.log(e, style=defaults.error_style, markup=False) raise exceptions.CommandCallError(f"Function call: {self.command} did not succeed.\n") from e attempt_log.input_parameters = params.copy() @@ -263,9 +270,9 @@ def execute_command( attempt_log.status = defaults.SUCCESS except Exception as _e: msg = f"Call to the function {self.command} did not succeed.\n" - logger.exception(_e) attempt_log.message = msg - console.print(_e, style=defaults.error_style) + console.print_exception(show_locals=False) + console.log(_e, style=defaults.error_style) attempt_log.end_time = str(datetime.now()) @@ -277,7 +284,7 @@ class NotebookTaskType(BaseTaskType): task_type: str = Field(default="notebook", serialization_alias="command_type") command: str - notebook_output_path: str = Field(default="", validate_default=True) + notebook_output_path: Optional[str] = Field(default=None, validate_default=True) optional_ploomber_args: dict = {} @field_validator("command") @@ -319,7 +326,7 @@ def execute_command( import ploomber_engine as pm from ploomber_engine.ipython import PloomberClient - notebook_output_path = self.notebook_output_path + notebook_output_path = self.notebook_output_path or "" with self.execution_context( map_variable=map_variable, allow_complex=False @@ -424,15 +431,17 @@ def execute_command( # Expose secrets as environment variables if self.secrets: - for key, value in self.secrets.items(): + for key in self.secrets: secret_value = context.run_context.secrets_handler.get(key) - subprocess_env[value] = secret_value + subprocess_env[key] = secret_value with self.execution_context(map_variable=map_variable, allow_complex=False) as params: subprocess_env.update({k: v.get_value() for k, v in params.items()}) # Json dumps all runnable environment variables for key, value in subprocess_env.items(): + if isinstance(value, str): + continue subprocess_env[key] = json.dumps(value) collect_delimiter = "=== COLLECT ===" @@ -441,37 +450,80 @@ def execute_command( logger.info(f"Executing shell command: {command}") capture = False - return_keys = [x.name for x in self.returns] + return_keys = {x.name: x for x in self.returns} - with subprocess.Popen( + proc = subprocess.Popen( command, shell=True, env=subprocess_env, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, - ) as proc: - for line in proc.stdout: # type: ignore - logger.info(line) - print(line) - - if line.strip() == collect_delimiter: - # The lines from now on should be captured - capture = True - continue - - if capture: - key, value = line.strip().split("=", 1) - if key in (return_keys or []): - param_name = Template(key).safe_substitute(map_variable) # type: ignore - try: - params[param_name] = JsonParameter(kind="json", value=json.loads(value)) - except json.JSONDecodeError: - params[param_name] = JsonParameter(kind="json", value=value) - - proc.wait() - if proc.returncode == 0: - attempt_log.status = defaults.SUCCESS + ) + result = proc.communicate() + logger.debug(result) + logger.info(proc.returncode) + + if proc.returncode != 0: + msg = ",".join(result[1].split("\n")) + attempt_log.status = defaults.FAIL + attempt_log.end_time = str(datetime.now()) + attempt_log.message = msg + console.print(msg, style=defaults.error_style) + return attempt_log + + # for stderr + for line in result[1].split("\n"): + if line.strip() == "": + continue + console.print(line, style=defaults.warning_style) + + output_parameters: Dict[str, Parameter] = {} + metrics: Dict[str, Parameter] = {} + + # only from stdout + for line in result[0].split("\n"): + if line.strip() == "": + continue + + logger.info(line) + console.print(line) + + if line.strip() == collect_delimiter: + # The lines from now on should be captured + capture = True + continue + + if capture: + key, value = line.strip().split("=", 1) + if key in return_keys: + task_return = return_keys[key] + + try: + value = json.loads(value) + except json.JSONDecodeError: + value = value + + output_parameter = task_return_to_parameter( + task_return=task_return, + value=value, + ) + + if task_return.kind == "metric": + metrics[task_return.name] = output_parameter + + param_name = task_return.name + if map_variable: + for _, v in map_variable.items(): + param_name = f"{param_name}_{v}" + + output_parameters[param_name] = output_parameter + + attempt_log.output_parameters = output_parameters + attempt_log.user_defined_metrics = metrics + params.update(output_parameters) + + attempt_log.status = defaults.SUCCESS attempt_log.end_time = str(datetime.now()) return attempt_log diff --git a/tests/runnable/test_sdk.py b/tests/runnable/test_sdk.py index 3958c73d..b174ba92 100644 --- a/tests/runnable/test_sdk.py +++ b/tests/runnable/test_sdk.py @@ -1,7 +1,7 @@ import pytest -from runnable.extensions import nodes from runnable import sdk +from runnable.extensions import nodes def test_success_init(): @@ -26,6 +26,6 @@ def test_stub_node_makes_next_success_if_terminate_with_success(): def test_stub_node_takes_given_next_node(): - test_stub = sdk.Stub(name="stub", next="test") + test_stub = sdk.Stub(name="stub", next_node="test") assert test_stub.create_node() == nodes.StubNode(name="stub", next_node="test", internal_name="stub") diff --git a/tests/scenarios/test_traversals.py b/tests/scenarios/test_traversals.py deleted file mode 100644 index 4bbc6993..00000000 --- a/tests/scenarios/test_traversals.py +++ /dev/null @@ -1,214 +0,0 @@ -# ruff: noqa - -import tempfile -from pathlib import Path -from rich import print - -import pytest -import ruamel.yaml - -from runnable import defaults, entrypoints, utils - -yaml = ruamel.yaml.YAML() - -PIPELINES_DEFINITION = Path("examples/") - - -def get_config(): - config = { - "executor": { - "type": "local", - }, - "run_log_store": {"type": "file-system", "config": {"log_folder": ""}}, - } - return config - - -def get_container_config(): - config = { - "executor": {"type": "local-container", "config": {"docker_image": "does-not-matter"}}, - "run_log_store": {"type": "file-system", "config": {"log_folder": ""}}, - } - return config - - -def get_chunked_config(): - config = { - "executor": { - "type": "local", - }, - "run_log_store": {"type": "chunked-fs", "config": {"log_folder": ""}}, - } - return config - - -def get_configs(): - return [get_config(), get_chunked_config()] - - -def write_config(work_dir: Path, config: dict): - config["run_log_store"]["config"]["log_folder"] = str(work_dir) - with open(work_dir / "config.yaml", "wb") as f: - yaml.dump(config, f) - - -def get_run_log(work_dir, run_id): - config_file = work_dir / "config.yaml" - - if utils.does_file_exist(config_file): - mode_executor = entrypoints.prepare_configurations(configuration_file=str(config_file), run_id=run_id) - return mode_executor.run_log_store.get_run_log_by_id(run_id=run_id, full=True).model_dump() - raise Exception - - -@pytest.mark.no_cover -def test_success(): - configs = get_configs() - - for config in configs: - with tempfile.TemporaryDirectory() as context_dir: - context_dir_path = Path(context_dir) - - write_config(context_dir_path, config) - - run_id = "testing_success" - - entrypoints.execute( - configuration_file=str(context_dir_path / "config.yaml"), - pipeline_file=str(PIPELINES_DEFINITION / "mocking.yaml"), - run_id=run_id, - ) - - try: - run_log = get_run_log(context_dir_path, run_id) - assert run_log["status"] == defaults.SUCCESS - assert list(run_log["steps"].keys()) == ["step 1", "step 2", "step 3", "success"] - except: - assert False - - -@pytest.mark.no_cover -def test_failure(): - configs = get_configs() - - for config in configs: - with tempfile.TemporaryDirectory() as context_dir: - context_dir_path = Path(context_dir) - - write_config(context_dir_path, config) - - run_id = "testing_failure" - - try: - entrypoints.execute( - configuration_file=str(context_dir_path / "config.yaml"), - pipeline_file=str(PIPELINES_DEFINITION / "default-fail.yaml"), - run_id=run_id, - ) - except Exception as ex: - print(ex) - - try: - run_log = get_run_log(context_dir_path, run_id) - assert run_log["status"] == defaults.FAIL - assert list(run_log["steps"].keys()) == ["step 1", "step 2", "fail"] - except: - assert False - - -@pytest.mark.no_cover -def test_on_failure(): - configs = get_configs() - for config in configs: - with tempfile.TemporaryDirectory() as context_dir: - context_dir_path = Path(context_dir) - - write_config(context_dir_path, config) - - run_id = "testing_on_failure" - - try: - entrypoints.execute( - configuration_file=str(context_dir_path / "config.yaml"), - pipeline_file=str(PIPELINES_DEFINITION / "on-failure.yaml"), - run_id=run_id, - ) - except: - pass - - try: - run_log = get_run_log(context_dir_path, run_id) - assert run_log["status"] == defaults.SUCCESS - assert list(run_log["steps"].keys()) == ["step 1", "step 3", "success"] - except: - assert False - - -# @pytest.mark.no_cover -# def test_parallel(): -# configs = get_configs() -# for config in configs: -# with tempfile.TemporaryDirectory() as context_dir: -# context_dir_path = Path(context_dir) - -# write_config(context_dir_path, config) -# run_id = "testing_parallel" - -# entrypoints.execute( -# configuration_file=str(context_dir_path / "config.yaml"), -# pipeline_file=str(PIPELINES_DEFINITION / "concepts/parallel.yaml"), -# run_id=run_id, -# ) - -# try: -# run_log = get_run_log(context_dir_path, run_id) -# assert run_log["status"] == defaults.SUCCESS -# assert list(run_log["steps"].keys()) == ["step 1", "step 2", "step 3", "success"] -# assert list(run_log["steps"]["step 2"]["branches"]["step 2.branch_a"]["steps"].keys()) == [ -# "step 2.branch_a.step 1", -# "step 2.branch_a.step 2", -# "step 2.branch_a.success", -# ] -# assert list(run_log["steps"]["step 2"]["branches"]["step 2.branch_b"]["steps"].keys()) == [ -# "step 2.branch_b.step 1", -# "step 2.branch_b.step 2", -# "step 2.branch_b.success", -# ] -# except: -# assert False - - -# @pytest.mark.no_cover -# def test_parallel_fail(parallel_fail_graph): -# configs = get_configs() -# for config in configs: -# with tempfile.TemporaryDirectory() as context_dir: -# context_dir_path = Path(context_dir) -# dag = {"dag": parallel_fail_graph().dict()} - -# write_dag_and_config(context_dir_path, dag, config) -# run_id = "testing_parallel" - -# try: -# entrypoints.execute( -# configuration_file=str(context_dir_path / "config.yaml"), -# pipeline_file=str(context_dir_path / "dag.yaml"), -# run_id=run_id, -# ) -# except: -# pass - -# try: -# run_log = get_run_log(context_dir_path, run_id) -# assert run_log["status"] == defaults.FAIL -# assert list(run_log["steps"].keys()) == ["first", "second", "fail"] -# assert list(run_log["steps"]["second"]["branches"]["second.a"]["steps"].keys()) == [ -# "second.a.first", -# "second.a.fail", -# ] -# assert list(run_log["steps"]["second"]["branches"]["second.b"]["steps"].keys()) == [ -# "second.b.first", -# "second.b.fail", -# ] -# except: -# assert False diff --git a/tests/test_examples.py b/tests/test_examples.py index d71971b2..9eabf0da 100644 --- a/tests/test_examples.py +++ b/tests/test_examples.py @@ -9,102 +9,21 @@ from runnable import exceptions from runnable.entrypoints import execute -# (file, is_fail?, kwargs) -examples = [ - ("concepts/catalog.yaml", False, {"configuration_file": "examples/configs/fs-catalog.yaml"}), - ("concepts/map.yaml", False, {}), - ("concepts/map_shell.yaml", False, {}), - ("concepts/nesting.yaml", False, {}), - ("concepts/notebook_native_parameters.yaml", False, {"parameters_file": "examples/concepts/parameters.yaml"}), - ("concepts/parallel.yaml", False, {}), - ("concepts/simple_notebook.yaml", False, {}), - ("concepts/simple.yaml", False, {}), - ("catalog.yaml", False, {"configuration_file": "examples/configs/fs-catalog.yaml"}), - ("default-fail.yaml", True, {}), - ("on-failure.yaml", False, {}), - ("parallel-fail.yaml", True, {}), -] - - -def list_examples(): - for example in examples: - yield example - - -@pytest.mark.parametrize("example", list_examples()) -@pytest.mark.no_cover -@pytest.mark.e2e -def test_yaml_examples(example): - print(f"Testing {example}...") - examples_path = Path("examples") - file_path, status, kwargs = example - try: - full_file_path = examples_path / file_path - configuration_file = kwargs.pop("configuration_file", "") - execute(configuration_file=configuration_file, pipeline_file=str(full_file_path.resolve()), **kwargs) - except exceptions.ExecutionFailedError: - if not status: - raise - - -@pytest.mark.parametrize("example", list_examples()) -@pytest.mark.no_cover -@pytest.mark.e2e -def test_yaml_examples_argo(example): - print(f"Testing {example}...") - examples_path = Path("examples") - file_path, status, kwargs = example - try: - full_file_path = examples_path / file_path - kwargs.pop("configuration_file", "") - configuration_file = "examples/configs/argo-config.yaml" - execute(configuration_file=configuration_file, pipeline_file=str(full_file_path.resolve()), **kwargs) - subprocess.run(["argo", "lint", "--offline", "argo-pipeline.yaml"], check=True) - except exceptions.ExecutionFailedError: - if not status: - raise - - -@pytest.mark.parametrize("example", list_examples()) -@pytest.mark.no_cover -@pytest.mark.e2e_container -def test_yaml_examples_container(example): - print(f"Testing {example}...") - examples_path = Path("examples") - file_path, status, kwargs = example - try: - full_file_path = examples_path / file_path - kwargs.pop("configuration_file", "") - configuration_file = "examples/configs/local-container.yaml" - os.environ["runnable_VAR_default_docker_image"] = "runnable:3.8" - execute(configuration_file=configuration_file, pipeline_file=str(full_file_path), **kwargs) - except exceptions.ExecutionFailedError: - if not status: - raise - - -@contextmanager -def secrets_env_context(): - os.environ["secret"] = "secret_value" - os.environ["runnable_CONFIGURATION_FILE"] = "examples/configs/secrets-env-default.yaml" - yield - del os.environ["secret"] - del os.environ["runnable_CONFIGURATION_FILE"] - - -# function, success, context +# # (file, is_fail?, kwargs) python_examples = [ - ("catalog", False, None), - ("catalog_simple", False, None), - ("mocking", False, None), - ("on_failure", False, None), - ("parameters", False, None), - ("parameters_simple", False, None), - ("concepts.catalog", False, None), - ("concepts.map", False, None), - ("concepts.nesting", False, None), - ("concepts.parallel", False, None), - ("concepts.simple", False, None), + ("01-tasks/notebook", False, None), + ("01-tasks/python_tasks", False, None), + ("01-tasks/scripts", False, None), + ("01-tasks/stub", False, None), + ("02-sequential/default_fail", False, None), + ("02-sequential/on_failure_fail", False, None), + ("02-sequential/on_failure_succeed", False, None), + ("02-sequential/traversal", False, None), + ("03-parameters/passing_parameters_notebook", False, None), + ("03-parameters/passing_parameters_python", False, None), + ("03-parameters/passing_parameters_shell", False, None), + ("03-parameters/static_parameters_non_python", False, None), + ("03-parameters/static_parameters_python", False, None), ] @@ -114,7 +33,7 @@ def list_python_examples(): @pytest.mark.parametrize("example", list_python_examples()) -@pytest.mark.no_cover +# @pytest.mark.no_cover @pytest.mark.e2e def test_python_examples(example): print(f"Testing {example}...") @@ -126,7 +45,7 @@ def test_python_examples(example): else: context = context() - imported_module = importlib.import_module(f"examples.{mod}") + imported_module = importlib.import_module(f"examples.{mod.replace('/', '.')}") f = getattr(imported_module, "main") try: with context: @@ -134,3 +53,129 @@ def test_python_examples(example): except exceptions.ExecutionFailedError: if not status: raise + + +# examples = [ +# ("concepts/catalog.yaml", False, {"configuration_file": "examples/configs/fs-catalog.yaml"}), +# ("concepts/map.yaml", False, {}), +# ("concepts/map_shell.yaml", False, {}), +# ("concepts/nesting.yaml", False, {}), +# ("concepts/notebook_native_parameters.yaml", False, {"parameters_file": "examples/concepts/parameters.yaml"}), +# ("concepts/parallel.yaml", False, {}), +# ("concepts/simple_notebook.yaml", False, {}), +# ("concepts/simple.yaml", False, {}), +# ("catalog.yaml", False, {"configuration_file": "examples/configs/fs-catalog.yaml"}), +# ("default-fail.yaml", True, {}), +# ("on-failure.yaml", False, {}), +# ("parallel-fail.yaml", True, {}), +# ] + + +# def list_examples(): +# for example in examples: +# yield example + + +# @pytest.mark.parametrize("example", list_examples()) +# @pytest.mark.no_cover +# @pytest.mark.e2e +# def test_yaml_examples(example): +# print(f"Testing {example}...") +# examples_path = Path("examples") +# file_path, status, kwargs = example +# try: +# full_file_path = examples_path / file_path +# configuration_file = kwargs.pop("configuration_file", "") +# execute(configuration_file=configuration_file, pipeline_file=str(full_file_path.resolve()), **kwargs) +# except exceptions.ExecutionFailedError: +# if not status: +# raise + + +# @pytest.mark.parametrize("example", list_examples()) +# @pytest.mark.no_cover +# @pytest.mark.e2e +# def test_yaml_examples_argo(example): +# print(f"Testing {example}...") +# examples_path = Path("examples") +# file_path, status, kwargs = example +# try: +# full_file_path = examples_path / file_path +# kwargs.pop("configuration_file", "") +# configuration_file = "examples/configs/argo-config.yaml" +# execute(configuration_file=configuration_file, pipeline_file=str(full_file_path.resolve()), **kwargs) +# subprocess.run(["argo", "lint", "--offline", "argo-pipeline.yaml"], check=True) +# except exceptions.ExecutionFailedError: +# if not status: +# raise + + +# @pytest.mark.parametrize("example", list_examples()) +# @pytest.mark.no_cover +# @pytest.mark.e2e_container +# def test_yaml_examples_container(example): +# print(f"Testing {example}...") +# examples_path = Path("examples") +# file_path, status, kwargs = example +# try: +# full_file_path = examples_path / file_path +# kwargs.pop("configuration_file", "") +# configuration_file = "examples/configs/local-container.yaml" +# os.environ["runnable_VAR_default_docker_image"] = "runnable:3.8" +# execute(configuration_file=configuration_file, pipeline_file=str(full_file_path), **kwargs) +# except exceptions.ExecutionFailedError: +# if not status: +# raise + + +# @contextmanager +# def secrets_env_context(): +# os.environ["secret"] = "secret_value" +# os.environ["runnable_CONFIGURATION_FILE"] = "examples/configs/secrets-env-default.yaml" +# yield +# del os.environ["secret"] +# del os.environ["runnable_CONFIGURATION_FILE"] + + +# # function, success, context +# python_examples = [ +# ("catalog", False, None), +# ("catalog_simple", False, None), +# ("mocking", False, None), +# ("on_failure", False, None), +# ("parameters", False, None), +# ("parameters_simple", False, None), +# ("concepts.catalog", False, None), +# ("concepts.map", False, None), +# ("concepts.nesting", False, None), +# ("concepts.parallel", False, None), +# ("concepts.simple", False, None), +# ] + + +# def list_python_examples(): +# for example in python_examples: +# yield example + + +# @pytest.mark.parametrize("example", list_python_examples()) +# @pytest.mark.no_cover +# @pytest.mark.e2e +# def test_python_examples(example): +# print(f"Testing {example}...") + +# mod, status, context = example + +# if not context: +# context = nullcontext() +# else: +# context = context() + +# imported_module = importlib.import_module(f"examples.{mod}") +# f = getattr(imported_module, "main") +# try: +# with context: +# f() +# except exceptions.ExecutionFailedError: +# if not status: +# raise