Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add support for templates schema validation after transform #39

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions src/airflow_declarative/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,10 +234,13 @@ def represent_callable(self, obj):

WITH_ITEMS = List(ANY) | Dict(using=CALLBACK) | Dict(from_stdout=STRING)

TEMPLATE_OPERATORS = Mapping(STRING, ANY)
TEMPLATE_SENSORS = Mapping(STRING, ANY)

DO_TEMPLATE = Dict(
{
OptionalKey("operators"): OPERATORS,
OptionalKey("sensors"): SENSORS,
OptionalKey("operators"): TEMPLATE_OPERATORS,
OptionalKey("sensors"): TEMPLATE_SENSORS,
OptionalKey("flow"): FLOW,
Key("with_items"): WITH_ITEMS,
}
Expand Down
2 changes: 1 addition & 1 deletion src/airflow_declarative/transformer.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ def transform(schema):
schema0 = ensure_schema(schema)
schema1 = transform_templates(schema0)
schema2 = transform_defaults(schema1)
return schema2
return ensure_schema(schema2)


def transform_templates(schema):
Expand Down
59 changes: 59 additions & 0 deletions tests/dags/good/template-with-invalid-schema-before-do.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
#
# Copyright 2019, Rambler Digital Solutions
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

dags:
callback_dag:
args:
start_date: 2017-07-27
schedule_interval: 1d
do:
- operators:
operator_{{ item.name }}:
callback: 'tests.utils:{{ item.callback }}'
callback_args:
'{{ item.args }}'
with_items:
- name: multi
callback: MultiParamOperator
args:
param1: egg
param2: bacon
param3: spam
- name: simple
callback: Operator
args:
param: egg
- operators:
operator_{{ item.name }}:
class: '{{ item.class }}'
args:
'{{ item.args }}'
with_items:
- name: bash
class: airflow.operators.bash_operator:BashOperator
args:
bash_command: 'echo "test"'
end_date: 2019-10-25
- sensors:
sensor_{{ item.name }}:
callback: 'tests.utils:{{ item.callback }}'
flow:
sensor_{{item.name}}:
- operator_bash
- operator_simple
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This example is rather big, and it is hard to tell which part exactly would have failed in master.

Is it this flow which references operators from other do blocks or there's more? If none, would it be possible to simplify the test by stripping everything unrelated to the problem being fixed here?

with_items:
- name: simple
callback: sensor
60 changes: 60 additions & 0 deletions tests/test_template_with_invalid_schema_before_do.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
# -*- coding: utf-8 -*-
#
# Copyright 2019, Rambler Digital Solutions
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

from __future__ import absolute_import, division, print_function, unicode_literals

from datetime import date

import pytest
from airflow.operators.bash_operator import BashOperator
from tests.utils import MultiParamOperator, Operator, sensor

import airflow_declarative


@pytest.fixture()
def dag(good_dag_path):
path = good_dag_path("template-with-invalid-schema-before-do")
dags = airflow_declarative.from_path(path)

assert len(dags) == 1

dag = dags[0]

return dag


def test_callback_params(dag):
operator_multi = dag.task_dict["operator_multi"]
assert operator_multi._callback == MultiParamOperator
assert operator_multi._callback_args == {
"param1": "egg",
"param2": "bacon",
"param3": "spam",
}

operator_simple = dag.task_dict["operator_simple"]
assert operator_simple._callback == Operator
assert operator_simple._callback_args == {"param": "egg"}

operator_bash = dag.task_dict["operator_bash"]
assert isinstance(operator_bash, BashOperator)
assert operator_bash.bash_command == 'echo "test"'
assert operator_bash.end_date == date(2019, 10, 25)

sensor_simple = dag.task_dict["sensor_simple"]
assert sensor_simple._callback == sensor