Skip to content

Commit

Permalink
Merge pull request #13 from optimizely/sc-log-levels
Browse files Browse the repository at this point in the history
Change logger config and print pod logs link
  • Loading branch information
stathischaritos authored Apr 11, 2022
2 parents 9dd9792 + 1893480 commit a9a84bd
Show file tree
Hide file tree
Showing 10 changed files with 530 additions and 543 deletions.
8 changes: 7 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
.mypy_cache/
.cache
__pycache__
__pycache__
venv
build
dist
.vscode
.eggs
kubeluigi.egg-info/
33 changes: 33 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,39 @@ class Task(KubernetesJobTask):

```

### Volumes

You can use volumes in the pods that run your tasks:

Simple Ephemeral volume example:

```python
with_ephemeral_volume200 = EphemeralVolume(size_in_gb=200)
class MyTask:
def __init__(self...):
...
# you can find this volume in your containers under `/mnt/data/`
self.volumes = [with_ephemeral_volume200]
```

By leveraging volumes with cloud storage you can read and write data as if it existed locally. For example by mounting CSI drives your tasks can read inputs and write outputs to `/mnt/my_s3_bucket/`, this avoids complicated setups in which tasks have to know cloud specifics to read inputs and outputs

We provide a base class for Azure blob storage, this pressuposes you installed azure blob CSI driver in your AKS cluster.

```python
with_azure_blob_volume = AzureBlobStorageVolume(storage_account=AZ_STORAGE_ACCOUNT,
storage_container=AZ_CONTAINER)
class MyTask:
def __init__(self...):
...
# you can find this volume in your containers under `/mnt/{AZ_STORAGE_ACCOUNT}/{AZ_CONTAINER}`
# you can use this convention to have your containers inputs and outputs params
# read data from this mount point
self.volumes = [with_azure_blob_volume]

```


## Logs

Kubeluigi's task logs include Job, Task, and Pod identifiers:
Expand Down
78 changes: 39 additions & 39 deletions kubeluigi/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from datetime import datetime
import logging
import uuid
from typing import List
import yaml

from kubeluigi.k8s import (
Expand All @@ -9,40 +8,33 @@
pod_spec_from_dict,
run_and_track_job,
kubernetes_client,
attach_volume_to_spec
attach_volume_to_spec,
FailedJob,
)

from kubeluigi.volumes import AttachableVolume

from kubernetes.client import ApiClient
from kubernetes.client.models.v1_job import V1Job
from kubernetes.client.models.v1_pod_spec import V1PodSpec

logger = logging.getLogger("luigi-interface")
logger = logging.getLogger(__name__)


class KubernetesJobTask:

volumes: List[AttachableVolume] = []

def _init_task_metadata(self):
self.job_uuid = str(uuid.uuid4().hex)
now = datetime.utcnow()
self.uu_name = "%s-%s-%s" % (
self.name,
now.strftime("%Y%m%d%H%M%S"),
self.job_uuid[:16],
)

self.uu_name = self.name

def _init_kubernetes(self):
self.__logger = logger
self.kubernetes_client = kubernetes_client()

@property
def restart_policy(self):
return "Never"

@property
def delete_on_success(self):
"""
Delete the Kubernetes workload if the job has ended successfully.
"""
return True

@property
def backoff_limit(self):
"""
Expand All @@ -51,18 +43,11 @@ def backoff_limit(self):
"""
return 6

@property
def delete_on_success(self):
"""
Delete the Kubernetes workload if the job has ended successfully.
"""
return True

@property
def name(self):
"""
A name for this job. This task will automatically append a UUID to the
name before to submit to Kubernetes.
A name for this job. This needs to be unique otherwise it will fail if another job
with the same name is running.
"""
raise NotImplementedError("subclass must define name")

Expand All @@ -89,39 +74,48 @@ def spec_schema(self):
"""
raise NotImplementedError("subclass must define spec_schema")

def build_job_definition(self):
def build_job_definition(self) -> V1Job:
self._init_task_metadata()
schema = self.spec_schema()
schema_with_volumes = self._attach_volumes_to_spec(schema)
pod_template_spec = pod_spec_from_dict(
self.uu_name, schema_with_volumes, self.restart_policy, self.labels
self.uu_name, schema_with_volumes, self.labels, self.restart_policy
)

job = job_definition(
job_name=self.uu_name,
job_uuid=self.job_uuid,
backoff_limit=self.backoff_limit,
pod_template_spec=pod_template_spec,
labels=self.labels,
namespace=self.namespace,
)
return job

def onpodstarted(self, pod):
logger.info(
f"Tail the Pod logs using: kubectl logs -f -n {pod.namespace} {pod.name}"
)

def as_yaml(self):
job = self.build_job_definition()
job_dict = ApiClient().sanitize_for_serialization(job)
str_yaml = yaml.safe_dump(job_dict, default_flow_style=False, sort_keys=False)
return str_yaml

def run(self):
self._init_kubernetes()
job = self.build_job_definition()
self.__logger.info("Submitting Kubernetes Job: " + self.uu_name)
logger.debug("Submitting Kubernetes Job: " + self.uu_name)
try:
run_and_track_job(self.kubernetes_client, job)
except Exception as e:
logger.exception("Luigi has failed to submit the job, starting cleaning")
raise e
run_and_track_job(self.kubernetes_client, job, self.onpodstarted)
except FailedJob as e:
logger.exception(
f"Luigi's job has failed running: {e.job.metadata.name}, {e.pod.status.message}"
)
raise
except Exception:
logger.exception(f"Luigi has failed to run: {job}, starting cleaning")
raise
finally:
clean_job_resources(self.kubernetes_client, job)

Expand All @@ -138,7 +132,13 @@ def _attach_volumes_to_spec(self, spec_schema):
"""
overrides the spec_schema of a task to attach a volume
"""
if 'volumes' not in spec_schema and hasattr(self, 'volumes'):
if "volumes" not in spec_schema and hasattr(self, "volumes"):
for volume in self.volumes:
spec_schema = attach_volume_to_spec(spec_schema, volume)
return spec_schema

def add_volume(self, volume):
"""
adds a volume to the task
"""
return self.volumes.append(volume)
Loading

0 comments on commit a9a84bd

Please sign in to comment.