Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Nexus links tests #1613

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions test/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ toolchain go1.21.1

require (
github.com/golang/mock v1.6.0
github.com/google/go-cmp v0.6.0
github.com/google/uuid v1.6.0
github.com/nexus-rpc/sdk-go v0.0.10
github.com/opentracing/opentracing-go v1.2.0
Expand Down
132 changes: 118 additions & 14 deletions test/nexus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@ import (
"net/http"
"os"
"slices"
"strings"
"testing"
"time"

"github.com/google/go-cmp/cmp"
"github.com/google/uuid"
"github.com/nexus-rpc/sdk-go/nexus"
"github.com/stretchr/testify/assert"
Expand All @@ -42,6 +42,8 @@ import (
historypb "go.temporal.io/api/history/v1"
nexuspb "go.temporal.io/api/nexus/v1"
"go.temporal.io/api/operatorservice/v1"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/testing/protocmp"

"go.temporal.io/sdk/client"
"go.temporal.io/sdk/interceptor"
Expand Down Expand Up @@ -76,7 +78,7 @@ func newTestContext(t *testing.T, ctx context.Context) *testContext {
require.NoError(t, err)

taskQueue := "sdk-go-nexus-test-tq-" + uuid.NewString()
endpoint := strings.ReplaceAll("sdk-go-nexus-test-ep-"+uuid.NewString(), "-", "_")
endpoint := "sdk-go-nexus-test-ep-" + uuid.NewString()
res, err := c.OperatorService().CreateNexusEndpoint(ctx, &operatorservice.CreateNexusEndpointRequest{
Spec: &nexuspb.EndpointSpec{
Name: endpoint,
Expand Down Expand Up @@ -155,6 +157,12 @@ func (tc *testContext) requireCounter(t *assert.CollectT, metric, service, opera
}))
}

func requireProtoEqual(t *testing.T, want proto.Message, got proto.Message) {
if diff := cmp.Diff(want, got, protocmp.Transform()); diff != "" {
require.Fail(t, "Proto mismatch (-want +got):\n", diff)
}
}

var syncOp = temporalnexus.NewSyncOperation("sync-op", func(ctx context.Context, c client.Client, s string, o nexus.StartOperationOptions) (string, error) {
switch s {
case "ok":
Expand Down Expand Up @@ -290,10 +298,22 @@ func TestNexusWorkflowRunOperation(t *testing.T) {

nc := tc.newNexusClient(t, service.Name)

link := &common.Link_WorkflowEvent{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is redundant IMHO since we're already testing adding links from a workflow.

Namespace: tc.testConfig.Namespace,
WorkflowId: "caller-wf-id",
RunId: "caller-run-id",
Reference: &common.Link_WorkflowEvent_EventRef{
EventRef: &common.Link_WorkflowEvent_EventReference{
EventType: enums.EVENT_TYPE_NEXUS_OPERATION_SCHEDULED,
},
},
}

workflowID := "nexus-handler-workflow-" + uuid.NewString()
result, err := nexus.StartOperation(ctx, nc, workflowOp, workflowID, nexus.StartOperationOptions{
CallbackURL: "http://localhost/test",
CallbackHeader: nexus.Header{"test": "ok"},
Links: []nexus.Link{temporalnexus.ConvertLinkWorkflowEventToNexusLink(link)},
})
require.NoError(t, err)
require.NotNil(t, result.Pending)
Expand All @@ -308,6 +328,17 @@ func TestNexusWorkflowRunOperation(t *testing.T) {
require.Equal(t, "http://localhost/test", callback.Nexus.Url)
require.Equal(t, map[string]string{"test": "ok"}, callback.Nexus.Header)

iter := tc.client.GetWorkflowHistory(ctx, workflowID, "", false, enums.HISTORY_EVENT_FILTER_TYPE_ALL_EVENT)
for iter.HasNext() {
event, err := iter.Next()
require.NoError(t, err)
if event.GetEventType() == enums.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED {
require.Len(t, event.GetLinks(), 1)
requireProtoEqual(t, link, event.GetLinks()[0].GetWorkflowEvent())
break
}
}

run := tc.client.GetWorkflow(ctx, workflowID, "")
require.NoError(t, handle.Cancel(ctx, nexus.CancelOperationOptions{}))
require.ErrorContains(t, run.Get(ctx, nil), "canceled")
Expand Down Expand Up @@ -451,19 +482,26 @@ func TestAsyncOperationFromWorkflow(t *testing.T) {
panic(fmt.Errorf("unexpected outcome: %s", action))
}
}
op := temporalnexus.NewWorkflowRunOperation("op", handlerWorkflow, func(ctx context.Context, action string, soo nexus.StartOperationOptions) (client.StartWorkflowOptions, error) {
require.NotPanicsf(t, func() {
temporalnexus.GetMetricsHandler(ctx)
temporalnexus.GetLogger(ctx)
}, "Failed to get metrics handler or logger from operation context.")
handlerWfID := ""
op := temporalnexus.NewWorkflowRunOperation(
"op",
handlerWorkflow,
func(ctx context.Context, action string, soo nexus.StartOperationOptions) (client.StartWorkflowOptions, error) {
require.NotPanicsf(t, func() {
temporalnexus.GetMetricsHandler(ctx)
temporalnexus.GetLogger(ctx)
}, "Failed to get metrics handler or logger from operation context.")

if action == "fail-to-start" {
return client.StartWorkflowOptions{}, nexus.HandlerErrorf(nexus.HandlerErrorTypeInternal, "fake internal error")
}
return client.StartWorkflowOptions{
ID: soo.RequestID,
}, nil
})
handlerWfID = ""
if action == "fail-to-start" {
return client.StartWorkflowOptions{}, nexus.HandlerErrorf(nexus.HandlerErrorTypeInternal, "fake internal error")
}
handlerWfID = soo.RequestID
return client.StartWorkflowOptions{
ID: soo.RequestID,
}, nil
},
)
callerWorkflow := func(ctx workflow.Context, action string) error {
c := workflow.NewNexusClient(tc.endpoint, "test")
ctx, cancel := workflow.WithCancel(ctx)
Expand Down Expand Up @@ -517,6 +555,72 @@ func TestAsyncOperationFromWorkflow(t *testing.T) {
}, callerWorkflow, "succeed")
require.NoError(t, err)
require.NoError(t, run.Get(ctx, nil))

// Check the link is added in the caller workflow.
iter := tc.client.GetWorkflowHistory(
ctx,
run.GetID(),
run.GetRunID(),
false,
enums.HISTORY_EVENT_FILTER_TYPE_ALL_EVENT,
)
var targetEvent *historypb.HistoryEvent
for iter.HasNext() {
event, err := iter.Next()
require.NoError(t, err)
if event.GetEventType() == enums.EVENT_TYPE_NEXUS_OPERATION_STARTED {
targetEvent = event
break
}
}
require.NotNil(t, targetEvent)
require.Len(t, targetEvent.GetLinks(), 1)
link := targetEvent.GetLinks()[0]
require.Equal(t, tc.testConfig.Namespace, link.GetWorkflowEvent().GetNamespace())
require.Equal(t, handlerWfID, link.GetWorkflowEvent().GetWorkflowId())
require.NotEmpty(t, link.GetWorkflowEvent().GetRunId())
requireProtoEqual(
t,
&common.Link_WorkflowEvent_EventReference{
EventType: enums.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED,
},
link.GetWorkflowEvent().GetEventRef())
handlerRunID := link.GetWorkflowEvent().GetRunId()
Comment on lines +579 to +588
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: you could have just used requireProtoEqual on the entire struct.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I cannot because I don't have the handler run id.


// Check the link is added in the handler workflow.
iter = tc.client.GetWorkflowHistory(
ctx,
handlerWfID,
handlerRunID,
false,
enums.HISTORY_EVENT_FILTER_TYPE_ALL_EVENT,
)
targetEvent = nil
for iter.HasNext() {
event, err := iter.Next()
require.NoError(t, err)
if event.GetEventType() == enums.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED {
targetEvent = event
break
}
}
require.NotNil(t, targetEvent)
require.Len(t, targetEvent.GetLinks(), 1)
requireProtoEqual(
t,
&common.Link_WorkflowEvent{
Namespace: tc.testConfig.Namespace,
WorkflowId: run.GetID(),
RunId: run.GetRunID(),
Reference: &common.Link_WorkflowEvent_EventRef{
EventRef: &common.Link_WorkflowEvent_EventReference{
EventId: 5,
EventType: enums.EVENT_TYPE_NEXUS_OPERATION_SCHEDULED,
},
},
},
targetEvent.GetLinks()[0].GetWorkflowEvent(),
)
})

t.Run("OpFailed", func(t *testing.T) {
Expand Down
Loading