Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Newbie] Supporting Yunikorn and Kueue #5915

Open
wants to merge 20 commits into
base: master
Choose a base branch
from
187 changes: 187 additions & 0 deletions rfc/system/5575-supporting-yunikorn-and-kueue.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
# [Newbie] Supporting Yunikorn and Kueue

**Authors:**

- @yuteng

## 1 Executive Summary

Providing kubernetes (k8s) resource management, gang scheduling and preemption for flyte applications by third-party software, including Apache Yunikorn and Kueue.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please explain what preemption means here compared to what preemption means in the context of spot instances on e.g. AWS or GCP?


## 2 Motivation

Flyte supports multi-tenancy and various Kubernetes plugins.
Some Kubernetes plugins may encounter into resource wastage when jobs partially start without performing any meaningful work.
A solution to this issue is gang scheduling, which guarantees that all worker pods derived from a CRD are scheduled simultaneously.
Kueue or Apache Yunikorn support this mechanism.
Additionally, Yunikorn can map tenants and organizations to hierarchical queues to define resource quotas.
Based on this setting, access control lists can be configured to grant access to users and groups.

## 3 Proposed Implementation

Kueue

```yaml
queueconfig:
scheduler: yunikorn
jobs:
- type: "ray"
gangscheduling: "placeholderTimeoutInSeconds=60 gangSchedulingStyle=hard"
allow-preemption: false
- type: "spark"
gangscheduling: "placeholderTimeoutInSeconds=30 gangSchedulingStyle=hard"
allow-preemption: true
```
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this list complete or an example? I.e. will this also work for plugins like kubeflow pytorch, tf, mpi or dask, ...?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an example.
Admin can set default configuration about gang scheduling for CRD in flyte k8s plugins.


`root.organization1.ray` is the queue of the ray job submitted by user1 belonging organization1.

ResourceFlavor allocates resource based on labels which indicates that category-based resource allocation by organization label is available.
Copy link
Member

@fg91 fg91 Nov 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please explain how the the reseource flavor will be determined? Is there a way to automatically derive this from the task decorator args @task(resources=..., accelerator=...)?

It would be really nice if tasks that need e.g. an A100 GPU were automatically not in the same queue as tasks that need 2 x T4 GPUs. We're using kubeflow pytorch jobs with scheduler plugins' gang scheduling and have observed jobs being starved that the cluster would have had resources for because other jobs which were trying to get different GPU types couldn't be scheduled but which had a higher priority.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, need to create Kueue CRDs first.
A cluster queue defines resource quota which property is defined by resource flavors.
I think creating resource flavors to categorizing resources under a cluster queue is available solution.

apiVersion: kueue.x-k8s.io/v1beta1
kind: ResourceFlavor
metadata:
  name: "spot-t4"
spec:
  nodeLabels:
    cloud.google.com/gke-accelerator: nvidia-tesla-t4
  nodeTaints:
  - effect: NoSchedule
    key: cloud.google.com/gke-accelerator: nvidia-tesla-t4
    value: "true"
  tolerations:
  - key: "spot-taint"
    operator: "Exists"
    effect: "NoSchedule"
    
   apiVersion: kueue.x-k8s.io/v1beta1
kind: ClusterQueue
metadata:
  name: "cluster-queue"
spec:
  namespaceSelector: {} # match all.
  resourceGroups:
  - coveredResources: ["cpu", "memory", "nvidia.com/gpu"]
    flavors:
    - name: "spot-t4"
      resources:
      - name: "cpu"
        nominalQuota: 9
      - name: "memory"
        nominalQuota: 36Gi
      - name: "nvidia.com/gpuu"
        nominalQuota: 50
    - name: "spot-a100"
      resources:
      - name: "cpu"
        nominalQuota: 18
      - name: "memory"
        nominalQuota: 72Gi
      - name: "nvidia.com/gpu"
        nominalQuota: 100

In the other hand, kueue preemption requires Kueue WorkloadPriorityClass and patching job with label.
The plugin received the preemption label and then it should patch it to pods belonging same job

Thus, a clusterQueue including multiple resources represents the total acessaible resource for an organization.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand this sentence tbh, could you please explain/expand?

| clusterQueue | localQueue |
| --- | --- |
| <organization name> | ray、spark、default |
A tenant can submit organization-specific tasks to queues such as organization.ray, organization.spark and organization.default to track which job types are submittable.

