diff --git a/airflow/providers/cncf/kubernetes/operators/pod.py b/airflow/providers/cncf/kubernetes/operators/pod.py index 5b9e57ec01743..68081b5a67075 100644 --- a/airflow/providers/cncf/kubernetes/operators/pod.py +++ b/airflow/providers/cncf/kubernetes/operators/pod.py @@ -790,9 +790,7 @@ def trigger_reentry(self, context: Context, event: dict[str, Any]) -> Any: since_time=last_log_time, ) - if pod_log_status.running: - self.log.info("Container still running; deferring again.") - self.invoke_defer_method(pod_log_status.last_log_time) + self.invoke_defer_method(pod_log_status.last_log_time) else: self.invoke_defer_method() diff --git a/tests/providers/cncf/kubernetes/operators/test_pod.py b/tests/providers/cncf/kubernetes/operators/test_pod.py index be8279bcabd9a..a4ccb4b44b4bf 100644 --- a/tests/providers/cncf/kubernetes/operators/test_pod.py +++ b/tests/providers/cncf/kubernetes/operators/test_pod.py @@ -1780,6 +1780,18 @@ def test_process_duplicate_label_pods__pod_removed_if_delete_pod( process_pod_deletion_mock.assert_called_once_with(pod_1) assert result.metadata.name == pod_2.metadata.name + @patch(POD_MANAGER_CLASS.format("fetch_container_logs")) + @patch(KUB_OP_PATH.format("invoke_defer_method")) + def test_defere_call_one_more_time_after_error(self, invoke_defer_method, fetch_container_logs): + fetch_container_logs.return_value = PodLoggingStatus(False, None) + op = KubernetesPodOperator(task_id="test_task", name="test-pod", get_logs=True) + + op.trigger_reentry( + create_context(op), event={"name": TEST_NAME, "namespace": TEST_NAMESPACE, "status": "running"} + ) + + invoke_defer_method.assert_called_with(None) + class TestSuppress: def test__suppress(self, caplog):