Skip to content

Commit

Permalink
Flyteadmin digest comparison should rely on database semantics (#6058)
Browse files Browse the repository at this point in the history
* to TaskManager CreateTask in transactional way

Signed-off-by: Alex Wu <[email protected]>

* amend TaskRepo Create method to create task before description to prevent TaskManager CreateTask method Task not found isue

Signed-off-by: Alex Wu <[email protected]>

* add unit test

Signed-off-by: Alex Wu <[email protected]>

* fix lint error

Signed-off-by: Alex Wu <[email protected]>

---------

Signed-off-by: Alex Wu <[email protected]>
  • Loading branch information
popojk authored Dec 27, 2024
1 parent 4ce86a8 commit 61838b4
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 18 deletions.
38 changes: 22 additions & 16 deletions flyteadmin/pkg/manager/impl/task_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,19 +88,7 @@ func (t *TaskManager) CreateTask(
logger.Errorf(ctx, "failed to compute task digest with err %v", err)
return nil, err
}
// See if a task exists and confirm whether it's an identical task or one that with a separate definition.
existingTaskModel, err := util.GetTaskModel(ctx, t.db, request.GetSpec().GetTemplate().GetId())
if err == nil {
if bytes.Equal(taskDigest, existingTaskModel.Digest) {
return nil, errors.NewTaskExistsIdenticalStructureError(ctx, request)
}
existingTask, transformerErr := transformers.FromTaskModel(*existingTaskModel)
if transformerErr != nil {
logger.Errorf(ctx, "failed to transform task from task model")
return nil, transformerErr
}
return nil, errors.NewTaskExistsDifferentStructureError(ctx, request, existingTask.GetClosure().GetCompiledTask(), compiledTask)
}
// Create Task in DB
taskModel, err := transformers.CreateTaskModel(finalizedRequest, &admin.TaskClosure{
CompiledTask: compiledTask,
CreatedAt: createdAt,
Expand All @@ -110,7 +98,6 @@ func (t *TaskManager) CreateTask(
"Failed to transform task model [%+v] with err: %v", finalizedRequest, err)
return nil, err
}

descriptionModel, err := transformers.CreateDescriptionEntityModel(request.GetSpec().GetDescription(), request.GetId())
if err != nil {
logger.Errorf(ctx,
Expand All @@ -122,8 +109,27 @@ func (t *TaskManager) CreateTask(
}
err = t.db.TaskRepo().Create(ctx, taskModel, descriptionModel)
if err != nil {
logger.Debugf(ctx, "Failed to create task model with id [%+v] with err %v", request.GetId(), err)
return nil, err
// See if an identical task already exists by checking the error code
flyteErr, ok := err.(errors.FlyteAdminError)
if !ok || flyteErr.Code() != codes.AlreadyExists {
logger.Errorf(ctx, "Failed to create task model with id [%+v] with err %v", request.GetId(), err)
return nil, err
}
// An identical task already exists. Fetch the existing task to verify if it has a different digest
existingTaskModel, fetchErr := util.GetTaskModel(ctx, t.db, request.GetSpec().GetTemplate().GetId())
if fetchErr != nil {
logger.Errorf(ctx, "Failed to fetch existing task model for id [%+v] with err %v", request.GetId(), fetchErr)
return nil, fetchErr
}
if bytes.Equal(taskDigest, existingTaskModel.Digest) {
return nil, errors.NewTaskExistsIdenticalStructureError(ctx, request)
}
existingTask, transformerErr := transformers.FromTaskModel(*existingTaskModel)
if transformerErr != nil {
logger.Errorf(ctx, "Failed to transform task from task model for id [%+v]", request.GetId())
return nil, transformerErr
}
return nil, errors.NewTaskExistsDifferentStructureError(ctx, request, existingTask.GetClosure().GetCompiledTask(), compiledTask)
}
t.metrics.ClosureSizeBytes.Observe(float64(len(taskModel.Closure)))
if finalizedRequest.GetSpec().GetTemplate().GetMetadata() != nil {
Expand Down
61 changes: 61 additions & 0 deletions flyteadmin/pkg/manager/impl/task_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,67 @@ func TestCreateTask(t *testing.T) {
assert.NotNil(t, response)
}

func TestCreateTask_DuplicateTaskRegistration(t *testing.T) {
mockRepository := getMockTaskRepository()
mockRepository.TaskRepo().(*repositoryMocks.MockTaskRepo).SetGetCallback(
func(input interfaces.Identifier) (models.Task, error) {
return models.Task{
TaskKey: models.TaskKey{
Project: taskIdentifier.GetProject(),
Domain: taskIdentifier.GetDomain(),
Name: taskIdentifier.GetName(),
Version: taskIdentifier.GetVersion(),
},
Digest: []byte{
0xbf, 0x79, 0x61, 0x1c, 0xf5, 0xc1, 0xfb, 0x4c, 0xf8, 0xf4, 0xc4, 0x53, 0x5f, 0x8f, 0x73, 0xe2, 0x26, 0x5a,
0x18, 0x4a, 0xb7, 0x66, 0x98, 0x3c, 0xab, 0x2, 0x6c, 0x9, 0x9b, 0x90, 0xec, 0x8f},
}, nil
})
mockRepository.TaskRepo().(*repositoryMocks.MockTaskRepo).SetCreateCallback(func(input models.Task, descriptionEntity *models.DescriptionEntity) error {
return adminErrors.NewFlyteAdminErrorf(codes.AlreadyExists, "task already exists")
})
taskManager := NewTaskManager(mockRepository, getMockConfigForTaskTest(), getMockTaskCompiler(),
mockScope.NewTestScope())
request := testutils.GetValidTaskRequest()
_, err := taskManager.CreateTask(context.Background(), request)
assert.Error(t, err)
flyteErr, ok := err.(adminErrors.FlyteAdminError)
assert.True(t, ok, "Error should be of type FlyteAdminError")
assert.Equal(t, codes.AlreadyExists, flyteErr.Code(), "Error code should be AlreadyExists")
assert.Contains(t, flyteErr.Error(), "task with identical structure already exists")
differentTemplate := &core.TaskTemplate{
Id: &core.Identifier{
ResourceType: core.ResourceType_TASK,
Project: "project",
Domain: "domain",
Name: "name",
Version: "version",
},
Type: "type",
Metadata: &core.TaskMetadata{
Runtime: &core.RuntimeMetadata{
Version: "runtime version 2",
},
},
Interface: &core.TypedInterface{},
Target: &core.TaskTemplate_Container{
Container: &core.Container{
Image: "image",
Command: []string{
"command",
},
},
},
}
request.Spec.Template = differentTemplate
_, err = taskManager.CreateTask(context.Background(), request)
assert.Error(t, err)
flyteErr, ok = err.(adminErrors.FlyteAdminError)
assert.True(t, ok, "Error should be of type FlyteAdminError")
assert.Equal(t, codes.InvalidArgument, flyteErr.Code(), "Error code should be InvalidArgument")
assert.Contains(t, flyteErr.Error(), "name task with different structure already exists.")
}

func TestCreateTask_ValidationError(t *testing.T) {
mockRepository := getMockTaskRepository()
taskManager := NewTaskManager(mockRepository, getMockConfigForTaskTest(), getMockTaskCompiler(),
Expand Down
4 changes: 2 additions & 2 deletions flyteadmin/pkg/repositories/gormimpl/task_repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,12 @@ func (r *TaskRepo) Create(ctx context.Context, input models.Task, descriptionEnt
}
return nil
}
tx := r.db.WithContext(ctx).Omit("id").Create(descriptionEntity)
tx := r.db.WithContext(ctx).Omit("id").Create(&input)
if tx.Error != nil {
return r.errorTransformer.ToFlyteAdminError(tx.Error)
}

tx = r.db.WithContext(ctx).Omit("id").Create(&input)
tx = r.db.WithContext(ctx).Omit("id").Create(descriptionEntity)
if tx.Error != nil {
return r.errorTransformer.ToFlyteAdminError(tx.Error)
}
Expand Down

0 comments on commit 61838b4

Please sign in to comment.