diff --git a/kubeluigi/k8s.py b/kubeluigi/k8s.py index 33d35fd..8e07d47 100644 --- a/kubeluigi/k8s.py +++ b/kubeluigi/k8s.py @@ -160,46 +160,52 @@ def get_job_pods(job) -> List[V1Pod]: ).items +def reduce_job_state(pods: List[V1Pod]): + pod_states = [] + error_message = "Unknown Reason of Failure" + job_state = "Mixed" + + for pod in pods: + + pod_state = pod.status.phase + + # Boil down all container states into one pod state. + for status in pod.status.container_statuses: + if status.state.waiting and status.state.waiting.reason == "InvalidImageName": + pod_state = "Failed" + error_message = "Invalid Image" + + if status.state.terminated and status.state.terminated.reason == 'Error': + pod_state = "Failed" + + pod_states.append(pod_state) + + # Boil down all pod states into one job state. + + # If all states are the same, set that as the job state + if len(set(pod_states)) == 1: + job_state = pod_states[0] + + # If one is Failed, then the job is Failed + if "Failed" in pod_states: + job_state = "Failed" + + return job_state, error_message + + def job_phase_stream(job): previous_job_state = None - error_message = "Unknown Reason of Failure" while True: - + sleep(DEFAULT_POLL_INTERVAL) pods = get_job_pods(job) - - pod_states = [] - - for pod in pods: - - pod_state = pod.status.phase - - # Boil down all container states into one pod state. - for status in pod.status.container_statuses: - if status.state.waiting and status.state.waiting.reason == "InvalidImageName": - pod_state = "Failed" - error_message = "Invalid Image" - - if status.state.terminated and status.state.terminated.reason == 'Error': - pod_state = "Failed" - - pod_states.append(pod_state) - - # Boil down all pod states into one job state. - - # If all states are the same, set that as the job state - if len(set(pod_states)) == 1: - job_state = pod_states[0] - - # If one is Failed, then the job is Failed - if "Failed" in pod_states: - job_state = "Failed" + job_state, error_message = reduce_job_state(pods) # Only yield job state changes if job_state != previous_job_state: yield job_state, pods, error_message - + # Update state tracker previous_job_state = job_state diff --git a/test/kubernetes_helpers_test.py b/test/kubernetes_helpers_test.py index b936926..a16cc83 100644 --- a/test/kubernetes_helpers_test.py +++ b/test/kubernetes_helpers_test.py @@ -10,6 +10,7 @@ get_container_with_volume_mounts, attach_volume_to_spec, job_phase_stream, + reduce_job_state ) from kubernetes.client import V1Pod, V1PodCondition @@ -199,3 +200,27 @@ def test_kick_off_job(): client.create_namespaced_job.assert_called_with( body=job, namespace=job.metadata.namespace ) + + +@patch("kubeluigi.k8s.get_job_pods") +def test_reduce_job_state(mocked_get_job_pods): + labels = {"l1": "label1"} + + pod1 = pod_spec_from_dict("name_of_pod", dummy_pod_spec, labels=labels) + pod1.status = MagicMock() + pod1.status.phase = "Running" + + pod2 = pod_spec_from_dict("name_of_pod", dummy_pod_spec, labels=labels) + pod2.status = MagicMock() + pod2.status.phase = "Failed" + + job_state, error_message = reduce_job_state([pod1, pod2]) + assert job_state == "Failed" + + pod2.status.phase = "Pending" + job_state, error_message = reduce_job_state([pod1, pod2]) + assert job_state == "Mixed" + + pod2.status.phase = "Running" + job_state, error_message = reduce_job_state([pod1, pod2]) + assert job_state == "Running"