Skip to content

Commit

Permalink
Merge branch 'master' into nexus-complete-before-start
Browse files Browse the repository at this point in the history
  • Loading branch information
pdoerner authored Nov 27, 2024
2 parents a1cbd2d + 08d52ce commit 0866c1b
Show file tree
Hide file tree
Showing 33 changed files with 1,285 additions and 159 deletions.
14 changes: 14 additions & 0 deletions contrib/datadog/tracing/interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,20 @@ func (t *tracerImpl) ContextWithSpan(ctx context.Context, span interceptor.Trace
return tracer.ContextWithSpan(ctx, span.(*tracerSpan).Span)
}

// SpanFromWorkflowContext extracts the DataDog Span object from the workflow context.
func SpanFromWorkflowContext(ctx workflow.Context) (ddtrace.Span, bool) {
val := ctx.Value(activeSpanContextKey)
if val == nil {
return tracer.SpanFromContext(nil)
}

if span, ok := val.(*tracerSpan); ok {
return span.Span, true
}

return tracer.SpanFromContext(nil)
}

func genSpanID(idempotencyKey string) uint64 {
h := fnv.New64()
// Write() always writes all bytes and never fails; the count and error result are for implementing io.Writer.
Expand Down
43 changes: 42 additions & 1 deletion contrib/datadog/tracing/interceptor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
package tracing

import (
"errors"
"strings"
"testing"

Expand All @@ -31,6 +32,9 @@ import (

"go.temporal.io/sdk/interceptor"
"go.temporal.io/sdk/internal/interceptortest"
"go.temporal.io/sdk/testsuite"
"go.temporal.io/sdk/worker"
"go.temporal.io/sdk/workflow"
)

type testTracer struct {
Expand Down Expand Up @@ -133,11 +137,48 @@ func Test_OnFinishOption(t *testing.T) {
Tracer: impl,
mt: mt,
}

interceptortest.RunTestWorkflowWithError(t, trc)

spans := trc.FinishedSpans()

require.Len(t, spans, 1)
require.Equal(t, "temporal.RunWorkflow", spans[0].Name)
}

func setCustomSpanTagWorkflow(ctx workflow.Context) error {
span, ok := SpanFromWorkflowContext(ctx)

if !ok {
return errors.New("Did not find span in workflow context")
}

span.SetTag("testTag", "testValue")
return nil
}

func Test_SpanFromWorkflowContext(t *testing.T) {
// Start the mock tracer.
mt := mocktracer.Start()
defer mt.Stop()

var suite testsuite.WorkflowTestSuite
env := suite.NewTestWorkflowEnvironment()
env.RegisterWorkflow(setCustomSpanTagWorkflow)

impl := NewTracer(TracerOptions{})
testTracer := testTracer{
Tracer: impl,
mt: mt,
}

// Set tracer interceptor
env.SetWorkerOptions(worker.Options{
Interceptors: []interceptor.WorkerInterceptor{interceptor.NewTracingInterceptor(testTracer)},
})

env.ExecuteWorkflow(setCustomSpanTagWorkflow)

require.True(t, env.IsWorkflowCompleted())
testSpan := mt.FinishedSpans()[0]
require.Equal(t, "testValue", testSpan.Tag("testTag"))
}
8 changes: 8 additions & 0 deletions internal/activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,14 @@ type (
// build ID or not. See temporal.VersioningIntent.
// WARNING: Worker versioning is currently experimental
VersioningIntent VersioningIntent

// Summary is a single-line summary for this activity that will appear in UI/CLI. This can be
// in single-line Temporal Markdown format.
//
// Optional: defaults to none/empty.
//
// NOTE: Experimental
Summary string
}

// LocalActivityOptions stores local activity specific parameters that will be stored inside of a context.
Expand Down
2 changes: 1 addition & 1 deletion internal/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -993,7 +993,7 @@ func NewServiceClient(workflowServiceClient workflowservice.WorkflowServiceClien
workerInterceptors: workerInterceptors,
excludeInternalFromRetry: options.ConnectionOptions.excludeInternalFromRetry,
eagerDispatcher: &eagerWorkflowDispatcher{
workersByTaskQueue: make(map[string][]eagerWorker),
workersByTaskQueue: make(map[string]map[eagerWorker]struct{}),
},
}

Expand Down
35 changes: 35 additions & 0 deletions internal/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -826,6 +826,41 @@ func (e *ChildWorkflowExecutionError) Unwrap() error {
return e.cause
}

// Namespace returns namespace of the child workflow.
func (e *ChildWorkflowExecutionError) Namespace() string {
return e.namespace
}

// WorkflowId returns workflow ID of the child workflow.
func (e *ChildWorkflowExecutionError) WorkflowID() string {
return e.workflowID
}

// RunID returns run ID of the child workflow.
func (e *ChildWorkflowExecutionError) RunID() string {
return e.runID
}

// WorkflowType returns type of the child workflow.
func (e *ChildWorkflowExecutionError) WorkflowType() string {
return e.workflowType
}

// InitiatedEventID returns event ID of the child workflow initiated event.
func (e *ChildWorkflowExecutionError) InitiatedEventID() int64 {
return e.initiatedEventID
}

// StartedEventID returns event ID of the child workflow started event.
func (e *ChildWorkflowExecutionError) StartedEventID() int64 {
return e.startedEventID
}

// RetryState returns details on why child workflow failed.
func (e *ChildWorkflowExecutionError) RetryState() enumspb.RetryState {
return e.retryState
}

// Error implements the error interface.
func (e *NexusOperationError) Error() string {
msg := fmt.Sprintf(
Expand Down
2 changes: 1 addition & 1 deletion internal/error_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func testTimeoutErrorDetails(t *testing.T, timeoutType enumspb.TimeoutType) {
context.commandsHelper.scheduledEventIDToActivityID[5] = activityID
di := h.newActivityCommandStateMachine(
5,
&commandpb.ScheduleActivityTaskCommandAttributes{ActivityId: activityID})
&commandpb.ScheduleActivityTaskCommandAttributes{ActivityId: activityID}, nil)
di.state = commandStateInitiated
di.setData(&scheduledActivity{
callback: func(r *commonpb.Payloads, e error) {
Expand Down
1 change: 1 addition & 0 deletions internal/internal_activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ type (
RetryPolicy *commonpb.RetryPolicy
DisableEagerExecution bool
VersioningIntent VersioningIntent
Summary string
}

// ExecuteLocalActivityOptions options for executing a local activity
Expand Down
11 changes: 8 additions & 3 deletions internal/internal_command_state_machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,9 @@ type (

activityCommandStateMachine struct {
*commandStateMachineBase
scheduleID int64
attributes *commandpb.ScheduleActivityTaskCommandAttributes
scheduleID int64
attributes *commandpb.ScheduleActivityTaskCommandAttributes
startMetadata *sdk.UserMetadata
}

cancelActivityStateMachine struct {
Expand Down Expand Up @@ -348,12 +349,14 @@ func (h *commandsHelper) newCommandStateMachineBase(commandType commandType, id
func (h *commandsHelper) newActivityCommandStateMachine(
scheduleID int64,
attributes *commandpb.ScheduleActivityTaskCommandAttributes,
startMetadata *sdk.UserMetadata,
) *activityCommandStateMachine {
base := h.newCommandStateMachineBase(commandTypeActivity, attributes.GetActivityId())
return &activityCommandStateMachine{
commandStateMachineBase: base,
scheduleID: scheduleID,
attributes: attributes,
startMetadata: startMetadata,
}
}

Expand Down Expand Up @@ -618,6 +621,7 @@ func (d *activityCommandStateMachine) getCommand() *commandpb.Command {
case commandStateCreated, commandStateCanceledBeforeSent:
command := createNewCommand(enumspb.COMMAND_TYPE_SCHEDULE_ACTIVITY_TASK)
command.Attributes = &commandpb.Command_ScheduleActivityTaskCommandAttributes{ScheduleActivityTaskCommandAttributes: d.attributes}
command.UserMetadata = d.startMetadata
return command
default:
return nil
Expand Down Expand Up @@ -1118,9 +1122,10 @@ func (h *commandsHelper) moveCommandToBack(command commandStateMachine) {
func (h *commandsHelper) scheduleActivityTask(
scheduleID int64,
attributes *commandpb.ScheduleActivityTaskCommandAttributes,
metadata *sdk.UserMetadata,
) commandStateMachine {
h.scheduledEventIDToActivityID[scheduleID] = attributes.GetActivityId()
command := h.newActivityCommandStateMachine(scheduleID, attributes)
command := h.newActivityCommandStateMachine(scheduleID, attributes, metadata)
h.addCommand(command)
return command
}
Expand Down
12 changes: 6 additions & 6 deletions internal/internal_command_state_machine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func Test_ActivityStateMachine_CompleteWithoutCancel(t *testing.T) {

// schedule activity
scheduleID := h.getNextID()
d := h.scheduleActivityTask(scheduleID, attributes)
d := h.scheduleActivityTask(scheduleID, attributes, nil)
require.Equal(t, commandStateCreated, d.getState())
commands := h.getCommands(true)
require.Equal(t, commandStateCommandSent, d.getState())
Expand All @@ -192,7 +192,7 @@ func Test_ActivityStateMachine_CancelBeforeSent(t *testing.T) {

// schedule activity
scheduleID := h.getNextID()
d := h.scheduleActivityTask(scheduleID, attributes)
d := h.scheduleActivityTask(scheduleID, attributes, nil)
require.Equal(t, commandStateCreated, d.getState())

// Cancel before command sent. We will send the command and the cancellation.
Expand All @@ -215,7 +215,7 @@ func Test_ActivityStateMachine_CancelAfterSent(t *testing.T) {

// schedule activity
scheduleID := h.getNextID()
d := h.scheduleActivityTask(scheduleID, attributes)
d := h.scheduleActivityTask(scheduleID, attributes, nil)
require.Equal(t, commandStateCreated, d.getState())
commands := h.getCommands(true)
require.Equal(t, 1, len(commands))
Expand Down Expand Up @@ -251,7 +251,7 @@ func Test_ActivityStateMachine_CompletedAfterCancel(t *testing.T) {

// schedule activity
scheduleID := h.getNextID()
d := h.scheduleActivityTask(scheduleID, attributes)
d := h.scheduleActivityTask(scheduleID, attributes, nil)
require.Equal(t, commandStateCreated, d.getState())
commands := h.getCommands(true)
require.Equal(t, 1, len(commands))
Expand Down Expand Up @@ -287,7 +287,7 @@ func Test_ActivityStateMachine_CancelInitiated_After_CanceledBeforeSent(t *testi

// schedule activity
scheduleID := h.getNextID()
d := h.scheduleActivityTask(scheduleID, attributes)
d := h.scheduleActivityTask(scheduleID, attributes, nil)
require.Equal(t, commandStateCreated, d.getState())

// cancel activity before sent
Expand Down Expand Up @@ -324,7 +324,7 @@ func Test_ActivityStateMachine_PanicInvalidStateTransition(t *testing.T) {

// schedule activity
scheduleID := h.getNextID()
h.scheduleActivityTask(scheduleID, attributes)
h.scheduleActivityTask(scheduleID, attributes, nil)

// verify that using invalid activity id will panic
err := runAndCatchPanic(func() {
Expand Down
Loading

0 comments on commit 0866c1b

Please sign in to comment.