Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flyteadmin digest comparison should rely on database semantics #6058

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 22 additions & 16 deletions flyteadmin/pkg/manager/impl/task_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,19 +88,7 @@
logger.Errorf(ctx, "failed to compute task digest with err %v", err)
return nil, err
}
// See if a task exists and confirm whether it's an identical task or one that with a separate definition.
existingTaskModel, err := util.GetTaskModel(ctx, t.db, request.GetSpec().GetTemplate().GetId())
if err == nil {
if bytes.Equal(taskDigest, existingTaskModel.Digest) {
return nil, errors.NewTaskExistsIdenticalStructureError(ctx, request)
}
existingTask, transformerErr := transformers.FromTaskModel(*existingTaskModel)
if transformerErr != nil {
logger.Errorf(ctx, "failed to transform task from task model")
return nil, transformerErr
}
return nil, errors.NewTaskExistsDifferentStructureError(ctx, request, existingTask.GetClosure().GetCompiledTask(), compiledTask)
}
// Create Task in DB
taskModel, err := transformers.CreateTaskModel(finalizedRequest, &admin.TaskClosure{
CompiledTask: compiledTask,
CreatedAt: createdAt,
Expand All @@ -110,7 +98,6 @@
"Failed to transform task model [%+v] with err: %v", finalizedRequest, err)
return nil, err
}

