Skip to content

Commit

Permalink
Merge pull request #4095 from flyteorg/monorepo--import-components-on…
Browse files Browse the repository at this point in the history
…e-last-time

[Monorepo] Import components one last time
  • Loading branch information
eapolinario authored Sep 29, 2023
2 parents 0f2b71f + ab518a2 commit b35cc95
Show file tree
Hide file tree
Showing 35 changed files with 3,408 additions and 241 deletions.
2 changes: 1 addition & 1 deletion flyteadmin/pkg/manager/impl/task_execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ func (m *TaskExecutionManager) CreateTaskExecutionEvent(ctx context.Context, req
return nil, err
}

if request.Event.Phase == core.TaskExecution_RUNNING && request.Event.PhaseVersion == 0 {
if request.Event.Phase == core.TaskExecution_RUNNING && request.Event.PhaseVersion == 0 { // TODO: need to be careful about missing inc/decs
m.metrics.ActiveTaskExecutions.Inc()
} else if common.IsTaskExecutionTerminal(request.Event.Phase) && request.Event.PhaseVersion == 0 {
m.metrics.ActiveTaskExecutions.Dec()
Expand Down
4 changes: 4 additions & 0 deletions flyteadmin/pkg/manager/impl/testutils/mock_requests.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,3 +300,7 @@ func GetWorkflowRequestInterfaceBytes() []byte {
bytes, _ := proto.Marshal(GetWorkflowRequest().Spec.Template.Interface)
return bytes
}

func GetWorkflowRequestInterface() *core.TypedInterface {
return GetWorkflowRequest().Spec.Template.Interface
}
7 changes: 7 additions & 0 deletions flyteadmin/pkg/manager/impl/workflow_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,13 @@ func TestListWorkflows(t *testing.T) {
assert.Equal(t, fmt.Sprintf("version %v", idx), workflow.Id.Version)
assert.True(t, proto.Equal(&admin.WorkflowClosure{
CreatedAt: testutils.MockCreatedAtProto,
CompiledWorkflow: &core.CompiledWorkflowClosure{
Primary: &core.CompiledWorkflow{
Template: &core.WorkflowTemplate{
Interface: &core.TypedInterface{},
},
},
},
}, workflow.Closure))
}
assert.Empty(t, workflowList.Token)
Expand Down
26 changes: 22 additions & 4 deletions flyteadmin/pkg/repositories/transformers/task_execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,19 +153,27 @@ func CreateTaskExecutionModel(ctx context.Context, input CreateTaskExecutionMode
CreatedAt: input.Request.Event.OccurredAt,
Logs: input.Request.Event.Logs,
CustomInfo: input.Request.Event.CustomInfo,
Reason: input.Request.Event.Reason,
TaskType: input.Request.Event.TaskType,
Metadata: metadata,
EventVersion: input.Request.Event.EventVersion,
}

if len(input.Request.Event.Reason) > 0 {
if len(input.Request.Event.Reasons) > 0 {
for _, reason := range input.Request.Event.Reasons {
closure.Reasons = append(closure.Reasons, &admin.Reason{
OccurredAt: reason.OccurredAt,
Message: reason.Reason,
})
}
closure.Reason = input.Request.Event.Reasons[len(input.Request.Event.Reasons)-1].Reason
} else if len(input.Request.Event.Reason) > 0 {
closure.Reasons = []*admin.Reason{
&admin.Reason{
{
OccurredAt: input.Request.Event.OccurredAt,
Message: input.Request.Event.Reason,
},
}
closure.Reason = input.Request.Event.Reason
}

eventPhase := input.Request.Event.Phase
Expand Down Expand Up @@ -386,7 +394,17 @@ func UpdateTaskExecutionModel(ctx context.Context, request *admin.TaskExecutionE
}
taskExecutionClosure.UpdatedAt = reportedAt
taskExecutionClosure.Logs = mergeLogs(taskExecutionClosure.Logs, request.Event.Logs)
if len(request.Event.Reason) > 0 {
if len(request.Event.Reasons) > 0 {
for _, reason := range request.Event.Reasons {
taskExecutionClosure.Reasons = append(
taskExecutionClosure.Reasons,
&admin.Reason{
OccurredAt: reason.OccurredAt,
Message: reason.Reason,
})
}
taskExecutionClosure.Reason = request.Event.Reasons[len(request.Event.Reasons)-1].Reason
} else if len(request.Event.Reason) > 0 {
if taskExecutionClosure.Reason != request.Event.Reason {
// by tracking a time-series of reasons we increase the size of the TaskExecutionClosure in scenarios where
// a task reports a large number of unique reasons. if this size increase becomes problematic we this logic
Expand Down
Loading

0 comments on commit b35cc95

Please sign in to comment.