A scheduling plugin implements functions `SetSchedulerName`, `CreateLabels` and `CreateGroupLabels` to create labels and `schedulerName`.
`CreateLabels` patches necassary labels, such as `queuename`, `user-info` and `applcationID`, to jobs.
`CreateGroupLabels` supports creating `group-pod` and `task-group` labels based on incoming CRD if need.
`SetSchedulerName` set `schedulerName` field in `podTemplate`.

```go
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not have a single interface and two implementations of the same interface for yunikorn and kueue?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer in func (e *PluginManager) launchResource( to not call queue or yunikorn specific code, see snippet below, but just a general interface whose implementation depends on the propeller config.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree, i updated the document..

type SchedulePlugin interface {
CreateLabels(taskCtx pluginsCore.TaskExecutionMetadata, o client.Object, cfg *config.K8sPluginConfig)
CreateGroupLabels(ctx context.Context, object client.Object, taskTmpl *core.TaskTemplate)
GetGroupLabels() (labels, annotations map[string]string)
SetSchedulerName(object client.Object)
}

type YunikornScheduablePlugin struct {
jobs map[string]string
Labels map[string]string
Annotations map[string]string
}

func (yk *YunikornSchedulPlugin) GetGroupLabels() (labels, annotations map[string]string) {
return yk.Labels, yk.Annotations
}

func (yk *YunikornSchedulePlugin) CreateLabels(taskCtx pluginsCore.TaskExecutionMetadata, o client.Object, cfg *config.K8sPluginConfig) (labels, annotations map[string]string) {
// Set queue name based on the job type and flyteidl.Identifier fields including "ResourceType", "Org" and "Name".
// 1.Clean yk.Labels and yk.Annotations
// 2.Add yunikorn.apache.org/user.info = <organization>.<Name>
// 3.Add yunikorn.apache.org/app-id = <ResourceType>-<uuid>
// 4.Add yunikorn.apache.org/queue = <organization>.<jobType>
}

func (yk *YunikornSchedulePlugin) CreateGroupLabels(ctx context.Context, object client.Object, taskTmpl *core.TaskTemplate) {
// 1.Add yunikorn.apache.org/task-group-name = yk.CreateTaskgroupName(ResourceType)
// 2.Add yunikorn.apache.org/task-groups = yk.CreateTaskgroup(object)
// 3.Add yunikorn.apache.org/schedulingPolicyParameters = yk.jobs[ResourceType]
// 4.Add yunikorn.apache.org/allow-preemption = true/false
}

type KueueScheduablePlugin struct {
jobs map[string]string
Labels map[string]string
Annotations map[string]string
}

func (k *KueueScheduablePlugin) GetGroupLabels() (labels, annotations map[string]string) {
return k.Labels, k.Annotations
}

func (k *KueueScheduablePlugin) CreateLabels(taskCtx pluginsCore.TaskExecutionMetadata, o client.Object, cfg *config.K8sPluginConfig) (labels, annotations map[string]string) {
// Set queue name based on the job type and flyteidl.Identifier field "Org".
// Clean k.Labels and k.Annotations
// 1.Add kueue.x-k8s.io/queue-name = <organization>.<jobtype>
// Update k.Labels and k.Annotations
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do any additional k8s resources have to be created for the queues or does a queue exist as soon as a pod has an annotation with a new queue name?

Copy link
Author

@0yukali0 0yukali0 Nov 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, Kueue CRDs describe the quota a queue when adopting Kueue.

In the other hand, queues are configured by setting [Yunikorn configuration] (https://yunikorn.apache.org/docs/user_guide/queue_config) if adopting Yunikorn.

func (k *KueueScheduablePlugin) CreateGroupLabels(ctx context.Context, object client.Object, taskTmpl *core.TaskTemplate) {
// Add Label "kueue.x-k8s.io/pod-group-name" and "kueue.x-k8s.io/pod-group-total-count" for spark、dask.
// If object type is ray CRD and kubeflow CRD which are supported by Kueue then skips.
// Update k.Labels and k.Annotations
}
```

When a job comes, following things happens.
1. `SetSchedulerName` sets the `schedulerName` with the specific scheduler name
2. `CreateLabels` new basic labels based on the scheduler.
3. `CreateGroupLabels` creates `kueue.x-k8s.io/pod-group-name` or `yunikorn.apache.org/task-groups` according to the calculatied results from CRD.
4. Merging labels and annotations from `CreateLabels` and `CreateGroupLabels` to the CRD.

