diff --git a/kubeluigi/__init__.py b/kubeluigi/__init__.py index 6af5d94..fa22023 100644 --- a/kubeluigi/__init__.py +++ b/kubeluigi/__init__.py @@ -91,10 +91,11 @@ def build_job_definition(self) -> V1Job: ) return job - def onpodstarted(self, pod): - logger.info( - f"Tail the Pod logs using: kubectl logs -f -n {pod.namespace} {pod.name}" - ) + def onpodstarted(self, pods): + for pod in pods: + logger.info( + f"Tail the Pod logs using: kubectl logs -f -n {pod.namespace} {pod.name}" + ) def as_yaml(self): job = self.build_job_definition() @@ -110,15 +111,19 @@ def run(self): run_and_track_job(self.kubernetes_client, job, self.onpodstarted) except FailedJob as e: logger.exception( - f"Luigi's job has failed running: {e.job.metadata.name}, {e.pod.status.message}" + f"Luigi's job has failed running: {e.job.metadata.name}" ) + for pod in e.pods: + logger.exception( + f"Luigi's job has failed running: {pod.status.message}" + ) raise except Exception: logger.exception(f"Luigi has failed to run: {job}, starting cleaning") raise - finally: + else: clean_job_resources(self.kubernetes_client, job) - + def output(self): """ An output target is necessary for checking job completion unless diff --git a/kubeluigi/k8s.py b/kubeluigi/k8s.py index f6cb47c..8e07d47 100644 --- a/kubeluigi/k8s.py +++ b/kubeluigi/k8s.py @@ -29,9 +29,9 @@ class FailedJob(Exception): - def __init__(self, job, pod, message): + def __init__(self, job, pods, message): self.job = job - self.pod = pod + self.pods = pods self.message = message super().__init__(self.message) @@ -160,15 +160,54 @@ def get_job_pods(job) -> List[V1Pod]: ).items +def reduce_job_state(pods: List[V1Pod]): + pod_states = [] + error_message = "Unknown Reason of Failure" + job_state = "Mixed" + + for pod in pods: + + pod_state = pod.status.phase + + # Boil down all container states into one pod state. + for status in pod.status.container_statuses: + if status.state.waiting and status.state.waiting.reason == "InvalidImageName": + pod_state = "Failed" + error_message = "Invalid Image" + + if status.state.terminated and status.state.terminated.reason == 'Error': + pod_state = "Failed" + + pod_states.append(pod_state) + + # Boil down all pod states into one job state. + + # If all states are the same, set that as the job state + if len(set(pod_states)) == 1: + job_state = pod_states[0] + + # If one is Failed, then the job is Failed + if "Failed" in pod_states: + job_state = "Failed" + + return job_state, error_message + + def job_phase_stream(job): - previous_phase = {} + previous_job_state = None + while True: + sleep(DEFAULT_POLL_INTERVAL) pods = get_job_pods(job) - for pod in pods: - if previous_phase.get(pod.metadata.name, None) != pod.status.phase: - yield pod.status.phase, pod - previous_phase[pod.metadata.name] = pod.status.phase + job_state, error_message = reduce_job_state(pods) + + # Only yield job state changes + if job_state != previous_job_state: + yield job_state, pods, error_message + + # Update state tracker + previous_job_state = job_state def are_all_pods_successful(job): @@ -184,18 +223,18 @@ def run_and_track_job( """ logger.debug(f"Submitting job: {job.metadata.name}") job = kick_off_job(k8s_client, job) - for phase, pod in job_phase_stream(job): - logger.debug(f"Task {job.metadata.name} state is {phase}") + for state, pods, error_message in job_phase_stream(job): + logger.debug(f"Task {job.metadata.name} state is {state}") # ToDo: Check if we handle : Scale up but not enough resources # Warning: Running onpodstarted is not guaranteed to execute all times.. - if phase == "Running": - onpodstarted(pod) + if state == "Running": + onpodstarted(pods) - if phase == "Failed": - raise FailedJob(job, pod, "Failed pod in Job") + if state == "Failed": + raise FailedJob(job, pods, error_message) - if phase == "Succeeded" and are_all_pods_successful(job): + if state == "Succeeded" and are_all_pods_successful(job): return diff --git a/test/kubernetes_helpers_test.py b/test/kubernetes_helpers_test.py index cf7cb55..a16cc83 100644 --- a/test/kubernetes_helpers_test.py +++ b/test/kubernetes_helpers_test.py @@ -10,6 +10,7 @@ get_container_with_volume_mounts, attach_volume_to_spec, job_phase_stream, + reduce_job_state ) from kubernetes.client import V1Pod, V1PodCondition @@ -143,15 +144,15 @@ def test_job_phase_stream(mocked_get_job_pods): pod.status = MagicMock() pod.status.phase = "Running" mocked_get_job_pods.return_value = [pod] - phase, pod_with_changed_state = next(job_phase_stream(job)) + phase, pods_with_changed_state, error_message = next(job_phase_stream(job)) assert phase == "Running" - assert pod.metadata.name == pod_with_changed_state.metadata.name + assert pod.metadata.name == pods_with_changed_state[0].metadata.name pod.status.phase = "Succeeded" mocked_get_job_pods.return_value = [pod] - phase, pod_with_changed_state = next(job_phase_stream(job)) + phase, pods_with_changed_state, error_message = next(job_phase_stream(job)) assert phase == "Succeeded" - assert pod.metadata.name == pod_with_changed_state.metadata.name + assert pod.metadata.name == pods_with_changed_state[0].metadata.name @patch("kubeluigi.k8s.get_job_pods") @@ -199,3 +200,27 @@ def test_kick_off_job(): client.create_namespaced_job.assert_called_with( body=job, namespace=job.metadata.namespace ) + + +@patch("kubeluigi.k8s.get_job_pods") +def test_reduce_job_state(mocked_get_job_pods): + labels = {"l1": "label1"} + + pod1 = pod_spec_from_dict("name_of_pod", dummy_pod_spec, labels=labels) + pod1.status = MagicMock() + pod1.status.phase = "Running" + + pod2 = pod_spec_from_dict("name_of_pod", dummy_pod_spec, labels=labels) + pod2.status = MagicMock() + pod2.status.phase = "Failed" + + job_state, error_message = reduce_job_state([pod1, pod2]) + assert job_state == "Failed" + + pod2.status.phase = "Pending" + job_state, error_message = reduce_job_state([pod1, pod2]) + assert job_state == "Mixed" + + pod2.status.phase = "Running" + job_state, error_message = reduce_job_state([pod1, pod2]) + assert job_state == "Running" diff --git a/test/test_kubernetes_job_task.py b/test/test_kubernetes_job_task.py index 5ea69f6..f6a36d9 100644 --- a/test/test_kubernetes_job_task.py +++ b/test/test_kubernetes_job_task.py @@ -63,37 +63,6 @@ def test_job_definition_as_yaml(): assert yaml_as_dict["spec"]["template"]["spec"]["volumes"] == [] -@patch("kubeluigi.run_and_track_job") -@patch("kubeluigi.clean_job_resources") -@patch.object(KubernetesJobTask, "build_job_definition") -@patch.object(KubernetesJobTask, "_init_kubernetes") -def test_failing_task_clean_resources( - mocked_init_kubernetes, - mocked_build_job_definition, - mocked_clean_job_resources, - mocked_run_and_track_job, -): - """ - testing k8s resources are cleaned when running job fails. - """ - task = DummyTask() - task._init_task_metadata() - task.kubernetes_client = MagicMock() - task._KubernetesJobTask__logger = MagicMock() - - class DummyException(Exception): - pass - - e = DummyException() - mocked_run_and_track_job.side_effect = e - with pytest.raises(DummyException): - task.run() - - mocked_build_job_definition.assert_called_once() - mocked_clean_job_resources.assert_called_once() - mocked_clean_job_resources.assert_called_once() - - def test_name_not_implemented(): task = KubernetesJobTask()