Skip to content

Commit

Permalink
isolated function and added unit test
Browse files Browse the repository at this point in the history
  • Loading branch information
stathischaritos committed Jun 24, 2022
1 parent a9d9d2b commit d291fdd
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 30 deletions.
66 changes: 36 additions & 30 deletions kubeluigi/k8s.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,46 +160,52 @@ def get_job_pods(job) -> List[V1Pod]:
).items


def reduce_job_state(pods: List[V1Pod]):
pod_states = []
error_message = "Unknown Reason of Failure"
job_state = "Mixed"

for pod in pods:

pod_state = pod.status.phase

# Boil down all container states into one pod state.
for status in pod.status.container_statuses:
if status.state.waiting and status.state.waiting.reason == "InvalidImageName":
pod_state = "Failed"
error_message = "Invalid Image"

if status.state.terminated and status.state.terminated.reason == 'Error':
pod_state = "Failed"

pod_states.append(pod_state)

# Boil down all pod states into one job state.

# If all states are the same, set that as the job state
if len(set(pod_states)) == 1:
job_state = pod_states[0]

# If one is Failed, then the job is Failed
if "Failed" in pod_states:
job_state = "Failed"

return job_state, error_message


def job_phase_stream(job):
previous_job_state = None
error_message = "Unknown Reason of Failure"

while True:

sleep(DEFAULT_POLL_INTERVAL)
pods = get_job_pods(job)

pod_states = []

for pod in pods:

pod_state = pod.status.phase

# Boil down all container states into one pod state.
for status in pod.status.container_statuses:
if status.state.waiting and status.state.waiting.reason == "InvalidImageName":
pod_state = "Failed"
error_message = "Invalid Image"

if status.state.terminated and status.state.terminated.reason == 'Error':
pod_state = "Failed"

pod_states.append(pod_state)

# Boil down all pod states into one job state.

# If all states are the same, set that as the job state
if len(set(pod_states)) == 1:
job_state = pod_states[0]

# If one is Failed, then the job is Failed
if "Failed" in pod_states:
job_state = "Failed"
job_state, error_message = reduce_job_state(pods)

# Only yield job state changes
if job_state != previous_job_state:
yield job_state, pods, error_message

# Update state tracker
previous_job_state = job_state

Expand Down
25 changes: 25 additions & 0 deletions test/kubernetes_helpers_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
get_container_with_volume_mounts,
attach_volume_to_spec,
job_phase_stream,
reduce_job_state
)

from kubernetes.client import V1Pod, V1PodCondition
Expand Down Expand Up @@ -199,3 +200,27 @@ def test_kick_off_job():
client.create_namespaced_job.assert_called_with(
body=job, namespace=job.metadata.namespace
)


@patch("kubeluigi.k8s.get_job_pods")
def test_reduce_job_state(mocked_get_job_pods):
labels = {"l1": "label1"}

pod1 = pod_spec_from_dict("name_of_pod", dummy_pod_spec, labels=labels)
pod1.status = MagicMock()
pod1.status.phase = "Running"

pod2 = pod_spec_from_dict("name_of_pod", dummy_pod_spec, labels=labels)
pod2.status = MagicMock()
pod2.status.phase = "Failed"

job_state, error_message = reduce_job_state([pod1, pod2])
assert job_state == "Failed"

pod2.status.phase = "Pending"
job_state, error_message = reduce_job_state([pod1, pod2])
assert job_state == "Mixed"

pod2.status.phase = "Running"
job_state, error_message = reduce_job_state([pod1, pod2])
assert job_state == "Running"

0 comments on commit d291fdd

Please sign in to comment.