```go
type PluginManager struct {
id string
plugin k8s.Plugin
resourceToWatch runtime.Object
kubeClient pluginsCore.KubeClient
metrics PluginMetrics
// Per namespace-resource
backOffController *backoff.Controller
resourceLevelMonitor *ResourceLevelMonitor
eventWatcher EventWatcher
}

func (e *PluginManager) launchResource(ctx context.Context, tCtx pluginsCore.TaskExecutionContext) (pluginsCore.Transition, error) {
Copy link
Member

@fg91 fg91 Nov 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe addObjectMetadata which is called by launchResource would be a better place to inject the required metadata. Or do we need to inject something other than labels/annotations?

o, err := e.plugin.BuildResource(ctx, k8sTaskCtx)
if err != nil {
return pluginsCore.UnknownTransition, err
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be nice to not have Kueue specific code here but a general interface, see this comment.

if p, ok := e.plugin.(k8s.ScheduablePlugin); ok {
o, err = p.SetSchedulerName(o)
if err != nil {
return pluginsCore.UnknownTransition, err
}
}
}

func (e *PluginManager) addObjectMetadata(taskCtx pluginsCore.TaskExecutionMetadata, o client.Object, cfg *config.K8sPluginConfig) {
var schedulerLabels, schedulerAnnotations map[string]string
if p, ok := e.plugin.(k8s.ScheduablePlugin); ok {
o, err = p.SetSchedulerName(o)
if err != nil {
return pluginsCore.UnknownTransition, err
}
p.CreateLabels(taskCtx, o)
p.CreateGroupLabels(taskCtx, o)
schedulerLabels, schedulerAnnotations = e.plugin.GetLabels()
}
o.SetNamespace(taskCtx.GetNamespace())
o.SetAnnotations(pluginsUtils.UnionMaps(cfg.DefaultAnnotations, o.GetAnnotations(), pluginsUtils.CopyMap(taskCtx.GetAnnotations(), schedulerAnnotations)))
o.SetLabels(pluginsUtils.UnionMaps(cfg.DefaultLabels, o.GetLabels(), pluginsUtils.CopyMap(taskCtx.GetLabels(), schedulerLabels)))
o.SetName(taskCtx.GetTaskExecutionID().GetGeneratedName())
}
```

## 4 Metrics & Dashboards

1. The Yunikorn scheduler add applications to a specific queue based on their user info, queue name for any application type.
2. Yunikorn and Kueue provide gang scheduling through annotations For Ray and spark.
3. Preemption behavior aligns with user-defined configuration in yunikorn.

## 5 Drawbacks

This appoarch doesn't offer a way to maintain consistency between the accuate resource quotas of groups and the configuration in scheduler.

## 6 Alternatives

## 7 Potential Impact and Dependencies

Flyte support Spark, Ray and Kubeflow CRDs including Pytorch and TFjobs.
The Spark and Ray operators have supported Yunikorn gang scheduling since task group calculation were implemented in these operators.
Taskgroup calculation implementation in pods aspect in flyte or kubeflow is required for supporting kubeflow CRDs.
In the other hand, Kueue currently doesn't support Spark CRD.
| Operator | Yunikorn | Kueue |
| --- | --- | --- |
| Spark | v | x |
| Ray | v | v |
| Kubeflow | x | v |

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From what I understand, one only needs to add labels/annotations on the worker pods. Can't we do this purely from flyte by modifying the pod template spec of the respective CRD? What do the operators have to do in addition to that?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, current progress fetch the pod templates from CRDs and patch label on them.
If operators implement the mechanism to patch group label for their CRD to support gang scheduling in the future, we can start to remove the code for generating group labels to reduce the maintaining overhead.

## 8 Unresolved questions

## 9 Conclusion

Yunikorn and Kueue support gang scheduling allowing all necassary pods to run sumultaneously when required resource are available.
Yunikorn provides preemption calculating the priority of applications based on their priority class and priority score of the queue where they are submitted, in order to trigger high-prioirty or emergency application immediately.
Yunikorn's hierarchical queue includes grarateed resources settings and ACLs.
Copy link
Member

@fg91 fg91 Nov 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Could you please run the doc through a spelling checker? Thank you 🙇

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, i ran the make spellcheck in the latest commit :)

Loading