From 2288e7f594f443158b9e4d9558c31ca52a1ffa46 Mon Sep 17 00:00:00 2001 From: David Przybilla Date: Mon, 28 Jun 2021 14:25:33 +0900 Subject: [PATCH 1/3] waiting for cluster scale up --- luigi/contrib/kubernetes.py | 23 +++++++++++++++++++++++ test/contrib/kubernetes_test.py | 27 +++++++++++++++++++++++++++ 2 files changed, 50 insertions(+) diff --git a/luigi/contrib/kubernetes.py b/luigi/contrib/kubernetes.py index 82dc769580..685a8a2ba6 100644 --- a/luigi/contrib/kubernetes.py +++ b/luigi/contrib/kubernetes.py @@ -33,6 +33,7 @@ Written and maintained by Marco Capuccini (@mcapuccini). """ import logging +import re import time import uuid from datetime import datetime @@ -219,6 +220,23 @@ def pod_creation_wait_interal(self): """Delay for initial pod creation for just submitted job in seconds""" return self.__DEFAULT_POD_CREATION_INTERVAL + def __is_scaling_in_progress(self, condition): + """ + returns true if cluster is currently going through a scale up + """ + if 'reason'not in condition or \ + 'Unschedulable' not in condition['reason'] or\ + 'message' not in condition: + return False + + match = re.match(r'(\d)\/(\d) nodes are available.*', condition['message']) + if match: + current_nodes = int(match.group(1)) + target_nodes = int(match.group(2)) + if current_nodes <= target_nodes: + return True + return False + def __track_job(self): """Poll job status while active""" while not self.__verify_job_has_started(): @@ -308,6 +326,11 @@ def __verify_job_has_started(self): if 'message' in cond: if cond['reason'] == 'ContainersNotReady': return False + if cond['reason'] == 'Unschedulable': + if self.__is_scaling_in_progress(cond): + # wait if cluster is scaling up... + self.__logger.debug("Kubernetes Cluster is scaling up " + cond['message']) + return False assert cond['status'] != 'False', \ "[ERROR] %s - %s" % (cond['reason'], cond['message']) return True diff --git a/test/contrib/kubernetes_test.py b/test/contrib/kubernetes_test.py index 246ca777d6..89e95b4a1f 100644 --- a/test/contrib/kubernetes_test.py +++ b/test/contrib/kubernetes_test.py @@ -110,3 +110,30 @@ def test_output(self, mock_signal, mock_job_status): kubernetes_job._KubernetesJobTask__track_job() # Make sure successful job signals self.assertTrue(mock_signal.called) + + def test_cluster_is_scaling(self): + kubernetes_job = KubernetesJobTask() + condition = { + "reason": "Unschedulable", + "message": "0/1 nodes are available: 1 Insufficient cpu, 1 Insufficient memory." + } + assert kubernetes_job.__is_scaling_in_progress(condition) + + condition = { + "reason": "ContainersNotReady", + "message": "0/1 nodes are available: 1 Insufficient cpu, 1 Insufficient memory." + } + assert kubernetes_job.__is_scaling_in_progress(condition) is False + + condition = { + "reason": "Unschedulable", + "message": "1/1 nodes are available: 1 Insufficient cpu, 1 Insufficient memory." + } + assert kubernetes_job.__is_scaling_in_progress(condition) is True + + condition = { + "reason": "Unschedulable", + "message": "other message" + } + assert kubernetes_job.__is_scaling_in_progress(condition) is False + From 887c7e08ee5f4ae4c2f208e8241fe9711abcfbe6 Mon Sep 17 00:00:00 2001 From: David Przybilla Date: Mon, 28 Jun 2021 15:41:59 +0900 Subject: [PATCH 2/3] more tests --- luigi/contrib/kubernetes.py | 2 +- test/contrib/kubernetes_test.py | 29 ++++++++++++++++++++++++++++- 2 files changed, 29 insertions(+), 2 deletions(-) diff --git a/luigi/contrib/kubernetes.py b/luigi/contrib/kubernetes.py index 685a8a2ba6..b34f8b3313 100644 --- a/luigi/contrib/kubernetes.py +++ b/luigi/contrib/kubernetes.py @@ -228,7 +228,7 @@ def __is_scaling_in_progress(self, condition): 'Unschedulable' not in condition['reason'] or\ 'message' not in condition: return False - + match = re.match(r'(\d)\/(\d) nodes are available.*', condition['message']) if match: current_nodes = int(match.group(1)) diff --git a/test/contrib/kubernetes_test.py b/test/contrib/kubernetes_test.py index 89e95b4a1f..5a55113e93 100644 --- a/test/contrib/kubernetes_test.py +++ b/test/contrib/kubernetes_test.py @@ -110,7 +110,7 @@ def test_output(self, mock_signal, mock_job_status): kubernetes_job._KubernetesJobTask__track_job() # Make sure successful job signals self.assertTrue(mock_signal.called) - + def test_cluster_is_scaling(self): kubernetes_job = KubernetesJobTask() condition = { @@ -137,3 +137,30 @@ def test_cluster_is_scaling(self): } assert kubernetes_job.__is_scaling_in_progress(condition) is False + condition = { + "message": "other message" + } + assert kubernetes_job.__is_scaling_in_progress(condition) is False + + @mock.patch.object(KubernetesJobTask, "_KubernetesJobTask__get_job_status") + @mock.patch.object(KubernetesJobTask, "KubernetesJobTask__get_pods") + def test_output_when_scaling(self, mock_get_pods, mock_job_status): + # mock that the job succeeded + cond1 = { + "reason": "Unschedulable", + "message": "1/1 nodes are available: 1 Insufficient cpu, 1 Insufficient memory." + } + mock_job_status.return_value = "succeeded" + mock_get_pods.return_value = [ + { + 'conditions': [ + cond1 + ] + } + ] + # create a kubernetes job + kubernetes_job = KubernetesJobTask() + # set logger and uu_name due to logging in __track_job() + kubernetes_job._KubernetesJobTask__logger = logger + kubernetes_job.uu_name = "test" + self.assertTrue(kubernetes_job._KubernetesJobTask____verify_job_has_started()) From ba99607702d350a3a1cdd12c95d4f85370b3518e Mon Sep 17 00:00:00 2001 From: David Przybilla Date: Mon, 28 Jun 2021 16:58:18 +0900 Subject: [PATCH 3/3] fixing format --- test/contrib/kubernetes_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/contrib/kubernetes_test.py b/test/contrib/kubernetes_test.py index 5a55113e93..869bf9b9e8 100644 --- a/test/contrib/kubernetes_test.py +++ b/test/contrib/kubernetes_test.py @@ -110,7 +110,7 @@ def test_output(self, mock_signal, mock_job_status): kubernetes_job._KubernetesJobTask__track_job() # Make sure successful job signals self.assertTrue(mock_signal.called) - + def test_cluster_is_scaling(self): kubernetes_job = KubernetesJobTask() condition = {