Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multi container pod logs #39

Merged
merged 3 commits into from
Apr 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,18 @@ url = "https://pypi.org/simple"
verify_ssl = true

[dev-packages]
black = "*"

[packages]
apache-airflow = "==1.10.14"
zthreading = "*"
SQLAlchemy = "==1.3.23"
Flask-SQLAlchemy= "==2.4.4"
apache-airflow = "==1.10.15"
airflow-db-logger = "==1.0.5"
kubernetes = "*"
zthreading = "*"

[requires]
python_version = "3.6"

[pipenv]
allow_prereleases = true
42 changes: 37 additions & 5 deletions airflow_kubernetes_job_operator/kube_api/queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,14 @@ class LogLine:
autodetect_kuberentes_log_level: bool = True
detect_kubernetes_log_level: Callable = None

def __init__(self, pod_name: str, namespace: str, message: str, timestamp: datetime = None):
def __init__(
self,
pod_name: str,
container_name: str,
namespace: str,
message: str,
timestamp: datetime = None,
):
"""GetPodLogs log line generated info object.

Args:
Expand All @@ -47,6 +54,7 @@ def __init__(self, pod_name: str, namespace: str, message: str, timestamp: datet
self.pod_name = pod_name
self.namespace = namespace
self.message = message
self.container_name = container_name
self.timestamp = timestamp or datetime.now()

def log(self, logger: Logger = kube_logger):
Expand All @@ -64,8 +72,14 @@ def __str__(self):
return self.message

def __repr__(self):
timestamp = f"[{self.timestamp}]" if self.show_kubernetes_log_timestamps else ""
return timestamp + f"[{self.namespace}/pods/{self.pod_name}]: {self.message}"
header_parts = [
f"{self.timestamp}" if self.show_kubernetes_log_timestamps else None,
f"{self.namespace}/pods/{self.pod_name}",
self.container_name,
]

header = "".join([f"[{p}]" for p in header_parts if p is not None])
return f"{header}: {self.message}"


class GetPodLogs(KubeApiRestQuery):
Expand All @@ -76,6 +90,8 @@ def __init__(
since: datetime = None,
follow: bool = False,
timeout: int = None,
container: str = None,
add_container_name_to_log: bool = None,
):
"""Returns the pod logs for a pod. Can follow the pod logs
in real time.
Expand All @@ -91,6 +107,7 @@ def __init__(
"""
assert not_empty_string(name), ValueError("name must be a non empty string")
assert not_empty_string(namespace), ValueError("namespace must be a non empty string")
assert container is None or not_empty_string(container), ValueError("container must be a non empty string")

kind: KubeResourceKind = KubeResourceKind.get_kind("Pod")
super().__init__(
Expand All @@ -104,15 +121,22 @@ def __init__(
self.name: str = name
self.namespace: str = namespace
self.since: datetime = since
self.container = container
self.query_params = {
"follow": follow,
"pretty": False,
"timestamps": True,
}

if container is not None:
self.query_params["container"] = container

self.since = since
self._last_timestamp = None
self._active_namespace = None
self.add_container_name_to_log = (
add_container_name_to_log if add_container_name_to_log is not None else container is not None
)

def pre_request(self, client: "KubeApiRestClient"):
super().pre_request(client)
Expand Down Expand Up @@ -158,7 +182,15 @@ def parse_data(self, message_line: str):
message = message.replace("\r", "")
lines = []
for message_line in message.split("\n"):
lines.append(LogLine(self.name, self.namespace, message_line, timestamp))
lines.append(
LogLine(
pod_name=self.name,
namespace=self.namespace,
message=message_line,
timestamp=timestamp,
container_name=self.container if self.add_container_name_to_log else None,
)
)
return lines

def emit_data(self, data):
Expand Down Expand Up @@ -271,7 +303,7 @@ def __init__(
)

def parse_data(self, data):
""" Override data parse """
"""Override data parse"""
rslt = json.loads(data)
prased = {}
for grp in rslt.get("groups", []):
Expand Down
82 changes: 57 additions & 25 deletions airflow_kubernetes_job_operator/kube_api/watchers.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,14 +197,25 @@ def emit_log(self, data):
self.emit(self.pod_log_event_name, data)

@thread_synchronized
def _create_pod_log_reader(self, uid: str, name: str, namespace: str, follow=True):
def _create_pod_log_reader(
self,
logger_id: str,
name: str,
namespace: str,
container: str = None,
follow=True,
is_single=False,
):
read_logs = GetPodLogs(
name=name,
namespace=namespace,
since=self.pod_log_since,
follow=follow,
container=container,
add_container_name_to_log=False if is_single else True,
)
self._executing_pod_loggers[uid] = read_logs

self._executing_pod_loggers[logger_id] = read_logs
return read_logs

def process_data_state(self, data: dict, client: KubeApiRestClient):
Expand All @@ -226,42 +237,62 @@ def process_data_state(self, data: dict, client: KubeApiRestClient):
if state.deleted:
del self._object_states[uid]

if self.watch_pod_logs and kind == "pod" and uid not in self._executing_pod_loggers:
if self.watch_pod_logs and kind == "pod":
name = data["metadata"]["name"]
namesoace = data["metadata"]["namespace"]
pod_status = data["status"]["phase"]

if pod_status != "Pending":
osw = self._object_states.get(uid)
read_logs = self._create_pod_log_reader(
uid=uid,
name=name,
namespace=namesoace,
)
containers = data["spec"]["containers"]
is_single = len(containers) < 2
for container in containers:
if not isinstance(container, dict):
continue

osw.emit(self.pod_logs_reader_started_event_name)
container_name = container.get("name", None)

def handle_error(sender, *args):
# Don't throw error if not running.
if not self.is_running:
return
assert isinstance(container_name, str) and len(container_name.strip()) > 0, KubeApiException(
"Invalid container name when reading logs"
)

if len(args) == 0:
self.emit_error(KubeApiException("Unknown error from sender", sender))
else:
self.emit_error(args[0])
logger_id = f"{uid}/{container_name}"

# binding only relevant events.
read_logs.on(read_logs.data_event_name, lambda line: self.emit_log(line))
read_logs.on(read_logs.error_event_name, handle_error)
client.query_async(read_logs)
if logger_id in self._executing_pod_loggers:
continue

osw = self._object_states.get(uid)
read_logs = self._create_pod_log_reader(
logger_id=logger_id,
name=name,
namespace=namesoace,
container=container.get("name", None),
is_single=is_single,
)

osw.emit(self.pod_logs_reader_started_event_name, container=container_name)

def handle_error(sender, *args):
# Don't throw error if not running.
if not self.is_running:
return

if len(args) == 0:
self.emit_error(KubeApiException("Unknown error from sender", sender))
else:
self.emit_error(args[0])

# binding only relevant events.
read_logs.on(read_logs.data_event_name, lambda line: self.emit_log(line))
read_logs.on(read_logs.error_event_name, handle_error)
client.query_async(read_logs)

def _stop_all_loggers(
self,
timeout: float = None,
throw_error_if_not_running: bool = None,
):
for q in list(self._executing_pod_loggers.values()):
q.stop(timeout=timeout, throw_error_if_not_running=throw_error_if_not_running)
for pod_logger in list(self._executing_pod_loggers.values()):
pod_logger.stop(timeout=timeout, throw_error_if_not_running=throw_error_if_not_running)

def stop(
self,
Expand Down Expand Up @@ -294,7 +325,8 @@ def log_event(self, logger: Logger, ev: Event):
line.log(logger)
elif ev.name == self.pod_logs_reader_started_event_name:
osw: NamespaceWatchQueryResourceState = ev.sender
logger.info(f"[{osw.namespace}/{osw.kind_name.lower()}s/{osw.name}] Reading logs")
container_name = ev.kwargs.get("container", "[unknown container name]")
logger.info(f"[{osw.namespace}/{osw.kind_name.lower()}s/{osw.name}] Reading logs from {container_name}")

def pipe_to_logger(self, logger: Logger = kube_logger, allowed_event_names=None) -> int:
allowed_event_names = set(
Expand Down
41 changes: 41 additions & 0 deletions tests/dags/templates/test_multi_container_pod.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
apiVersion: v1
kind: Pod
metadata:
name: 'multi-container-test'
labels:
app: 'multi-container-test'
spec:
restartPolicy: Never
containers:
- name: container1
image: 'alpine:latest'
command:
- sh
- -c
- |
echo starting sleep...
sleep 10
echo end
resources:
limits:
cpu: 200m
memory: 500Mi
requests:
cpu: 100m
memory: 200Mi
- name: container2
image: 'alpine:latest'
command:
- sh
- -c
- |
echo starting sleep...
sleep 10
echo end
resources:
limits:
cpu: 200m
memory: 500Mi
requests:
cpu: 100m
memory: 200Mi
4 changes: 2 additions & 2 deletions tests/dags/test_double_log.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import os
from utils import default_args, resolve_file
from utils import default_args, name_from_file
from airflow import DAG
from airflow_kubernetes_job_operator.kubernetes_job_operator import KubernetesJobOperator


dag = DAG(
"kub-job-op-custom",
name_from_file(__file__),
default_args=default_args,
description="Test base job operator",
schedule_interval=None,
Expand Down
4 changes: 2 additions & 2 deletions tests/dags/test_job_operator.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
from utils import default_args
from utils import default_args, name_from_file
from datetime import timedelta
from airflow import DAG
from airflow_kubernetes_job_operator.kubernetes_job_operator import KubernetesJobOperator

dag = DAG(
"kub-job-op",
name_from_file(__file__),
default_args=default_args,
description="Test base job operator",
schedule_interval=None,
Expand Down
20 changes: 0 additions & 20 deletions tests/dags/test_job_operator_config_file.py

This file was deleted.

39 changes: 0 additions & 39 deletions tests/dags/test_job_operator_custom.py

This file was deleted.

Loading