-
Notifications
You must be signed in to change notification settings - Fork 63
Resource manager overhaul #552
base: master
Are you sure you want to change the base?
Changes from 17 commits
3e6c058
65de610
76b3bd1
f85ba5c
7797a16
4a7f977
afd2812
2c68564
447b4b3
9d9f023
c226e3e
a2b117f
6efb3b4
118fcf2
06370fd
ebecb89
bd23905
93ebadd
f6fa4ca
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -10,8 +10,6 @@ import ( | |
|
||
"github.com/flyteorg/flyteadmin/plugins" | ||
|
||
"github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/flytek8s" | ||
|
||
"github.com/flyteorg/flyteadmin/auth" | ||
|
||
"github.com/flyteorg/flyteadmin/pkg/manager/impl/resources" | ||
|
@@ -184,12 +182,49 @@ func (m *ExecutionManager) addPluginOverrides(ctx context.Context, executionID * | |
return nil, nil | ||
} | ||
|
||
// TODO: Delete this code usage after the flyte v0.17.0 release | ||
// defaults should be a coalesce of task defaults, and platform defaults. | ||
// task limits should be the limits from the task, coalesced with the defaults from step one | ||
// then both should be limited by any platform limits. | ||
// anything 0 or empty is not set. | ||
// if both requests and limits end up empty, return nil. if one is empty, return nil for it | ||
func (m *ExecutionManager) getResources(ctx context.Context, taskResources *core.Resources, platformResources workflowengineInterfaces.TaskResources) *core.Resources { | ||
|
||
// requests: coalesce(task request, platform default) | ||
// limits: coalesce(task limits, task requests) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. IMHO this only makes sense for non-compressible resources like memory and storage. |
||
// check that defaults and limits are both below platform limit | ||
var requestSet runtimeInterfaces.TaskResourceSet | ||
var limitSet runtimeInterfaces.TaskResourceSet | ||
if taskResources != nil && taskResources.GetRequests() != nil { | ||
requestSet = util.GetTaskResourcesAndCoalesce(ctx, taskResources.GetRequests(), platformResources.Defaults) | ||
} else { | ||
requestSet = platformResources.Defaults | ||
} | ||
if taskResources != nil && taskResources.GetLimits() != nil { | ||
limitSet = util.GetTaskResourcesAndCoalesce(ctx, taskResources.GetLimits(), requestSet) | ||
} else { | ||
limitSet = requestSet | ||
} | ||
adjustedRequestSet := util.ConstrainTaskResourceSet(ctx, requestSet, platformResources.Limits) | ||
adjustedLimitSet := util.ConstrainTaskResourceSet(ctx, limitSet, platformResources.Limits) | ||
|
||
// convert the sets back to core.Resources | ||
requestEntries := util.ConvertTaskResourceSetToCoreResources(adjustedRequestSet) | ||
limitEntries := util.ConvertTaskResourceSetToCoreResources(adjustedLimitSet) | ||
if len(requestEntries) == 0 && len(limitEntries) == 0 { | ||
return nil | ||
} | ||
res := core.Resources{} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In the code this replaces (current lines 210-213) we init the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i think we solve this with more testing. looked through the code and i don't think it's used. Also ran locally with no admin config, no matchable resources, and no task resources, and things seem to be running almost correctly. The pod is still being created with
but the fly crd has node resources
and has execution config task resources:
which is correct because these are non-nullable. and resources doesn't show up at all in the task template part of the task definition. if downstream code can't handle a nil for some reason it should handle it there i feel. |
||
if len(requestEntries) > 0 { | ||
res.Requests = requestEntries | ||
} | ||
if len(limitEntries) > 0 { | ||
res.Limits = limitEntries | ||
} | ||
|
||
return &res | ||
} | ||
|
||
// Assumes input contains a compiled task with a valid container resource execConfig. | ||
// | ||
// Note: The system will assign a system-default value for request but for limit it will deduce it from the request | ||
// itself => Limit := Min([Some-Multiplier X Request], System-Max). For now we are using a multiplier of 1. In | ||
// general we recommend the users to set limits close to requests for more predictability in the system. | ||
func (m *ExecutionManager) setCompiledTaskDefaults(ctx context.Context, task *core.CompiledTask, | ||
platformTaskResources workflowengineInterfaces.TaskResources) { | ||
|
||
|
@@ -203,100 +238,8 @@ func (m *ExecutionManager) setCompiledTaskDefaults(ctx context.Context, task *co | |
logger.Debugf(ctx, "Not setting default resources for task [%+v], no container resources found to check", task) | ||
return | ||
} | ||
task.Template.GetContainer().Resources = m.getResources(ctx, task.Template.GetContainer().Resources, platformTaskResources) | ||
|
||
if task.Template.GetContainer().Resources == nil { | ||
// In case of no resources on the container, create empty requests and limits | ||
// so the container will still have resources configure properly | ||
task.Template.GetContainer().Resources = &core.Resources{ | ||
Requests: []*core.Resources_ResourceEntry{}, | ||
Limits: []*core.Resources_ResourceEntry{}, | ||
} | ||
} | ||
|
||
var finalizedResourceRequests = make([]*core.Resources_ResourceEntry, 0) | ||
var finalizedResourceLimits = make([]*core.Resources_ResourceEntry, 0) | ||
|
||
// The IDL representation for container-type tasks represents resources as a list with string quantities. | ||
// In order to easily reason about them we convert them to a set where we can O(1) fetch specific resources (e.g. CPU) | ||
// and represent them as comparable quantities rather than strings. | ||
taskResourceRequirements := util.GetCompleteTaskResourceRequirements(ctx, task.Template.Id, task) | ||
|
||
cpu := flytek8s.AdjustOrDefaultResource(taskResourceRequirements.Defaults.CPU, taskResourceRequirements.Limits.CPU, | ||
platformTaskResources.Defaults.CPU, platformTaskResources.Limits.CPU) | ||
finalizedResourceRequests = append(finalizedResourceRequests, &core.Resources_ResourceEntry{ | ||
Name: core.Resources_CPU, | ||
Value: cpu.Request.String(), | ||
}) | ||
finalizedResourceLimits = append(finalizedResourceLimits, &core.Resources_ResourceEntry{ | ||
Name: core.Resources_CPU, | ||
Value: cpu.Limit.String(), | ||
}) | ||
|
||
memory := flytek8s.AdjustOrDefaultResource(taskResourceRequirements.Defaults.Memory, taskResourceRequirements.Limits.Memory, | ||
platformTaskResources.Defaults.Memory, platformTaskResources.Limits.Memory) | ||
finalizedResourceRequests = append(finalizedResourceRequests, &core.Resources_ResourceEntry{ | ||
Name: core.Resources_MEMORY, | ||
Value: memory.Request.String(), | ||
}) | ||
finalizedResourceLimits = append(finalizedResourceLimits, &core.Resources_ResourceEntry{ | ||
Name: core.Resources_MEMORY, | ||
Value: memory.Limit.String(), | ||
}) | ||
|
||
// Only assign ephemeral storage when it is either requested or limited in the task definition, or a platform | ||
// default exists. | ||
if !taskResourceRequirements.Defaults.EphemeralStorage.IsZero() || | ||
!taskResourceRequirements.Limits.EphemeralStorage.IsZero() || | ||
!platformTaskResources.Defaults.EphemeralStorage.IsZero() { | ||
Comment on lines
-248
to
-250
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we have to check for cases where a user manually sets a resource limit to 0 to override the default limits? For example - flyteadmin configurations default limit to 5G Memory, if a user wants a task with unbounded Memory can they set There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yeah not sure. So as of this PR, "0", is not treated specially until the very end. It's like any other number. It's at |
||
ephemeralStorage := flytek8s.AdjustOrDefaultResource(taskResourceRequirements.Defaults.EphemeralStorage, taskResourceRequirements.Limits.EphemeralStorage, | ||
platformTaskResources.Defaults.EphemeralStorage, platformTaskResources.Limits.EphemeralStorage) | ||
finalizedResourceRequests = append(finalizedResourceRequests, &core.Resources_ResourceEntry{ | ||
Name: core.Resources_EPHEMERAL_STORAGE, | ||
Value: ephemeralStorage.Request.String(), | ||
}) | ||
finalizedResourceLimits = append(finalizedResourceLimits, &core.Resources_ResourceEntry{ | ||
Name: core.Resources_EPHEMERAL_STORAGE, | ||
Value: ephemeralStorage.Limit.String(), | ||
}) | ||
} | ||
|
||
// Only assign storage when it is either requested or limited in the task definition, or a platform | ||
// default exists. | ||
if !taskResourceRequirements.Defaults.Storage.IsZero() || | ||
!taskResourceRequirements.Limits.Storage.IsZero() || | ||
!platformTaskResources.Defaults.Storage.IsZero() { | ||
storageResource := flytek8s.AdjustOrDefaultResource(taskResourceRequirements.Defaults.Storage, taskResourceRequirements.Limits.Storage, | ||
platformTaskResources.Defaults.Storage, platformTaskResources.Limits.Storage) | ||
finalizedResourceRequests = append(finalizedResourceRequests, &core.Resources_ResourceEntry{ | ||
Name: core.Resources_STORAGE, | ||
Value: storageResource.Request.String(), | ||
}) | ||
finalizedResourceLimits = append(finalizedResourceLimits, &core.Resources_ResourceEntry{ | ||
Name: core.Resources_STORAGE, | ||
Value: storageResource.Limit.String(), | ||
}) | ||
} | ||
|
||
// Only assign gpu when it is either requested or limited in the task definition, or a platform default exists. | ||
if !taskResourceRequirements.Defaults.GPU.IsZero() || | ||
!taskResourceRequirements.Limits.GPU.IsZero() || | ||
!platformTaskResources.Defaults.GPU.IsZero() { | ||
gpu := flytek8s.AdjustOrDefaultResource(taskResourceRequirements.Defaults.GPU, taskResourceRequirements.Limits.GPU, | ||
platformTaskResources.Defaults.GPU, platformTaskResources.Limits.GPU) | ||
finalizedResourceRequests = append(finalizedResourceRequests, &core.Resources_ResourceEntry{ | ||
Name: core.Resources_GPU, | ||
Value: gpu.Request.String(), | ||
}) | ||
finalizedResourceLimits = append(finalizedResourceLimits, &core.Resources_ResourceEntry{ | ||
Name: core.Resources_GPU, | ||
Value: gpu.Limit.String(), | ||
}) | ||
} | ||
|
||
task.Template.GetContainer().Resources = &core.Resources{ | ||
Requests: finalizedResourceRequests, | ||
Limits: finalizedResourceLimits, | ||
} | ||
} | ||
|
||
// Fetches inherited execution metadata including the parent node execution db model id and the source execution model id | ||
|
@@ -1623,7 +1566,7 @@ func NewExecutionManager(db repositoryInterfaces.Repository, pluginRegistry *plu | |
"size in bytes of serialized execution outputs"), | ||
} | ||
|
||
resourceManager := resources.NewResourceManager(db, config.ApplicationConfiguration()) | ||
resourceManager := resources.NewResourceManager(db, config) | ||
return &ExecutionManager{ | ||
db: db, | ||
config: config, | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -9,7 +9,6 @@ import ( | |
"github.com/flyteorg/flyteadmin/plugins" | ||
|
||
"google.golang.org/grpc/status" | ||
|
||
"google.golang.org/protobuf/types/known/timestamppb" | ||
|
||
"github.com/benbjohnson/clock" | ||
|
@@ -87,12 +86,12 @@ var testCluster = "C1" | |
var outputURI = "output uri" | ||
|
||
var resourceDefaults = runtimeInterfaces.TaskResourceSet{ | ||
CPU: resource.MustParse("200m"), | ||
Memory: resource.MustParse("200Gi"), | ||
CPU: testutils.GetPtr(resource.MustParse("200m")), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. rather than adding a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is K8s code unf... don't want to wade through getting a change merged through that process. I can try though. |
||
Memory: testutils.GetPtr(resource.MustParse("200Gi")), | ||
} | ||
var resourceLimits = runtimeInterfaces.TaskResourceSet{ | ||
CPU: resource.MustParse("300m"), | ||
Memory: resource.MustParse("500Gi"), | ||
CPU: testutils.GetPtr(resource.MustParse("300m")), | ||
Memory: testutils.GetPtr(resource.MustParse("500Gi")), | ||
} | ||
|
||
func getLegacySpec() *admin.ExecutionSpec { | ||
|
@@ -3754,16 +3753,16 @@ func TestSetDefaults(t *testing.T) { | |
execManager := NewExecutionManager(repositoryMocks.NewMockRepository(), r, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil, nil, &eventWriterMocks.WorkflowExecutionEventWriter{}) | ||
execManager.(*ExecutionManager).setCompiledTaskDefaults(context.Background(), task, workflowengineInterfaces.TaskResources{ | ||
Defaults: runtimeInterfaces.TaskResourceSet{ | ||
CPU: resource.MustParse("200m"), | ||
GPU: resource.MustParse("4"), | ||
Memory: resource.MustParse("200Gi"), | ||
EphemeralStorage: resource.MustParse("500Mi"), | ||
CPU: testutils.GetPtr(resource.MustParse("200m")), | ||
GPU: testutils.GetPtr(resource.MustParse("4")), | ||
Memory: testutils.GetPtr(resource.MustParse("200Gi")), | ||
EphemeralStorage: testutils.GetPtr(resource.MustParse("500Mi")), | ||
}, | ||
Limits: runtimeInterfaces.TaskResourceSet{ | ||
CPU: resource.MustParse("300m"), | ||
GPU: resource.MustParse("8"), | ||
Memory: resource.MustParse("500Gi"), | ||
EphemeralStorage: resource.MustParse("501Mi"), | ||
CPU: testutils.GetPtr(resource.MustParse("300m")), | ||
GPU: testutils.GetPtr(resource.MustParse("8")), | ||
Memory: testutils.GetPtr(resource.MustParse("500Gi")), | ||
EphemeralStorage: testutils.GetPtr(resource.MustParse("501Mi")), | ||
}, | ||
}) | ||
assert.True(t, proto.Equal( | ||
|
@@ -3839,15 +3838,15 @@ func TestSetDefaults_MissingRequests_ExistingRequestsPreserved(t *testing.T) { | |
execManager := NewExecutionManager(repositoryMocks.NewMockRepository(), r, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil, nil, &eventWriterMocks.WorkflowExecutionEventWriter{}) | ||
execManager.(*ExecutionManager).setCompiledTaskDefaults(context.Background(), task, workflowengineInterfaces.TaskResources{ | ||
Defaults: runtimeInterfaces.TaskResourceSet{ | ||
CPU: resource.MustParse("200m"), | ||
GPU: resource.MustParse("4"), | ||
Memory: resource.MustParse("200Gi"), | ||
CPU: testutils.GetPtr(resource.MustParse("200m")), | ||
GPU: testutils.GetPtr(resource.MustParse("4")), | ||
Memory: testutils.GetPtr(resource.MustParse("200Gi")), | ||
}, | ||
Limits: runtimeInterfaces.TaskResourceSet{ | ||
CPU: resource.MustParse("300m"), | ||
GPU: resource.MustParse("8"), | ||
CPU: testutils.GetPtr(resource.MustParse("300m")), | ||
GPU: testutils.GetPtr(resource.MustParse("8")), | ||
// Because only the limit is set, this resource should not be injected. | ||
EphemeralStorage: resource.MustParse("100"), | ||
EphemeralStorage: testutils.GetPtr(resource.MustParse("100")), | ||
}, | ||
}) | ||
assert.True(t, proto.Equal( | ||
|
@@ -3888,10 +3887,10 @@ func TestSetDefaults_MissingRequests_ExistingRequestsPreserved(t *testing.T) { | |
|
||
func TestSetDefaults_OptionalRequiredResources(t *testing.T) { | ||
taskConfigLimits := runtimeInterfaces.TaskResourceSet{ | ||
CPU: resource.MustParse("300m"), | ||
GPU: resource.MustParse("1"), | ||
Memory: resource.MustParse("500Gi"), | ||
EphemeralStorage: resource.MustParse("501Mi"), | ||
CPU: testutils.GetPtr(resource.MustParse("300m")), | ||
GPU: testutils.GetPtr(resource.MustParse("1")), | ||
Memory: testutils.GetPtr(resource.MustParse("500Gi")), | ||
EphemeralStorage: testutils.GetPtr(resource.MustParse("501Mi")), | ||
} | ||
|
||
task := &core.CompiledTask{ | ||
|
@@ -3917,8 +3916,8 @@ func TestSetDefaults_OptionalRequiredResources(t *testing.T) { | |
execManager := NewExecutionManager(repositoryMocks.NewMockRepository(), r, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil, nil, &eventWriterMocks.WorkflowExecutionEventWriter{}) | ||
execManager.(*ExecutionManager).setCompiledTaskDefaults(context.Background(), task, workflowengineInterfaces.TaskResources{ | ||
Defaults: runtimeInterfaces.TaskResourceSet{ | ||
CPU: resource.MustParse("200m"), | ||
Memory: resource.MustParse("200Gi"), | ||
CPU: testutils.GetPtr(resource.MustParse("200m")), | ||
Memory: testutils.GetPtr(resource.MustParse("200Gi")), | ||
}, | ||
Limits: taskConfigLimits, | ||
}) | ||
|
@@ -3957,9 +3956,9 @@ func TestSetDefaults_OptionalRequiredResources(t *testing.T) { | |
execManager.(*ExecutionManager).setCompiledTaskDefaults(context.Background(), task, workflowengineInterfaces.TaskResources{ | ||
Limits: taskConfigLimits, | ||
Defaults: runtimeInterfaces.TaskResourceSet{ | ||
CPU: resource.MustParse("200m"), | ||
Memory: resource.MustParse("200Gi"), | ||
EphemeralStorage: resource.MustParse("1"), | ||
CPU: testutils.GetPtr(resource.MustParse("200m")), | ||
Memory: testutils.GetPtr(resource.MustParse("200Gi")), | ||
EphemeralStorage: testutils.GetPtr(resource.MustParse("1")), | ||
}, | ||
}) | ||
assert.True(t, proto.Equal( | ||
|
@@ -3999,6 +3998,7 @@ func TestSetDefaults_OptionalRequiredResources(t *testing.T) { | |
}) | ||
|
||
} | ||
|
||
func TestCreateSingleTaskExecution(t *testing.T) { | ||
repository := getMockRepositoryForExecTest() | ||
var getCalledCount = 0 | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This logic seems sound to me, but would love to see this simplified. For example, can we convert to a similar type and use a function like: