diff --git a/internal/internal_workflow_client.go b/internal/internal_workflow_client.go index f7630bcca..692aa6643 100644 --- a/internal/internal_workflow_client.go +++ b/internal/internal_workflow_client.go @@ -69,9 +69,9 @@ var ( ) var ( - errUnsupportedOperation = fmt.Errorf("unsupported operation") - errInvalidServerResponse = fmt.Errorf("invalid server response") - errInvalidWorkflowOperation = fmt.Errorf("invalid WithStartOperation") + errUnsupportedOperation = fmt.Errorf("unsupported operation") + errInvalidServerResponse = fmt.Errorf("invalid server response") + errInvalidWithStartWorkflowOperation = fmt.Errorf("invalid WithStartWorkflowOperation") ) const ( @@ -1189,6 +1189,13 @@ func (wc *WorkflowClient) UpdateWithStartWorkflow( return nil, err } + if updateOptions.RunID != "" { + return nil, errors.New("invalid UpdateWorkflowOptions: RunID cannot be set for UpdateWithStartWorkflow because the workflow might not be running") + } + if updateOptions.FirstExecutionRunID != "" { + return nil, errors.New("invalid UpdateWorkflowOptions: FirstExecutionRunID cannot be set for UpdateWithStartWorkflow because the workflow might not be running") + } + updateInput, err := createUpdateWorkflowInput(updateOptions) if err != nil { return nil, err @@ -1721,19 +1728,12 @@ func (w *workflowClientInterceptor) UpdateWithStartWorkflow( updateInput *ClientUpdateWorkflowInput, startOperation *WithStartWorkflowOperation, ) (WorkflowUpdateHandle, error) { - if updateInput.RunID != "" { - return nil, errors.New("RunID and StartWorkflowInput cannot both be set") - } - if updateInput.FirstExecutionRunID != "" { - return nil, errors.New("FirstExecutionRunID and StartWorkflowInput cannot both be set") - } - // Create start request if err := startOperation.markExecuted(); err != nil { - return nil, fmt.Errorf("%w: %w", errInvalidWorkflowOperation, err) + return nil, fmt.Errorf("%w: %w", errInvalidWithStartWorkflowOperation, err) } if startOperation.err != nil { - return nil, fmt.Errorf("%w: %w", errInvalidWorkflowOperation, startOperation.err) + return nil, fmt.Errorf("%w: %w", errInvalidWithStartWorkflowOperation, startOperation.err) } startReq, err := w.createStartWorkflowRequest(ctx, startOperation.input) if err != nil { @@ -1743,7 +1743,7 @@ func (w *workflowClientInterceptor) UpdateWithStartWorkflow( // Create update request updateReq, err := w.createUpdateWorkflowRequest(ctx, updateInput) if err != nil { - return nil, fmt.Errorf("%w: %w", errInvalidWorkflowOperation, err) + return nil, fmt.Errorf("%w: %w", errInvalidWithStartWorkflowOperation, err) } if updateReq.WorkflowExecution.WorkflowId == "" { updateReq.WorkflowExecution.WorkflowId = startReq.WorkflowId @@ -1782,7 +1782,7 @@ func (w *workflowClientInterceptor) UpdateWithStartWorkflow( } handle, err := w.updateHandleFromResponse(ctx, updateReq.WaitPolicy.LifecycleStage, updateResp) if err != nil { - return nil, fmt.Errorf("%w: %w", errInvalidWorkflowOperation, err) + return nil, fmt.Errorf("%w: %w", errInvalidWithStartWorkflowOperation, err) } return handle, nil } @@ -1857,7 +1857,7 @@ func (w *workflowClientInterceptor) updateWithStartWorkflow( } case *workflowservice.ExecuteMultiOperationRequest_Operation_UpdateWorkflow: if !errors.As(opErr, &abortedErr) { - startErr = fmt.Errorf("%w: %w", errInvalidWorkflowOperation, opErr) + startErr = fmt.Errorf("%w: %w", errInvalidWithStartWorkflowOperation, opErr) } default: // this would only happen if a case statement for a newly added operation is missing above diff --git a/test/integration_test.go b/test/integration_test.go index 4a3fcad9b..6a8890551 100644 --- a/test/integration_test.go +++ b/test/integration_test.go @@ -3956,231 +3956,240 @@ func (ts *IntegrationTestSuite) TestUpdateSettingHandlerInHandler() { ts.NoError(run.Get(ctx, nil)) } -func (ts *IntegrationTestSuite) TestExecuteWorkflowWithUpdate() { +func (ts *IntegrationTestSuite) TestUpdateWithStartWorkflow() { ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() - startOptionsWithOperation := func(op client.WithStartWorkflowOperation) client.StartWorkflowOptions { - startOptions := ts.startWorkflowOptions("test-update-with-start-" + uuid.New()) - startOptions.EnableEagerStart = false // not allowed to use with update-with-start - startOptions.WithStartOperation = op - return startOptions + startWorkflowOptions := func() client.StartWorkflowOptions { + opts := ts.startWorkflowOptions("test-update-with-start-" + uuid.New()) + opts.EnableEagerStart = false // not allowed to use with update-with-start + opts.WorkflowIDConflictPolicy = enumspb.WORKFLOW_ID_CONFLICT_POLICY_FAIL // required for update-with-start + return opts } ts.Run("sends update-with-start (no running workflow)", func() { - updateOp := client.NewUpdateWithStartWorkflowOperation( - client.UpdateWorkflowOptions{ - UpdateName: "update", - Args: []any{1}, - WaitForStage: client.WorkflowUpdateStageAccepted, - }) - - startOptions := startOptionsWithOperation(updateOp) - run, err := ts.client.ExecuteWorkflow(ctx, startOptions, ts.workflows.UpdateEntityWorkflow) + println(ctx) + startOp, err := ts.client.NewWithStartWorkflowOperation( + startWorkflowOptions(), ts.workflows.UpdateEntityWorkflow, + ) + ts.NoError(err) + updHandle, err := ts.client.UpdateWithStartWorkflow(ctx, client.UpdateWorkflowOptions{ + UpdateName: "update", + Args: []any{1}, + WaitForStage: client.WorkflowUpdateStageAccepted, + }, startOp) ts.NoError(err) + println(updHandle) + var updateResult int - updHandle, err := updateOp.Get(ctx) - ts.NoError(err) ts.NoError(updHandle.Get(ctx, &updateResult)) ts.Equal(1, updateResult) + run, err := startOp.Get(ctx) + ts.NoError(err) var workflowResult int ts.NoError(run.Get(ctx, &workflowResult)) ts.Equal(1, workflowResult) }) ts.Run("sends update-with-start (already running workflow)", func() { - startOptions := startOptionsWithOperation(nil) + startOptions := startWorkflowOptions() run1, err := ts.client.ExecuteWorkflow(ctx, startOptions, ts.workflows.UpdateEntityWorkflow) ts.NoError(err) - updateOp := client.NewUpdateWithStartWorkflowOperation( - client.UpdateWorkflowOptions{ - UpdateName: "update", - Args: []any{1}, - WaitForStage: client.WorkflowUpdateStageCompleted, - }) - - startOptions.WithStartOperation = updateOp startOptions.WorkflowIDConflictPolicy = enumspb.WORKFLOW_ID_CONFLICT_POLICY_USE_EXISTING - run2, err := ts.client.ExecuteWorkflow(ctx, startOptions, ts.workflows.UpdateEntityWorkflow) + startOp, err := ts.client.NewWithStartWorkflowOperation(startOptions, ts.workflows.UpdateEntityWorkflow) + ts.NoError(err) + + updHandle, err := ts.client.UpdateWithStartWorkflow(ctx, client.UpdateWorkflowOptions{ + UpdateName: "update", + Args: []any{1}, + WaitForStage: client.WorkflowUpdateStageCompleted, + }, startOp) + ts.NoError(err) + + run2, err := startOp.Get(ctx) ts.NoError(err) ts.Equal(run1.GetRunID(), run2.GetRunID()) var updateResult int - updHandle, err := updateOp.Get(ctx) - ts.NoError(err) ts.NoError(updHandle.Get(ctx, &updateResult)) ts.Equal(1, updateResult) }) ts.Run("sends update-with-start but update is rejected", func() { - updateOp := client.NewUpdateWithStartWorkflowOperation( - client.UpdateWorkflowOptions{ - UpdateName: "update", - Args: []any{-1}, // rejected update payload - WaitForStage: client.WorkflowUpdateStageCompleted, - }) + startOp, err := ts.client.NewWithStartWorkflowOperation(startWorkflowOptions(), ts.workflows.UpdateEntityWorkflow) + ts.NoError(err) - startOptions := startOptionsWithOperation(updateOp) - run, err := ts.client.ExecuteWorkflow(ctx, startOptions, ts.workflows.UpdateEntityWorkflow) + updHandle, err := ts.client.UpdateWithStartWorkflow(ctx, client.UpdateWorkflowOptions{ + UpdateName: "update", + Args: []any{-1}, // rejected update payload + WaitForStage: client.WorkflowUpdateStageCompleted, + }, startOp) + ts.NoError(err) + + run, err := startOp.Get(ctx) ts.NoError(err) ts.NotNil(run) var updateResult int - updHandle, err := updateOp.Get(ctx) - ts.NoError(err) err = updHandle.Get(ctx, &updateResult) ts.ErrorContains(err, "addend must be non-negative") }) - ts.Run("receives update result in separate goroutines", func() { - updateOp := client.NewUpdateWithStartWorkflowOperation( + ts.Run("receives results in separate goroutines", func() { + + startOp, err := ts.client.NewWithStartWorkflowOperation(startWorkflowOptions(), ts.workflows.UpdateEntityWorkflow) + ts.NoError(err) + + done1 := make(chan struct{}) + defer func() { <-done1 }() + go func() { + run, err := startOp.Get(ctx) + ts.NoError(err) + ts.NotNil(run) + done1 <- struct{}{} + }() + + updHandle, err := ts.client.UpdateWithStartWorkflow(ctx, client.UpdateWorkflowOptions{ UpdateName: "update", Args: []any{1}, WaitForStage: client.WorkflowUpdateStageAccepted, - }) + }, startOp) + ts.NoError(err) - done := make(chan struct{}) - defer func() { <-done }() + done2 := make(chan struct{}) + defer func() { <-done2 }() go func() { var updateResult int - updHandle, err := updateOp.Get(ctx) - ts.NoError(err) ts.NoError(updHandle.Get(ctx, &updateResult)) ts.Equal(1, updateResult) - done <- struct{}{} + done2 <- struct{}{} }() - startOptions := startOptionsWithOperation(updateOp) - _, err := ts.client.ExecuteWorkflow(ctx, startOptions, ts.workflows.UpdateEntityWorkflow) - ts.NoError(err) - var updateResult int - updHandle, err := updateOp.Get(ctx) - ts.NoError(err) ts.NoError(updHandle.Get(ctx, &updateResult)) ts.Equal(1, updateResult) }) ts.Run("fails when start request is invalid", func() { - updateOp := client.NewUpdateWithStartWorkflowOperation( - client.UpdateWorkflowOptions{ - UpdateName: "update", - WaitForStage: client.WorkflowUpdateStageCompleted, - }) + updateOptions := client.UpdateWorkflowOptions{ + UpdateName: "update", + WaitForStage: client.WorkflowUpdateStageCompleted, + } + startOptions := startWorkflowOptions() - startOptions := startOptionsWithOperation(updateOp) startOptions.CronSchedule = "invalid!" - _, err := ts.client.ExecuteWorkflow(ctx, startOptions, ts.workflows.UpdateEntityWorkflow) + startOp, err := ts.client.NewWithStartWorkflowOperation(startOptions, ts.workflows.UpdateEntityWorkflow) + ts.NoError(err) + _, err = ts.client.UpdateWithStartWorkflow(ctx, updateOptions, startOp) ts.Error(err) + + startOptions.WorkflowIDConflictPolicy = enumspb.WORKFLOW_ID_CONFLICT_POLICY_UNSPECIFIED + _, err = ts.client.NewWithStartWorkflowOperation(startOptions, ts.workflows.UpdateEntityWorkflow) + ts.ErrorContains(err, "WorkflowIDConflictPolicy must be set") }) ts.Run("fails when update operation is invalid", func() { - updateOp := client.NewUpdateWithStartWorkflowOperation( + startOptions := startWorkflowOptions() + + startOp, err := ts.client.NewWithStartWorkflowOperation(startOptions, ts.workflows.UpdateEntityWorkflow) + ts.NoError(err) + + _, err = ts.client.UpdateWithStartWorkflow(ctx, client.UpdateWorkflowOptions{ // invalid - }) + }, startOp) + ts.ErrorContains(err, "WaitForStage must be specified") - startOptions := startOptionsWithOperation(updateOp) - _, err := ts.client.ExecuteWorkflow(ctx, startOptions, ts.workflows.UpdateEntityWorkflow) - ts.ErrorContains(err, "invalid WithStartOperation: WaitForStage must be specified") - - updateOp = client.NewUpdateWithStartWorkflowOperation( + _, err = ts.client.UpdateWithStartWorkflow(ctx, client.UpdateWorkflowOptions{ RunID: "invalid", WaitForStage: client.WorkflowUpdateStageCompleted, - }) + }, startOp) + ts.ErrorContains(err, "invalid UpdateWorkflowOptions: RunID cannot be set for UpdateWithStartWorkflow because the workflow might not be running") - startOptions = startOptionsWithOperation(updateOp) - _, err = ts.client.ExecuteWorkflow(ctx, startOptions, ts.workflows.UpdateEntityWorkflow) - ts.ErrorContains(err, "invalid WithStartOperation: RunID cannot be set because the workflow might not be running") - - updateOp = client.NewUpdateWithStartWorkflowOperation( + _, err = ts.client.UpdateWithStartWorkflow(ctx, client.UpdateWorkflowOptions{ FirstExecutionRunID: "invalid", WaitForStage: client.WorkflowUpdateStageCompleted, - }) - - startOptions = startOptionsWithOperation(updateOp) - _, err = ts.client.ExecuteWorkflow(ctx, startOptions, ts.workflows.UpdateEntityWorkflow) - ts.ErrorContains(err, "invalid WithStartOperation: FirstExecutionRunID cannot be set because the workflow might not be running") + }, startOp) + ts.ErrorContains(err, "invalid UpdateWorkflowOptions: FirstExecutionRunID cannot be set for UpdateWithStartWorkflow because the workflow might not be running") - updateOp = client.NewUpdateWithStartWorkflowOperation( + _, err = ts.client.UpdateWithStartWorkflow(ctx, client.UpdateWorkflowOptions{ UpdateName: "", // invalid WaitForStage: client.WorkflowUpdateStageCompleted, - }) - - startOptions = startOptionsWithOperation(updateOp) - _, err = ts.client.ExecuteWorkflow(ctx, startOptions, ts.workflows.UpdateEntityWorkflow) - ts.ErrorContains(err, "invalid WithStartOperation: ") // omitting server message intentionally + }, startOp) + ts.ErrorContains(err, "invalid WithStartWorkflowOperation: ") // omitting server message intentionally - updateOp = client.NewUpdateWithStartWorkflowOperation( + _, err = ts.client.UpdateWithStartWorkflow(ctx, client.UpdateWorkflowOptions{ WorkflowID: "different", // does not match Start's UpdateName: "update", WaitForStage: client.WorkflowUpdateStageCompleted, - }) - - startOptions = startOptionsWithOperation(updateOp) - _, err = ts.client.ExecuteWorkflow(ctx, startOptions, ts.workflows.UpdateEntityWorkflow) - ts.ErrorContains(err, "invalid WithStartOperation: ") // omitting server message intentionally + }, startOp) + ts.ErrorContains(err, "invalid WithStartWorkflowOperation: ") // omitting server message intentionally }) ts.Run("fails when workflow is already running", func() { - startOptions := startOptionsWithOperation(nil) + startOptions := startWorkflowOptions() _, err := ts.client.ExecuteWorkflow(ctx, startOptions, ts.workflows.UpdateEntityWorkflow) ts.NoError(err) + startOp, err := ts.client.NewWithStartWorkflowOperation(startOptions, ts.workflows.UpdateEntityWorkflow) + ts.NoError(err) - updateOp := client.NewUpdateWithStartWorkflowOperation( + _, err = ts.client.UpdateWithStartWorkflow(ctx, client.UpdateWorkflowOptions{ UpdateName: "update", Args: []any{1}, WaitForStage: client.WorkflowUpdateStageCompleted, - }) + }, startOp) - startOptions.WithStartOperation = updateOp // NOTE that WorkflowExecutionErrorWhenAlreadyStarted (defaults to false) has no impact - _, err = ts.client.ExecuteWorkflow(ctx, startOptions, ts.workflows.UpdateEntityWorkflow) ts.ErrorContains(err, "Workflow execution is already running") }) ts.Run("fails when executed twice", func() { - updateOp := client.NewUpdateWithStartWorkflowOperation( - client.UpdateWorkflowOptions{ - UpdateName: "update", - Args: []any{1}, - WaitForStage: client.WorkflowUpdateStageCompleted, - }) + startOp, err := ts.client.NewWithStartWorkflowOperation(startWorkflowOptions(), ts.workflows.UpdateEntityWorkflow) + ts.NoError(err) - startOptions := startOptionsWithOperation(updateOp) - _, err := ts.client.ExecuteWorkflow(ctx, startOptions, ts.workflows.UpdateEntityWorkflow) + updateOptions := client.UpdateWorkflowOptions{ + UpdateName: "update", + Args: []any{1}, + WaitForStage: client.WorkflowUpdateStageCompleted, + } + _, err = ts.client.UpdateWithStartWorkflow(ctx, updateOptions, startOp) ts.NoError(err) - _, err = ts.client.ExecuteWorkflow(ctx, startOptions, ts.workflows.UpdateEntityWorkflow) - ts.ErrorContains(err, "invalid WithStartOperation: was already executed") + _, err = ts.client.UpdateWithStartWorkflow(ctx, updateOptions, startOp) + ts.ErrorContains(err, "invalid WithStartWorkflowOperation: was already executed") }) ts.Run("propagates context", func() { - updateOp := client.NewUpdateWithStartWorkflowOperation( - client.UpdateWorkflowOptions{ - UpdateName: "update", - Args: []any{1}, - WaitForStage: client.WorkflowUpdateStageCompleted, - }) + startOp, err := ts.client.NewWithStartWorkflowOperation(startWorkflowOptions(), ts.workflows.ContextPropagator, true) + ts.NoError(err) - var propagatedValues []string ctx := context.Background() // Propagate values using different context propagators. ctx = context.WithValue(ctx, contextKey(testContextKey1), "propagatedValue1") ctx = context.WithValue(ctx, contextKey(testContextKey2), "propagatedValue2") ctx = context.WithValue(ctx, contextKey(testContextKey3), "non-propagatedValue") - startOptions := startOptionsWithOperation(updateOp) - err := ts.executeWorkflowWithContextAndOption(ctx, startOptions, ts.workflows.ContextPropagator, &propagatedValues, true) + + _, err = ts.client.UpdateWithStartWorkflow(ctx, + client.UpdateWorkflowOptions{ + UpdateName: "update", + Args: []any{1}, + WaitForStage: client.WorkflowUpdateStageCompleted, + }, startOp) + ts.NoError(err) + + var propagatedValues []string + run, err := startOp.Get(ctx) ts.NoError(err) + ts.NoError(run.Get(ctx, &propagatedValues)) // One copy from workflow and one copy from activity * 2 for child workflow ts.EqualValues([]string{