diff --git a/Pipfile b/Pipfile index c2a0c85..e9c2b67 100755 --- a/Pipfile +++ b/Pipfile @@ -4,11 +4,18 @@ url = "https://pypi.org/simple" verify_ssl = true [dev-packages] +black = "*" [packages] -apache-airflow = "==1.10.14" -zthreading = "*" +SQLAlchemy = "==1.3.23" +Flask-SQLAlchemy= "==2.4.4" +apache-airflow = "==1.10.15" +airflow-db-logger = "==1.0.5" kubernetes = "*" +zthreading = "*" [requires] python_version = "3.6" + +[pipenv] +allow_prereleases = true diff --git a/airflow_kubernetes_job_operator/kube_api/queries.py b/airflow_kubernetes_job_operator/kube_api/queries.py index 46033cd..89f1ad8 100755 --- a/airflow_kubernetes_job_operator/kube_api/queries.py +++ b/airflow_kubernetes_job_operator/kube_api/queries.py @@ -34,7 +34,14 @@ class LogLine: autodetect_kuberentes_log_level: bool = True detect_kubernetes_log_level: Callable = None - def __init__(self, pod_name: str, namespace: str, message: str, timestamp: datetime = None): + def __init__( + self, + pod_name: str, + container_name: str, + namespace: str, + message: str, + timestamp: datetime = None, + ): """GetPodLogs log line generated info object. Args: @@ -47,6 +54,7 @@ def __init__(self, pod_name: str, namespace: str, message: str, timestamp: datet self.pod_name = pod_name self.namespace = namespace self.message = message + self.container_name = container_name self.timestamp = timestamp or datetime.now() def log(self, logger: Logger = kube_logger): @@ -64,8 +72,14 @@ def __str__(self): return self.message def __repr__(self): - timestamp = f"[{self.timestamp}]" if self.show_kubernetes_log_timestamps else "" - return timestamp + f"[{self.namespace}/pods/{self.pod_name}]: {self.message}" + header_parts = [ + f"{self.timestamp}" if self.show_kubernetes_log_timestamps else None, + f"{self.namespace}/pods/{self.pod_name}", + self.container_name, + ] + + header = "".join([f"[{p}]" for p in header_parts if p is not None]) + return f"{header}: {self.message}" class GetPodLogs(KubeApiRestQuery): @@ -76,6 +90,8 @@ def __init__( since: datetime = None, follow: bool = False, timeout: int = None, + container: str = None, + add_container_name_to_log: bool = None, ): """Returns the pod logs for a pod. Can follow the pod logs in real time. @@ -91,6 +107,7 @@ def __init__( """ assert not_empty_string(name), ValueError("name must be a non empty string") assert not_empty_string(namespace), ValueError("namespace must be a non empty string") + assert container is None or not_empty_string(container), ValueError("container must be a non empty string") kind: KubeResourceKind = KubeResourceKind.get_kind("Pod") super().__init__( @@ -104,15 +121,22 @@ def __init__( self.name: str = name self.namespace: str = namespace self.since: datetime = since + self.container = container self.query_params = { "follow": follow, "pretty": False, "timestamps": True, } + if container is not None: + self.query_params["container"] = container + self.since = since self._last_timestamp = None self._active_namespace = None + self.add_container_name_to_log = ( + add_container_name_to_log if add_container_name_to_log is not None else container is not None + ) def pre_request(self, client: "KubeApiRestClient"): super().pre_request(client) @@ -158,7 +182,15 @@ def parse_data(self, message_line: str): message = message.replace("\r", "") lines = [] for message_line in message.split("\n"): - lines.append(LogLine(self.name, self.namespace, message_line, timestamp)) + lines.append( + LogLine( + pod_name=self.name, + namespace=self.namespace, + message=message_line, + timestamp=timestamp, + container_name=self.container if self.add_container_name_to_log else None, + ) + ) return lines def emit_data(self, data): @@ -271,7 +303,7 @@ def __init__( ) def parse_data(self, data): - """ Override data parse """ + """Override data parse""" rslt = json.loads(data) prased = {} for grp in rslt.get("groups", []): diff --git a/airflow_kubernetes_job_operator/kube_api/watchers.py b/airflow_kubernetes_job_operator/kube_api/watchers.py index 9c9f501..1512a98 100755 --- a/airflow_kubernetes_job_operator/kube_api/watchers.py +++ b/airflow_kubernetes_job_operator/kube_api/watchers.py @@ -197,14 +197,25 @@ def emit_log(self, data): self.emit(self.pod_log_event_name, data) @thread_synchronized - def _create_pod_log_reader(self, uid: str, name: str, namespace: str, follow=True): + def _create_pod_log_reader( + self, + logger_id: str, + name: str, + namespace: str, + container: str = None, + follow=True, + is_single=False, + ): read_logs = GetPodLogs( name=name, namespace=namespace, since=self.pod_log_since, follow=follow, + container=container, + add_container_name_to_log=False if is_single else True, ) - self._executing_pod_loggers[uid] = read_logs + + self._executing_pod_loggers[logger_id] = read_logs return read_logs def process_data_state(self, data: dict, client: KubeApiRestClient): @@ -226,42 +237,62 @@ def process_data_state(self, data: dict, client: KubeApiRestClient): if state.deleted: del self._object_states[uid] - if self.watch_pod_logs and kind == "pod" and uid not in self._executing_pod_loggers: + if self.watch_pod_logs and kind == "pod": name = data["metadata"]["name"] namesoace = data["metadata"]["namespace"] pod_status = data["status"]["phase"] + if pod_status != "Pending": - osw = self._object_states.get(uid) - read_logs = self._create_pod_log_reader( - uid=uid, - name=name, - namespace=namesoace, - ) + containers = data["spec"]["containers"] + is_single = len(containers) < 2 + for container in containers: + if not isinstance(container, dict): + continue - osw.emit(self.pod_logs_reader_started_event_name) + container_name = container.get("name", None) - def handle_error(sender, *args): - # Don't throw error if not running. - if not self.is_running: - return + assert isinstance(container_name, str) and len(container_name.strip()) > 0, KubeApiException( + "Invalid container name when reading logs" + ) - if len(args) == 0: - self.emit_error(KubeApiException("Unknown error from sender", sender)) - else: - self.emit_error(args[0]) + logger_id = f"{uid}/{container_name}" - # binding only relevant events. - read_logs.on(read_logs.data_event_name, lambda line: self.emit_log(line)) - read_logs.on(read_logs.error_event_name, handle_error) - client.query_async(read_logs) + if logger_id in self._executing_pod_loggers: + continue + + osw = self._object_states.get(uid) + read_logs = self._create_pod_log_reader( + logger_id=logger_id, + name=name, + namespace=namesoace, + container=container.get("name", None), + is_single=is_single, + ) + + osw.emit(self.pod_logs_reader_started_event_name, container=container_name) + + def handle_error(sender, *args): + # Don't throw error if not running. + if not self.is_running: + return + + if len(args) == 0: + self.emit_error(KubeApiException("Unknown error from sender", sender)) + else: + self.emit_error(args[0]) + + # binding only relevant events. + read_logs.on(read_logs.data_event_name, lambda line: self.emit_log(line)) + read_logs.on(read_logs.error_event_name, handle_error) + client.query_async(read_logs) def _stop_all_loggers( self, timeout: float = None, throw_error_if_not_running: bool = None, ): - for q in list(self._executing_pod_loggers.values()): - q.stop(timeout=timeout, throw_error_if_not_running=throw_error_if_not_running) + for pod_logger in list(self._executing_pod_loggers.values()): + pod_logger.stop(timeout=timeout, throw_error_if_not_running=throw_error_if_not_running) def stop( self, @@ -294,7 +325,8 @@ def log_event(self, logger: Logger, ev: Event): line.log(logger) elif ev.name == self.pod_logs_reader_started_event_name: osw: NamespaceWatchQueryResourceState = ev.sender - logger.info(f"[{osw.namespace}/{osw.kind_name.lower()}s/{osw.name}] Reading logs") + container_name = ev.kwargs.get("container", "[unknown container name]") + logger.info(f"[{osw.namespace}/{osw.kind_name.lower()}s/{osw.name}] Reading logs from {container_name}") def pipe_to_logger(self, logger: Logger = kube_logger, allowed_event_names=None) -> int: allowed_event_names = set( diff --git a/tests/dags/templates/test_multi_container_pod.yaml b/tests/dags/templates/test_multi_container_pod.yaml new file mode 100644 index 0000000..4248b92 --- /dev/null +++ b/tests/dags/templates/test_multi_container_pod.yaml @@ -0,0 +1,41 @@ +apiVersion: v1 +kind: Pod +metadata: + name: 'multi-container-test' + labels: + app: 'multi-container-test' +spec: + restartPolicy: Never + containers: + - name: container1 + image: 'alpine:latest' + command: + - sh + - -c + - | + echo starting sleep... + sleep 10 + echo end + resources: + limits: + cpu: 200m + memory: 500Mi + requests: + cpu: 100m + memory: 200Mi + - name: container2 + image: 'alpine:latest' + command: + - sh + - -c + - | + echo starting sleep... + sleep 10 + echo end + resources: + limits: + cpu: 200m + memory: 500Mi + requests: + cpu: 100m + memory: 200Mi diff --git a/tests/dags/test_double_log.py b/tests/dags/test_double_log.py index ad4c388..1358cea 100755 --- a/tests/dags/test_double_log.py +++ b/tests/dags/test_double_log.py @@ -1,11 +1,11 @@ import os -from utils import default_args, resolve_file +from utils import default_args, name_from_file from airflow import DAG from airflow_kubernetes_job_operator.kubernetes_job_operator import KubernetesJobOperator dag = DAG( - "kub-job-op-custom", + name_from_file(__file__), default_args=default_args, description="Test base job operator", schedule_interval=None, diff --git a/tests/dags/test_job_operator.py b/tests/dags/test_job_operator.py index b5a159b..e783d24 100755 --- a/tests/dags/test_job_operator.py +++ b/tests/dags/test_job_operator.py @@ -1,10 +1,10 @@ -from utils import default_args +from utils import default_args, name_from_file from datetime import timedelta from airflow import DAG from airflow_kubernetes_job_operator.kubernetes_job_operator import KubernetesJobOperator dag = DAG( - "kub-job-op", + name_from_file(__file__), default_args=default_args, description="Test base job operator", schedule_interval=None, diff --git a/tests/dags/test_job_operator_config_file.py b/tests/dags/test_job_operator_config_file.py deleted file mode 100755 index 838c2a2..0000000 --- a/tests/dags/test_job_operator_config_file.py +++ /dev/null @@ -1,20 +0,0 @@ -# TODO: finish this test last -# import os -# from airflow import DAG -# from airflow_kubernetes_job_operator.kubernetes_job_operator import KubernetesJobOperator -# from airflow.utils.dates import days_ago - -# default_args = {"owner": "tester", "start_date": days_ago(2), "retries": 0} -# dag = DAG( -# "job-operator-config-file", default_args=default_args, description="Test base job operator", schedule_interval=None, -# ) - -# job_task = KubernetesJobOperator( -# task_id="test-job", -# dag=dag, -# image="ubuntu", -# in_cluster=False, -# cluster_context="docker-desktop", -# config_file=os.path.expanduser("~/.kube/config_special"), -# command=["bash", "-c", 'echo "all ok"'], -# ) diff --git a/tests/dags/test_job_operator_custom.py b/tests/dags/test_job_operator_custom.py deleted file mode 100755 index ae58efd..0000000 --- a/tests/dags/test_job_operator_custom.py +++ /dev/null @@ -1,39 +0,0 @@ -import os -from utils import default_args, resolve_file -from airflow import DAG -from airflow_kubernetes_job_operator.kubernetes_job_operator import KubernetesJobOperator - - -dag = DAG( - "kub-job-op-custom", - default_args=default_args, - description="Test base job operator", - schedule_interval=None, - catchup=False, -) - -basepath = os.path.dirname(__file__) -template_path = "templates" - -envs = { - "PASS_ARG": "a test", -} - -KubernetesJobOperator( - task_id="test-job-custom-success", - body_filepath=resolve_file("./.local/test_custom.success.yaml"), - envs=envs, - dag=dag, -) - -KubernetesJobOperator( - task_id="test-job-custom-fail", - body_filepath=resolve_file("./.local/test_custom.fail.yaml"), - envs=envs, - dag=dag, -) - - -if __name__ == "__main__": - dag.clear(reset_dag_runs=True) - dag.run() diff --git a/tests/dags/test_job_operator_jinja.py b/tests/dags/test_job_operator_jinja.py index 6fb582e..9e6e58d 100755 --- a/tests/dags/test_job_operator_jinja.py +++ b/tests/dags/test_job_operator_jinja.py @@ -1,14 +1,13 @@ -from utils import default_args +from utils import default_args, name_from_file from datetime import timedelta from airflow import DAG from airflow_kubernetes_job_operator import ( KubernetesJobOperator, JobRunnerDeletePolicy, - KubernetesLegacyJobOperator, ) dag = DAG( - "kub-job-op-test-jinja", + name_from_file(__file__), default_args=default_args, description="Test base job operator", schedule_interval=None, @@ -40,31 +39,6 @@ jinja_job_args={"test": "lama"}, ) -# bash_script = """ -# #/usr/bin/env bash -# echo "Legacy start for taskid {{ti.task_id}} {{job.test}}" -# cur_count=0 -# while true; do -# cur_count=$((cur_count + 1)) -# if [ "$cur_count" -ge "$TIC_COUNT" ]; then -# break -# fi -# date -# sleep 1 -# done - -# echo "Complete" -# """ -# KubernetesLegacyJobOperator( -# task_id="legacy-test-job-success", -# image="{{default_image}}", -# cmds=["bash", "-c", bash_script], -# dag=dag, -# is_delete_operator_pod=True, -# env_vars=envs, -# delete_policy=default_delete_policy, -# ) - if __name__ == "__main__": dag.clear(reset_dag_runs=True) dag.run() diff --git a/tests/dags/test_job_operator_long_jobs.py b/tests/dags/test_job_operator_long_jobs.py index 1654918..b89733b 100755 --- a/tests/dags/test_job_operator_long_jobs.py +++ b/tests/dags/test_job_operator_long_jobs.py @@ -1,10 +1,10 @@ from datetime import timedelta -from utils import resolve_file, default_args +from utils import default_args, name_from_file from airflow import DAG from airflow_kubernetes_job_operator.kubernetes_job_operator import KubernetesJobOperator dag = DAG( - "kub-job-op-long", + name_from_file(__file__), default_args=default_args, description="Test base job operator", schedule_interval=None, @@ -19,7 +19,7 @@ KubernetesJobOperator( task_id="test-long-job-success", - body_filepath=resolve_file("./templates/test_long_job.yaml"), + body_filepath="./templates/test_long_job.yaml", envs={ "PASS_ARG": "a long test", "TIC_COUNT": str(total_time_seconds), diff --git a/tests/dags/test_job_operator_with_service.py b/tests/dags/test_job_operator_with_service.py index 24f3859..f29e4a5 100755 --- a/tests/dags/test_job_operator_with_service.py +++ b/tests/dags/test_job_operator_with_service.py @@ -1,5 +1,4 @@ -from utils import default_args -from datetime import timedelta +from utils import default_args, name_from_file from airflow import DAG from airflow_kubernetes_job_operator import ( KubernetesJobOperator, @@ -7,7 +6,7 @@ ) dag = DAG( - "kub-job-op-test-jinja", + name_from_file(__file__), default_args=default_args, description="Test base job operator", schedule_interval=None, diff --git a/tests/dags/test_legacy_job_operator.py b/tests/dags/test_legacy_job_operator.py index 4d0c6a8..1c633c4 100755 --- a/tests/dags/test_legacy_job_operator.py +++ b/tests/dags/test_legacy_job_operator.py @@ -1,9 +1,9 @@ -from utils import default_args +from utils import default_args, name_from_file from airflow import DAG from airflow_kubernetes_job_operator.kubernetes_legacy_job_operator import KubernetesLegacyJobOperator dag = DAG( - "kub-job-op-legacy", + name_from_file(__file__), default_args=default_args, description="Test base job operator", schedule_interval=None, diff --git a/tests/dags/test_multi_container_pod.py b/tests/dags/test_multi_container_pod.py new file mode 100755 index 0000000..575bd20 --- /dev/null +++ b/tests/dags/test_multi_container_pod.py @@ -0,0 +1,29 @@ +from utils import default_args, name_from_file +from datetime import timedelta +from airflow import DAG +from airflow_kubernetes_job_operator.kubernetes_job_operator import KubernetesJobOperator + +dag = DAG( + name_from_file(__file__), + default_args=default_args, + description="Test base job operator", + schedule_interval=None, + catchup=False, +) + +namespace = None +default_delete_policy = "IfSucceeded" + +with dag: + KubernetesJobOperator( + task_id="two-containers", + namespace=namespace, + body_filepath="./templates/test_multi_container_pod.yaml", + dag=dag, + delete_policy=default_delete_policy, + ) + + +if __name__ == "__main__": + dag.clear(reset_dag_runs=True) + dag.run() diff --git a/tests/dags/utils.py b/tests/dags/utils.py index 8216fd5..9724338 100755 --- a/tests/dags/utils.py +++ b/tests/dags/utils.py @@ -1,70 +1,8 @@ import os -from airflow_kubernetes_job_operator.kube_api import KubeApiConfiguration, KubeResourceKind -# from airflow_kubernetes_job_operator.config import DEFAULT_EXECUTION_OBJECT_PATHS -# from airflow_kubernetes_job_operator.utils import resolve_relative_path - -# resolve_relative_path(DEFAULT_EXECUTION_OBJECT_PATHS.values()[0]) - -# import airflow.configuration -import warnings -import logging -import sys - -KubeApiConfiguration.add_kube_config_search_location("~/composer_kube_config") # second -KubeApiConfiguration.add_kube_config_search_location("~/gcs/dags/config/hcjobs-kubeconfig.yaml") # first -KubeApiConfiguration.set_default_namespace("cdm-hcjobs") - -KubeApiConfiguration.register_kind( - name="HCJob", - api_version="hc.dto.cbsinteractive.com/v1alpha1", - parse_kind_state=KubeResourceKind.parse_state_job, -) - - -logging.basicConfig(level=logging.INFO) -warnings.filterwarnings("ignore", category=DeprecationWarning) - - -def print_default_kube_configuration(): - try: - config = KubeApiConfiguration.load_kubernetes_configuration() - assert config is not None - - print_version = str(sys.version).replace("\n", " ") - logging.info( - f""" - ----------------------------------------------------------------------- - Context: {KubeApiConfiguration.get_active_context_info(config)} - home directory: {os.path.expanduser('~')} - Config host: {config.host} - Config filepath: {config.filepath} - Default namespace: {KubeApiConfiguration.get_default_namespace(config)} - Executing dags in python version: {print_version} - ----------------------------------------------------------------------- - """ - ) - except Exception as ex: - logging.error( - """ - ----------------------------------------------------------------------- - Failed to retrive config, kube config could not be loaded. - ---------------------------------------------------------------------- - """, - ex, - ) - - -print_default_kube_configuration() +import re default_args = {"owner": "tester", "start_date": "1/1/2020", "retries": 0} -DAGS_PATH = os.path.dirname(__file__) - -def resolve_file(fpath: str): - if fpath.startswith("."): - if fpath.startswith("./"): - fpath = os.path.join(DAGS_PATH, fpath[2:]) - else: - fpath = os.path.join(DAGS_PATH, fpath) - return os.path.abspath(fpath) +def name_from_file(fpath): + return re.sub(r"[^a-zA-Z0-9-]", "-", os.path.splitext(os.path.basename(fpath))[0])