From 11d18f910d1e9db0922a3c944e41f08a185e1855 Mon Sep 17 00:00:00 2001 From: Luis Medina <3936213+lu4nm3@users.noreply.github.com> Date: Tue, 24 Mar 2020 13:15:18 -0700 Subject: [PATCH] Add an example of the new Presto task (#9) * Presto task * flesh out * feedback * feedback2 * match flytekit example * bump version --- flytetester/app/workflows/presto_workflow.py | 27 ++++++++++++++++++++ flytetester/requirements.txt | 2 +- 2 files changed, 28 insertions(+), 1 deletion(-) create mode 100644 flytetester/app/workflows/presto_workflow.py diff --git a/flytetester/app/workflows/presto_workflow.py b/flytetester/app/workflows/presto_workflow.py new file mode 100644 index 0000000..7c1988f --- /dev/null +++ b/flytetester/app/workflows/presto_workflow.py @@ -0,0 +1,27 @@ +from __future__ import absolute_import + +from flytekit.sdk.tasks import inputs +from flytekit.sdk.types import Types +from flytekit.sdk.workflow import workflow_class, Input, Output +from flytekit.common.tasks.presto_task import SdkPrestoTask + +schema = Types.Schema([("a", Types.String), ("b", Types.Integer)]) + +presto_task = SdkPrestoTask( + task_inputs=inputs(ds=Types.String, rg=Types.String), + statement="SELECT * FROM hive.city.fact_airport_sessions WHERE ds = '{{ .Inputs.ds}}' LIMIT 10", + output_schema=schema, + routing_group="{{ .Inputs.rg }}", + # catalog="hive", + # schema="city", +) + + +@workflow_class() +class PrestoWorkflow(object): + ds = Input(Types.String, required=True, help="Test string with no default") + # routing_group = Input(Types.String, required=True, help="Test string with no default") + + p_task = presto_task(ds=ds, rg='etl') + + output_a = Output(p_task.outputs.results, sdk_type=schema) diff --git a/flytetester/requirements.txt b/flytetester/requirements.txt index 7aa4476..4e91cb5 100644 --- a/flytetester/requirements.txt +++ b/flytetester/requirements.txt @@ -1,4 +1,4 @@ -flytekit[sidecar,schema]==0.6.0b1 +flytekit[sidecar,schema]==0.6.0b6 statsd opencv-python==3.4.4.19 k8s-proto>=0.0.2