Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Quickfix/bumping tenacity version #76

Open
wants to merge 159 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
159 commits
Select commit Hold shift + click to select a range
ea36004
Turning follow_external_dependency into bool or dict property
siklosid Sep 30, 2023
5113752
Handling the new dict format of follow_external_dependency
siklosid Sep 30, 2023
fc2a2e2
Changing test case to see if it handles sensor parameters properly
siklosid Sep 30, 2023
d0780dd
Merge pull request #23 from chocoapp/feature/sensor_parameters
siklosid Oct 9, 2023
db6dee2
upgrade version of tenacity
Oct 23, 2023
bd5adcf
Merge pull request #24 from chocoapp/feature/DATA-1562-upgrade-tenacity
claudiazi Oct 26, 2023
d463438
added new dbt config parser module
kiranvasudev Nov 15, 2023
a91a58f
added class with constructor
kiranvasudev Nov 15, 2023
102620e
added method to parse dbt model inputs
kiranvasudev Nov 15, 2023
c61e656
added functions to generate dagger input and outputs for dbt models
kiranvasudev Nov 15, 2023
29858a9
added fn to generate io for dbt task
kiranvasudev Nov 15, 2023
1a93f05
black format
kiranvasudev Nov 15, 2023
847f622
use new module in fn to generate configs
kiranvasudev Nov 15, 2023
104013a
changed output bucket
kiranvasudev Nov 15, 2023
de82c14
fixed import
kiranvasudev Nov 15, 2023
dcb6854
renamed functions and variables
kiranvasudev Nov 15, 2023
fb6f8f6
added type hints and docstrings
kiranvasudev Nov 15, 2023
06dc86b
changed how models are selected when config is generated
kiranvasudev Nov 15, 2023
19a13bc
black
kiranvasudev Nov 15, 2023
5f83797
fix project dir to load the correct profile
kiranvasudev Nov 16, 2023
209efd6
get external_location from config instead of unrendered config
kiranvasudev Nov 16, 2023
3c4e4c3
renamed variables for better understanding
kiranvasudev Nov 16, 2023
1bd525c
added doctest for fn
kiranvasudev Nov 16, 2023
4b651ad
added files for fixtures and tests
kiranvasudev Nov 16, 2023
7123cbc
added fixture for manifest and profiles
kiranvasudev Nov 16, 2023
77fb2b2
setUp test class
kiranvasudev Nov 16, 2023
47c0638
added test for get_dbt_model_parents
kiranvasudev Nov 16, 2023
4eced6b
added tests for generate_dagger_inputs
kiranvasudev Nov 16, 2023
aa767e8
added test for generate_dagger_outputs
kiranvasudev Nov 16, 2023
8cecbe6
fixed type hint
kiranvasudev Nov 16, 2023
04eafe6
fixed type hints
kiranvasudev Nov 16, 2023
745311d
refactored for simplicity
kiranvasudev Nov 17, 2023
b52e0b5
updated tests
kiranvasudev Nov 17, 2023
8e8c408
Merge remote-tracking branch 'origin/feature/dbt-task-split' into fea…
kiranvasudev Nov 17, 2023
6e7ced2
added dbt profile to default parameters for dbt task
kiranvasudev Nov 20, 2023
962ef70
Merge pull request #25 from chocoapp/feature/dbt-task-split
kiranvasudev Nov 20, 2023
e8cbc6f
added follow external dependency as true as default for athena task
kiranvasudev Nov 20, 2023
e62d35c
add fn to process seed input
kiranvasudev Nov 20, 2023
8de6821
refactor code to incorporate seeds
kiranvasudev Nov 20, 2023
5f26723
updates fixtures
kiranvasudev Nov 20, 2023
9acaff5
added test for dbt seed
kiranvasudev Nov 20, 2023
876010c
modified tests for model containing dbt seed as a dependency
kiranvasudev Nov 20, 2023
9cd0a51
refactor
kiranvasudev Nov 24, 2023
2e6abf6
updated tests and fixtures
kiranvasudev Nov 24, 2023
c652d56
changed name of seed task generating fn
kiranvasudev Nov 24, 2023
ac5f48f
removed unused line
kiranvasudev Nov 24, 2023
46a833d
added docstrings
kiranvasudev Nov 27, 2023
cdd0fea
removed unused test parameter
kiranvasudev Nov 27, 2023
829ac94
changed name of function for better understanding
kiranvasudev Nov 27, 2023
eaf4286
added test to check for de-duplication of inputs
kiranvasudev Nov 27, 2023
ab9a2c8
refactored getting model location function
kiranvasudev Nov 29, 2023
8ef7050
refactor dummy task generation
kiranvasudev Nov 29, 2023
65a07d7
generate inputs for intermediate models and updated tests
kiranvasudev Nov 29, 2023
e5afcfa
uncomment skipping local test
kiranvasudev Nov 29, 2023
e28a078
refactor generate_dagger_tasks fn to make recursive
kiranvasudev Nov 29, 2023
ab48229
updated fixtures and tests
kiranvasudev Nov 29, 2023
3c8c92c
Merge pull request #26 from chocoapp/feature/dbt-task-split
kiranvasudev Nov 30, 2023
d653977
bugfix
kiranvasudev Dec 4, 2023
48a102f
only return dummy when stg model
kiranvasudev Dec 4, 2023
f6c7226
adapted tests
kiranvasudev Dec 4, 2023
32fdd1f
Merge pull request #27 from chocoapp/fix/external-dag-sensors
kiranvasudev Dec 5, 2023
4ceca95
Merge pull request #28 from chocoapp/feature/dbt-task-split
kiranvasudev Dec 5, 2023
146846e
initialize dbt module only when its a dbt pipeline config
kiranvasudev Dec 6, 2023
fd2bd0f
format
kiranvasudev Dec 6, 2023
fb0c2e9
made logic to check for dbt task easier
kiranvasudev Dec 6, 2023
bd8e44c
Merge pull request #29 from chocoapp/feature/dbt-task-split
kiranvasudev Dec 6, 2023
a2b3fdd
fix: follow external dependency for staging models
kiranvasudev Dec 7, 2023
4f1783b
Merge pull request #30 from chocoapp/feature/dbt-task-split
kiranvasudev Dec 8, 2023
3653646
added logic to get parents of int models
kiranvasudev Feb 23, 2024
7dec65a
updated tests and fixtures
kiranvasudev Feb 23, 2024
750f53e
Merge pull request #31 from chocoapp/fix/dbt-add-core-sensor-to-int
kiranvasudev Feb 27, 2024
bdc3e57
Turing split_statements on by default
siklosid Mar 25, 2024
0e9dd3b
Merge pull request #32 from chocoapp/feature/psql_split_statements
siklosid Mar 26, 2024
96ad27b
Moving to python3.9; Upgrading airflow version; removing legacy postg…
siklosid Apr 13, 2024
3e62d78
Making sensor default args more flexible
siklosid Apr 15, 2024
41f1554
Upgrading python in CI
siklosid Apr 15, 2024
d8145ab
Adding graphviz dependency to test
siklosid Apr 15, 2024
7b9bd16
Merge pull request #33 from chocoapp/feature/DATA-1791_defferable_sen…
siklosid Apr 15, 2024
8cc2ec2
Upgrading some package versions to remove warnings
siklosid Apr 16, 2024
647a740
Merge pull request #34 from chocoapp/fix/update_packages
siklosid Apr 16, 2024
f56e6b6
feat: rename profile_name to target_name
Apr 17, 2024
52e7572
Merge pull request #35 from chocoapp/feature/DATA-1493-adjust-dbt-params
claudiazi Apr 17, 2024
1b99357
feat: register new databricks_io
Apr 26, 2024
e00f555
feat: refactor the DBTParseConfig to parse databricks-dbt manifest
Apr 26, 2024
9274fe2
feat: add unit test for databricks config parser
Apr 26, 2024
e951c47
chore: black
Apr 26, 2024
4567b3e
Switching to official batch operator
siklosid Apr 26, 2024
1c2ad82
feat: add another param in dbt task
Apr 26, 2024
a8e6471
Complete renaming of classes
siklosid Apr 26, 2024
3c5fd12
Merge pull request #36 from chocoapp/bugfix/DATA-1804_missing_batch_logs
siklosid Apr 26, 2024
7e29420
feat: refactor
Apr 26, 2024
432c32d
Merge remote-tracking branch 'origin/master' into feature/DATA-1811-d…
Apr 26, 2024
5659c68
feat: adjust the s3 tasks
Apr 26, 2024
81697d4
feat: adjust the _get_s3_task for different dbt adapters
Apr 29, 2024
7c1aa22
fix: define the correct target_config for databricks adapter
Apr 29, 2024
40b28ef
fix: _generate_dagger_output
Apr 29, 2024
9c0311f
extend: command
Apr 29, 2024
14f9d1f
fix: _generate_command
Apr 30, 2024
0d3947f
Merge pull request #37 from chocoapp/feature/DATA-1811-databricks-io
claudiazi Apr 30, 2024
f7cc983
fix: _get_model_data_location in DatabricksDBTConfigParser
Apr 30, 2024
1523648
Merge pull request #38 from chocoapp/feature/DATA-1811-databricks-io
claudiazi Apr 30, 2024
59cf847
fix: generate_task for dbt tasks
May 3, 2024
a25ccc9
Merge pull request #39 from chocoapp/feature/DATA-1811-databricks-io
claudiazi May 3, 2024
c44a7b5
Replacing string replacement with jinja in module processor
siklosid Jun 12, 2024
e687686
Merge pull request #40 from chocoapp/feature/DATA-1699_using_jinja_in…
siklosid Jun 12, 2024
1bc6bf2
feat: adjust the dbt config parser so that view/ephemeral staging lay…
Jun 17, 2024
22ca09b
feat: adjust io for the materalised staging model
Jun 18, 2024
f2f8015
feat: restructure the dbt dagger task input & output
Jun 19, 2024
81e75da
Merge pull request #41 from chocoapp/feat/DATA-1996-improve-dbt-parser
claudiazi Jun 19, 2024
698358d
Module generation with generalised jinja parameters
siklosid Jul 4, 2024
cd12dda
Now it's possible to assign task to task groups
siklosid Jul 4, 2024
0e6a6a6
Adding default value to the parameter
siklosid Jul 4, 2024
f041aa8
Merge pull request #42 from chocoapp/feature/DATA-2002_generalise_jin…
siklosid Jul 4, 2024
8d0e23f
removed custom dbt task generation logic from Module
kiranvasudev Nov 6, 2024
77f6026
add plugins path to dagger config
kiranvasudev Nov 7, 2024
fbb648b
added function to load plugins
kiranvasudev Nov 7, 2024
90bb16b
load plugins and render jinja
kiranvasudev Nov 7, 2024
1d09c57
iterate over multiple folders and their subfolders
kiranvasudev Nov 7, 2024
da0528a
exclude all files starting with __
kiranvasudev Nov 7, 2024
a8aeff7
added plugin to dagger config
kiranvasudev Nov 7, 2024
6addf98
added logging for plugins
kiranvasudev Nov 7, 2024
b8feafb
fix
kiranvasudev Nov 7, 2024
d8c6384
added tests for plugins
kiranvasudev Nov 7, 2024
a3d7f47
refactor code
kiranvasudev Nov 7, 2024
450b544
refactor tests
kiranvasudev Nov 7, 2024
98a6b55
exclude test folders from directory walk
kiranvasudev Nov 8, 2024
ab6ff15
remove unused imports
kiranvasudev Nov 8, 2024
0fa5389
update readme
kiranvasudev Nov 11, 2024
e7f08cd
fix readme
kiranvasudev Nov 12, 2024
7554e62
Merge pull request #43 from chocoapp/feature/plugin-dbt-parser
kiranvasudev Nov 13, 2024
7d871b6
Adding new reverse etl operator to dagger inherited from batch operator
siklosid Jan 2, 2025
bfceb81
Registering the new operator with dagger
siklosid Jan 2, 2025
0b2ac50
Small type fix to resolve broken cli help command
siklosid Jan 2, 2025
e414b84
Adding the possibility that inherited operator can overwrite attribut…
siklosid Jan 2, 2025
67a8142
Smaller fixes; syntax fix; Fixing command creation by extending the e…
siklosid Jan 2, 2025
0557bab
Adding dynamo and sns io types
siklosid Jan 3, 2025
4ddfc33
Adding dynamo and sns io types
siklosid Jan 3, 2025
8038d3c
Fixing input/output name for reverse etl so it matches the batch job …
siklosid Jan 3, 2025
60c4736
Handling hard wired constants as local parameters of the task
siklosid Jan 3, 2025
f64910f
Removing account_id from io; naming convention; fixing small issues
siklosid Jan 3, 2025
2ea6f7e
Fixing batch job name
siklosid Jan 3, 2025
85c9c2a
Removing choco specific parameters and moving them to conf file; Maki…
siklosid Jan 7, 2025
3c5a610
Adding unit tests and a small fix
siklosid Jan 7, 2025
f45c84a
Adding comments
siklosid Jan 7, 2025
76c774d
Merge pull request #44 from chocoapp/feature/data-2025_reverse_etl_op…
siklosid Jan 8, 2025
e62890b
feat: add application name to the spark job & add kill spark job when…
Jan 16, 2025
31085ac
fix: type of _execution_timeout
Jan 16, 2025
02283d8
fix: remove the wrong and uncessary function
Jan 16, 2025
c9a1c6f
fix: timeout logic
Jan 17, 2025
bfb935c
fix: add missing import
Jan 17, 2025
7fba8d5
fix: spark_app_name
Jan 17, 2025
37a31f8
fix: default spark_app_name
Jan 17, 2025
a3422db
feat: improve the logic to kill the spark job
Jan 20, 2025
6046a01
chore: black + add log + missing function
Jan 20, 2025
57c394b
chore: add info to debug
Jan 20, 2025
6fdb674
chore: add info to debug
Jan 20, 2025
a5d627c
chore: reformat
Jan 20, 2025
9ad0c1e
Merge pull request #45 from chocoapp/feature/DATA-2175-kill-timeout-s…
claudiazi Jan 21, 2025
3ef83b3
Bumping tenacity version
siklosid Feb 13, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/ci-data.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ jobs:
with:
persist-credentials: false

