Skip to content

Commit

Permalink
Add integration test for other executors in kubernetes cluster (apach…
Browse files Browse the repository at this point in the history
…e#15695)

This change adds tests to run some example dags with other executors.

Steps to run tests with for example CeleryExecutor

```
./breeze kind-cluster start
./breeze kind-cluster --executor CeleryExecutor deploy
./breeze kind-cluster test
```
  • Loading branch information
ephraimbuddy authored May 11, 2021
1 parent 34cf525 commit cbc3cb8
Show file tree
Hide file tree
Showing 4 changed files with 333 additions and 194 deletions.
23 changes: 23 additions & 0 deletions TESTING.rst
Original file line number Diff line number Diff line change
Expand Up @@ -601,6 +601,17 @@ The deploy command performs those steps:
5. Applies the volumes.yaml to get the volumes deployed to ``default`` namespace - this is where
KubernetesExecutor starts its pods.

You can also specify a different executor by providing the ``--executor`` optional argument:

.. code-block:: bash
./breeze kind-cluster deploy --executor CeleryExecutor
Note that when you specify the ``--executor`` option, it becomes the default. Therefore, every other operations
on ``./breeze kind-cluster`` will default to using this executor. To change that, use the ``--executor`` option on the
subsequent commands too.


Running tests with Kubernetes Cluster
-------------------------------------

Expand All @@ -622,6 +633,12 @@ Running Kubernetes tests via breeze:
./breeze kind-cluster test
./breeze kind-cluster test -- TEST TEST [TEST ...]
Optionally add ``--executor``:

.. code-block:: bash
./breeze kind-cluster test --executor CeleryExecutor
./breeze kind-cluster test -- TEST TEST [TEST ...] --executor CeleryExecutor
Entering shell with Kubernetes Cluster
--------------------------------------
Expand All @@ -645,6 +662,12 @@ You can enter the shell via those scripts
./breeze kind-cluster shell
Optionally add ``--executor``:

.. code-block:: bash
./breeze kind-cluster shell --executor CeleryExecutor
K9s CLI - debug Kubernetes in style!
------------------------------------
Expand Down
209 changes: 209 additions & 0 deletions kubernetes_tests/test_base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

import os
import re
import subprocess
import time
import unittest
from datetime import datetime
from subprocess import check_call, check_output

import requests
import requests.exceptions
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

CLUSTER_FORWARDED_PORT = os.environ.get('CLUSTER_FORWARDED_PORT') or "8080"
KUBERNETES_HOST_PORT = (os.environ.get('CLUSTER_HOST') or "localhost") + ":" + CLUSTER_FORWARDED_PORT
EXECUTOR = os.environ.get("EXECUTOR")

print()
print(f"Cluster host/port used: ${KUBERNETES_HOST_PORT}")
print(f"Executor: {EXECUTOR}")
print()


class TestBase(unittest.TestCase):
@staticmethod
def _describe_resources(namespace: str):
print("=" * 80)
print(f"Describe resources for namespace {namespace}")
print(f"Datetime: {datetime.utcnow()}")
print("=" * 80)
print("Describing pods")
print("-" * 80)
subprocess.call(["kubectl", "describe", "pod", "--namespace", namespace])
print("=" * 80)
print("Describing persistent volumes")
print("-" * 80)
subprocess.call(["kubectl", "describe", "pv", "--namespace", namespace])
print("=" * 80)
print("Describing persistent volume claims")
print("-" * 80)
subprocess.call(["kubectl", "describe", "pvc", "--namespace", namespace])
print("=" * 80)

@staticmethod
def _num_pods_in_namespace(namespace):
air_pod = check_output(['kubectl', 'get', 'pods', '-n', namespace]).decode()
air_pod = air_pod.split('\n')
names = [re.compile(r'\s+').split(x)[0] for x in air_pod if 'airflow' in x]
return len(names)

@staticmethod
def _delete_airflow_pod(name=''):
suffix = '-' + name if name else ''
air_pod = check_output(['kubectl', 'get', 'pods']).decode()
air_pod = air_pod.split('\n')
names = [re.compile(r'\s+').split(x)[0] for x in air_pod if 'airflow' + suffix in x]
if names:
check_call(['kubectl', 'delete', 'pod', names[0]])

def _get_session_with_retries(self):
session = requests.Session()
session.auth = ('admin', 'admin')
retries = Retry(total=3, backoff_factor=1)
session.mount('http://', HTTPAdapter(max_retries=retries))
session.mount('https://', HTTPAdapter(max_retries=retries))
return session

def _ensure_airflow_webserver_is_healthy(self):
response = self.session.get(
f"http://{KUBERNETES_HOST_PORT}/health",
timeout=1,
)

assert response.status_code == 200

def setUp(self):
self.host = KUBERNETES_HOST_PORT
self.session = self._get_session_with_retries()
self._ensure_airflow_webserver_is_healthy()

def tearDown(self):
self.session.close()

def monitor_task(self, host, dag_run_id, dag_id, task_id, expected_final_state, timeout):
tries = 0
state = ''
max_tries = max(int(timeout / 5), 1)
# Wait some time for the operator to complete
while tries < max_tries:
time.sleep(5)
# Check task state
try:
get_string = (
f'http://{host}/api/v1/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}'
)
print(f"Calling [monitor_task]#1 {get_string}")
result = self.session.get(get_string)
if result.status_code == 404:
check_call(["echo", "api returned 404."])
tries += 1
continue
assert result.status_code == 200, "Could not get the status"
result_json = result.json()
print(f"Received [monitor_task]#2: {result_json}")
state = result_json['state']
print(f"Attempt {tries}: Current state of operator is {state}")

if state == expected_final_state:
break
self._describe_resources(namespace="airflow")
self._describe_resources(namespace="default")
tries += 1
except requests.exceptions.ConnectionError as e:
check_call(["echo", f"api call failed. trying again. error {e}"])
if state != expected_final_state:
print(f"The expected state is wrong {state} != {expected_final_state} (expected)!")
assert state == expected_final_state

def ensure_dag_expected_state(self, host, execution_date, dag_id, expected_final_state, timeout):
tries = 0
state = ''
max_tries = max(int(timeout / 5), 1)
# Wait some time for the operator to complete
while tries < max_tries:
time.sleep(5)
get_string = f'http://{host}/api/v1/dags/{dag_id}/dagRuns'
print(f"Calling {get_string}")
# Get all dagruns
result = self.session.get(get_string)
assert result.status_code == 200, "Could not get the status"
result_json = result.json()
print(f"Received: {result}")
state = None
for dag_run in result_json['dag_runs']:
if dag_run['execution_date'] == execution_date:
state = dag_run['state']
check_call(["echo", f"Attempt {tries}: Current state of dag is {state}"])
print(f"Attempt {tries}: Current state of dag is {state}")

if state == expected_final_state:
break
self._describe_resources("airflow")
self._describe_resources("default")
tries += 1
assert state == expected_final_state

# Maybe check if we can retrieve the logs, but then we need to extend the API

def start_dag(self, dag_id, host):
patch_string = f'http://{host}/api/v1/dags/{dag_id}'
print(f"Calling [start_dag]#1 {patch_string}")
result = self.session.patch(patch_string, json={'is_paused': False})
try:
result_json = result.json()
except ValueError:
result_json = str(result)
print(f"Received [start_dag]#1 {result_json}")
assert result.status_code == 200, f"Could not enable DAG: {result_json}"
post_string = f'http://{host}/api/v1/dags/{dag_id}/dagRuns'
print(f"Calling [start_dag]#2 {post_string}")
# Trigger a new dagrun
result = self.session.post(post_string, json={})
try:
result_json = result.json()
except ValueError:
result_json = str(result)
print(f"Received [start_dag]#2 {result_json}")
assert result.status_code == 200, f"Could not trigger a DAG-run: {result_json}"

time.sleep(1)

get_string = f'http://{host}/api/v1/dags/{dag_id}/dagRuns'
print(f"Calling [start_dag]#3 {get_string}")
result = self.session.get(get_string)
assert result.status_code == 200, f"Could not get DAGRuns: {result.json()}"
result_json = result.json()
print(f"Received: [start_dag]#3 {result_json}")
return result_json

def start_job_in_kubernetes(self, dag_id, host):
result_json = self.start_dag(dag_id=dag_id, host=host)
dag_runs = result_json['dag_runs']
assert len(dag_runs) > 0
execution_date = None
dag_run_id = None
for dag_run in dag_runs:
if dag_run['dag_id'] == dag_id:
execution_date = dag_run['execution_date']
dag_run_id = dag_run['dag_run_id']
break
assert execution_date is not None, f"No execution_date can be found for the dag with {dag_id}"
return dag_run_id, execution_date
Loading

0 comments on commit cbc3cb8

Please sign in to comment.