descriptionModel, err := transformers.CreateDescriptionEntityModel(request.GetSpec().GetDescription(), request.GetId())
if err != nil {
logger.Errorf(ctx,
Expand All @@ -122,8 +109,27 @@
}
err = t.db.TaskRepo().Create(ctx, taskModel, descriptionModel)
if err != nil {
logger.Debugf(ctx, "Failed to create task model with id [%+v] with err %v", request.GetId(), err)
return nil, err
// See if an identical task already exists by checking the error code
flyteErr, ok := err.(errors.FlyteAdminError)
if !ok || flyteErr.Code() != codes.AlreadyExists {
logger.Errorf(ctx, "Failed to create task model with id [%+v] with err %v", request.GetId(), err)
return nil, err
}
// An identical task already exists. Fetch the existing task to verify if it has a different digest
existingTaskModel, fetchErr := util.GetTaskModel(ctx, t.db, request.GetSpec().GetTemplate().GetId())
if fetchErr != nil {
logger.Errorf(ctx, "Failed to fetch existing task model for id [%+v] with err %v", request.GetId(), fetchErr)
return nil, fetchErr
}

Check warning on line 123 in flyteadmin/pkg/manager/impl/task_manager.go

View check run for this annotation

Codecov / codecov/patch

flyteadmin/pkg/manager/impl/task_manager.go#L121-L123

Added lines #L121 - L123 were not covered by tests
if bytes.Equal(taskDigest, existingTaskModel.Digest) {
return nil, errors.NewTaskExistsIdenticalStructureError(ctx, request)
}
existingTask, transformerErr := transformers.FromTaskModel(*existingTaskModel)
if transformerErr != nil {
logger.Errorf(ctx, "Failed to transform task from task model for id [%+v]", request.GetId())
return nil, transformerErr
}

Check warning on line 131 in flyteadmin/pkg/manager/impl/task_manager.go

View check run for this annotation

Codecov / codecov/patch

flyteadmin/pkg/manager/impl/task_manager.go#L129-L131

Added lines #L129 - L131 were not covered by tests
return nil, errors.NewTaskExistsDifferentStructureError(ctx, request, existingTask.GetClosure().GetCompiledTask(), compiledTask)
}
t.metrics.ClosureSizeBytes.Observe(float64(len(taskModel.Closure)))
if finalizedRequest.GetSpec().GetTemplate().GetMetadata() != nil {
Expand Down
61 changes: 61 additions & 0 deletions flyteadmin/pkg/manager/impl/task_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,67 @@ func TestCreateTask(t *testing.T) {
assert.NotNil(t, response)
}

func TestCreateTask_DuplicateTaskRegistration(t *testing.T) {
mockRepository := getMockTaskRepository()
mockRepository.TaskRepo().(*repositoryMocks.MockTaskRepo).SetGetCallback(
func(input interfaces.Identifier) (models.Task, error) {
return models.Task{
TaskKey: models.TaskKey{
Project: taskIdentifier.GetProject(),
Domain: taskIdentifier.GetDomain(),
Name: taskIdentifier.GetName(),
Version: taskIdentifier.GetVersion(),
},
Digest: []byte{
0xbf, 0x79, 0x61, 0x1c, 0xf5, 0xc1, 0xfb, 0x4c, 0xf8, 0xf4, 0xc4, 0x53, 0x5f, 0x8f, 0x73, 0xe2, 0x26, 0x5a,
0x18, 0x4a, 0xb7, 0x66, 0x98, 0x3c, 0xab, 0x2, 0x6c, 0x9, 0x9b, 0x90, 0xec, 0x8f},
}, nil
})
mockRepository.TaskRepo().(*repositoryMocks.MockTaskRepo).SetCreateCallback(func(input models.Task, descriptionEntity *models.DescriptionEntity) error {
return adminErrors.NewFlyteAdminErrorf(codes.AlreadyExists, "task already exists")
})
taskManager := NewTaskManager(mockRepository, getMockConfigForTaskTest(), getMockTaskCompiler(),
mockScope.NewTestScope())
request := testutils.GetValidTaskRequest()
_, err := taskManager.CreateTask(context.Background(), request)
assert.Error(t, err)
flyteErr, ok := err.(adminErrors.FlyteAdminError)
assert.True(t, ok, "Error should be of type FlyteAdminError")
assert.Equal(t, codes.AlreadyExists, flyteErr.Code(), "Error code should be AlreadyExists")
assert.Contains(t, flyteErr.Error(), "task with identical structure already exists")
differentTemplate := &core.TaskTemplate{
Id: &core.Identifier{
ResourceType: core.ResourceType_TASK,
Project: "project",
Domain: "domain",
Name: "name",
Version: "version",
},
Type: "type",
Metadata: &core.TaskMetadata{
Runtime: &core.RuntimeMetadata{
Version: "runtime version 2",
},
},
Interface: &core.TypedInterface{},
Target: &core.TaskTemplate_Container{
Container: &core.Container{
Image: "image",
Command: []string{
"command",
},
},
},
}
request.Spec.Template = differentTemplate
_, err = taskManager.CreateTask(context.Background(), request)
assert.Error(t, err)
flyteErr, ok = err.(adminErrors.FlyteAdminError)
assert.True(t, ok, "Error should be of type FlyteAdminError")
assert.Equal(t, codes.InvalidArgument, flyteErr.Code(), "Error code should be InvalidArgument")
assert.Contains(t, flyteErr.Error(), "name task with different structure already exists.")
}

func TestCreateTask_ValidationError(t *testing.T) {
mockRepository := getMockTaskRepository()
taskManager := NewTaskManager(mockRepository, getMockConfigForTaskTest(), getMockTaskCompiler(),
Expand Down
4 changes: 2 additions & 2 deletions flyteadmin/pkg/repositories/gormimpl/task_repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,12 @@ func (r *TaskRepo) Create(ctx context.Context, input models.Task, descriptionEnt
}
return nil
}
tx := r.db.WithContext(ctx).Omit("id").Create(descriptionEntity)
tx := r.db.WithContext(ctx).Omit("id").Create(&input)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this change is technically not necessary since this is all wrapped in a transaction, and if any insert fails then the whole transaction should be rolled back.

if tx.Error != nil {
return r.errorTransformer.ToFlyteAdminError(tx.Error)
}

tx = r.db.WithContext(ctx).Omit("id").Create(&input)
tx = r.db.WithContext(ctx).Omit("id").Create(descriptionEntity)
if tx.Error != nil {
return r.errorTransformer.ToFlyteAdminError(tx.Error)
}
Expand Down
Loading