Skip to content

Commit

Permalink
Use standalone dag processor for AF3 in chart (apache#45659)
Browse files Browse the repository at this point in the history
We are moving to only support a standalone DAG processor in Airflow 3,
so let's switch our chart to always use it.
  • Loading branch information
jedcunningham authored Jan 15, 2025
1 parent 6b599f8 commit 9ac85c8
Show file tree
Hide file tree
Showing 10 changed files with 140 additions and 24 deletions.
6 changes: 5 additions & 1 deletion chart/templates/dag-processor/dag-processor-deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,11 @@
## Airflow Dag Processor Deployment
#################################
{{- if semverCompare ">=2.3.0" .Values.airflowVersion }}
{{- if .Values.dagProcessor.enabled }}
{{- $enabled := .Values.dagProcessor.enabled }}
{{- if eq $enabled nil}}
{{ $enabled = ternary true false (semverCompare ">=3.0.0" .Values.airflowVersion) }}
{{- end }}
{{- if $enabled }}
{{- $nodeSelector := or .Values.dagProcessor.nodeSelector .Values.nodeSelector }}
{{- $affinity := or .Values.dagProcessor.affinity .Values.affinity }}
{{- $tolerations := or .Values.dagProcessor.tolerations .Values.tolerations }}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,11 @@
## Airflow Dag Processor ServiceAccount
#################################
{{- if semverCompare ">=2.3.0" .Values.airflowVersion }}
{{- if and .Values.dagProcessor.serviceAccount.create .Values.dagProcessor.enabled }}
{{- $enabled := .Values.dagProcessor.enabled }}
{{- if eq $enabled nil}}
{{ $enabled = ternary true false (semverCompare ">=3.0.0" .Values.airflowVersion) }}
{{- end }}
{{- if and .Values.dagProcessor.serviceAccount.create $enabled }}
apiVersion: v1
kind: ServiceAccount
automountServiceAccountToken: {{ .Values.dagProcessor.serviceAccount.automountServiceAccountToken }}
Expand Down
6 changes: 5 additions & 1 deletion chart/templates/scheduler/scheduler-deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,11 @@
# If we're using a StatefulSet
{{- $stateful := and $local $persistence }}
# We can skip DAGs mounts on scheduler if dagProcessor is enabled, except with $local mode
{{- $localOrDagProcessorDisabled := or (not .Values.dagProcessor.enabled) $local }}
{{- $dagProcessorEnabled := .Values.dagProcessor.enabled }}
{{- if eq $dagProcessorEnabled nil}}
{{ $dagProcessorEnabled = ternary true false (semverCompare ">=3.0.0" .Values.airflowVersion) }}
{{- end }}
{{- $localOrDagProcessorDisabled := or (not $dagProcessorEnabled) $local }}
# If we're using elasticsearch or opensearch logging
{{- $remoteLogging := or .Values.elasticsearch.enabled .Values.opensearch.enabled }}
{{- $nodeSelector := or .Values.scheduler.nodeSelector .Values.nodeSelector }}
Expand Down
7 changes: 5 additions & 2 deletions chart/values.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -3586,8 +3586,11 @@
"properties": {
"enabled": {
"description": "Enable standalone dag processor (requires Airflow 2.3.0+).",
"type": "boolean",
"default": false
"type": [
"boolean",
"null"
],
"default": null
},
"livenessProbe": {
"description": "Liveness probe configuration for dag processor.",
Expand Down
4 changes: 2 additions & 2 deletions chart/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1798,7 +1798,7 @@ triggerer:

# Airflow Dag Processor Config
dagProcessor:
enabled: false
enabled: ~
# Number of airflow dag processors in the deployment
replicas: 1
# Max number of old replicasets to retain
Expand Down Expand Up @@ -2640,7 +2640,7 @@ config:
flower_url_prefix: '{{ ternary "" .Values.ingress.flower.path (eq .Values.ingress.flower.path "/") }}'
worker_concurrency: 16
scheduler:
standalone_dag_processor: '{{ ternary "True" "False" .Values.dagProcessor.enabled }}'
standalone_dag_processor: '{{ ternary "True" "False" (or (semverCompare ">=3.0.0" .Values.airflowVersion) (.Values.dagProcessor.enabled | default false)) }}'
# statsd params included for Airflow 1.10 backward compatibility; moved to [metrics] in 2.0
statsd_on: '{{ ternary "True" "False" .Values.statsd.enabled }}'
statsd_port: 9125
Expand Down
2 changes: 2 additions & 0 deletions helm_tests/airflow_aux/test_basic_helm_chart.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,10 @@ def test_basic_deployments(self, version):
expected.update(
(
("Deployment", "test-basic-api-server"),
("Deployment", "test-basic-dag-processor"),
("Service", "test-basic-api-server"),
("ServiceAccount", "test-basic-api-server"),
("ServiceAccount", "test-basic-dag-processor"),
("Service", "test-basic-triggerer"),
)
)
Expand Down
39 changes: 39 additions & 0 deletions helm_tests/airflow_aux/test_configmap.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,3 +201,42 @@ def test_expected_default_dag_folder(self, dag_values, expected_default_dag_fold
cfg = jmespath.search('data."airflow.cfg"', docs[0])
expected_folder_config = f"dags_folder = {expected_default_dag_folder}"
assert expected_folder_config in cfg.splitlines()

@pytest.mark.parametrize(
"airflow_version, enabled",
[
("2.10.4", False),
("3.0.0", True),
],
)
def test_default_standalone_dag_processor_by_airflow_version(self, airflow_version, enabled):
docs = render_chart(
values={"airflowVersion": airflow_version},
show_only=["templates/configmaps/configmap.yaml"],
)

cfg = jmespath.search('data."airflow.cfg"', docs[0])
expected_line = f"standalone_dag_processor = {enabled}"
assert expected_line in cfg.splitlines()

@pytest.mark.parametrize(
"airflow_version, enabled",
[
("2.10.4", False),
("2.10.4", True),
("3.0.0", False),
("3.0.0", True),
],
)
def test_standalone_dag_processor_explicit(self, airflow_version, enabled):
docs = render_chart(
values={
"airflowVersion": airflow_version,
"config": {"scheduler": {"standalone_dag_processor": enabled}},
},
show_only=["templates/configmaps/configmap.yaml"],
)

cfg = jmespath.search('data."airflow.cfg"', docs[0])
expected_line = f"standalone_dag_processor = {str(enabled).lower()}"
assert expected_line in cfg.splitlines()
39 changes: 38 additions & 1 deletion helm_tests/airflow_core/test_dag_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,44 @@ def test_only_exists_on_new_airflow_versions(self, airflow_version, num_docs):
show_only=["templates/dag-processor/dag-processor-deployment.yaml"],
)

assert num_docs == len(docs)
assert len(docs) == num_docs

@pytest.mark.parametrize(
"airflow_version, num_docs",
[
("2.10.4", 0),
("3.0.0", 1),
],
)
def test_enabled_by_airflow_version(self, airflow_version, num_docs):
"""Tests that Dag Processor is enabled by default with Airflow 3"""
docs = render_chart(
values={"airflowVersion": airflow_version},
show_only=["templates/dag-processor/dag-processor-deployment.yaml"],
)

assert len(docs) == num_docs

@pytest.mark.parametrize(
"airflow_version, enabled",
[
("2.10.4", False),
("2.10.4", True),
("3.0.0", False),
("3.0.0", True),
],
)
def test_enabled_explicit(self, airflow_version, enabled):
"""Tests that Dag Processor can be enabled/disabled regardless of version"""
docs = render_chart(
values={"airflowVersion": airflow_version, "dagProcessor": {"enabled": enabled}},
show_only=["templates/dag-processor/dag-processor-deployment.yaml"],
)

if enabled:
assert len(docs) == 1
else:
assert len(docs) == 0

def test_can_be_disabled(self):
"""Standalone Dag Processor is disabled by default."""
Expand Down
47 changes: 31 additions & 16 deletions helm_tests/airflow_core/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -759,34 +759,49 @@ def test_dags_gitsync_sidecar_and_init_container(self, dags_values):
]

@pytest.mark.parametrize(
"dag_processor, executor, skip_dags_mount",
"airflow_version, dag_processor, executor, skip_dags_mount",
[
(True, "LocalExecutor", False),
(True, "CeleryExecutor", True),
(True, "KubernetesExecutor", True),
(True, "LocalKubernetesExecutor", False),
(False, "LocalExecutor", False),
(False, "CeleryExecutor", False),
(False, "KubernetesExecutor", False),
(False, "LocalKubernetesExecutor", False),
# standalone dag_processor is optional on 2.10, so we can skip dags for non-local if its on
("2.10.4", True, "LocalExecutor", False),
("2.10.4", True, "CeleryExecutor", True),
("2.10.4", True, "KubernetesExecutor", True),
("2.10.4", True, "LocalKubernetesExecutor", False),
# but if standalone dag_processor is off, we must always have dags
("2.10.4", False, "LocalExecutor", False),
("2.10.4", False, "CeleryExecutor", False),
("2.10.4", False, "KubernetesExecutor", False),
("2.10.4", False, "LocalKubernetesExecutor", False),
# by default, we don't have a standalone dag_processor
("2.10.4", None, "LocalExecutor", False),
("2.10.4", None, "CeleryExecutor", False),
("2.10.4", None, "KubernetesExecutor", False),
("2.10.4", None, "LocalKubernetesExecutor", False),
# but in airflow 3, standalone dag_processor required, so we again can skip dags for non-local
("3.0.0", None, "LocalExecutor", False),
("3.0.0", None, "CeleryExecutor", True),
("3.0.0", None, "KubernetesExecutor", True),
("3.0.0", None, "LocalKubernetesExecutor", False),
],
)
def test_dags_mount_and_gitsync_expected_with_dag_processor(
self, dag_processor, executor, skip_dags_mount
self, airflow_version, dag_processor, executor, skip_dags_mount
):
"""
DAG Processor can move gitsync and DAGs mount from the scheduler to the DAG Processor only.
The only exception is when we have a Local executor.
In these cases, the scheduler does the worker role and needs access to DAGs anyway.
"""
values = {
"airflowVersion": airflow_version,
"executor": executor,
"dags": {"gitSync": {"enabled": True}, "persistence": {"enabled": True}},
"scheduler": {"logGroomerSidecar": {"enabled": False}},
}
if dag_processor is not None:
values["dagProcessor"] = {"enabled": dag_processor}
docs = render_chart(
values={
"dagProcessor": {"enabled": dag_processor},
"executor": executor,
"dags": {"gitSync": {"enabled": True}, "persistence": {"enabled": True}},
"scheduler": {"logGroomerSidecar": {"enabled": False}},
},
values=values,
show_only=["templates/scheduler/scheduler-deployment.yaml"],
)

Expand Down
8 changes: 8 additions & 0 deletions helm_tests/security/test_rbac.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@

CUSTOM_SERVICE_ACCOUNT_NAMES = (
(CUSTOM_SCHEDULER_NAME := "TestScheduler"),
(CUSTOM_DAG_PROCESSOR_NAME := "TestDagProcessor"),
(CUSTOM_WEBSERVER_NAME := "TestWebserver"),
(CUSTOM_API_SERVER_NAME := "TestAPISserver"),
(CUSTOM_WORKER_NAME := "TestWorker"),
Expand Down Expand Up @@ -120,10 +121,12 @@ def _get_object_tuples(version, sa: bool = True):
(
("Service", "test-rbac-api-server"),
("Deployment", "test-rbac-api-server"),
("Deployment", "test-rbac-dag-processor"),
)
)
if sa:
tuples.append(("ServiceAccount", "test-rbac-api-server"))
tuples.append(("ServiceAccount", "test-rbac-dag-processor"))
return tuples

@parametrize_version
Expand All @@ -148,6 +151,7 @@ def test_deployments_no_rbac_no_sa(self, version):
},
"redis": {"serviceAccount": {"create": False}},
"scheduler": {"serviceAccount": {"create": False}},
"dagProcessor": {"serviceAccount": {"create": False}},
"webserver": {"serviceAccount": {"create": False}},
"apiServer": {"serviceAccount": {"create": False}},
"workers": {"serviceAccount": {"create": False}},
Expand Down Expand Up @@ -200,6 +204,7 @@ def test_deployments_with_rbac_no_sa(self, version):
},
},
"scheduler": {"serviceAccount": {"create": False}},
"dagProcessor": {"serviceAccount": {"create": False}},
"webserver": {"serviceAccount": {"create": False}},
"apiServer": {"serviceAccount": {"create": False}},
"workers": {"serviceAccount": {"create": False}},
Expand Down Expand Up @@ -260,6 +265,7 @@ def test_service_account_custom_names(self):
},
},
"scheduler": {"serviceAccount": {"name": CUSTOM_SCHEDULER_NAME}},
"dagProcessor": {"serviceAccount": {"name": CUSTOM_DAG_PROCESSOR_NAME}},
"webserver": {"serviceAccount": {"name": CUSTOM_WEBSERVER_NAME}},
"apiServer": {"serviceAccount": {"name": CUSTOM_API_SERVER_NAME}},
"workers": {"serviceAccount": {"name": CUSTOM_WORKER_NAME}},
Expand Down Expand Up @@ -298,6 +304,7 @@ def test_service_account_custom_names_in_objects(self):
},
},
"scheduler": {"serviceAccount": {"name": CUSTOM_SCHEDULER_NAME}},
"dagProcessor": {"serviceAccount": {"name": CUSTOM_DAG_PROCESSOR_NAME}},
"webserver": {"serviceAccount": {"name": CUSTOM_WEBSERVER_NAME}},
"apiServer": {"serviceAccount": {"name": CUSTOM_API_SERVER_NAME}},
"workers": {"serviceAccount": {"name": CUSTOM_WORKER_NAME}},
Expand Down Expand Up @@ -353,6 +360,7 @@ def test_service_account_without_resource(self):
]
service_account_names = [
"test-rbac-scheduler",
"test-rbac-dag-processor",
"test-rbac-webserver",
"test-rbac-api-server",
"test-rbac-triggerer",
Expand Down

0 comments on commit 9ac85c8

Please sign in to comment.