diff --git a/contrib/datadog/tracing/interceptor.go b/contrib/datadog/tracing/interceptor.go index 954c79598..4faadde58 100644 --- a/contrib/datadog/tracing/interceptor.go +++ b/contrib/datadog/tracing/interceptor.go @@ -143,6 +143,20 @@ func (t *tracerImpl) ContextWithSpan(ctx context.Context, span interceptor.Trace return tracer.ContextWithSpan(ctx, span.(*tracerSpan).Span) } +// SpanFromWorkflowContext extracts the DataDog Span object from the workflow context. +func SpanFromWorkflowContext(ctx workflow.Context) (ddtrace.Span, bool) { + val := ctx.Value(activeSpanContextKey) + if val == nil { + return tracer.SpanFromContext(nil) + } + + if span, ok := val.(*tracerSpan); ok { + return span.Span, true + } + + return tracer.SpanFromContext(nil) +} + func genSpanID(idempotencyKey string) uint64 { h := fnv.New64() // Write() always writes all bytes and never fails; the count and error result are for implementing io.Writer. diff --git a/contrib/datadog/tracing/interceptor_test.go b/contrib/datadog/tracing/interceptor_test.go index caeff15a5..1d7943a01 100644 --- a/contrib/datadog/tracing/interceptor_test.go +++ b/contrib/datadog/tracing/interceptor_test.go @@ -22,6 +22,7 @@ package tracing import ( + "errors" "strings" "testing" @@ -31,6 +32,9 @@ import ( "go.temporal.io/sdk/interceptor" "go.temporal.io/sdk/internal/interceptortest" + "go.temporal.io/sdk/testsuite" + "go.temporal.io/sdk/worker" + "go.temporal.io/sdk/workflow" ) type testTracer struct { @@ -133,7 +137,6 @@ func Test_OnFinishOption(t *testing.T) { Tracer: impl, mt: mt, } - interceptortest.RunTestWorkflowWithError(t, trc) spans := trc.FinishedSpans() @@ -141,3 +144,41 @@ func Test_OnFinishOption(t *testing.T) { require.Len(t, spans, 1) require.Equal(t, "temporal.RunWorkflow", spans[0].Name) } + +func setCustomSpanTagWorkflow(ctx workflow.Context) error { + span, ok := SpanFromWorkflowContext(ctx) + + if !ok { + return errors.New("Did not find span in workflow context") + } + + span.SetTag("testTag", "testValue") + return nil +} + +func Test_SpanFromWorkflowContext(t *testing.T) { + // Start the mock tracer. + mt := mocktracer.Start() + defer mt.Stop() + + var suite testsuite.WorkflowTestSuite + env := suite.NewTestWorkflowEnvironment() + env.RegisterWorkflow(setCustomSpanTagWorkflow) + + impl := NewTracer(TracerOptions{}) + testTracer := testTracer{ + Tracer: impl, + mt: mt, + } + + // Set tracer interceptor + env.SetWorkerOptions(worker.Options{ + Interceptors: []interceptor.WorkerInterceptor{interceptor.NewTracingInterceptor(testTracer)}, + }) + + env.ExecuteWorkflow(setCustomSpanTagWorkflow) + + require.True(t, env.IsWorkflowCompleted()) + testSpan := mt.FinishedSpans()[0] + require.Equal(t, "testValue", testSpan.Tag("testTag")) +} diff --git a/internal/activity.go b/internal/activity.go index d115b1181..a7789b5a1 100644 --- a/internal/activity.go +++ b/internal/activity.go @@ -149,6 +149,14 @@ type ( // build ID or not. See temporal.VersioningIntent. // WARNING: Worker versioning is currently experimental VersioningIntent VersioningIntent + + // Summary is a single-line summary for this activity that will appear in UI/CLI. This can be + // in single-line Temporal Markdown format. + // + // Optional: defaults to none/empty. + // + // NOTE: Experimental + Summary string } // LocalActivityOptions stores local activity specific parameters that will be stored inside of a context. diff --git a/internal/client.go b/internal/client.go index 867c1c3d9..99f9d390e 100644 --- a/internal/client.go +++ b/internal/client.go @@ -993,7 +993,7 @@ func NewServiceClient(workflowServiceClient workflowservice.WorkflowServiceClien workerInterceptors: workerInterceptors, excludeInternalFromRetry: options.ConnectionOptions.excludeInternalFromRetry, eagerDispatcher: &eagerWorkflowDispatcher{ - workersByTaskQueue: make(map[string][]eagerWorker), + workersByTaskQueue: make(map[string]map[eagerWorker]struct{}), }, } diff --git a/internal/error.go b/internal/error.go index a5e865ec0..412cb04ba 100644 --- a/internal/error.go +++ b/internal/error.go @@ -826,6 +826,41 @@ func (e *ChildWorkflowExecutionError) Unwrap() error { return e.cause } +// Namespace returns namespace of the child workflow. +func (e *ChildWorkflowExecutionError) Namespace() string { + return e.namespace +} + +// WorkflowId returns workflow ID of the child workflow. +func (e *ChildWorkflowExecutionError) WorkflowID() string { + return e.workflowID +} + +// RunID returns run ID of the child workflow. +func (e *ChildWorkflowExecutionError) RunID() string { + return e.runID +} + +// WorkflowType returns type of the child workflow. +func (e *ChildWorkflowExecutionError) WorkflowType() string { + return e.workflowType +} + +// InitiatedEventID returns event ID of the child workflow initiated event. +func (e *ChildWorkflowExecutionError) InitiatedEventID() int64 { + return e.initiatedEventID +} + +// StartedEventID returns event ID of the child workflow started event. +func (e *ChildWorkflowExecutionError) StartedEventID() int64 { + return e.startedEventID +} + +// RetryState returns details on why child workflow failed. +func (e *ChildWorkflowExecutionError) RetryState() enumspb.RetryState { + return e.retryState +} + // Error implements the error interface. func (e *NexusOperationError) Error() string { msg := fmt.Sprintf( diff --git a/internal/error_test.go b/internal/error_test.go index 084bcf3cd..d1a4e59c1 100644 --- a/internal/error_test.go +++ b/internal/error_test.go @@ -165,7 +165,7 @@ func testTimeoutErrorDetails(t *testing.T, timeoutType enumspb.TimeoutType) { context.commandsHelper.scheduledEventIDToActivityID[5] = activityID di := h.newActivityCommandStateMachine( 5, - &commandpb.ScheduleActivityTaskCommandAttributes{ActivityId: activityID}) + &commandpb.ScheduleActivityTaskCommandAttributes{ActivityId: activityID}, nil) di.state = commandStateInitiated di.setData(&scheduledActivity{ callback: func(r *commonpb.Payloads, e error) { diff --git a/internal/internal_activity.go b/internal/internal_activity.go index 84c525530..b4b8f6be9 100644 --- a/internal/internal_activity.go +++ b/internal/internal_activity.go @@ -73,6 +73,7 @@ type ( RetryPolicy *commonpb.RetryPolicy DisableEagerExecution bool VersioningIntent VersioningIntent + Summary string } // ExecuteLocalActivityOptions options for executing a local activity diff --git a/internal/internal_command_state_machine.go b/internal/internal_command_state_machine.go index 7515d10b9..c8c90bef2 100644 --- a/internal/internal_command_state_machine.go +++ b/internal/internal_command_state_machine.go @@ -79,8 +79,9 @@ type ( activityCommandStateMachine struct { *commandStateMachineBase - scheduleID int64 - attributes *commandpb.ScheduleActivityTaskCommandAttributes + scheduleID int64 + attributes *commandpb.ScheduleActivityTaskCommandAttributes + startMetadata *sdk.UserMetadata } cancelActivityStateMachine struct { @@ -348,12 +349,14 @@ func (h *commandsHelper) newCommandStateMachineBase(commandType commandType, id func (h *commandsHelper) newActivityCommandStateMachine( scheduleID int64, attributes *commandpb.ScheduleActivityTaskCommandAttributes, + startMetadata *sdk.UserMetadata, ) *activityCommandStateMachine { base := h.newCommandStateMachineBase(commandTypeActivity, attributes.GetActivityId()) return &activityCommandStateMachine{ commandStateMachineBase: base, scheduleID: scheduleID, attributes: attributes, + startMetadata: startMetadata, } } @@ -618,6 +621,7 @@ func (d *activityCommandStateMachine) getCommand() *commandpb.Command { case commandStateCreated, commandStateCanceledBeforeSent: command := createNewCommand(enumspb.COMMAND_TYPE_SCHEDULE_ACTIVITY_TASK) command.Attributes = &commandpb.Command_ScheduleActivityTaskCommandAttributes{ScheduleActivityTaskCommandAttributes: d.attributes} + command.UserMetadata = d.startMetadata return command default: return nil @@ -1118,9 +1122,10 @@ func (h *commandsHelper) moveCommandToBack(command commandStateMachine) { func (h *commandsHelper) scheduleActivityTask( scheduleID int64, attributes *commandpb.ScheduleActivityTaskCommandAttributes, + metadata *sdk.UserMetadata, ) commandStateMachine { h.scheduledEventIDToActivityID[scheduleID] = attributes.GetActivityId() - command := h.newActivityCommandStateMachine(scheduleID, attributes) + command := h.newActivityCommandStateMachine(scheduleID, attributes, metadata) h.addCommand(command) return command } diff --git a/internal/internal_command_state_machine_test.go b/internal/internal_command_state_machine_test.go index b6fd26144..8d4d4c225 100644 --- a/internal/internal_command_state_machine_test.go +++ b/internal/internal_command_state_machine_test.go @@ -165,7 +165,7 @@ func Test_ActivityStateMachine_CompleteWithoutCancel(t *testing.T) { // schedule activity scheduleID := h.getNextID() - d := h.scheduleActivityTask(scheduleID, attributes) + d := h.scheduleActivityTask(scheduleID, attributes, nil) require.Equal(t, commandStateCreated, d.getState()) commands := h.getCommands(true) require.Equal(t, commandStateCommandSent, d.getState()) @@ -192,7 +192,7 @@ func Test_ActivityStateMachine_CancelBeforeSent(t *testing.T) { // schedule activity scheduleID := h.getNextID() - d := h.scheduleActivityTask(scheduleID, attributes) + d := h.scheduleActivityTask(scheduleID, attributes, nil) require.Equal(t, commandStateCreated, d.getState()) // Cancel before command sent. We will send the command and the cancellation. @@ -215,7 +215,7 @@ func Test_ActivityStateMachine_CancelAfterSent(t *testing.T) { // schedule activity scheduleID := h.getNextID() - d := h.scheduleActivityTask(scheduleID, attributes) + d := h.scheduleActivityTask(scheduleID, attributes, nil) require.Equal(t, commandStateCreated, d.getState()) commands := h.getCommands(true) require.Equal(t, 1, len(commands)) @@ -251,7 +251,7 @@ func Test_ActivityStateMachine_CompletedAfterCancel(t *testing.T) { // schedule activity scheduleID := h.getNextID() - d := h.scheduleActivityTask(scheduleID, attributes) + d := h.scheduleActivityTask(scheduleID, attributes, nil) require.Equal(t, commandStateCreated, d.getState()) commands := h.getCommands(true) require.Equal(t, 1, len(commands)) @@ -287,7 +287,7 @@ func Test_ActivityStateMachine_CancelInitiated_After_CanceledBeforeSent(t *testi // schedule activity scheduleID := h.getNextID() - d := h.scheduleActivityTask(scheduleID, attributes) + d := h.scheduleActivityTask(scheduleID, attributes, nil) require.Equal(t, commandStateCreated, d.getState()) // cancel activity before sent @@ -324,7 +324,7 @@ func Test_ActivityStateMachine_PanicInvalidStateTransition(t *testing.T) { // schedule activity scheduleID := h.getNextID() - h.scheduleActivityTask(scheduleID, attributes) + h.scheduleActivityTask(scheduleID, attributes, nil) // verify that using invalid activity id will panic err := runAndCatchPanic(func() { diff --git a/internal/internal_coroutines_test.go b/internal/internal_coroutines_test.go index 4e3478961..8114c47fd 100644 --- a/internal/internal_coroutines_test.go +++ b/internal/internal_coroutines_test.go @@ -38,6 +38,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.temporal.io/api/workflowservice/v1" "go.temporal.io/sdk/converter" ) @@ -551,6 +552,254 @@ func TestBlockingSelect(t *testing.T) { require.EqualValues(t, expected, history) } +func TestSelectBlockingDefault(t *testing.T) { + var history []string + env := &workflowEnvironmentImpl{ + sdkFlags: newSDKFlags(&workflowservice.GetSystemInfoResponse_Capabilities{SdkMetadata: true}), + commandsHelper: newCommandsHelper(), + dataConverter: converter.GetDefaultDataConverter(), + workflowInfo: &WorkflowInfo{ + Namespace: "namespace:" + t.Name(), + TaskQueueName: "taskqueue:" + t.Name(), + }, + } + // Verify that the flag is not set + require.False(t, env.GetFlag(SDKFlagBlockedSelectorSignalReceive)) + interceptor, ctx, err := newWorkflowContext(env, nil) + require.NoError(t, err, "newWorkflowContext failed") + d, _ := newDispatcher(ctx, interceptor, func(ctx Context) { + c1 := NewChannel(ctx) + c2 := NewChannel(ctx) + + Go(ctx, func(ctx Context) { + history = append(history, "add-one") + c1.Send(ctx, "one") + history = append(history, "add-one-done") + + }) + + Go(ctx, func(ctx Context) { + history = append(history, "add-two") + c2.Send(ctx, "two") + history = append(history, "add-two-done") + }) + + selector := NewSelector(ctx) + var v string + selector. + AddReceive(c1, func(c ReceiveChannel, more bool) { + c.Receive(ctx, &v) + history = append(history, fmt.Sprintf("c1-%v", v)) + }). + AddDefault(func() { + c2.Receive(ctx, &v) + history = append(history, fmt.Sprintf("c2-%v", v)) + }) + history = append(history, "select1") + selector.Select(ctx) + + // Default behavior this signal is lost + require.True(t, c1.Len() == 0 && v == "two") + + history = append(history, "select2") + selector.Select(ctx) + history = append(history, "done") + }, func() bool { return false }) + defer d.Close() + requireNoExecuteErr(t, d.ExecuteUntilAllBlocked(defaultDeadlockDetectionTimeout)) + require.False(t, d.IsDone()) + + expected := []string{ + "select1", + "add-one", + "add-one-done", + "add-two", + "add-two-done", + "c2-two", + "select2", + } + require.EqualValues(t, expected, history) +} + +func TestSelectBlockingDefaultWithFlag(t *testing.T) { + var history []string + env := &workflowEnvironmentImpl{ + sdkFlags: newSDKFlags(&workflowservice.GetSystemInfoResponse_Capabilities{SdkMetadata: true}), + commandsHelper: newCommandsHelper(), + dataConverter: converter.GetDefaultDataConverter(), + workflowInfo: &WorkflowInfo{ + Namespace: "namespace:" + t.Name(), + TaskQueueName: "taskqueue:" + t.Name(), + }, + } + require.True(t, env.TryUse(SDKFlagBlockedSelectorSignalReceive)) + interceptor, ctx, err := newWorkflowContext(env, nil) + require.NoError(t, err, "newWorkflowContext failed") + d, _ := newDispatcher(ctx, interceptor, func(ctx Context) { + c1 := NewChannel(ctx) + c2 := NewChannel(ctx) + + Go(ctx, func(ctx Context) { + history = append(history, "add-one") + c1.Send(ctx, "one") + history = append(history, "add-one-done") + + }) + + Go(ctx, func(ctx Context) { + history = append(history, "add-two") + c2.Send(ctx, "two") + history = append(history, "add-two-done") + }) + + selector := NewSelector(ctx) + var v string + selector. + AddReceive(c1, func(c ReceiveChannel, more bool) { + c.Receive(ctx, &v) + history = append(history, fmt.Sprintf("c1-%v", v)) + }). + AddDefault(func() { + c2.Receive(ctx, &v) + history = append(history, fmt.Sprintf("c2-%v", v)) + }) + history = append(history, "select1") + selector.Select(ctx) + + // Signal should not be lost + require.False(t, c1.Len() == 0 && v == "two") + + history = append(history, "select2") + selector.Select(ctx) + history = append(history, "done") + }, func() bool { return false }) + defer d.Close() + requireNoExecuteErr(t, d.ExecuteUntilAllBlocked(defaultDeadlockDetectionTimeout)) + require.True(t, d.IsDone()) + + expected := []string{ + "select1", + "add-one", + "add-one-done", + "add-two", + "add-two-done", + "c2-two", + "select2", + "c1-one", + "done", + } + + require.EqualValues(t, expected, history) +} + +func TestBlockingSelectFuture(t *testing.T) { + var history []string + d := createNewDispatcher(func(ctx Context) { + c1 := NewChannel(ctx) + f1, s1 := NewFuture(ctx) + + Go(ctx, func(ctx Context) { + history = append(history, "add-one") + c1.Send(ctx, "one") + history = append(history, "add-one-done") + }) + Go(ctx, func(ctx Context) { + history = append(history, "add-two") + s1.SetValue("one-future") + }) + + selector := NewSelector(ctx) + selector. + AddReceive(c1, func(c ReceiveChannel, more bool) { + var v string + c.Receive(ctx, &v) + history = append(history, fmt.Sprintf("c1-%v", v)) + }). + AddFuture(f1, func(f Future) { + var v string + err := f.Get(ctx, &v) + require.NoError(t, err) + history = append(history, fmt.Sprintf("f1-%v", v)) + }) + history = append(history, "select1") + selector.Select(ctx) + fmt.Println("select1 done", history) + + history = append(history, "select2") + selector.Select(ctx) + history = append(history, "done") + + }) + defer d.Close() + requireNoExecuteErr(t, d.ExecuteUntilAllBlocked(defaultDeadlockDetectionTimeout)) + require.True(t, d.IsDone(), strings.Join(history, "\n")) + expected := []string{ + "select1", + "add-one", + "add-one-done", + "add-two", + "c1-one", + "select2", + "f1-one-future", + "done", + } + require.EqualValues(t, expected, history) +} + +func TestBlockingSelectSend(t *testing.T) { + var history []string + d := createNewDispatcher(func(ctx Context) { + c1 := NewChannel(ctx) + c2 := NewChannel(ctx) + + Go(ctx, func(ctx Context) { + history = append(history, "add-one") + c1.Send(ctx, "one") + history = append(history, "add-one-done") + }) + Go(ctx, func(ctx Context) { + require.True(t, c2.Len() == 1) + history = append(history, "receiver") + var v string + more := c2.Receive(ctx, &v) + require.True(t, more) + history = append(history, fmt.Sprintf("c2-%v", v)) + require.True(t, c2.Len() == 0) + }) + + selector := NewSelector(ctx) + selector. + AddReceive(c1, func(c ReceiveChannel, more bool) { + var v string + c.Receive(ctx, &v) + history = append(history, fmt.Sprintf("c1-%v", v)) + }). + AddSend(c2, "two", func() { history = append(history, "send2") }) + history = append(history, "select1") + selector.Select(ctx) + + history = append(history, "select2") + selector.Select(ctx) + history = append(history, "done") + + }) + defer d.Close() + requireNoExecuteErr(t, d.ExecuteUntilAllBlocked(defaultDeadlockDetectionTimeout)) + require.True(t, d.IsDone(), strings.Join(history, "\n")) + expected := []string{ + "select1", + "add-one", + "add-one-done", + "receiver", + "c1-one", + "select2", + "send2", + "done", + "c2-two", + } + require.EqualValues(t, expected, history) +} + func TestBlockingSelectAsyncSend(t *testing.T) { var history []string d := createNewDispatcher(func(ctx Context) { diff --git a/internal/internal_eager_workflow.go b/internal/internal_eager_workflow.go index 9b8eac029..029829b88 100644 --- a/internal/internal_eager_workflow.go +++ b/internal/internal_eager_workflow.go @@ -33,14 +33,25 @@ import ( // eagerWorkflowDispatcher is responsible for finding an available worker for an eager workflow task. type eagerWorkflowDispatcher struct { lock sync.RWMutex - workersByTaskQueue map[string][]eagerWorker + workersByTaskQueue map[string]map[eagerWorker]struct{} } // registerWorker registers a worker that can be used for eager workflow dispatch func (e *eagerWorkflowDispatcher) registerWorker(worker *workflowWorker) { e.lock.Lock() defer e.lock.Unlock() - e.workersByTaskQueue[worker.executionParameters.TaskQueue] = append(e.workersByTaskQueue[worker.executionParameters.TaskQueue], worker.worker) + taskQueue := worker.executionParameters.TaskQueue + if e.workersByTaskQueue[taskQueue] == nil { + e.workersByTaskQueue[taskQueue] = make(map[eagerWorker]struct{}) + } + e.workersByTaskQueue[taskQueue][worker.worker] = struct{}{} +} + +// deregisterWorker deregister a worker so that it will not be used for eager workflow dispatch +func (e *eagerWorkflowDispatcher) deregisterWorker(worker *workflowWorker) { + e.lock.Lock() + defer e.lock.Unlock() + delete(e.workersByTaskQueue[worker.executionParameters.TaskQueue], worker.worker) } // applyToRequest updates request if eager workflow dispatch is possible and returns the eagerWorkflowExecutor to use @@ -48,9 +59,11 @@ func (e *eagerWorkflowDispatcher) applyToRequest(request *workflowservice.StartW // Try every worker that is assigned to the desired task queue. e.lock.RLock() workers := e.workersByTaskQueue[request.GetTaskQueue().Name] - randWorkers := make([]eagerWorker, len(workers)) - // Copy the slice so we can release the lock. - copy(randWorkers, workers) + randWorkers := make([]eagerWorker, 0, len(workers)) + // Copy the workers so we can release the lock. + for worker := range workers { + randWorkers = append(randWorkers, worker) + } e.lock.RUnlock() rand.Shuffle(len(randWorkers), func(i, j int) { randWorkers[i], randWorkers[j] = randWorkers[j], randWorkers[i] }) for _, worker := range randWorkers { diff --git a/internal/internal_eager_workflow_test.go b/internal/internal_eager_workflow_test.go index 294a29c16..db963551f 100644 --- a/internal/internal_eager_workflow_test.go +++ b/internal/internal_eager_workflow_test.go @@ -50,7 +50,7 @@ func (e *eagerWorkerMock) pushEagerTask(task eagerTask) { func TestEagerWorkflowDispatchNoWorkerOnTaskQueue(t *testing.T) { dispatcher := &eagerWorkflowDispatcher{ - workersByTaskQueue: make(map[string][]eagerWorker), + workersByTaskQueue: make(map[string]map[eagerWorker]struct{}), } dispatcher.registerWorker(&workflowWorker{ executionParameters: workerExecutionParameters{TaskQueue: "bad-task-queue"}, @@ -66,20 +66,20 @@ func TestEagerWorkflowDispatchNoWorkerOnTaskQueue(t *testing.T) { func TestEagerWorkflowDispatchAvailableWorker(t *testing.T) { dispatcher := &eagerWorkflowDispatcher{ - workersByTaskQueue: make(map[string][]eagerWorker), + workersByTaskQueue: make(map[string]map[eagerWorker]struct{}), } availableWorker := &eagerWorkerMock{ tryReserveSlotCallback: func() *SlotPermit { return &SlotPermit{} }, } - dispatcher.workersByTaskQueue["task-queue"] = []eagerWorker{ + dispatcher.workersByTaskQueue["task-queue"] = map[eagerWorker]struct{}{ &eagerWorkerMock{ tryReserveSlotCallback: func() *SlotPermit { return nil }, - }, + }: {}, &eagerWorkerMock{ tryReserveSlotCallback: func() *SlotPermit { return nil }, - }, - availableWorker, + }: {}, + availableWorker: {}, } request := &workflowservice.StartWorkflowExecutionRequest{ diff --git a/internal/internal_event_handlers.go b/internal/internal_event_handlers.go index 82a9785be..a284cb653 100644 --- a/internal/internal_event_handlers.go +++ b/internal/internal_event_handlers.go @@ -602,7 +602,7 @@ func (wc *workflowEnvironmentImpl) ExecuteChildWorkflow( attributes.InheritBuildId = determineInheritBuildIdFlagForCommand( params.VersioningIntent, wc.workflowInfo.TaskQueueName, params.TaskQueueName) - startMetadata, err := buildUserMetadata(params.staticSummary, params.staticDetails, wc.dataConverter) + startMetadata, err := buildUserMetadata(params.StaticSummary, params.StaticDetails, wc.dataConverter) if err != nil { callback(nil, err) return @@ -759,7 +759,13 @@ func (wc *workflowEnvironmentImpl) ExecuteActivity(parameters ExecuteActivityPar scheduleTaskAttr.UseWorkflowBuildId = determineInheritBuildIdFlagForCommand( parameters.VersioningIntent, wc.workflowInfo.TaskQueueName, parameters.TaskQueueName) - command := wc.commandsHelper.scheduleActivityTask(scheduleID, scheduleTaskAttr) + startMetadata, err := buildUserMetadata(parameters.Summary, "", wc.dataConverter) + if err != nil { + callback(nil, err) + return ActivityID{} + } + + command := wc.commandsHelper.scheduleActivityTask(scheduleID, scheduleTaskAttr, startMetadata) command.setData(&scheduledActivity{ callback: callback, waitForCancelRequest: parameters.WaitForCancellation, @@ -978,6 +984,10 @@ func (wc *workflowEnvironmentImpl) TryUse(flag sdkFlag) bool { return wc.sdkFlags.tryUse(flag, !wc.isReplay) } +func (wc *workflowEnvironmentImpl) GetFlag(flag sdkFlag) bool { + return wc.sdkFlags.getFlag(flag) +} + func (wc *workflowEnvironmentImpl) QueueUpdate(name string, f func()) { wc.bufferedUpdateRequests[name] = append(wc.bufferedUpdateRequests[name], f) } diff --git a/internal/internal_flags.go b/internal/internal_flags.go index 503c650a5..4a697d69e 100644 --- a/internal/internal_flags.go +++ b/internal/internal_flags.go @@ -47,9 +47,14 @@ const ( // SDKPriorityUpdateHandling will cause update request to be handled before the main workflow method. // It will also cause the SDK to immediately handle updates when a handler is registered. SDKPriorityUpdateHandling = 4 - SDKFlagUnknown = math.MaxUint32 + // SDKFlagBlockedSelectorSignalReceive will cause a signal to not be lost + // when the Default path is blocked. + SDKFlagBlockedSelectorSignalReceive = 5 + SDKFlagUnknown = math.MaxUint32 ) +var unblockSelectorSignal bool + func sdkFlagFromUint(value uint32) sdkFlag { switch value { case uint32(SDKFlagUnset): @@ -62,6 +67,8 @@ func sdkFlagFromUint(value uint32) sdkFlag { return SDKFlagProtocolMessageCommand case uint32(SDKPriorityUpdateHandling): return SDKPriorityUpdateHandling + case uint32(SDKFlagBlockedSelectorSignalReceive): + return SDKFlagBlockedSelectorSignalReceive default: return SDKFlagUnknown } @@ -105,6 +112,11 @@ func (sf *sdkFlags) tryUse(flag sdkFlag, record bool) bool { } } +// getFlag returns true if the flag is currently set. +func (sf *sdkFlags) getFlag(flag sdkFlag) bool { + return sf.currentFlags[flag] || sf.newFlags[flag] +} + // set marks a flag as in current use regardless of replay status. func (sf *sdkFlags) set(flags ...sdkFlag) { if !sf.capabilities.GetSdkMetadata() { @@ -131,3 +143,9 @@ func (sf *sdkFlags) gatherNewSDKFlags() []sdkFlag { } return flags } + +// SetUnblockSelectorSignal toggles the flag to unblock the selector signal. +// For test use only, +func SetUnblockSelectorSignal(unblockSignal bool) { + unblockSelectorSignal = unblockSignal +} diff --git a/internal/internal_worker.go b/internal/internal_worker.go index 23beb0c59..61f324f91 100644 --- a/internal/internal_worker.go +++ b/internal/internal_worker.go @@ -1050,6 +1050,9 @@ func (aw *AggregatedWorker) start() error { // stop workflow worker. if !util.IsInterfaceNil(aw.workflowWorker) { if aw.workflowWorker.worker.isWorkerStarted { + if aw.client.eagerDispatcher != nil { + aw.client.eagerDispatcher.deregisterWorker(aw.workflowWorker) + } aw.workflowWorker.Stop() } } @@ -1218,6 +1221,9 @@ func (aw *AggregatedWorker) Stop() { } if !util.IsInterfaceNil(aw.workflowWorker) { + if aw.client.eagerDispatcher != nil { + aw.client.eagerDispatcher.deregisterWorker(aw.workflowWorker) + } aw.workflowWorker.Stop() } if !util.IsInterfaceNil(aw.activityWorker) { diff --git a/internal/internal_worker_base.go b/internal/internal_worker_base.go index dc3865e6c..f7160da8e 100644 --- a/internal/internal_worker_base.go +++ b/internal/internal_worker_base.go @@ -145,6 +145,8 @@ type ( DrainUnhandledUpdates() bool // TryUse returns true if this flag may currently be used. TryUse(flag sdkFlag) bool + // GetFlag returns if the flag is currently used. + GetFlag(flag sdkFlag) bool } // WorkflowDefinitionFactory factory for creating WorkflowDefinition instances. diff --git a/internal/internal_workers_test.go b/internal/internal_workers_test.go index 5622e0ca3..11159cf18 100644 --- a/internal/internal_workers_test.go +++ b/internal/internal_workers_test.go @@ -117,6 +117,7 @@ func (s *WorkersTestSuite) TestWorkflowWorker() { workflowWorker.Stop() s.NoError(ctx.Err()) + } type CountingSlotSupplier struct { @@ -736,6 +737,8 @@ func (s *WorkersTestSuite) TestWorkerMultipleStop() { worker := NewAggregatedWorker(client, "multi-stop-tq", WorkerOptions{}) s.NoError(worker.Start()) worker.Stop() + // Verify stopping the worker removes it from the eager dispatcher + s.Empty(client.eagerDispatcher.workersByTaskQueue["multi-stop-tq"]) worker.Stop() } diff --git a/internal/internal_workflow.go b/internal/internal_workflow.go index ce4eb8986..62349900b 100644 --- a/internal/internal_workflow.go +++ b/internal/internal_workflow.go @@ -47,6 +47,7 @@ import ( "go.temporal.io/sdk/converter" "go.temporal.io/sdk/internal/common/metrics" + "go.temporal.io/sdk/log" ) const ( @@ -193,6 +194,7 @@ type ( mutex sync.Mutex // used to synchronize executing closed bool interceptor WorkflowOutboundInterceptor + logger log.Logger deadlockDetector *deadlockDetector readOnly bool // allBlockedCallback is called when all coroutines are blocked, @@ -222,6 +224,8 @@ type ( SearchAttributes map[string]interface{} TypedSearchAttributes SearchAttributes ParentClosePolicy enumspb.ParentClosePolicy + StaticSummary string + StaticDetails string signalChannels map[string]Channel requestedSignalChannels map[string]*requestedSignalChannel queryHandlers map[string]*queryHandler @@ -229,9 +233,6 @@ type ( // runningUpdatesHandles is a map of update handlers that are currently running. runningUpdatesHandles map[string]UpdateInfo VersioningIntent VersioningIntent - // TODO(cretz): Expose once https://github.com/temporalio/temporal/issues/6412 is fixed - staticSummary string - staticDetails string // currentDetails is the user-set string returned on metadata query as // WorkflowMetadata.current_details currentDetails string @@ -676,8 +677,11 @@ func (d *syncWorkflowDefinition) Close() { // Context passed to the root function is child of the passed rootCtx. // This way rootCtx can be used to pass values to the coroutine code. func newDispatcher(rootCtx Context, interceptor *workflowEnvironmentInterceptor, root func(ctx Context), allBlockedCallback func() bool) (*dispatcherImpl, Context) { + env := getWorkflowEnvironment(rootCtx) + result := &dispatcherImpl{ interceptor: interceptor.outboundInterceptor, + logger: env.GetLogger(), deadlockDetector: newDeadlockDetector(), allBlockedCallback: allBlockedCallback, } @@ -1158,21 +1162,31 @@ func (s *coroutineState) close() { } // exit tries to run Goexit on the coroutine and wait for it to exit -// within timeout. -func (s *coroutineState) exit(timeout time.Duration) { +// within timeout. If it doesn't exit within timeout, it will log a warning. +func (s *coroutineState) exit(logger log.Logger, warnTimeout time.Duration) { if !s.closed.Load() { s.unblock <- func(status string, stackDepth int) bool { runtime.Goexit() return true } - timer := time.NewTimer(timeout) + timer := time.NewTimer(warnTimeout) defer timer.Stop() select { case <-s.aboutToBlock: + return case <-timer.C: + st, err := getCoroStackTrace(s, "running", 0) + if err != nil { + st = fmt.Sprintf("<%s>", err) + } + + logger.Warn(fmt.Sprintf("Workflow coroutine %q didn't exit within %v", s.name, warnTimeout), "stackTrace", st) } + // We need to make sure the coroutine is closed, otherwise we risk concurrent coroutines running + // at the same time causing a race condition. + <-s.aboutToBlock } } @@ -1332,7 +1346,7 @@ func (d *dispatcherImpl) Close() { // * On exit the coroutines defers will still run and that may block. go func() { for _, c := range d.coroutines { - c.exit(defaultCoroutineExitTimeout) + c.exit(d.logger, defaultDeadlockDetectionTimeout) } }() } @@ -1407,8 +1421,24 @@ func (s *selectorImpl) Select(ctx Context) { if readyBranch != nil { return false } - readyBranch = func() { + // readyBranch is not executed when AddDefault is specified, + // setting the value here prevents the signal from being dropped + env := getWorkflowEnvironment(ctx) + var dropSignalFlag bool + if unblockSelectorSignal { + dropSignalFlag = env.TryUse(SDKFlagBlockedSelectorSignalReceive) + } else { + dropSignalFlag = env.GetFlag(SDKFlagBlockedSelectorSignalReceive) + } + + if dropSignalFlag { c.recValue = &v + } + + readyBranch = func() { + if !dropSignalFlag { + c.recValue = &v + } f(c, more) } return true diff --git a/internal/internal_workflow_client_test.go b/internal/internal_workflow_client_test.go index 270b27203..8617163cf 100644 --- a/internal/internal_workflow_client_test.go +++ b/internal/internal_workflow_client_test.go @@ -1407,7 +1407,7 @@ func (s *workflowClientTestSuite) TestEagerStartWorkflowNotSupported() { }, } client.eagerDispatcher = &eagerWorkflowDispatcher{ - workersByTaskQueue: map[string][]eagerWorker{taskqueue: {eagerMock}}, + workersByTaskQueue: map[string]map[eagerWorker]struct{}{taskqueue: {eagerMock: {}}}, } s.True(ok) options := StartWorkflowOptions{ @@ -1446,7 +1446,7 @@ func (s *workflowClientTestSuite) TestEagerStartWorkflowNoWorker() { tryReserveSlotCallback: func() *SlotPermit { return nil }, processTaskAsyncCallback: func(task eagerTask) { processTask = true }} client.eagerDispatcher = &eagerWorkflowDispatcher{ - workersByTaskQueue: map[string][]eagerWorker{taskqueue: {eagerMock}}, + workersByTaskQueue: map[string]map[eagerWorker]struct{}{taskqueue: {eagerMock: {}}}, } s.True(ok) options := StartWorkflowOptions{ @@ -1485,7 +1485,7 @@ func (s *workflowClientTestSuite) TestEagerStartWorkflow() { tryReserveSlotCallback: func() *SlotPermit { return &SlotPermit{} }, processTaskAsyncCallback: func(task eagerTask) { processTask = true }} client.eagerDispatcher = &eagerWorkflowDispatcher{ - workersByTaskQueue: map[string][]eagerWorker{taskqueue: {eagerMock}}} + workersByTaskQueue: map[string]map[eagerWorker]struct{}{taskqueue: {eagerMock: {}}}} s.True(ok) options := StartWorkflowOptions{ ID: workflowID, @@ -1525,7 +1525,7 @@ func (s *workflowClientTestSuite) TestEagerStartWorkflowStartRequestFail() { tryReserveSlotCallback: func() *SlotPermit { return &SlotPermit{} }, processTaskAsyncCallback: func(task eagerTask) { processTask = true }} client.eagerDispatcher = &eagerWorkflowDispatcher{ - workersByTaskQueue: map[string][]eagerWorker{taskqueue: {eagerMock}}} + workersByTaskQueue: map[string]map[eagerWorker]struct{}{taskqueue: {eagerMock: {}}}} s.True(ok) options := StartWorkflowOptions{ ID: workflowID, diff --git a/internal/internal_workflow_test.go b/internal/internal_workflow_test.go index efa6912d2..448ef20bd 100644 --- a/internal/internal_workflow_test.go +++ b/internal/internal_workflow_test.go @@ -1402,24 +1402,6 @@ func (s *WorkflowUnitTest) Test_MutatingFunctionsInQueries() { s.NoError(env.GetWorkflowError()) } -type updateCallback struct { - accept func() - reject func(error) - complete func(interface{}, error) -} - -func (uc *updateCallback) Accept() { - uc.accept() -} - -func (uc *updateCallback) Reject(err error) { - uc.reject(err) -} - -func (uc *updateCallback) Complete(success interface{}, err error) { - uc.complete(success, err) -} - func (s *WorkflowUnitTest) Test_MutatingFunctionsInUpdateValidator() { env := s.NewTestWorkflowEnvironment() @@ -1438,7 +1420,7 @@ func (s *WorkflowUnitTest) Test_MutatingFunctionsInUpdateValidator() { } env.RegisterWorkflow(wf) env.RegisterDelayedCallback(func() { - env.UpdateWorkflow(updateType, "testID", &updateCallback{}) + env.UpdateWorkflow(updateType, "testID", &TestUpdateCallback{}) }, time.Second) env.ExecuteWorkflow(wf) s.True(env.IsWorkflowCompleted()) diff --git a/internal/internal_workflow_testsuite.go b/internal/internal_workflow_testsuite.go index 7cbf77552..e5a552c6e 100644 --- a/internal/internal_workflow_testsuite.go +++ b/internal/internal_workflow_testsuite.go @@ -157,6 +157,14 @@ type ( taskQueues map[string]struct{} } + updateResult struct { + success interface{} + err error + update_id string + callbacks []updateCallbacksWrapper + completed bool + } + // testWorkflowEnvironmentShared is the shared data between parent workflow and child workflow test environments testWorkflowEnvironmentShared struct { locker sync.Mutex @@ -229,6 +237,7 @@ type ( signalHandler func(name string, input *commonpb.Payloads, header *commonpb.Header) error queryHandler func(string, *commonpb.Payloads, *commonpb.Header) (*commonpb.Payloads, error) updateHandler func(name string, id string, input *commonpb.Payloads, header *commonpb.Header, resp UpdateCallbacks) + updateMap map[string]*updateResult startedHandler func(r WorkflowExecution, e error) isWorkflowCompleted bool @@ -250,12 +259,21 @@ type ( workflowFunctionExecuting bool bufferedUpdateRequests map[string][]func() + + sdkFlags *sdkFlags } testSessionEnvironmentImpl struct { *sessionEnvironmentImpl testWorkflowEnvironment *testWorkflowEnvironmentImpl } + + // UpdateCallbacksWrapper is a wrapper to UpdateCallbacks. It allows us to dedup duplicate update IDs in the test environment. + updateCallbacksWrapper struct { + uc UpdateCallbacks + env *testWorkflowEnvironmentImpl + updateID string + } ) func newTestWorkflowEnvironmentImpl(s *WorkflowTestSuite, parentRegistry *registry) *testWorkflowEnvironmentImpl { @@ -313,6 +331,7 @@ func newTestWorkflowEnvironmentImpl(s *WorkflowTestSuite, parentRegistry *regist failureConverter: GetDefaultFailureConverter(), runTimeout: maxWorkflowTimeout, bufferedUpdateRequests: make(map[string][]func()), + sdkFlags: newSDKFlags(&workflowservice.GetSystemInfoResponse_Capabilities{SdkMetadata: true}), } if debugMode { @@ -605,7 +624,11 @@ func (env *testWorkflowEnvironmentImpl) getWorkflowDefinition(wt WorkflowType) ( } func (env *testWorkflowEnvironmentImpl) TryUse(flag sdkFlag) bool { - return true + return env.sdkFlags.tryUse(flag, true) +} + +func (env *testWorkflowEnvironmentImpl) GetFlag(flag sdkFlag) bool { + return env.sdkFlags.getFlag(flag) } func (env *testWorkflowEnvironmentImpl) QueueUpdate(name string, f func()) { @@ -2910,10 +2933,32 @@ func (env *testWorkflowEnvironmentImpl) updateWorkflow(name string, id string, u if err != nil { panic(err) } - env.postCallback(func() { - // Do not send any headers on test invocations - env.updateHandler(name, id, data, nil, uc) - }, true) + + if env.updateMap == nil { + env.updateMap = make(map[string]*updateResult) + } + + var ucWrapper = updateCallbacksWrapper{uc: uc, env: env, updateID: id} + + // check for duplicate update ID + if result, ok := env.updateMap[id]; ok { + if result.completed { + env.postCallback(func() { + ucWrapper.uc.Accept() + ucWrapper.uc.Complete(result.success, result.err) + }, false) + } else { + result.callbacks = append(result.callbacks, ucWrapper) + } + env.updateMap[id] = result + } else { + env.updateMap[id] = &updateResult{nil, nil, id, []updateCallbacksWrapper{}, false} + env.postCallback(func() { + // Do not send any headers on test invocations + env.updateHandler(name, id, data, nil, ucWrapper) + }, true) + } + } func (env *testWorkflowEnvironmentImpl) updateWorkflowByID(workflowID, name, id string, uc UpdateCallbacks, args ...interface{}) error { @@ -2925,9 +2970,31 @@ func (env *testWorkflowEnvironmentImpl) updateWorkflowByID(workflowID, name, id if err != nil { panic(err) } - workflowHandle.env.postCallback(func() { - workflowHandle.env.updateHandler(name, id, data, nil, uc) - }, true) + + if env.updateMap == nil { + env.updateMap = make(map[string]*updateResult) + } + + var ucWrapper = updateCallbacksWrapper{uc: uc, env: env, updateID: id} + + // Check for duplicate update ID + if result, ok := env.updateMap[id]; ok { + if result.completed { + env.postCallback(func() { + ucWrapper.uc.Accept() + ucWrapper.uc.Complete(result.success, result.err) + }, false) + } else { + result.callbacks = append(result.callbacks, ucWrapper) + } + env.updateMap[id] = result + } else { + env.updateMap[id] = &updateResult{nil, nil, id, []updateCallbacksWrapper{}, false} + workflowHandle.env.postCallback(func() { + workflowHandle.env.updateHandler(name, id, data, nil, ucWrapper) + }, true) + } + return nil } @@ -3068,6 +3135,34 @@ func mockFnGetVersion(string, Version, Version) Version { // make sure interface is implemented var _ WorkflowEnvironment = (*testWorkflowEnvironmentImpl)(nil) +func (uc updateCallbacksWrapper) Accept() { + uc.uc.Accept() +} + +func (uc updateCallbacksWrapper) Reject(err error) { + uc.uc.Reject(err) +} + +func (uc updateCallbacksWrapper) Complete(success interface{}, err error) { + // cache update result so we can dedup duplicate update IDs + if uc.env == nil { + panic("env is needed in updateCallback to cache update results for deduping purposes") + } + if result, ok := uc.env.updateMap[uc.updateID]; ok { + if !result.completed { + result.success = success + result.err = err + uc.uc.Complete(success, err) + result.completed = true + result.post_callbacks(uc.env) + } else { + uc.uc.Complete(result.success, result.err) + } + } else { + panic("updateMap[updateID] should already be created from updateWorkflow()") + } +} + func (h *testNexusOperationHandle) newStartTask() *workflowservice.PollNexusTaskQueueResponse { return &workflowservice.PollNexusTaskQueueResponse{ TaskToken: []byte{}, @@ -3418,3 +3513,13 @@ func newTestNexusOperation(opRef testNexusOperationReference) *testNexusOperatio testNexusOperationReference: opRef, } } + +func (res *updateResult) post_callbacks(env *testWorkflowEnvironmentImpl) { + for _, uc := range res.callbacks { + env.postCallback(func() { + uc.Accept() + uc.Complete(res.success, res.err) + }, false) + } + res.callbacks = []updateCallbacksWrapper{} +} diff --git a/internal/internal_workflow_testsuite_test.go b/internal/internal_workflow_testsuite_test.go index 472e8f1fa..558ae7a43 100644 --- a/internal/internal_workflow_testsuite_test.go +++ b/internal/internal_workflow_testsuite_test.go @@ -4240,3 +4240,37 @@ func (s *WorkflowTestSuiteUnitTest) Test_SameWorkflowAndActivityNames() { s.Require().True(env.IsWorkflowCompleted()) s.Require().NoError(env.GetWorkflowError()) } + +func (s *WorkflowTestSuiteUnitTest) Test_SignalLoss() { + workflowFn := func(ctx Context) error { + ch1 := GetSignalChannel(ctx, "test-signal") + ch2 := GetSignalChannel(ctx, "test-signal-2") + selector := NewSelector(ctx) + var v string + selector.AddReceive(ch1, func(c ReceiveChannel, more bool) { + c.Receive(ctx, &v) + }) + selector.AddDefault(func() { + ch2.Receive(ctx, &v) + }) + selector.Select(ctx) + s.Require().True(ch1.Len() == 0 && v == "s2") + selector.Select(ctx) + + return nil + } + + // send a signal after workflow has started + env := s.NewTestWorkflowEnvironment() + env.RegisterDelayedCallback(func() { + env.SignalWorkflow("test-signal", "s1") + env.SignalWorkflow("test-signal-2", "s2") + }, 5*time.Second) + env.ExecuteWorkflow(workflowFn) + s.True(env.IsWorkflowCompleted()) + err := env.GetWorkflowError() + s.Error(err) + var workflowErr *WorkflowExecutionError + s.True(errors.As(err, &workflowErr)) + s.Equal("deadline exceeded (type: ScheduleToClose)", workflowErr.cause.Error()) +} diff --git a/internal/workflow.go b/internal/workflow.go index 12a08b622..39bff603e 100644 --- a/internal/workflow.go +++ b/internal/workflow.go @@ -380,9 +380,22 @@ type ( // WARNING: Worker versioning is currently experimental VersioningIntent VersioningIntent - // TODO(cretz): Expose once https://github.com/temporalio/temporal/issues/6412 is fixed - staticSummary string - staticDetails string + // StaticSummary is a single-line fixed summary for this child workflow execution that will appear in UI/CLI. This can be + // in single-line Temporal Markdown format. + // + // Optional: defaults to none/empty. + // + // NOTE: Experimental + StaticSummary string + + // Details - General fixed details for this child workflow execution that will appear in UI/CLI. This can be in + // Temporal markdown format and can span multiple lines. This is a fixed value on the workflow that cannot be + // updated. For details that can be updated, use SetCurrentDetails within the workflow. + // + // Optional: defaults to none/empty. + // + // NOTE: Experimental + StaticDetails string } // RegisterWorkflowOptions consists of options for registering a workflow @@ -1083,7 +1096,8 @@ func (wc *workflowEnvironmentInterceptor) ExecuteChildWorkflow(ctx Context, chil options.SearchAttributes = workflowOptionsFromCtx.SearchAttributes options.TypedSearchAttributes = workflowOptionsFromCtx.TypedSearchAttributes options.VersioningIntent = workflowOptionsFromCtx.VersioningIntent - + options.StaticDetails = workflowOptionsFromCtx.StaticDetails + options.StaticSummary = workflowOptionsFromCtx.StaticSummary header, err := workflowHeaderPropagated(ctx, options.ContextPropagators) if err != nil { executionSettable.Set(nil, err) @@ -1609,9 +1623,8 @@ func WithChildWorkflowOptions(ctx Context, cwo ChildWorkflowOptions) Context { wfOptions.TypedSearchAttributes = cwo.TypedSearchAttributes wfOptions.ParentClosePolicy = cwo.ParentClosePolicy wfOptions.VersioningIntent = cwo.VersioningIntent - // TODO(cretz): Expose once https://github.com/temporalio/temporal/issues/6412 is fixed - wfOptions.staticSummary = cwo.staticSummary - wfOptions.staticDetails = cwo.staticDetails + wfOptions.StaticSummary = cwo.StaticSummary + wfOptions.StaticDetails = cwo.StaticDetails return ctx1 } @@ -1638,9 +1651,8 @@ func GetChildWorkflowOptions(ctx Context) ChildWorkflowOptions { TypedSearchAttributes: opts.TypedSearchAttributes, ParentClosePolicy: opts.ParentClosePolicy, VersioningIntent: opts.VersioningIntent, - // TODO(cretz): Expose once https://github.com/temporalio/temporal/issues/6412 is fixed - staticSummary: opts.staticSummary, - staticDetails: opts.staticDetails, + StaticSummary: opts.StaticSummary, + StaticDetails: opts.StaticDetails, } } @@ -2163,6 +2175,7 @@ func WithActivityOptions(ctx Context, options ActivityOptions) Context { eap.RetryPolicy = convertToPBRetryPolicy(options.RetryPolicy) eap.DisableEagerExecution = options.DisableEagerExecution eap.VersioningIntent = options.VersioningIntent + eap.Summary = options.Summary return ctx1 } @@ -2219,6 +2232,7 @@ func GetActivityOptions(ctx Context) ActivityOptions { RetryPolicy: convertFromPBRetryPolicy(opts.RetryPolicy), DisableEagerExecution: opts.DisableEagerExecution, VersioningIntent: opts.VersioningIntent, + Summary: opts.Summary, } } diff --git a/internal/workflow_test.go b/internal/workflow_test.go index 1abfa8209..2bc2d5116 100644 --- a/internal/workflow_test.go +++ b/internal/workflow_test.go @@ -61,6 +61,8 @@ func TestGetChildWorkflowOptions(t *testing.T) { }, ParentClosePolicy: enums.PARENT_CLOSE_POLICY_REQUEST_CANCEL, VersioningIntent: VersioningIntentDefault, + StaticSummary: "child workflow summary", + StaticDetails: "child workflow details", } // Require test options to have non-zero value for each field. This ensures that we update tests (and the @@ -82,6 +84,7 @@ func TestGetActivityOptions(t *testing.T) { RetryPolicy: newTestRetryPolicy(), DisableEagerExecution: true, VersioningIntent: VersioningIntentDefault, + Summary: "activity summary", } assertNonZero(t, opts) diff --git a/internal/workflow_testsuite.go b/internal/workflow_testsuite.go index 4775cf22f..9985fdd58 100644 --- a/internal/workflow_testsuite.go +++ b/internal/workflow_testsuite.go @@ -34,6 +34,7 @@ import ( "github.com/nexus-rpc/sdk-go/nexus" "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" commonpb "go.temporal.io/api/common/v1" enumspb "go.temporal.io/api/enums/v1" @@ -83,6 +84,15 @@ type ( runFn func(args mock.Arguments) waitDuration func() time.Duration } + + // TestUpdateCallback is a basic implementation of the UpdateCallbacks interface for testing purposes. + // Tests are welcome to implement their own version of this interface if they need to test more complex + // update logic. This is a simple implementation to make testing basic Workflow Updates easier. + TestUpdateCallback struct { + OnAccept func() + OnReject func(error) + OnComplete func(interface{}, error) + } ) func newEncodedValues(values *commonpb.Payloads, dc converter.DataConverter) converter.EncodedValues { @@ -767,6 +777,18 @@ func (c *MockCallWrapper) NotBefore(calls ...*MockCallWrapper) *MockCallWrapper return c } +func (uc *TestUpdateCallback) Accept() { + uc.OnAccept() +} + +func (uc *TestUpdateCallback) Reject(err error) { + uc.OnReject(err) +} + +func (uc *TestUpdateCallback) Complete(success interface{}, err error) { + uc.OnComplete(success, err) +} + // ExecuteWorkflow executes a workflow, wait until workflow complete. It will fail the test if workflow is blocked and // cannot complete within TestTimeout (set by SetTestTimeout()). func (e *TestWorkflowEnvironment) ExecuteWorkflow(workflowFn interface{}, args ...interface{}) { @@ -1079,6 +1101,17 @@ func (e *TestWorkflowEnvironment) UpdateWorkflowByID(workflowID, updateName, upd return e.impl.updateWorkflowByID(workflowID, updateName, updateID, uc, args) } +func (e *TestWorkflowEnvironment) UpdateWorkflowNoRejection(updateName string, updateID string, t mock.TestingT, args ...interface{}) { + uc := &TestUpdateCallback{ + OnReject: func(err error) { + require.Fail(t, "update should not be rejected") + }, + OnAccept: func() {}, + OnComplete: func(interface{}, error) {}, + } + e.UpdateWorkflow(updateName, updateID, uc, args) +} + // QueryWorkflowByID queries a child workflow by its ID and returns the result synchronously func (e *TestWorkflowEnvironment) QueryWorkflowByID(workflowID, queryType string, args ...interface{}) (converter.EncodedValue, error) { return e.impl.queryWorkflowByID(workflowID, queryType, args...) diff --git a/internal/workflow_testsuite_test.go b/internal/workflow_testsuite_test.go index 3fc46146b..9d0684266 100644 --- a/internal/workflow_testsuite_test.go +++ b/internal/workflow_testsuite_test.go @@ -274,12 +274,12 @@ func TestWorkflowIDUpdateWorkflowByID(t *testing.T) { // Test UpdateWorkflowByID works with custom ID env := suite.NewTestWorkflowEnvironment() env.RegisterDelayedCallback(func() { - err := env.UpdateWorkflowByID("my-workflow-id", "update", "id", &updateCallback{ - reject: func(err error) { + err := env.UpdateWorkflowByID("my-workflow-id", "update", "id", &TestUpdateCallback{ + OnReject: func(err error) { require.Fail(t, "update should not be rejected") }, - accept: func() {}, - complete: func(interface{}, error) {}, + OnAccept: func() {}, + OnComplete: func(interface{}, error) {}, }, "input") require.NoError(t, err) }, time.Second) @@ -311,13 +311,13 @@ func TestChildWorkflowUpdate(t *testing.T) { ID: wfID, }) env.RegisterDelayedCallback(func() { - err := env.UpdateWorkflowByID("child-workflow", "child-handler", "1", &updateCallback{ - accept: func() { + err := env.UpdateWorkflowByID("child-workflow", "child-handler", "1", &TestUpdateCallback{ + OnAccept: func() { }, - reject: func(err error) { + OnReject: func(err error) { require.Fail(t, "update failed", err) }, - complete: func(result interface{}, err error) { + OnComplete: func(result interface{}, err error) { if err != nil { require.Fail(t, "update failed", err) } @@ -369,13 +369,7 @@ func TestWorkflowUpdateOrder(t *testing.T) { // Test UpdateWorkflowByID works with custom ID env := suite.NewTestWorkflowEnvironment() env.RegisterDelayedCallback(func() { - env.UpdateWorkflow("update", "id", &updateCallback{ - reject: func(err error) { - require.Fail(t, "update should not be rejected") - }, - accept: func() {}, - complete: func(interface{}, error) {}, - }) + env.UpdateWorkflowNoRejection("update", "id", t) }, 0) env.ExecuteWorkflow(func(ctx Context) (int, error) { @@ -407,14 +401,14 @@ func TestWorkflowNotRegisteredRejected(t *testing.T) { env := suite.NewTestWorkflowEnvironment() var updateRejectionErr error env.RegisterDelayedCallback(func() { - env.UpdateWorkflow("update", "id", &updateCallback{ - reject: func(err error) { + env.UpdateWorkflow("update", "id", &TestUpdateCallback{ + OnReject: func(err error) { updateRejectionErr = err }, - accept: func() { + OnAccept: func() { require.Fail(t, "update should not be accepted") }, - complete: func(interface{}, error) {}, + OnComplete: func(interface{}, error) {}, }) }, 0) @@ -433,36 +427,24 @@ func TestWorkflowUpdateOrderAcceptReject(t *testing.T) { env := suite.NewTestWorkflowEnvironment() // Send 3 updates, with one bad update env.RegisterDelayedCallback(func() { - env.UpdateWorkflow("update", "1", &updateCallback{ - reject: func(err error) { - require.Fail(t, "update should not be rejected") - }, - accept: func() {}, - complete: func(interface{}, error) {}, - }) + env.UpdateWorkflowNoRejection("update", "1", t) }, 0) var updateRejectionErr error env.RegisterDelayedCallback(func() { - env.UpdateWorkflow("bad update", "2", &updateCallback{ - reject: func(err error) { + env.UpdateWorkflow("bad update", "2", &TestUpdateCallback{ + OnReject: func(err error) { updateRejectionErr = err }, - accept: func() { - require.Fail(t, "update should not be rejected") + OnAccept: func() { + require.Fail(t, "update should not be accepted") }, - complete: func(interface{}, error) {}, + OnComplete: func(interface{}, error) {}, }) }, 0) env.RegisterDelayedCallback(func() { - env.UpdateWorkflow("update", "3", &updateCallback{ - reject: func(err error) { - require.Fail(t, "update should not be rejected") - }, - accept: func() {}, - complete: func(interface{}, error) {}, - }) + env.UpdateWorkflowNoRejection("update", "3", t) }, 0) env.ExecuteWorkflow(func(ctx Context) (int, error) { @@ -491,28 +473,109 @@ func TestWorkflowUpdateOrderAcceptReject(t *testing.T) { require.Equal(t, "unknown update bad update. KnownUpdates=[update]", updateRejectionErr.Error()) } +func TestWorkflowDuplicateIDDedup(t *testing.T) { + duplicateIDDedup(t, true, false, 1) +} + +func TestWorkflowDuplicateIDDedupInterweave(t *testing.T) { + // The second update should be scheduled before the first update is complete. + // This causes the second update to be completed only after the first update + // is complete and its result is cached for the second update to dedup. + duplicateIDDedup(t, false, false, 1) +} + +func TestWorkflowDuplicateIDDedupWithSleep(t *testing.T) { + duplicateIDDedup(t, false, true, 1) +} + +func TestWorkflowDuplicateIDDedupMore(t *testing.T) { + duplicateIDDedup(t, true, false, 50) +} + +func TestWorkflowDuplicateIDDedupDelayAndSleep(t *testing.T) { + duplicateIDDedup(t, true, true, 50) +} + +func duplicateIDDedup(t *testing.T, delay_second bool, with_sleep bool, additional int) { + var suite WorkflowTestSuite + var second_delay time.Duration + if delay_second { + second_delay = 1 * time.Second + } else { + second_delay = 0 * time.Second + } + additional_update_count := 0 + // Test dev server dedups UpdateWorkflow with same ID + env := suite.NewTestWorkflowEnvironment() + env.RegisterDelayedCallback(func() { + env.UpdateWorkflow("update", "id", &TestUpdateCallback{ + OnReject: func(err error) { + require.Fail(t, fmt.Sprintf("update should not be rejected, err: %v", err)) + }, + OnAccept: func() {}, + OnComplete: func(result interface{}, err error) { + intResult, ok := result.(int) + if !ok { + require.Fail(t, fmt.Sprintf("result should be int: %v\nerr: %v", result, err)) + } else { + require.Equal(t, 0, intResult) + } + }, + }, 0) + }, 0) + + for i := 0; i < additional; i++ { + env.RegisterDelayedCallback(func() { + env.UpdateWorkflow("update", "id", &TestUpdateCallback{ + OnReject: func(err error) { + require.Fail(t, fmt.Sprintf("update should not be rejected, err: %v", err)) + }, + OnAccept: func() {}, + OnComplete: func(result interface{}, err error) { + intResult, ok := result.(int) + if !ok { + require.Fail(t, fmt.Sprintf("result should be int: %v\nerr: %v", result, err)) + } else { + // if dedup, this be okay, even if we pass in 1 as arg, since it's deduping, + // the result should match the first update's result, 0 + require.Equal(t, 0, intResult) + } + additional_update_count += 1 + }, + }, 1) + + }, second_delay) + } + + env.ExecuteWorkflow(func(ctx Context) error { + err := SetUpdateHandler(ctx, "update", func(ctx Context, i int) (int, error) { + if with_sleep { + err := Sleep(ctx, time.Second) + if err != nil { + return 0, err + } + } + return i, nil + }, UpdateHandlerOptions{}) + if err != nil { + return err + } + return Sleep(ctx, time.Hour) + }) + require.NoError(t, env.GetWorkflowError()) + require.Equal(t, additional, additional_update_count) +} + func TestAllHandlersFinished(t *testing.T) { var suite WorkflowTestSuite env := suite.NewTestWorkflowEnvironment() env.RegisterDelayedCallback(func() { - env.UpdateWorkflow("update", "id_1", &updateCallback{ - reject: func(err error) { - require.Fail(t, "update should not be rejected") - }, - accept: func() {}, - complete: func(interface{}, error) {}, - }) + env.UpdateWorkflowNoRejection("update", "id_1", t) }, 0) env.RegisterDelayedCallback(func() { - env.UpdateWorkflow("update", "id_2", &updateCallback{ - reject: func(err error) { - require.Fail(t, "update should not be rejected") - }, - accept: func() {}, - complete: func(interface{}, error) {}, - }) + env.UpdateWorkflowNoRejection("update", "id_2", t) }, time.Minute) env.ExecuteWorkflow(func(ctx Context) (int, error) { @@ -570,33 +633,15 @@ func TestWorkflowAllHandlersFinished(t *testing.T) { env := suite.NewTestWorkflowEnvironment() env.RegisterDelayedCallback(func() { - env.UpdateWorkflow("update", "id_1", &updateCallback{ - reject: func(err error) { - require.Fail(t, "update should not be rejected") - }, - accept: func() {}, - complete: func(interface{}, error) {}, - }) + env.UpdateWorkflowNoRejection("update", "id_1", t) }, 0) env.RegisterDelayedCallback(func() { - env.UpdateWorkflow("update", "id_2", &updateCallback{ - reject: func(err error) { - require.Fail(t, "update should not be rejected") - }, - accept: func() {}, - complete: func(interface{}, error) {}, - }) + env.UpdateWorkflowNoRejection("update", "id_2", t) }, time.Minute) env.RegisterDelayedCallback(func() { - env.UpdateWorkflow("nonWarningHandler", "id_3", &updateCallback{ - reject: func(err error) { - require.Fail(t, "update should not be rejected") - }, - accept: func() {}, - complete: func(interface{}, error) {}, - }) + env.UpdateWorkflowNoRejection("nonWarningHandler", "id_3", t) }, 2*time.Minute) env.RegisterDelayedCallback(func() { @@ -727,13 +772,7 @@ func TestWorkflowUpdateLogger(t *testing.T) { env := suite.NewTestWorkflowEnvironment() env.RegisterDelayedCallback(func() { - env.UpdateWorkflow("logging_update", "id_1", &updateCallback{ - reject: func(err error) { - require.Fail(t, "update should not be rejected") - }, - accept: func() {}, - complete: func(interface{}, error) {}, - }) + env.UpdateWorkflowNoRejection("logging_update", "id_1", t) }, 0) env.RegisterDelayedCallback(func() { diff --git a/test/integration_test.go b/test/integration_test.go index d8c80cee8..4a3fcad9b 100644 --- a/test/integration_test.go +++ b/test/integration_test.go @@ -6241,10 +6241,6 @@ func (ts *IntegrationTestSuite) TestRequestFailureMetric() { } func (ts *IntegrationTestSuite) TestUserMetadata() { - // Skip this test if disabled - if os.Getenv("DISABLE_USER_METADATA_TESTS") != "" { - ts.T().SkipNow() - } ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() @@ -6301,6 +6297,8 @@ func (ts *IntegrationTestSuite) TestUserMetadata() { // Confirm that the history has a timer with the proper summary iter := ts.client.GetWorkflowHistory(ctx, run.GetID(), "", false, enumspb.HISTORY_EVENT_FILTER_TYPE_ALL_EVENT) var timerEvent *historypb.HistoryEvent + var activityEvent *historypb.HistoryEvent + var childWorkflowEvent *historypb.HistoryEvent for iter.HasNext() { event, err := iter.Next() ts.NoError(err) @@ -6308,11 +6306,35 @@ func (ts *IntegrationTestSuite) TestUserMetadata() { ts.Nil(timerEvent) timerEvent = event } + + if event.GetActivityTaskScheduledEventAttributes() != nil { + ts.Nil(activityEvent) + activityEvent = event + } + + if event.GetStartChildWorkflowExecutionInitiatedEventAttributes() != nil { + ts.Nil(childWorkflowEvent) + childWorkflowEvent = event + } } ts.NotNil(timerEvent) ts.NoError(converter.GetDefaultDataConverter().FromPayload( timerEvent.UserMetadata.Summary, &str)) ts.Equal("my-timer", str) + + ts.NotNil(activityEvent) + ts.NoError(converter.GetDefaultDataConverter().FromPayload( + activityEvent.UserMetadata.Summary, &str)) + ts.Equal("my-activity", str) + + ts.NotNil(childWorkflowEvent) + fmt.Printf("childWorkflowEvent: %v\n", childWorkflowEvent.UserMetadata) + ts.NoError(converter.GetDefaultDataConverter().FromPayload( + childWorkflowEvent.UserMetadata.Summary, &str)) + ts.Equal("my-child-wf-summary", str) + ts.NoError(converter.GetDefaultDataConverter().FromPayload( + childWorkflowEvent.UserMetadata.Details, &str)) + ts.Equal("my-child-wf-details", str) } func (ts *IntegrationTestSuite) TestAwaitWithOptionsTimeout() { @@ -6602,6 +6624,32 @@ func (ts *IntegrationTestSuite) getReportedOperationCount(metricName string, ope return count } +func (ts *IntegrationTestSuite) TestSelectorBlock() { + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + options := ts.startWorkflowOptions("test-selector-block") + run, err := ts.client.ExecuteWorkflow(ctx, options, ts.workflows.SelectorBlockSignal) + ts.NoError(err) + var result string + ts.NoError(run.Get(ctx, &result)) + ts.Equal("hello", result) +} + +func (ts *IntegrationTestSuite) TestSelectorNoBlock() { + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + options := ts.startWorkflowOptions("test-selector-block") + + internal.SetUnblockSelectorSignal(true) + defer internal.SetUnblockSelectorSignal(false) + + run, err := ts.client.ExecuteWorkflow(ctx, options, ts.workflows.SelectorBlockSignal) + ts.NoError(err) + var result string + ts.NoError(run.Get(ctx, &result)) + ts.Equal("HELLO", result) +} + type coroutineCountingInterceptor struct { interceptor.WorkerInterceptorBase // Access via count() diff --git a/test/replaytests/replay_test.go b/test/replaytests/replay_test.go index 55c55e2f6..b328a9752 100644 --- a/test/replaytests/replay_test.go +++ b/test/replaytests/replay_test.go @@ -464,6 +464,26 @@ func (s *replayTestSuite) TestGogoprotoPayloadWorkflow() { s.NoError(err) } +func (s *replayTestSuite) TestSelectorBlockingDefault() { + replayer := worker.NewWorkflowReplayer() + replayer.RegisterWorkflow(SelectorBlockingDefaultWorkflow) + // Verify we can still replay an old workflow that does + // not have the SDKFlagBlockedSelectorSignalReceive flag + err := replayer.ReplayWorkflowHistoryFromJSONFile(ilog.NewDefaultLogger(), "selector-blocking-default.json") + s.NoError(err) + require.NoError(s.T(), err) +} + +func (s *replayTestSuite) TestSelectorNonBlocking() { + replayer := worker.NewWorkflowReplayer() + replayer.RegisterWorkflow(SelectorBlockingDefaultWorkflow) + // Verify we can replay the new workflow that has the + // SDKFlagBlockedSelectorSignalReceive flag + err := replayer.ReplayWorkflowHistoryFromJSONFile(ilog.NewDefaultLogger(), "selector-non-blocking.json") + s.NoError(err) + require.NoError(s.T(), err) +} + type captureConverter struct { converter.DataConverter toPayloads []interface{} diff --git a/test/replaytests/selector-blocking-default.json b/test/replaytests/selector-blocking-default.json new file mode 100644 index 000000000..07c2d0387 --- /dev/null +++ b/test/replaytests/selector-blocking-default.json @@ -0,0 +1,89 @@ +{ + "events": [ + { + "eventId": "1", + "eventTime": "2024-10-21T23:39:08.991521Z", + "eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_STARTED", + "taskId": "1048587", + "workflowExecutionStartedEventAttributes": { + "workflowType": { + "name": "SelectorBlockingDefaultWorkflow" + }, + "taskQueue": { + "name": "hello-world", + "kind": "TASK_QUEUE_KIND_NORMAL" + }, + "workflowExecutionTimeout": "0s", + "workflowRunTimeout": "0s", + "workflowTaskTimeout": "10s", + "originalExecutionRunId": "dde5f879-0e59-47e8-8048-ac0f164866fd", + "identity": "47182@Andrews-MacBook-Pro.local@", + "firstExecutionRunId": "dde5f879-0e59-47e8-8048-ac0f164866fd", + "attempt": 1, + "firstWorkflowTaskBackoff": "0s", + "header": {}, + "workflowId": "hello_world_workflowID" + } + }, + { + "eventId": "2", + "eventTime": "2024-10-21T23:39:08.991569Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_SCHEDULED", + "taskId": "1048588", + "workflowTaskScheduledEventAttributes": { + "taskQueue": { + "name": "hello-world", + "kind": "TASK_QUEUE_KIND_NORMAL" + }, + "startToCloseTimeout": "10s", + "attempt": 1 + } + }, + { + "eventId": "3", + "eventTime": "2024-10-21T23:39:08.994898Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_STARTED", + "taskId": "1048593", + "workflowTaskStartedEventAttributes": { + "scheduledEventId": "2", + "identity": "47139@Andrews-MacBook-Pro.local@", + "requestId": "a7a50c99-1d0d-449c-9d75-09458ac1e7af", + "historySizeBytes": "282", + "workerVersion": { + "buildId": "e15e79cbae5f5acc33774a930eed2f97" + } + } + }, + { + "eventId": "4", + "eventTime": "2024-10-21T23:39:08.999006Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_COMPLETED", + "taskId": "1048597", + "workflowTaskCompletedEventAttributes": { + "scheduledEventId": "2", + "startedEventId": "3", + "identity": "47139@Andrews-MacBook-Pro.local@", + "workerVersion": { + "buildId": "e15e79cbae5f5acc33774a930eed2f97" + }, + "sdkMetadata": { + "langUsedFlags": [ + 3 + ], + "sdkName": "temporal-go", + "sdkVersion": "1.29.1" + }, + "meteringMetadata": {} + } + }, + { + "eventId": "5", + "eventTime": "2024-10-21T23:39:08.999055Z", + "eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED", + "taskId": "1048598", + "workflowExecutionCompletedEventAttributes": { + "workflowTaskCompletedEventId": "4" + } + } + ] +} \ No newline at end of file diff --git a/test/replaytests/selector-non-blocking.json b/test/replaytests/selector-non-blocking.json new file mode 100644 index 000000000..6ae33da84 --- /dev/null +++ b/test/replaytests/selector-non-blocking.json @@ -0,0 +1,211 @@ +{ + "events": [ + { + "eventId": "1", + "eventTime": "2024-11-13T17:54:47.478632Z", + "eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_STARTED", + "taskId": "1048626", + "workflowExecutionStartedEventAttributes": { + "workflowType": { + "name": "SelectorBlockingDefaultWorkflow" + }, + "taskQueue": { + "name": "hello-world", + "kind": "TASK_QUEUE_KIND_NORMAL" + }, + "workflowExecutionTimeout": "2s", + "workflowRunTimeout": "2s", + "workflowTaskTimeout": "2s", + "originalExecutionRunId": "a73567ce-1a8e-4c86-9286-65e9039663a3", + "identity": "94241@Andrews-MacBook-Pro.local@", + "firstExecutionRunId": "a73567ce-1a8e-4c86-9286-65e9039663a3", + "attempt": 1, + "workflowExecutionExpirationTime": "2024-11-13T17:54:49.478Z", + "firstWorkflowTaskBackoff": "0s", + "header": {}, + "workflowId": "hello_world_workflowID" + } + }, + { + "eventId": "2", + "eventTime": "2024-11-13T17:54:47.478680Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_SCHEDULED", + "taskId": "1048627", + "workflowTaskScheduledEventAttributes": { + "taskQueue": { + "name": "hello-world", + "kind": "TASK_QUEUE_KIND_NORMAL" + }, + "startToCloseTimeout": "2s", + "attempt": 1 + } + }, + { + "eventId": "3", + "eventTime": "2024-11-13T17:54:47.480740Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_STARTED", + "taskId": "1048636", + "workflowTaskStartedEventAttributes": { + "scheduledEventId": "2", + "identity": "94147@Andrews-MacBook-Pro.local@", + "requestId": "9fae9b1e-4182-4f47-a675-ba4facd08273", + "historySizeBytes": "602", + "workerVersion": { + "buildId": "7e5be6238aa91ebec5dcc5b6859e87c6" + } + } + }, + { + "eventId": "4", + "eventTime": "2024-11-13T17:54:47.485146Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_COMPLETED", + "taskId": "1048640", + "workflowTaskCompletedEventAttributes": { + "scheduledEventId": "2", + "startedEventId": "3", + "identity": "94147@Andrews-MacBook-Pro.local@", + "workerVersion": { + "buildId": "7e5be6238aa91ebec5dcc5b6859e87c6" + }, + "sdkMetadata": { + "langUsedFlags": [ + 3, + 5 + ], + "sdkName": "temporal-go", + "sdkVersion": "1.29.1" + }, + "meteringMetadata": {} + } + }, + { + "eventId": "5", + "eventTime": "2024-11-13T17:54:47.485222Z", + "eventType": "EVENT_TYPE_ACTIVITY_TASK_SCHEDULED", + "taskId": "1048641", + "activityTaskScheduledEventAttributes": { + "activityId": "5", + "activityType": { + "name": "SelectorBlockingDefaultActivity" + }, + "taskQueue": { + "name": "hello-world", + "kind": "TASK_QUEUE_KIND_NORMAL" + }, + "header": {}, + "input": { + "payloads": [ + { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "IlNpZ25hbCBub3QgbG9zdCI=" + } + ] + }, + "scheduleToCloseTimeout": "2s", + "scheduleToStartTimeout": "2s", + "startToCloseTimeout": "2s", + "heartbeatTimeout": "0s", + "workflowTaskCompletedEventId": "4", + "retryPolicy": { + "initialInterval": "1s", + "backoffCoefficient": 2, + "maximumInterval": "100s" + }, + "useWorkflowBuildId": true + } + }, + { + "eventId": "6", + "eventTime": "2024-11-13T17:54:47.486704Z", + "eventType": "EVENT_TYPE_ACTIVITY_TASK_STARTED", + "taskId": "1048646", + "activityTaskStartedEventAttributes": { + "scheduledEventId": "5", + "identity": "94147@Andrews-MacBook-Pro.local@", + "requestId": "31f676df-39a4-4ef7-ad2e-fd2166139abd", + "attempt": 1, + "workerVersion": { + "buildId": "7e5be6238aa91ebec5dcc5b6859e87c6" + } + } + }, + { + "eventId": "7", + "eventTime": "2024-11-13T17:54:47.488853Z", + "eventType": "EVENT_TYPE_ACTIVITY_TASK_COMPLETED", + "taskId": "1048647", + "activityTaskCompletedEventAttributes": { + "result": { + "payloads": [ + { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "IlNpZ25hbCBub3QgbG9zdCB3YXMgbG9nZ2VkISI=" + } + ] + }, + "scheduledEventId": "5", + "startedEventId": "6", + "identity": "94147@Andrews-MacBook-Pro.local@" + } + }, + { + "eventId": "8", + "eventTime": "2024-11-13T17:54:47.488857Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_SCHEDULED", + "taskId": "1048648", + "workflowTaskScheduledEventAttributes": { + "taskQueue": { + "name": "Andrews-MacBook-Pro.local:ffbf63d9-bf89-41ab-8431-2f3d60c085c7", + "kind": "TASK_QUEUE_KIND_STICKY", + "normalName": "hello-world" + }, + "startToCloseTimeout": "2s", + "attempt": 1 + } + }, + { + "eventId": "9", + "eventTime": "2024-11-13T17:54:47.489773Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_STARTED", + "taskId": "1048652", + "workflowTaskStartedEventAttributes": { + "scheduledEventId": "8", + "identity": "94147@Andrews-MacBook-Pro.local@", + "requestId": "fc1ab01a-627d-49db-a0c0-0829e9938212", + "historySizeBytes": "1417", + "workerVersion": { + "buildId": "7e5be6238aa91ebec5dcc5b6859e87c6" + } + } + }, + { + "eventId": "10", + "eventTime": "2024-11-13T17:54:47.491177Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_COMPLETED", + "taskId": "1048656", + "workflowTaskCompletedEventAttributes": { + "scheduledEventId": "8", + "startedEventId": "9", + "identity": "94147@Andrews-MacBook-Pro.local@", + "workerVersion": { + "buildId": "7e5be6238aa91ebec5dcc5b6859e87c6" + }, + "sdkMetadata": {}, + "meteringMetadata": {} + } + }, + { + "eventId": "11", + "eventTime": "2024-11-13T17:54:47.491192Z", + "eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED", + "taskId": "1048657", + "workflowExecutionCompletedEventAttributes": { + "workflowTaskCompletedEventId": "10" + } + } + ] +} \ No newline at end of file diff --git a/test/replaytests/workflows.go b/test/replaytests/workflows.go index 90fbdff5c..97089844a 100644 --- a/test/replaytests/workflows.go +++ b/test/replaytests/workflows.go @@ -610,3 +610,49 @@ func ListAndDescribeWorkflow(ctx workflow.Context) (int, error) { } return len(result.Executions), nil } + +func SelectorBlockingDefaultWorkflow(ctx workflow.Context) error { + logger := workflow.GetLogger(ctx) + ao := workflow.ActivityOptions{ + StartToCloseTimeout: 10 * time.Second, + } + ctx = workflow.WithActivityOptions(ctx, ao) + + ch1 := workflow.NewChannel(ctx) + ch2 := workflow.NewChannel(ctx) + + workflow.Go(ctx, func(ctx workflow.Context) { + ch1.Send(ctx, "one") + + }) + + workflow.Go(ctx, func(ctx workflow.Context) { + ch2.Send(ctx, "two") + }) + + selector := workflow.NewSelector(ctx) + var s string + selector.AddReceive(ch1, func(c workflow.ReceiveChannel, more bool) { + c.Receive(ctx, &s) + }) + selector.AddDefault(func() { + ch2.Receive(ctx, &s) + }) + selector.Select(ctx) + if selector.HasPending() { + var result string + activity := workflow.ExecuteActivity(ctx, SelectorBlockingDefaultActivity, "Signal not lost") + activity.Get(ctx, &result) + logger.Info("Result", result) + } else { + logger.Info("Signal in ch1 lost") + return nil + } + return nil +} + +func SelectorBlockingDefaultActivity(ctx context.Context, value string) (string, error) { + logger := activity.GetLogger(ctx) + logger.Info("Activity", "value", value) + return value + " was logged!", nil +} diff --git a/test/workflow_test.go b/test/workflow_test.go index 562761d8d..9a7e929cb 100644 --- a/test/workflow_test.go +++ b/test/workflow_test.go @@ -3092,6 +3092,7 @@ func (w *Workflows) UpsertMemo(ctx workflow.Context, memo map[string]interface{} } func (w *Workflows) UserMetadata(ctx workflow.Context) error { + var activities *Activities // Define an update and query handler err := workflow.SetQueryHandlerWithOptions( ctx, @@ -3123,6 +3124,29 @@ func (w *Workflows) UserMetadata(ctx workflow.Context) error { ).Receive(ctx, nil) workflow.SetCurrentDetails(ctx, "current-details-2") + // Start an activity with a description + ao := workflow.ActivityOptions{ + StartToCloseTimeout: time.Minute, + Summary: "my-activity", + } + ctx = workflow.WithActivityOptions(ctx, ao) + var result string + err = workflow.ExecuteActivity(ctx, activities.EmptyActivity).Get(ctx, &result) + if err != nil { + return err + } + + // Start a child workflow with a description + cwo := workflow.ChildWorkflowOptions{ + StaticSummary: "my-child-wf-summary", + StaticDetails: "my-child-wf-details", + } + ctx = workflow.WithChildOptions(ctx, cwo) + err = workflow.ExecuteChildWorkflow(ctx, w.SimplestWorkflow).Get(ctx, nil) + if err != nil { + return err + } + // Run a short timer with a summary and return return workflow.NewTimerWithOptions( ctx, @@ -3174,6 +3198,45 @@ func (w *Workflows) RunsLocalAndNonlocalActsWithRetries(ctx workflow.Context, nu return nil } +func (w *Workflows) SelectorBlockSignal(ctx workflow.Context) (string, error) { + ctx = workflow.WithActivityOptions(ctx, w.defaultActivityOptions()) + var logger = workflow.GetLogger(ctx) + logger.Info("calling ExecuteActivity") + ch1 := workflow.NewChannel(ctx) + ch2 := workflow.NewChannel(ctx) + + workflow.Go(ctx, func(ctx workflow.Context) { + ch1.Send(ctx, "one") + + }) + + workflow.Go(ctx, func(ctx workflow.Context) { + ch2.Send(ctx, "two") + }) + + selector := workflow.NewSelector(ctx) + var s string + selector.AddReceive(ch1, func(c workflow.ReceiveChannel, more bool) { + c.Receive(ctx, &s) + }) + selector.AddDefault(func() { + ch2.Receive(ctx, &s) + }) + selector.Select(ctx) + + var hello = "hello" + if selector.HasPending() { + var result string + activity := workflow.ExecuteActivity(ctx, "Prefix_ToUpper", hello) + activity.Get(ctx, &result) + logger.Info("Result", result) + return result, nil + } else { + logger.Info("Signal in ch1 lost") + } + return hello, nil +} + func (w *Workflows) register(worker worker.Worker) { worker.RegisterWorkflow(w.ActivityCancelRepro) worker.RegisterWorkflow(w.ActivityCompletionUsingID) @@ -3310,6 +3373,7 @@ func (w *Workflows) register(worker worker.Worker) { worker.RegisterWorkflow(w.UpdateSetHandlerOnly) worker.RegisterWorkflow(w.Echo) worker.RegisterWorkflow(w.RunsLocalAndNonlocalActsWithRetries) + worker.RegisterWorkflow(w.SelectorBlockSignal) } func (w *Workflows) defaultActivityOptions() workflow.ActivityOptions { diff --git a/testsuite/testsuite.go b/testsuite/testsuite.go index eba27e666..02342e266 100644 --- a/testsuite/testsuite.go +++ b/testsuite/testsuite.go @@ -42,6 +42,9 @@ type ( // MockCallWrapper is a wrapper to mock.Call. It offers the ability to wait on workflow's clock instead of wall clock. MockCallWrapper = internal.MockCallWrapper + + // TestUpdateCallback is a basic implementation of the UpdateCallbacks interface for testing purposes. + TestUpdateCallback = internal.TestUpdateCallback ) // ErrMockStartChildWorkflowFailed is special error used to indicate the mocked child workflow should fail to start.