- name: Set up Python 3.7
- name: Set up Python 3.9
uses: actions/setup-python@v2
with:
python-version: 3.7
python-version: 3.9

- name: Install dependencies
run: |
Expand Down
6 changes: 3 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -96,15 +96,15 @@ install: clean ## install the package to the active Python's site-packages


install-dev: clean ## install the package to the active Python's site-packages
virtualenv -p python3 venv; \
virtualenv -p python3.9 venv; \
source venv/bin/activate; \
python -m pip install --upgrade pip; \
python setup.py install; \
pip install -e . ; \
pip install -r reqs/dev.txt -r reqs/test.txt
SYSTEM_VERSION_COMPAT=0 CFLAGS='-std=c++20' pip install -r reqs/dev.txt -r reqs/test.txt

install-test: clean ## install the package to the active Python's site-packages
virtualenv -p python3 venv; \
virtualenv -p python3.9 venv; \
source venv/bin/activate; \
python -m pip install --upgrade pip; \
pip install -r reqs/test.txt -r reqs/base.txt
Expand Down
35 changes: 35 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,41 @@ flowchart TD;

```

Plugins for dagger
-------

### Overview
Dagger now supports a plugin system that allows users to extend its functionality by adding custom Python classes. These plugins are integrated into the Jinja2 templating engine, enabling dynamic rendering of task configuration templates.
### Purpose
The plugin system allows users to define Python classes that can be loaded into the Jinja2 environment. When functions from these classes are invoked within a task configuration template, they are rendered dynamically using Jinja2. This feature enhances the flexibility of task configurations by allowing custom logic to be embedded directly in the templates.

### Usage
1. **Creating a Plugin:** To create a new plugin, define a Python class in a folder(for example `plugins/sample_plugin/sample_plugin.py`) with the desired methods. For example:
```python
class MyCustomPlugin:
def generate_input(self, branch_name):
return [{"name": f"{branch_name}", "type": "dummy"}]
```
This class defines a `generate_input` method that takes the branch_name from the module config and returns a dummy dagger task.

2. **Loading the Plugin into Dagger:** To load this plugin into Dagger's Jinja2 environment, you need to register it in your `dagger_config.yaml`:
```yaml
# pipeline.yaml
plugin:
paths:
- plugins # all Python classes within this path will be loaded into the Jinja environment
```

3. **Using Plugin Methods in Templates:** Once the plugin is loaded, you can call its methods from within any Jinja2 template in your task configurations:
```yaml
# task_configuration.yaml
type: batch
description: sample task
inputs: # format: list | Use dagger init-io cli
{{ MyCustomPlugin.generate_input("dummy_input") }}
```



Credits
-------
Expand Down
21 changes: 19 additions & 2 deletions dagger/cli/module.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,34 @@
import click
from dagger.utilities.module import Module
from dagger.utils import Printer
import json


def parse_key_value(ctx, param, value):
#print('YYY', value)
if not value:
return {}
key_value_dict = {}
for pair in value:
try:
key, val_file_path = pair.split('=', 1)
#print('YYY', key, val_file_path, pair)
val = json.load(open(val_file_path))
key_value_dict[key] = val
except ValueError:
raise click.BadParameter(f"Key-value pair '{pair}' is not in the format key=value")
return key_value_dict

@click.command()
@click.option("--config_file", "-c", help="Path to module config file")
@click.option("--target_dir", "-t", help="Path to directory to generate the task configs to")
def generate_tasks(config_file: str, target_dir: str) -> None:
@click.option("--jinja_parameters", "-j", callback=parse_key_value, multiple=True, default=None, help="Path to jinja parameters json file in the format: <jinja_variable_name>=<path to json file>")
def generate_tasks(config_file: str, target_dir: str, jinja_parameters: dict) -> None:
"""
Generating tasks for a module based on config
"""

module = Module(config_file, target_dir)
module = Module(config_file, target_dir, jinja_parameters)
module.generate_task_configs()

Printer.print_success("Tasks are successfully generated")
Expand Down
17 changes: 13 additions & 4 deletions dagger/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,7 @@
# Airflow parameters
airflow_config = config.get('airflow', None) or {}
WITH_DATA_NODES = airflow_config.get('with_data_nodes', False)
EXTERNAL_SENSOR_POKE_INTERVAL = airflow_config.get('external_sensor_poke_interval', 600)
EXTERNAL_SENSOR_TIMEOUT = airflow_config.get('external_sensor_timeout', 28800)
EXTERNAL_SENSOR_MODE = airflow_config.get('external_sensor_mode', 'reschedule')
EXTERNAL_SENSOR_DEFAULT_ARGS = airflow_config.get('external_sensor_default_args', {})
IS_DUMMY_OPERATOR_SHORT_CIRCUIT = airflow_config.get('is_dummy_operator_short_circuit', False)

# Neo4j parameters
Expand Down Expand Up @@ -100,4 +98,15 @@
# Alert parameters
alert_config = config.get('alert', None) or {}
SLACK_TOKEN = alert_config.get('slack_token', None)
DEFAULT_ALERT = alert_config.get('default_alert', {"type": "slack", "channel": "#airflow-jobs", "mentions": None})
DEFAULT_ALERT = alert_config.get('default_alert', {"type": "slack", "channel": "#airflow-jobs", "mentions": None})

# Plugin parameters
plugin_config = config.get('plugin', None) or {}
PLUGIN_DIRS = [os.path.join(AIRFLOW_HOME, path) for path in plugin_config.get('paths', [])]
logging.info(f"All Python classes will be loaded as plugins from the following directories: {PLUGIN_DIRS}")

# ReverseETL parameters
reverse_etl_config = config.get('reverse_etl', None) or {}
REVERSE_ETL_DEFAULT_JOB_NAME = reverse_etl_config.get('default_job_name', None)
REVERSE_ETL_DEFAULT_EXECUTABLE_PREFIX = reverse_etl_config.get('default_executable_prefix', None)
REVERSE_ETL_DEFAULT_EXECUTABLE = reverse_etl_config.get('default_executable', None)
28 changes: 16 additions & 12 deletions dagger/dag_creator/airflow/dag_creator.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def _get_external_task_sensor_name_dict(self, from_task_id: str) -> dict:
"external_sensor_name": f"{from_pipeline_name}-{from_task_name}-sensor",
}

def _get_external_task_sensor(self, from_task_id: str, to_task_id: str) -> ExternalTaskSensor:
def _get_external_task_sensor(self, from_task_id: str, to_task_id: str, follow_external_dependency: dict) -> ExternalTaskSensor:
"""
create an object of external task sensor for a specific from_task_id and to_task_id
"""
Expand All @@ -72,6 +72,9 @@ def _get_external_task_sensor(self, from_task_id: str, to_task_id: str) -> Exter

to_pipe_id = self._task_graph.get_node(to_task_id).obj.pipeline.name

extra_args = conf.EXTERNAL_SENSOR_DEFAULT_ARGS.copy()
extra_args.update(follow_external_dependency)

return ExternalTaskSensor(
dag=self._dags[to_pipe_id],
task_id=external_sensor_name,
Expand All @@ -80,9 +83,7 @@ def _get_external_task_sensor(self, from_task_id: str, to_task_id: str) -> Exter
execution_date_fn=self._get_execution_date_fn(
from_pipeline_schedule, to_pipeline_schedule
),
mode=conf.EXTERNAL_SENSOR_MODE,
poke_interval=conf.EXTERNAL_SENSOR_POKE_INTERVAL,
timeout=conf.EXTERNAL_SENSOR_TIMEOUT,
**extra_args
)

def _create_control_flow_task(self, pipe_id, dag):
Expand Down Expand Up @@ -135,6 +136,7 @@ def _create_edge_without_data(self, from_task_id: str, to_task_ids: list, node:
to_task_ids: The IDs of the tasks to which the edge connects.
node: The current node in a task graph.
"""

from_pipe = (
self._task_graph.get_node(from_task_id).obj.pipeline_name if from_task_id else None
)
Expand All @@ -143,7 +145,7 @@ def _create_edge_without_data(self, from_task_id: str, to_task_ids: list, node:
to_pipe = self._task_graph.get_node(to_task_id).obj.pipeline_name
if from_pipe and from_pipe == to_pipe:
self._tasks[from_task_id] >> self._tasks[to_task_id]
elif from_pipe and from_pipe != to_pipe and edge_properties.follow_external_dependency:
elif from_pipe and from_pipe != to_pipe and edge_properties.follow_external_dependency is not None:
from_schedule = self._task_graph.get_node(from_task_id).obj.pipeline.schedule
to_schedule = self._task_graph.get_node(to_task_id).obj.pipeline.schedule
if not from_schedule.startswith("@") and not to_schedule.startswith("@"):
Expand All @@ -155,15 +157,17 @@ def _create_edge_without_data(self, from_task_id: str, to_task_ids: list, node:
not in self._sensor_dict.get(to_pipe, dict()).keys()
):
external_task_sensor = self._get_external_task_sensor(
from_task_id, to_task_id
from_task_id, to_task_id, edge_properties.follow_external_dependency
)
self._sensor_dict[to_pipe] = {

if self._sensor_dict.get(to_pipe) is None:
self._sensor_dict[to_pipe] = {}

self._sensor_dict[to_pipe].update({
external_task_sensor_name: external_task_sensor
}
(
self._tasks[self._get_control_flow_task_id(to_pipe)]
>> external_task_sensor
)
})

self._tasks[self._get_control_flow_task_id(to_pipe)] >> external_task_sensor
self._sensor_dict[to_pipe][external_task_sensor_name] >> self._tasks[to_task_id]
else:
self._tasks[self._get_control_flow_task_id(to_pipe)] >> self._tasks[to_task_id]
Expand Down
13 changes: 13 additions & 0 deletions dagger/dag_creator/airflow/operator_creator.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from abc import ABC, abstractmethod
from datetime import timedelta
from airflow.utils.task_group import TaskGroup

TIMEDELTA_PARAMETERS = ['execution_timeout']

Expand All @@ -11,6 +12,15 @@ def __init__(self, task, dag):
self._template_parameters = {}
self._airflow_parameters = {}

def _get_existing_task_group_or_create_new(self):
group_id = self._task.task_group
if self._dag.task_group:
for group in self._dag.task_group.children.values():
if isinstance(group, TaskGroup) and group.group_id == group_id:
return group

return TaskGroup(group_id=group_id, dag=self._dag)

@abstractmethod
def _create_operator(self, kwargs):
raise NotImplementedError
Expand All @@ -34,6 +44,9 @@ def _update_airflow_parameters(self):
if self._task.timeout_in_seconds:
self._airflow_parameters["execution_timeout"] = self._task.timeout_in_seconds

if self._task.task_group:
self._airflow_parameters["task_group"] = self._get_existing_task_group_or_create_new()

self._fix_timedelta_parameters()

def create_operator(self):
Expand Down
28 changes: 23 additions & 5 deletions dagger/dag_creator/airflow/operator_creators/batch_creator.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
from pathlib import Path
from datetime import timedelta

from dagger.dag_creator.airflow.operator_creator import OperatorCreator
from dagger.dag_creator.airflow.operators.awsbatch_operator import AWSBatchOperator
from dagger import conf


class BatchCreator(OperatorCreator):
Expand All @@ -8,6 +12,20 @@ class BatchCreator(OperatorCreator):
def __init__(self, task, dag):
super().__init__(task, dag)

@staticmethod
def _validate_job_name(job_name, absolute_job_name):
if not absolute_job_name and not job_name:
raise Exception("Both job_name and absolute_job_name cannot be null")

if absolute_job_name is not None:
return absolute_job_name

job_path = Path(conf.DAGS_DIR) / job_name.replace("-", "/")
assert (
job_path.is_dir()
), f"Job name `{job_name}`, points to a non-existing folder `{job_path}`"
return job_name

def _generate_command(self):
command = [self._task.executable_prefix, self._task.executable]
for param_name, param_value in self._template_parameters.items():
Expand All @@ -21,16 +39,16 @@ def _create_operator(self, **kwargs):
overrides = self._task.overrides
overrides.update({"command": self._generate_command()})

job_name = self._validate_job_name(self._task.job_name, self._task.absolute_job_name)
batch_op = AWSBatchOperator(
dag=self._dag,
task_id=self._task.name,
job_name=self._task.job_name,
absolute_job_name=self._task.absolute_job_name,
job_name=self._task.name,
job_definition=job_name,
region_name=self._task.region_name,
cluster_name=self._task.cluster_name,
job_queue=self._task.job_queue,
overrides=overrides,
container_overrides=overrides,
awslogs_enabled=True,
**kwargs,
)

return batch_op
24 changes: 12 additions & 12 deletions dagger/dag_creator/airflow/operator_creators/dbt_creator.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,28 +13,28 @@ def __init__(self, task, dag):
self._project_dir = task.project_dir
self._profile_dir = task.profile_dir
self._profile_name = task.profile_name
self._target_name = task.target_name
self._select = task.select
self._dbt_command = task.dbt_command
self._vars = task.vars
self._create_external_athena_table = task.create_external_athena_table

def _generate_command(self):
command = [self._task.executable_prefix, self._task.executable]
command.append(f"--project_dir={self._project_dir}")
command.append(f"--profiles_dir={self._profile_dir}")
command.append(f"--profile_name={self._profile_name}")
command.append(f"--target_name={self._target_name}")
command.append(f"--dbt_command={self._dbt_command}")
if self._select:
command.append(f"--select={self._select}")

if len(self._template_parameters) > 0:
dbt_vars = json.dumps(self._template_parameters)
if self._vars:
dbt_vars = json.dumps(self._vars)
command.append(f"--vars='{dbt_vars}'")

if self._create_external_athena_table:
command.append(f"--create_external_athena_table={self._create_external_athena_table}")
for param_name, param_value in self._template_parameters.items():
command.append(
f"--{param_name}={param_value}"
)
return command

# Overwriting function because for dbt we don't want to add inputs/outputs to the
# template parameters.
def create_operator(self):
self._template_parameters.update(self._task.template_parameters)
self._update_airflow_parameters()

return self._create_operator(**self._airflow_parameters)
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,8 @@
from typing import Optional

from dagger.dag_creator.airflow.operator_creator import OperatorCreator
from dagger.dag_creator.airflow.operators.redshift_sql_operator import (
RedshiftSQLOperator,
)
from dagger.dag_creator.airflow.operators.redshift_sql_operator import RedshiftSQLOperator



class RedshiftLoadCreator(OperatorCreator):
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from os.path import join

from dagger.dag_creator.airflow.operator_creator import OperatorCreator
from dagger.dag_creator.airflow.operators.postgres_operator import PostgresOperator
from dagger.dag_creator.airflow.operators.redshift_sql_operator import RedshiftSQLOperator


class RedshiftTransformCreator(OperatorCreator):
Expand All @@ -22,11 +22,11 @@ def _read_sql(directory, file_path):
def _create_operator(self, **kwargs):
sql_string = self._read_sql(self._task.pipeline.directory, self._task.sql_file)

redshift_op = PostgresOperator(
redshift_op = RedshiftSQLOperator(
dag=self._dag,
task_id=self._task.name,
sql=sql_string,
postgres_conn_id=self._task.postgres_conn_id,
redshift_conn_id=self._task.postgres_conn_id,
params=self._template_parameters,
**kwargs,
)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from os.path import join

from dagger.dag_creator.airflow.operator_creator import OperatorCreator
from dagger.dag_creator.airflow.operators.postgres_operator import PostgresOperator
from dagger.dag_creator.airflow.operators.redshift_sql_operator import RedshiftSQLOperator

REDSHIFT_UNLOAD_CMD = """
unload ('{sql_string}')
Expand Down Expand Up @@ -58,12 +58,13 @@ def _create_operator(self, **kwargs):

unload_cmd = self._get_unload_command(sql_string)

redshift_op = PostgresOperator(
redshift_op = RedshiftSQLOperator(
dag=self._dag,
task_id=self._task.name,
sql=unload_cmd,
postgres_conn_id=self._task.postgres_conn_id,
redshift_conn_id=self._task.postgres_conn_id,
params=self._template_parameters,
autocommit=True,
**kwargs,
)

Expand Down
Loading