-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathairflow_spark.py
36 lines (29 loc) · 1.17 KB
/
airflow_spark.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import sys, os, re
import airflow
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
import iso8601
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date' : iso8601.parse_date("2016-12-01"),
}
dag = DAG('GH_spark', default_args = default_args, schedule_interval = None)
pyspark_local_task_one = BashOperator(
task_id = "pyspark_local_task_one",
bash_command = """ spark-submit --master {{ params.master }} {{params.base_path}}/{{params.filename}} {{ts}} {{params.base_path}}""",
params = {
"master": "local[8]",
"filename": "gh_spark/pyspark_task_one.py",
"base_path": "/home/ec2-user/airflow/dags"},
dag=dag)
pyspark_local_task_two = BashOperator(
task_id = "pyspark_local_task_two",
bash_command = """ spark-submit --master {{params.master}} {{params.base_path}}/{{params.filename}} {{ts}} {{params.base_path}}""",
params = {
"master": "local[8]",
"filename": "gh_spark/pyspark_task_two.py",
"base_path": "/home/ec2-user/airflow/dags"},
dag=dag)
pyspark_local_task_two.set_upstream(pyspark_local_task_one)