From 9ac85c89dcc5f1defc2bb949497ed58d3fc67bef Mon Sep 17 00:00:00 2001 From: Jed Cunningham <66968678+jedcunningham@users.noreply.github.com> Date: Wed, 15 Jan 2025 00:54:02 -0700 Subject: [PATCH] Use standalone dag processor for AF3 in chart (#45659) We are moving to only support a standalone DAG processor in Airflow 3, so let's switch our chart to always use it. --- .../dag-processor-deployment.yaml | 6 ++- .../dag-processor-serviceaccount.yaml | 6 ++- .../scheduler/scheduler-deployment.yaml | 6 ++- chart/values.schema.json | 7 ++- chart/values.yaml | 4 +- .../airflow_aux/test_basic_helm_chart.py | 2 + helm_tests/airflow_aux/test_configmap.py | 39 +++++++++++++++ helm_tests/airflow_core/test_dag_processor.py | 39 ++++++++++++++- helm_tests/airflow_core/test_scheduler.py | 47 ++++++++++++------- helm_tests/security/test_rbac.py | 8 ++++ 10 files changed, 140 insertions(+), 24 deletions(-) diff --git a/chart/templates/dag-processor/dag-processor-deployment.yaml b/chart/templates/dag-processor/dag-processor-deployment.yaml index 47652b2159261..70c7f852dd3cb 100644 --- a/chart/templates/dag-processor/dag-processor-deployment.yaml +++ b/chart/templates/dag-processor/dag-processor-deployment.yaml @@ -21,7 +21,11 @@ ## Airflow Dag Processor Deployment ################################# {{- if semverCompare ">=2.3.0" .Values.airflowVersion }} -{{- if .Values.dagProcessor.enabled }} +{{- $enabled := .Values.dagProcessor.enabled }} +{{- if eq $enabled nil}} + {{ $enabled = ternary true false (semverCompare ">=3.0.0" .Values.airflowVersion) }} +{{- end }} +{{- if $enabled }} {{- $nodeSelector := or .Values.dagProcessor.nodeSelector .Values.nodeSelector }} {{- $affinity := or .Values.dagProcessor.affinity .Values.affinity }} {{- $tolerations := or .Values.dagProcessor.tolerations .Values.tolerations }} diff --git a/chart/templates/dag-processor/dag-processor-serviceaccount.yaml b/chart/templates/dag-processor/dag-processor-serviceaccount.yaml index 5d386e4af6f68..8fdae4abe1f32 100644 --- a/chart/templates/dag-processor/dag-processor-serviceaccount.yaml +++ b/chart/templates/dag-processor/dag-processor-serviceaccount.yaml @@ -21,7 +21,11 @@ ## Airflow Dag Processor ServiceAccount ################################# {{- if semverCompare ">=2.3.0" .Values.airflowVersion }} -{{- if and .Values.dagProcessor.serviceAccount.create .Values.dagProcessor.enabled }} +{{- $enabled := .Values.dagProcessor.enabled }} +{{- if eq $enabled nil}} + {{ $enabled = ternary true false (semverCompare ">=3.0.0" .Values.airflowVersion) }} +{{- end }} +{{- if and .Values.dagProcessor.serviceAccount.create $enabled }} apiVersion: v1 kind: ServiceAccount automountServiceAccountToken: {{ .Values.dagProcessor.serviceAccount.automountServiceAccountToken }} diff --git a/chart/templates/scheduler/scheduler-deployment.yaml b/chart/templates/scheduler/scheduler-deployment.yaml index ed63f1ef46903..0b76eb4587a55 100644 --- a/chart/templates/scheduler/scheduler-deployment.yaml +++ b/chart/templates/scheduler/scheduler-deployment.yaml @@ -29,7 +29,11 @@ # If we're using a StatefulSet {{- $stateful := and $local $persistence }} # We can skip DAGs mounts on scheduler if dagProcessor is enabled, except with $local mode -{{- $localOrDagProcessorDisabled := or (not .Values.dagProcessor.enabled) $local }} +{{- $dagProcessorEnabled := .Values.dagProcessor.enabled }} +{{- if eq $dagProcessorEnabled nil}} + {{ $dagProcessorEnabled = ternary true false (semverCompare ">=3.0.0" .Values.airflowVersion) }} +{{- end }} +{{- $localOrDagProcessorDisabled := or (not $dagProcessorEnabled) $local }} # If we're using elasticsearch or opensearch logging {{- $remoteLogging := or .Values.elasticsearch.enabled .Values.opensearch.enabled }} {{- $nodeSelector := or .Values.scheduler.nodeSelector .Values.nodeSelector }} diff --git a/chart/values.schema.json b/chart/values.schema.json index 597c2c475f8bf..44fb26f4add5f 100644 --- a/chart/values.schema.json +++ b/chart/values.schema.json @@ -3586,8 +3586,11 @@ "properties": { "enabled": { "description": "Enable standalone dag processor (requires Airflow 2.3.0+).", - "type": "boolean", - "default": false + "type": [ + "boolean", + "null" + ], + "default": null }, "livenessProbe": { "description": "Liveness probe configuration for dag processor.", diff --git a/chart/values.yaml b/chart/values.yaml index 8032cef940b7d..8f084d3eb7624 100644 --- a/chart/values.yaml +++ b/chart/values.yaml @@ -1798,7 +1798,7 @@ triggerer: # Airflow Dag Processor Config dagProcessor: - enabled: false + enabled: ~ # Number of airflow dag processors in the deployment replicas: 1 # Max number of old replicasets to retain @@ -2640,7 +2640,7 @@ config: flower_url_prefix: '{{ ternary "" .Values.ingress.flower.path (eq .Values.ingress.flower.path "/") }}' worker_concurrency: 16 scheduler: - standalone_dag_processor: '{{ ternary "True" "False" .Values.dagProcessor.enabled }}' + standalone_dag_processor: '{{ ternary "True" "False" (or (semverCompare ">=3.0.0" .Values.airflowVersion) (.Values.dagProcessor.enabled | default false)) }}' # statsd params included for Airflow 1.10 backward compatibility; moved to [metrics] in 2.0 statsd_on: '{{ ternary "True" "False" .Values.statsd.enabled }}' statsd_port: 9125 diff --git a/helm_tests/airflow_aux/test_basic_helm_chart.py b/helm_tests/airflow_aux/test_basic_helm_chart.py index c9dff5476e530..52ff7facf4ba9 100644 --- a/helm_tests/airflow_aux/test_basic_helm_chart.py +++ b/helm_tests/airflow_aux/test_basic_helm_chart.py @@ -143,8 +143,10 @@ def test_basic_deployments(self, version): expected.update( ( ("Deployment", "test-basic-api-server"), + ("Deployment", "test-basic-dag-processor"), ("Service", "test-basic-api-server"), ("ServiceAccount", "test-basic-api-server"), + ("ServiceAccount", "test-basic-dag-processor"), ("Service", "test-basic-triggerer"), ) ) diff --git a/helm_tests/airflow_aux/test_configmap.py b/helm_tests/airflow_aux/test_configmap.py index c0f8d2409a26c..45a0a82a80e16 100644 --- a/helm_tests/airflow_aux/test_configmap.py +++ b/helm_tests/airflow_aux/test_configmap.py @@ -201,3 +201,42 @@ def test_expected_default_dag_folder(self, dag_values, expected_default_dag_fold cfg = jmespath.search('data."airflow.cfg"', docs[0]) expected_folder_config = f"dags_folder = {expected_default_dag_folder}" assert expected_folder_config in cfg.splitlines() + + @pytest.mark.parametrize( + "airflow_version, enabled", + [ + ("2.10.4", False), + ("3.0.0", True), + ], + ) + def test_default_standalone_dag_processor_by_airflow_version(self, airflow_version, enabled): + docs = render_chart( + values={"airflowVersion": airflow_version}, + show_only=["templates/configmaps/configmap.yaml"], + ) + + cfg = jmespath.search('data."airflow.cfg"', docs[0]) + expected_line = f"standalone_dag_processor = {enabled}" + assert expected_line in cfg.splitlines() + + @pytest.mark.parametrize( + "airflow_version, enabled", + [ + ("2.10.4", False), + ("2.10.4", True), + ("3.0.0", False), + ("3.0.0", True), + ], + ) + def test_standalone_dag_processor_explicit(self, airflow_version, enabled): + docs = render_chart( + values={ + "airflowVersion": airflow_version, + "config": {"scheduler": {"standalone_dag_processor": enabled}}, + }, + show_only=["templates/configmaps/configmap.yaml"], + ) + + cfg = jmespath.search('data."airflow.cfg"', docs[0]) + expected_line = f"standalone_dag_processor = {str(enabled).lower()}" + assert expected_line in cfg.splitlines() diff --git a/helm_tests/airflow_core/test_dag_processor.py b/helm_tests/airflow_core/test_dag_processor.py index 6e19b68ae286d..cc478703c384e 100644 --- a/helm_tests/airflow_core/test_dag_processor.py +++ b/helm_tests/airflow_core/test_dag_processor.py @@ -43,7 +43,44 @@ def test_only_exists_on_new_airflow_versions(self, airflow_version, num_docs): show_only=["templates/dag-processor/dag-processor-deployment.yaml"], ) - assert num_docs == len(docs) + assert len(docs) == num_docs + + @pytest.mark.parametrize( + "airflow_version, num_docs", + [ + ("2.10.4", 0), + ("3.0.0", 1), + ], + ) + def test_enabled_by_airflow_version(self, airflow_version, num_docs): + """Tests that Dag Processor is enabled by default with Airflow 3""" + docs = render_chart( + values={"airflowVersion": airflow_version}, + show_only=["templates/dag-processor/dag-processor-deployment.yaml"], + ) + + assert len(docs) == num_docs + + @pytest.mark.parametrize( + "airflow_version, enabled", + [ + ("2.10.4", False), + ("2.10.4", True), + ("3.0.0", False), + ("3.0.0", True), + ], + ) + def test_enabled_explicit(self, airflow_version, enabled): + """Tests that Dag Processor can be enabled/disabled regardless of version""" + docs = render_chart( + values={"airflowVersion": airflow_version, "dagProcessor": {"enabled": enabled}}, + show_only=["templates/dag-processor/dag-processor-deployment.yaml"], + ) + + if enabled: + assert len(docs) == 1 + else: + assert len(docs) == 0 def test_can_be_disabled(self): """Standalone Dag Processor is disabled by default.""" diff --git a/helm_tests/airflow_core/test_scheduler.py b/helm_tests/airflow_core/test_scheduler.py index 2c3fb0f65cdf2..09e46e70bad8d 100644 --- a/helm_tests/airflow_core/test_scheduler.py +++ b/helm_tests/airflow_core/test_scheduler.py @@ -759,20 +759,32 @@ def test_dags_gitsync_sidecar_and_init_container(self, dags_values): ] @pytest.mark.parametrize( - "dag_processor, executor, skip_dags_mount", + "airflow_version, dag_processor, executor, skip_dags_mount", [ - (True, "LocalExecutor", False), - (True, "CeleryExecutor", True), - (True, "KubernetesExecutor", True), - (True, "LocalKubernetesExecutor", False), - (False, "LocalExecutor", False), - (False, "CeleryExecutor", False), - (False, "KubernetesExecutor", False), - (False, "LocalKubernetesExecutor", False), + # standalone dag_processor is optional on 2.10, so we can skip dags for non-local if its on + ("2.10.4", True, "LocalExecutor", False), + ("2.10.4", True, "CeleryExecutor", True), + ("2.10.4", True, "KubernetesExecutor", True), + ("2.10.4", True, "LocalKubernetesExecutor", False), + # but if standalone dag_processor is off, we must always have dags + ("2.10.4", False, "LocalExecutor", False), + ("2.10.4", False, "CeleryExecutor", False), + ("2.10.4", False, "KubernetesExecutor", False), + ("2.10.4", False, "LocalKubernetesExecutor", False), + # by default, we don't have a standalone dag_processor + ("2.10.4", None, "LocalExecutor", False), + ("2.10.4", None, "CeleryExecutor", False), + ("2.10.4", None, "KubernetesExecutor", False), + ("2.10.4", None, "LocalKubernetesExecutor", False), + # but in airflow 3, standalone dag_processor required, so we again can skip dags for non-local + ("3.0.0", None, "LocalExecutor", False), + ("3.0.0", None, "CeleryExecutor", True), + ("3.0.0", None, "KubernetesExecutor", True), + ("3.0.0", None, "LocalKubernetesExecutor", False), ], ) def test_dags_mount_and_gitsync_expected_with_dag_processor( - self, dag_processor, executor, skip_dags_mount + self, airflow_version, dag_processor, executor, skip_dags_mount ): """ DAG Processor can move gitsync and DAGs mount from the scheduler to the DAG Processor only. @@ -780,13 +792,16 @@ def test_dags_mount_and_gitsync_expected_with_dag_processor( The only exception is when we have a Local executor. In these cases, the scheduler does the worker role and needs access to DAGs anyway. """ + values = { + "airflowVersion": airflow_version, + "executor": executor, + "dags": {"gitSync": {"enabled": True}, "persistence": {"enabled": True}}, + "scheduler": {"logGroomerSidecar": {"enabled": False}}, + } + if dag_processor is not None: + values["dagProcessor"] = {"enabled": dag_processor} docs = render_chart( - values={ - "dagProcessor": {"enabled": dag_processor}, - "executor": executor, - "dags": {"gitSync": {"enabled": True}, "persistence": {"enabled": True}}, - "scheduler": {"logGroomerSidecar": {"enabled": False}}, - }, + values=values, show_only=["templates/scheduler/scheduler-deployment.yaml"], ) diff --git a/helm_tests/security/test_rbac.py b/helm_tests/security/test_rbac.py index d163a2ed68373..0eecb9aadeb4d 100644 --- a/helm_tests/security/test_rbac.py +++ b/helm_tests/security/test_rbac.py @@ -80,6 +80,7 @@ CUSTOM_SERVICE_ACCOUNT_NAMES = ( (CUSTOM_SCHEDULER_NAME := "TestScheduler"), + (CUSTOM_DAG_PROCESSOR_NAME := "TestDagProcessor"), (CUSTOM_WEBSERVER_NAME := "TestWebserver"), (CUSTOM_API_SERVER_NAME := "TestAPISserver"), (CUSTOM_WORKER_NAME := "TestWorker"), @@ -120,10 +121,12 @@ def _get_object_tuples(version, sa: bool = True): ( ("Service", "test-rbac-api-server"), ("Deployment", "test-rbac-api-server"), + ("Deployment", "test-rbac-dag-processor"), ) ) if sa: tuples.append(("ServiceAccount", "test-rbac-api-server")) + tuples.append(("ServiceAccount", "test-rbac-dag-processor")) return tuples @parametrize_version @@ -148,6 +151,7 @@ def test_deployments_no_rbac_no_sa(self, version): }, "redis": {"serviceAccount": {"create": False}}, "scheduler": {"serviceAccount": {"create": False}}, + "dagProcessor": {"serviceAccount": {"create": False}}, "webserver": {"serviceAccount": {"create": False}}, "apiServer": {"serviceAccount": {"create": False}}, "workers": {"serviceAccount": {"create": False}}, @@ -200,6 +204,7 @@ def test_deployments_with_rbac_no_sa(self, version): }, }, "scheduler": {"serviceAccount": {"create": False}}, + "dagProcessor": {"serviceAccount": {"create": False}}, "webserver": {"serviceAccount": {"create": False}}, "apiServer": {"serviceAccount": {"create": False}}, "workers": {"serviceAccount": {"create": False}}, @@ -260,6 +265,7 @@ def test_service_account_custom_names(self): }, }, "scheduler": {"serviceAccount": {"name": CUSTOM_SCHEDULER_NAME}}, + "dagProcessor": {"serviceAccount": {"name": CUSTOM_DAG_PROCESSOR_NAME}}, "webserver": {"serviceAccount": {"name": CUSTOM_WEBSERVER_NAME}}, "apiServer": {"serviceAccount": {"name": CUSTOM_API_SERVER_NAME}}, "workers": {"serviceAccount": {"name": CUSTOM_WORKER_NAME}}, @@ -298,6 +304,7 @@ def test_service_account_custom_names_in_objects(self): }, }, "scheduler": {"serviceAccount": {"name": CUSTOM_SCHEDULER_NAME}}, + "dagProcessor": {"serviceAccount": {"name": CUSTOM_DAG_PROCESSOR_NAME}}, "webserver": {"serviceAccount": {"name": CUSTOM_WEBSERVER_NAME}}, "apiServer": {"serviceAccount": {"name": CUSTOM_API_SERVER_NAME}}, "workers": {"serviceAccount": {"name": CUSTOM_WORKER_NAME}}, @@ -353,6 +360,7 @@ def test_service_account_without_resource(self): ] service_account_names = [ "test-rbac-scheduler", + "test-rbac-dag-processor", "test-rbac-webserver", "test-rbac-api-server", "test-rbac-triggerer",