From 4f8d1406bddf497c235a37683ea2ec4dc4e67946 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Tue, 17 Dec 2024 13:10:47 -0800 Subject: [PATCH 1/3] improve error message in the agent plugin Signed-off-by: Kevin Su --- .../go/tasks/plugins/webapi/agent/config.go | 4 ++-- .../go/tasks/plugins/webapi/agent/plugin.go | 23 ++++++++++--------- 2 files changed, 14 insertions(+), 13 deletions(-) diff --git a/flyteplugins/go/tasks/plugins/webapi/agent/config.go b/flyteplugins/go/tasks/plugins/webapi/agent/config.go index f26499f320..f32bc8178a 100644 --- a/flyteplugins/go/tasks/plugins/webapi/agent/config.go +++ b/flyteplugins/go/tasks/plugins/webapi/agent/config.go @@ -27,7 +27,7 @@ var ( Size: 500000, ResyncInterval: config.Duration{Duration: 30 * time.Second}, Workers: 10, - MaxSystemFailures: 5, + MaxSystemFailures: 0, }, ResourceMeta: nil, }, @@ -42,7 +42,7 @@ var ( DefaultAgent: Deployment{ Endpoint: "", Insecure: true, - DefaultTimeout: config.Duration{Duration: 10 * time.Second}, + DefaultTimeout: config.Duration{Duration: 2 * time.Second}, }, // AsyncPlugin should be registered to at least one task type. // Reference: https://github.com/flyteorg/flyte/blob/master/flyteplugins/go/tasks/pluginmachinery/registry.go#L27 diff --git a/flyteplugins/go/tasks/plugins/webapi/agent/plugin.go b/flyteplugins/go/tasks/plugins/webapi/agent/plugin.go index 77a27b6699..3015edaf2e 100644 --- a/flyteplugins/go/tasks/plugins/webapi/agent/plugin.go +++ b/flyteplugins/go/tasks/plugins/webapi/agent/plugin.go @@ -80,11 +80,11 @@ func (p *Plugin) Create(ctx context.Context, taskCtx webapi.TaskExecutionContext webapi.Resource, error) { taskTemplate, err := taskCtx.TaskReader().Read(ctx) if err != nil { - return nil, nil, err + return nil, nil, fmt.Errorf("failed to read task template with error: %v", err) } inputs, err := taskCtx.InputReader().Get(ctx) if err != nil { - return nil, nil, err + return nil, nil, fmt.Errorf("failed to read inputs with error: %v", err) } var argTemplate []string @@ -98,7 +98,7 @@ func (p *Plugin) Create(ctx context.Context, taskCtx webapi.TaskExecutionContext argTemplate = taskTemplate.GetContainer().GetArgs() modifiedArgs, err := template.Render(ctx, taskTemplate.GetContainer().GetArgs(), templateParameters) if err != nil { - return nil, nil, err + return nil, nil, fmt.Errorf("failed to render args with error: %v", err) } taskTemplate.GetContainer().Args = modifiedArgs defer func() { @@ -135,7 +135,7 @@ func (p *Plugin) Create(ctx context.Context, taskCtx webapi.TaskExecutionContext request := &admin.CreateTaskRequest{Inputs: inputs, Template: taskTemplate, OutputPrefix: outputPrefix, TaskExecutionMetadata: &taskExecutionMetadata} res, err := client.CreateTask(finalCtx, request) if err != nil { - return nil, nil, err + return nil, nil, fmt.Errorf("failed to create task from agent with %v", err) } return ResourceMetaWrapper{ @@ -153,7 +153,8 @@ func (p *Plugin) ExecuteTaskSync( ) (webapi.ResourceMeta, webapi.Resource, error) { stream, err := client.ExecuteTaskSync(ctx) if err != nil { - return nil, nil, err + logger.Errorf(ctx, "failed to execute task from agent with %v", err) + return nil, nil, fmt.Errorf("failed to execute task from agent with: %v", err) } headerProto := &admin.ExecuteTaskSyncRequest{ @@ -185,8 +186,8 @@ func (p *Plugin) ExecuteTaskSync( in, err := stream.Recv() if err != nil { - logger.Errorf(ctx, "failed to receive from server %s", err.Error()) - return nil, nil, err + logger.Errorf(ctx, "failed to receive stream from server %s", err.Error()) + return nil, nil, fmt.Errorf("failed to receive stream from server %w", err) } if in.GetHeader() == nil { return nil, nil, fmt.Errorf("expected header in the response, but got none") @@ -202,7 +203,7 @@ func (p *Plugin) ExecuteTaskSync( LogLinks: resource.GetLogLinks(), CustomInfo: resource.GetCustomInfo(), AgentError: resource.GetAgentError(), - }, err + }, fmt.Errorf("failed to execute task from agent with %v", err) } func (p *Plugin) Get(ctx context.Context, taskCtx webapi.GetContext) (latest webapi.Resource, err error) { @@ -223,7 +224,7 @@ func (p *Plugin) Get(ctx context.Context, taskCtx webapi.GetContext) (latest web } res, err := client.GetTask(finalCtx, request) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to get task from agent with %v", err) } return ResourceWrapper{ @@ -256,7 +257,7 @@ func (p *Plugin) Delete(ctx context.Context, taskCtx webapi.DeleteContext) error ResourceMeta: metadata.AgentResourceMeta, } _, err = client.DeleteTask(finalCtx, request) - return err + return fmt.Errorf("failed to delete task from agent with %v", err) } func (p *Plugin) Status(ctx context.Context, taskCtx webapi.StatusContext) (phase core.PhaseInfo, err error) { @@ -307,7 +308,7 @@ func (p *Plugin) Status(ctx context.Context, taskCtx webapi.StatusContext) (phas err = writeOutput(ctx, taskCtx, resource.Outputs) if err != nil { logger.Errorf(ctx, "failed to write output with err %s", err.Error()) - return core.PhaseInfoUndefined, err + return core.PhaseInfoUndefined, fmt.Errorf("failed to write output with err %s", err.Error()) } return core.PhaseInfoSuccess(taskInfo), nil } From 466f35e966627a4e733afb16152c1179894e1380 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Tue, 17 Dec 2024 13:11:34 -0800 Subject: [PATCH 2/3] nit Signed-off-by: Kevin Su --- flyteplugins/go/tasks/plugins/webapi/agent/config.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/flyteplugins/go/tasks/plugins/webapi/agent/config.go b/flyteplugins/go/tasks/plugins/webapi/agent/config.go index f32bc8178a..f26499f320 100644 --- a/flyteplugins/go/tasks/plugins/webapi/agent/config.go +++ b/flyteplugins/go/tasks/plugins/webapi/agent/config.go @@ -27,7 +27,7 @@ var ( Size: 500000, ResyncInterval: config.Duration{Duration: 30 * time.Second}, Workers: 10, - MaxSystemFailures: 0, + MaxSystemFailures: 5, }, ResourceMeta: nil, }, @@ -42,7 +42,7 @@ var ( DefaultAgent: Deployment{ Endpoint: "", Insecure: true, - DefaultTimeout: config.Duration{Duration: 2 * time.Second}, + DefaultTimeout: config.Duration{Duration: 10 * time.Second}, }, // AsyncPlugin should be registered to at least one task type. // Reference: https://github.com/flyteorg/flyte/blob/master/flyteplugins/go/tasks/pluginmachinery/registry.go#L27 From cabe46adfa57a58c7bdc8235e1a5ab8270429c6e Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Tue, 17 Dec 2024 14:35:24 -0800 Subject: [PATCH 3/3] fix test Signed-off-by: Kevin Su --- flyteplugins/go/tasks/plugins/webapi/agent/plugin.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/flyteplugins/go/tasks/plugins/webapi/agent/plugin.go b/flyteplugins/go/tasks/plugins/webapi/agent/plugin.go index 3015edaf2e..4f518ced55 100644 --- a/flyteplugins/go/tasks/plugins/webapi/agent/plugin.go +++ b/flyteplugins/go/tasks/plugins/webapi/agent/plugin.go @@ -154,7 +154,7 @@ func (p *Plugin) ExecuteTaskSync( stream, err := client.ExecuteTaskSync(ctx) if err != nil { logger.Errorf(ctx, "failed to execute task from agent with %v", err) - return nil, nil, fmt.Errorf("failed to execute task from agent with: %v", err) + return nil, nil, fmt.Errorf("failed to execute task from agent with %v", err) } headerProto := &admin.ExecuteTaskSyncRequest{ @@ -203,7 +203,7 @@ func (p *Plugin) ExecuteTaskSync( LogLinks: resource.GetLogLinks(), CustomInfo: resource.GetCustomInfo(), AgentError: resource.GetAgentError(), - }, fmt.Errorf("failed to execute task from agent with %v", err) + }, nil } func (p *Plugin) Get(ctx context.Context, taskCtx webapi.GetContext) (latest webapi.Resource, err error) {