From 54aa165a22633a5e5ff5e814db67c3e9994753fa Mon Sep 17 00:00:00 2001 From: Future-Outlier Date: Wed, 27 Nov 2024 23:36:49 +0800 Subject: [PATCH] add tests from Yi-Cheng Signed-off-by: Future-Outlier --- .../transformers/node_execution.go | 1 + .../transformers/node_execution_test.go | 5 + .../pkg/controller/nodes/task/handler.go | 96 +++++++++++++++---- 3 files changed, 83 insertions(+), 19 deletions(-) diff --git a/flyteadmin/pkg/repositories/transformers/node_execution.go b/flyteadmin/pkg/repositories/transformers/node_execution.go index 107e9efb70..1f17f0f131 100644 --- a/flyteadmin/pkg/repositories/transformers/node_execution.go +++ b/flyteadmin/pkg/repositories/transformers/node_execution.go @@ -42,6 +42,7 @@ func addNodeRunningState(request *admin.NodeExecutionEventRequest, nodeExecution "failed to marshal occurredAt into a timestamp proto with error: %v", err) } closure.StartedAt = startedAtProto + closure.DeckUri = request.GetEvent().GetDeckUri() return nil } diff --git a/flyteadmin/pkg/repositories/transformers/node_execution_test.go b/flyteadmin/pkg/repositories/transformers/node_execution_test.go index e37d312612..11dfd7256e 100644 --- a/flyteadmin/pkg/repositories/transformers/node_execution_test.go +++ b/flyteadmin/pkg/repositories/transformers/node_execution_test.go @@ -51,6 +51,7 @@ var childExecutionID = &core.WorkflowExecutionIdentifier{ const dynamicWorkflowClosureRef = "s3://bucket/admin/metadata/workflow" const testInputURI = "fake://bucket/inputs.pb" +const DeckURI = "fake://bucket/deck.html" var testInputs = &core.LiteralMap{ Literals: map[string]*core.Literal{ @@ -65,6 +66,7 @@ func TestAddRunningState(t *testing.T) { Event: &event.NodeExecutionEvent{ Phase: core.NodeExecution_RUNNING, OccurredAt: startedAtProto, + DeckUri: DeckURI, }, } nodeExecutionModel := models.NodeExecution{} @@ -73,6 +75,7 @@ func TestAddRunningState(t *testing.T) { assert.Nil(t, err) assert.Equal(t, startedAt, *nodeExecutionModel.StartedAt) assert.True(t, proto.Equal(startedAtProto, closure.GetStartedAt())) + assert.Equal(t, DeckURI, closure.GetDeckUri()) } func TestAddTerminalState_OutputURI(t *testing.T) { @@ -84,6 +87,7 @@ func TestAddTerminalState_OutputURI(t *testing.T) { OutputUri: outputURI, }, OccurredAt: occurredAtProto, + DeckUri: DeckURI, }, } startedAt := occurredAt.Add(-time.Minute) @@ -99,6 +103,7 @@ func TestAddTerminalState_OutputURI(t *testing.T) { assert.Nil(t, err) assert.EqualValues(t, outputURI, closure.GetOutputUri()) assert.Equal(t, time.Minute, nodeExecutionModel.Duration) + assert.Equal(t, DeckURI, closure.GetDeckUri()) } func TestAddTerminalState_OutputData(t *testing.T) { diff --git a/flytepropeller/pkg/controller/nodes/task/handler.go b/flytepropeller/pkg/controller/nodes/task/handler.go index 000d6bd7e7..f291189f9f 100644 --- a/flytepropeller/pkg/controller/nodes/task/handler.go +++ b/flytepropeller/pkg/controller/nodes/task/handler.go @@ -71,10 +71,43 @@ func getPluginMetricKey(pluginID, taskType string) string { return taskType + "_" + pluginID } -func (p *pluginRequestedTransition) CacheHit(outputPath storage.DataReference, deckPath *storage.DataReference, entry catalog.Entry) { +func (p *pluginRequestedTransition) AddDeckURI(ctx context.Context, tCtx *taskExecutionContext) { + var deckURI *storage.DataReference + deckURIValue := tCtx.ow.GetDeckPath() + deckURI = &deckURIValue + + if p.execInfo.OutputInfo == nil { + p.execInfo.OutputInfo = &handler.OutputInfo{} + } + + p.execInfo.OutputInfo.DeckURI = deckURI +} + +// RemoveNonexistentDeckURI removes the deck URI from the plugin execution info if the URI does not exist in remote storage. +func (p *pluginRequestedTransition) RemoveNonexistentDeckURI(ctx context.Context, tCtx *taskExecutionContext) error { + reader := tCtx.ow.GetReader() + if reader == nil && p.execInfo.OutputInfo != nil { + p.execInfo.OutputInfo.DeckURI = nil + return nil + } + + exists, err := reader.DeckExists(ctx) + if err != nil { + logger.Errorf(ctx, "Failed to check deck file existence. Error: %v", err) + return regErrors.Wrapf(err, "failed to check existence of deck file") + } + + if !exists && p.execInfo.OutputInfo != nil { + p.execInfo.OutputInfo.DeckURI = nil + } + + return nil +} + +func (p *pluginRequestedTransition) CacheHit(outputPath storage.DataReference, entry catalog.Entry) { p.ttype = handler.TransitionTypeEphemeral p.pInfo = pluginCore.PhaseInfoSuccess(nil) - p.ObserveSuccess(outputPath, deckPath, &event.TaskNodeMetadata{CacheStatus: entry.GetStatus().GetCacheStatus(), CatalogKey: entry.GetStatus().GetMetadata()}) + p.ObserveSuccess(outputPath, &event.TaskNodeMetadata{CacheStatus: entry.GetStatus().GetCacheStatus(), CatalogKey: entry.GetStatus().GetMetadata()}) } func (p *pluginRequestedTransition) PopulateCacheInfo(entry catalog.Entry) { @@ -144,10 +177,13 @@ func (p *pluginRequestedTransition) FinalTaskEvent(input ToTaskExecutionEventInp return ToTaskExecutionEvent(input) } -func (p *pluginRequestedTransition) ObserveSuccess(outputPath storage.DataReference, deckPath *storage.DataReference, taskMetadata *event.TaskNodeMetadata) { - p.execInfo.OutputInfo = &handler.OutputInfo{ - OutputURI: outputPath, - DeckURI: deckPath, +func (p *pluginRequestedTransition) ObserveSuccess(outputPath storage.DataReference, taskMetadata *event.TaskNodeMetadata) { + if p.execInfo.OutputInfo == nil { + p.execInfo.OutputInfo = &handler.OutputInfo{ + OutputURI: outputPath, + } + } else { + p.execInfo.OutputInfo.OutputURI = outputPath } p.execInfo.TaskNodeInfo = &handler.TaskNodeInfo{ @@ -171,7 +207,7 @@ func (p *pluginRequestedTransition) FinalTransition(ctx context.Context) (handle } logger.Debugf(ctx, "Task still running") - return handler.DoTransition(p.ttype, handler.PhaseInfoRunning(nil)), nil + return handler.DoTransition(p.ttype, handler.PhaseInfoRunning(&p.execInfo)), nil } // The plugin interface available especially for testing. @@ -464,8 +500,19 @@ func (t Handler) invokePlugin(ctx context.Context, p pluginCore.Plugin, tCtx *ta } } + // Regardless of the observed phase, we always add the DeckUri to support real-time deck functionality. + // The deck should be accessible even if the task is still running or has failed. + // It's possible that the deck URI may not exist in remote storage yet or will never exist. + // So, it is console's responsibility to handle the case when the deck URI actually does not exist. + pluginTrns.AddDeckURI(ctx, tCtx) + switch pluginTrns.pInfo.Phase() { case pluginCore.PhaseSuccess: + // This is to prevent the console from potentially checking the deck URI that does not exist if in final phase(PhaseSuccess). + err = pluginTrns.RemoveNonexistentDeckURI(ctx, tCtx) + if err != nil { + return pluginTrns, err + } // ------------------------------------- // TODO: @kumare create Issue# Remove the code after we use closures to handle dynamic nodes // This code only exists to support Dynamic tasks. Eventually dynamic tasks will use closure nodes to execute @@ -501,25 +548,36 @@ func (t Handler) invokePlugin(ctx context.Context, p pluginCore.Plugin, tCtx *ta CheckpointUri: tCtx.ow.GetCheckpointPrefix().String(), }) } else { - var deckURI *storage.DataReference - if tCtx.ow.GetReader() != nil { - exists, err := tCtx.ow.GetReader().DeckExists(ctx) - if err != nil { - logger.Errorf(ctx, "Failed to check deck file existence. Error: %v", err) - return pluginTrns, regErrors.Wrapf(err, "failed to check existence of deck file") - } else if exists { - deckURIValue := tCtx.ow.GetDeckPath() - deckURI = &deckURIValue - } - } - pluginTrns.ObserveSuccess(tCtx.ow.GetOutputPath(), deckURI, + pluginTrns.ObserveSuccess(tCtx.ow.GetOutputPath(), &event.TaskNodeMetadata{ CheckpointUri: tCtx.ow.GetCheckpointPrefix().String(), }) + + ////var deckURI *storage.DataReference + //if tCtx.ow.GetReader() != nil { + // exists, err := tCtx.ow.GetReader().DeckExists(ctx) + // if err != nil { + // logger.Errorf(ctx, "Failed to check deck file existence. Error: %v", err) + // return pluginTrns, regErrors.Wrapf(err, "failed to check existence of deck file") + // } else if exists { + // deckURIValue := tCtx.ow.GetDeckPath() + // deckURI = &deckURIValue + // } + //} + //pluginTrns.ObserveSuccess(tCtx.ow.GetOutputPath(), + // &event.TaskNodeMetadata{ + // CheckpointUri: tCtx.ow.GetCheckpointPrefix().String(), + // }) } case pluginCore.PhaseRetryableFailure: fallthrough case pluginCore.PhasePermanentFailure: + // This is to prevent the console from potentially checking the deck URI that does not exist if in final + // phase(PhaseFailure). + err = pluginTrns.RemoveNonexistentDeckURI(ctx, tCtx) + if err != nil { + return pluginTrns, err + } pluginTrns.ObservedFailure( &event.TaskNodeMetadata{ CheckpointUri: tCtx.ow.GetCheckpointPrefix().String(),