diff --git a/luigi/contrib/kubernetes.py b/luigi/contrib/kubernetes.py index 82dc769580..b34f8b3313 100644 --- a/luigi/contrib/kubernetes.py +++ b/luigi/contrib/kubernetes.py @@ -33,6 +33,7 @@ Written and maintained by Marco Capuccini (@mcapuccini). """ import logging +import re import time import uuid from datetime import datetime @@ -219,6 +220,23 @@ def pod_creation_wait_interal(self): """Delay for initial pod creation for just submitted job in seconds""" return self.__DEFAULT_POD_CREATION_INTERVAL + def __is_scaling_in_progress(self, condition): + """ + returns true if cluster is currently going through a scale up + """ + if 'reason'not in condition or \ + 'Unschedulable' not in condition['reason'] or\ + 'message' not in condition: + return False + + match = re.match(r'(\d)\/(\d) nodes are available.*', condition['message']) + if match: + current_nodes = int(match.group(1)) + target_nodes = int(match.group(2)) + if current_nodes <= target_nodes: + return True + return False + def __track_job(self): """Poll job status while active""" while not self.__verify_job_has_started(): @@ -308,6 +326,11 @@ def __verify_job_has_started(self): if 'message' in cond: if cond['reason'] == 'ContainersNotReady': return False + if cond['reason'] == 'Unschedulable': + if self.__is_scaling_in_progress(cond): + # wait if cluster is scaling up... + self.__logger.debug("Kubernetes Cluster is scaling up " + cond['message']) + return False assert cond['status'] != 'False', \ "[ERROR] %s - %s" % (cond['reason'], cond['message']) return True diff --git a/test/contrib/kubernetes_test.py b/test/contrib/kubernetes_test.py index 246ca777d6..869bf9b9e8 100644 --- a/test/contrib/kubernetes_test.py +++ b/test/contrib/kubernetes_test.py @@ -110,3 +110,57 @@ def test_output(self, mock_signal, mock_job_status): kubernetes_job._KubernetesJobTask__track_job() # Make sure successful job signals self.assertTrue(mock_signal.called) + + def test_cluster_is_scaling(self): + kubernetes_job = KubernetesJobTask() + condition = { + "reason": "Unschedulable", + "message": "0/1 nodes are available: 1 Insufficient cpu, 1 Insufficient memory." + } + assert kubernetes_job.__is_scaling_in_progress(condition) + + condition = { + "reason": "ContainersNotReady", + "message": "0/1 nodes are available: 1 Insufficient cpu, 1 Insufficient memory." + } + assert kubernetes_job.__is_scaling_in_progress(condition) is False + + condition = { + "reason": "Unschedulable", + "message": "1/1 nodes are available: 1 Insufficient cpu, 1 Insufficient memory." + } + assert kubernetes_job.__is_scaling_in_progress(condition) is True + + condition = { + "reason": "Unschedulable", + "message": "other message" + } + assert kubernetes_job.__is_scaling_in_progress(condition) is False + + condition = { + "message": "other message" + } + assert kubernetes_job.__is_scaling_in_progress(condition) is False + + @mock.patch.object(KubernetesJobTask, "_KubernetesJobTask__get_job_status") + @mock.patch.object(KubernetesJobTask, "KubernetesJobTask__get_pods") + def test_output_when_scaling(self, mock_get_pods, mock_job_status): + # mock that the job succeeded + cond1 = { + "reason": "Unschedulable", + "message": "1/1 nodes are available: 1 Insufficient cpu, 1 Insufficient memory." + } + mock_job_status.return_value = "succeeded" + mock_get_pods.return_value = [ + { + 'conditions': [ + cond1 + ] + } + ] + # create a kubernetes job + kubernetes_job = KubernetesJobTask() + # set logger and uu_name due to logging in __track_job() + kubernetes_job._KubernetesJobTask__logger = logger + kubernetes_job.uu_name = "test" + self.assertTrue(kubernetes_job._KubernetesJobTask____verify_job_has_started())