From 2827807e22bc244e8cb583298091e7cb917ed9cc Mon Sep 17 00:00:00 2001 From: Efstathios Charitos Date: Thu, 19 May 2022 12:03:35 +0100 Subject: [PATCH 1/5] improving the setting of job states --- kubeluigi/__init__.py | 13 +++++----- kubeluigi/k8s.py | 55 ++++++++++++++++++++++++++++++++++--------- 2 files changed, 51 insertions(+), 17 deletions(-) diff --git a/kubeluigi/__init__.py b/kubeluigi/__init__.py index 6af5d94..e71b895 100644 --- a/kubeluigi/__init__.py +++ b/kubeluigi/__init__.py @@ -91,10 +91,11 @@ def build_job_definition(self) -> V1Job: ) return job - def onpodstarted(self, pod): - logger.info( - f"Tail the Pod logs using: kubectl logs -f -n {pod.namespace} {pod.name}" - ) + def onpodstarted(self, pods): + for pod in pods: + logger.info( + f"Tail the Pod logs using: kubectl logs -f -n {pod.namespace} {pod.name}" + ) def as_yaml(self): job = self.build_job_definition() @@ -116,9 +117,9 @@ def run(self): except Exception: logger.exception(f"Luigi has failed to run: {job}, starting cleaning") raise - finally: + else: clean_job_resources(self.kubernetes_client, job) - + def output(self): """ An output target is necessary for checking job completion unless diff --git a/kubeluigi/k8s.py b/kubeluigi/k8s.py index f6cb47c..e0fd631 100644 --- a/kubeluigi/k8s.py +++ b/kubeluigi/k8s.py @@ -161,14 +161,47 @@ def get_job_pods(job) -> List[V1Pod]: def job_phase_stream(job): - previous_phase = {} + previous_job_state = None + while True: + sleep(DEFAULT_POLL_INTERVAL) pods = get_job_pods(job) + + pod_states = [] + for pod in pods: - if previous_phase.get(pod.metadata.name, None) != pod.status.phase: - yield pod.status.phase, pod - previous_phase[pod.metadata.name] = pod.status.phase + + pod_state = pod.status.phase + + # Boil down all container states into one pod state. + for status in pod.status.container_statuses: + print(status) + if status.state.waiting and status.state.waiting.reason == "InvalidImageName": + pod_state = "Failed" + + if status.state.terminated and status.state.terminated.reason == 'Error': + pod_state = "Failed" + + pod_states.append(pod_state) + + print(pod_states) + # Boil down all pod states into one job state. + + # If all states are the same, set that as the job state + if len(set(pod_states)) == 1: + job_state = pod_states[0] + + # If one is Failed, then the job is Failed + if "Failed" in pod_states: + job_state = "Failed" + + # Only yield job state changes + if job_state != previous_job_state: + yield job_state, pods + + # Update state tracker + previous_job_state = job_state def are_all_pods_successful(job): @@ -184,18 +217,18 @@ def run_and_track_job( """ logger.debug(f"Submitting job: {job.metadata.name}") job = kick_off_job(k8s_client, job) - for phase, pod in job_phase_stream(job): - logger.debug(f"Task {job.metadata.name} state is {phase}") + for state, pods in job_phase_stream(job): + logger.debug(f"Task {job.metadata.name} state is {state}") # ToDo: Check if we handle : Scale up but not enough resources # Warning: Running onpodstarted is not guaranteed to execute all times.. - if phase == "Running": - onpodstarted(pod) + if state == "Running": + onpodstarted(pods) - if phase == "Failed": - raise FailedJob(job, pod, "Failed pod in Job") + if state == "Failed": + raise FailedJob(job, pods, "Failed pod in Job") - if phase == "Succeeded" and are_all_pods_successful(job): + if state == "Succeeded" and are_all_pods_successful(job): return From eee5e72f4d59bfae96cfab3296b1887cae2dad90 Mon Sep 17 00:00:00 2001 From: Efstathios Charitos Date: Thu, 19 May 2022 12:55:39 +0100 Subject: [PATCH 2/5] los fixos --- kubeluigi/__init__.py | 6 +++++- kubeluigi/k8s.py | 18 +++++++++--------- 2 files changed, 14 insertions(+), 10 deletions(-) diff --git a/kubeluigi/__init__.py b/kubeluigi/__init__.py index e71b895..fa22023 100644 --- a/kubeluigi/__init__.py +++ b/kubeluigi/__init__.py @@ -111,8 +111,12 @@ def run(self): run_and_track_job(self.kubernetes_client, job, self.onpodstarted) except FailedJob as e: logger.exception( - f"Luigi's job has failed running: {e.job.metadata.name}, {e.pod.status.message}" + f"Luigi's job has failed running: {e.job.metadata.name}" ) + for pod in e.pods: + logger.exception( + f"Luigi's job has failed running: {pod.status.message}" + ) raise except Exception: logger.exception(f"Luigi has failed to run: {job}, starting cleaning") diff --git a/kubeluigi/k8s.py b/kubeluigi/k8s.py index e0fd631..33d35fd 100644 --- a/kubeluigi/k8s.py +++ b/kubeluigi/k8s.py @@ -29,9 +29,9 @@ class FailedJob(Exception): - def __init__(self, job, pod, message): + def __init__(self, job, pods, message): self.job = job - self.pod = pod + self.pods = pods self.message = message super().__init__(self.message) @@ -162,6 +162,7 @@ def get_job_pods(job) -> List[V1Pod]: def job_phase_stream(job): previous_job_state = None + error_message = "Unknown Reason of Failure" while True: @@ -169,23 +170,22 @@ def job_phase_stream(job): pods = get_job_pods(job) pod_states = [] - + for pod in pods: pod_state = pod.status.phase # Boil down all container states into one pod state. for status in pod.status.container_statuses: - print(status) if status.state.waiting and status.state.waiting.reason == "InvalidImageName": pod_state = "Failed" + error_message = "Invalid Image" if status.state.terminated and status.state.terminated.reason == 'Error': pod_state = "Failed" pod_states.append(pod_state) - print(pod_states) # Boil down all pod states into one job state. # If all states are the same, set that as the job state @@ -195,10 +195,10 @@ def job_phase_stream(job): # If one is Failed, then the job is Failed if "Failed" in pod_states: job_state = "Failed" - + # Only yield job state changes if job_state != previous_job_state: - yield job_state, pods + yield job_state, pods, error_message # Update state tracker previous_job_state = job_state @@ -217,7 +217,7 @@ def run_and_track_job( """ logger.debug(f"Submitting job: {job.metadata.name}") job = kick_off_job(k8s_client, job) - for state, pods in job_phase_stream(job): + for state, pods, error_message in job_phase_stream(job): logger.debug(f"Task {job.metadata.name} state is {state}") # ToDo: Check if we handle : Scale up but not enough resources @@ -226,7 +226,7 @@ def run_and_track_job( onpodstarted(pods) if state == "Failed": - raise FailedJob(job, pods, "Failed pod in Job") + raise FailedJob(job, pods, error_message) if state == "Succeeded" and are_all_pods_successful(job): return From 994028ec6fb85be7d2558a0f4cced793acc86790 Mon Sep 17 00:00:00 2001 From: Efstathios Charitos Date: Fri, 24 Jun 2022 06:02:26 +0100 Subject: [PATCH 3/5] dont clean failing job --- test/test_kubernetes_job_task.py | 31 ------------------------------- 1 file changed, 31 deletions(-) diff --git a/test/test_kubernetes_job_task.py b/test/test_kubernetes_job_task.py index 5ea69f6..f6a36d9 100644 --- a/test/test_kubernetes_job_task.py +++ b/test/test_kubernetes_job_task.py @@ -63,37 +63,6 @@ def test_job_definition_as_yaml(): assert yaml_as_dict["spec"]["template"]["spec"]["volumes"] == [] -@patch("kubeluigi.run_and_track_job") -@patch("kubeluigi.clean_job_resources") -@patch.object(KubernetesJobTask, "build_job_definition") -@patch.object(KubernetesJobTask, "_init_kubernetes") -def test_failing_task_clean_resources( - mocked_init_kubernetes, - mocked_build_job_definition, - mocked_clean_job_resources, - mocked_run_and_track_job, -): - """ - testing k8s resources are cleaned when running job fails. - """ - task = DummyTask() - task._init_task_metadata() - task.kubernetes_client = MagicMock() - task._KubernetesJobTask__logger = MagicMock() - - class DummyException(Exception): - pass - - e = DummyException() - mocked_run_and_track_job.side_effect = e - with pytest.raises(DummyException): - task.run() - - mocked_build_job_definition.assert_called_once() - mocked_clean_job_resources.assert_called_once() - mocked_clean_job_resources.assert_called_once() - - def test_name_not_implemented(): task = KubernetesJobTask() From a9d9d2ba53fd7499226ee7adebd16253e092caf9 Mon Sep 17 00:00:00 2001 From: Efstathios Charitos Date: Fri, 24 Jun 2022 06:05:58 +0100 Subject: [PATCH 4/5] tests --- test/kubernetes_helpers_test.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/test/kubernetes_helpers_test.py b/test/kubernetes_helpers_test.py index cf7cb55..b936926 100644 --- a/test/kubernetes_helpers_test.py +++ b/test/kubernetes_helpers_test.py @@ -143,15 +143,15 @@ def test_job_phase_stream(mocked_get_job_pods): pod.status = MagicMock() pod.status.phase = "Running" mocked_get_job_pods.return_value = [pod] - phase, pod_with_changed_state = next(job_phase_stream(job)) + phase, pods_with_changed_state, error_message = next(job_phase_stream(job)) assert phase == "Running" - assert pod.metadata.name == pod_with_changed_state.metadata.name + assert pod.metadata.name == pods_with_changed_state[0].metadata.name pod.status.phase = "Succeeded" mocked_get_job_pods.return_value = [pod] - phase, pod_with_changed_state = next(job_phase_stream(job)) + phase, pods_with_changed_state, error_message = next(job_phase_stream(job)) assert phase == "Succeeded" - assert pod.metadata.name == pod_with_changed_state.metadata.name + assert pod.metadata.name == pods_with_changed_state[0].metadata.name @patch("kubeluigi.k8s.get_job_pods") From d291fdd4e74be2f49fa468e77b3e21a5987f6be0 Mon Sep 17 00:00:00 2001 From: Efstathios Charitos Date: Fri, 24 Jun 2022 13:00:26 +0100 Subject: [PATCH 5/5] isolated function and added unit test --- kubeluigi/k8s.py | 66 ++++++++++++++++++--------------- test/kubernetes_helpers_test.py | 25 +++++++++++++ 2 files changed, 61 insertions(+), 30 deletions(-) diff --git a/kubeluigi/k8s.py b/kubeluigi/k8s.py index 33d35fd..8e07d47 100644 --- a/kubeluigi/k8s.py +++ b/kubeluigi/k8s.py @@ -160,46 +160,52 @@ def get_job_pods(job) -> List[V1Pod]: ).items +def reduce_job_state(pods: List[V1Pod]): + pod_states = [] + error_message = "Unknown Reason of Failure" + job_state = "Mixed" + + for pod in pods: + + pod_state = pod.status.phase + + # Boil down all container states into one pod state. + for status in pod.status.container_statuses: + if status.state.waiting and status.state.waiting.reason == "InvalidImageName": + pod_state = "Failed" + error_message = "Invalid Image" + + if status.state.terminated and status.state.terminated.reason == 'Error': + pod_state = "Failed" + + pod_states.append(pod_state) + + # Boil down all pod states into one job state. + + # If all states are the same, set that as the job state + if len(set(pod_states)) == 1: + job_state = pod_states[0] + + # If one is Failed, then the job is Failed + if "Failed" in pod_states: + job_state = "Failed" + + return job_state, error_message + + def job_phase_stream(job): previous_job_state = None - error_message = "Unknown Reason of Failure" while True: - + sleep(DEFAULT_POLL_INTERVAL) pods = get_job_pods(job) - - pod_states = [] - - for pod in pods: - - pod_state = pod.status.phase - - # Boil down all container states into one pod state. - for status in pod.status.container_statuses: - if status.state.waiting and status.state.waiting.reason == "InvalidImageName": - pod_state = "Failed" - error_message = "Invalid Image" - - if status.state.terminated and status.state.terminated.reason == 'Error': - pod_state = "Failed" - - pod_states.append(pod_state) - - # Boil down all pod states into one job state. - - # If all states are the same, set that as the job state - if len(set(pod_states)) == 1: - job_state = pod_states[0] - - # If one is Failed, then the job is Failed - if "Failed" in pod_states: - job_state = "Failed" + job_state, error_message = reduce_job_state(pods) # Only yield job state changes if job_state != previous_job_state: yield job_state, pods, error_message - + # Update state tracker previous_job_state = job_state diff --git a/test/kubernetes_helpers_test.py b/test/kubernetes_helpers_test.py index b936926..a16cc83 100644 --- a/test/kubernetes_helpers_test.py +++ b/test/kubernetes_helpers_test.py @@ -10,6 +10,7 @@ get_container_with_volume_mounts, attach_volume_to_spec, job_phase_stream, + reduce_job_state ) from kubernetes.client import V1Pod, V1PodCondition @@ -199,3 +200,27 @@ def test_kick_off_job(): client.create_namespaced_job.assert_called_with( body=job, namespace=job.metadata.namespace ) + + +@patch("kubeluigi.k8s.get_job_pods") +def test_reduce_job_state(mocked_get_job_pods): + labels = {"l1": "label1"} + + pod1 = pod_spec_from_dict("name_of_pod", dummy_pod_spec, labels=labels) + pod1.status = MagicMock() + pod1.status.phase = "Running" + + pod2 = pod_spec_from_dict("name_of_pod", dummy_pod_spec, labels=labels) + pod2.status = MagicMock() + pod2.status.phase = "Failed" + + job_state, error_message = reduce_job_state([pod1, pod2]) + assert job_state == "Failed" + + pod2.status.phase = "Pending" + job_state, error_message = reduce_job_state([pod1, pod2]) + assert job_state == "Mixed" + + pod2.status.phase = "Running" + job_state, error_message = reduce_job_state([pod1, pod2]) + assert job_state == "Running"