From a78bfbf9871742bfeeccb2c47cd99b052ce9e1cf Mon Sep 17 00:00:00 2001 From: rodrigozhou Date: Thu, 29 Aug 2024 12:56:59 -0500 Subject: [PATCH] Add Nexus links tests --- test/go.mod | 1 + test/nexus_test.go | 132 ++++++++++++++++++++++++++++++++++++++++----- 2 files changed, 119 insertions(+), 14 deletions(-) diff --git a/test/go.mod b/test/go.mod index 6f8d3690c..336a91d7f 100644 --- a/test/go.mod +++ b/test/go.mod @@ -6,6 +6,7 @@ toolchain go1.21.1 require ( github.com/golang/mock v1.6.0 + github.com/google/go-cmp v0.6.0 github.com/google/uuid v1.6.0 github.com/nexus-rpc/sdk-go v0.0.10 github.com/opentracing/opentracing-go v1.2.0 diff --git a/test/nexus_test.go b/test/nexus_test.go index 13bb9659e..35822f557 100644 --- a/test/nexus_test.go +++ b/test/nexus_test.go @@ -29,10 +29,10 @@ import ( "net/http" "os" "slices" - "strings" "testing" "time" + "github.com/google/go-cmp/cmp" "github.com/google/uuid" "github.com/nexus-rpc/sdk-go/nexus" "github.com/stretchr/testify/assert" @@ -42,6 +42,8 @@ import ( historypb "go.temporal.io/api/history/v1" nexuspb "go.temporal.io/api/nexus/v1" "go.temporal.io/api/operatorservice/v1" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/testing/protocmp" "go.temporal.io/sdk/client" "go.temporal.io/sdk/interceptor" @@ -76,7 +78,7 @@ func newTestContext(t *testing.T, ctx context.Context) *testContext { require.NoError(t, err) taskQueue := "sdk-go-nexus-test-tq-" + uuid.NewString() - endpoint := strings.ReplaceAll("sdk-go-nexus-test-ep-"+uuid.NewString(), "-", "_") + endpoint := "sdk-go-nexus-test-ep-" + uuid.NewString() res, err := c.OperatorService().CreateNexusEndpoint(ctx, &operatorservice.CreateNexusEndpointRequest{ Spec: &nexuspb.EndpointSpec{ Name: endpoint, @@ -155,6 +157,12 @@ func (tc *testContext) requireCounter(t *assert.CollectT, metric, service, opera })) } +func requireProtoEqual(t *testing.T, want proto.Message, got proto.Message) { + if diff := cmp.Diff(want, got, protocmp.Transform()); diff != "" { + require.Fail(t, "Proto mismatch (-want +got):\n", diff) + } +} + var syncOp = temporalnexus.NewSyncOperation("sync-op", func(ctx context.Context, c client.Client, s string, o nexus.StartOperationOptions) (string, error) { switch s { case "ok": @@ -290,10 +298,22 @@ func TestNexusWorkflowRunOperation(t *testing.T) { nc := tc.newNexusClient(t, service.Name) + link := &common.Link_WorkflowEvent{ + Namespace: tc.testConfig.Namespace, + WorkflowId: "caller-wf-id", + RunId: "caller-run-id", + Reference: &common.Link_WorkflowEvent_EventRef{ + EventRef: &common.Link_WorkflowEvent_EventReference{ + EventType: enums.EVENT_TYPE_NEXUS_OPERATION_SCHEDULED, + }, + }, + } + workflowID := "nexus-handler-workflow-" + uuid.NewString() result, err := nexus.StartOperation(ctx, nc, workflowOp, workflowID, nexus.StartOperationOptions{ CallbackURL: "http://localhost/test", CallbackHeader: nexus.Header{"test": "ok"}, + Links: []nexus.Link{temporalnexus.ConvertLinkWorkflowEventToNexusLink(link)}, }) require.NoError(t, err) require.NotNil(t, result.Pending) @@ -308,6 +328,17 @@ func TestNexusWorkflowRunOperation(t *testing.T) { require.Equal(t, "http://localhost/test", callback.Nexus.Url) require.Equal(t, map[string]string{"test": "ok"}, callback.Nexus.Header) + iter := tc.client.GetWorkflowHistory(ctx, workflowID, "", false, enums.HISTORY_EVENT_FILTER_TYPE_ALL_EVENT) + for iter.HasNext() { + event, err := iter.Next() + require.NoError(t, err) + if event.GetEventType() == enums.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED { + require.Len(t, event.GetLinks(), 1) + requireProtoEqual(t, link, event.GetLinks()[0].GetWorkflowEvent()) + break + } + } + run := tc.client.GetWorkflow(ctx, workflowID, "") require.NoError(t, handle.Cancel(ctx, nexus.CancelOperationOptions{})) require.ErrorContains(t, run.Get(ctx, nil), "canceled") @@ -451,19 +482,26 @@ func TestAsyncOperationFromWorkflow(t *testing.T) { panic(fmt.Errorf("unexpected outcome: %s", action)) } } - op := temporalnexus.NewWorkflowRunOperation("op", handlerWorkflow, func(ctx context.Context, action string, soo nexus.StartOperationOptions) (client.StartWorkflowOptions, error) { - require.NotPanicsf(t, func() { - temporalnexus.GetMetricsHandler(ctx) - temporalnexus.GetLogger(ctx) - }, "Failed to get metrics handler or logger from operation context.") + handlerWfID := "" + op := temporalnexus.NewWorkflowRunOperation( + "op", + handlerWorkflow, + func(ctx context.Context, action string, soo nexus.StartOperationOptions) (client.StartWorkflowOptions, error) { + require.NotPanicsf(t, func() { + temporalnexus.GetMetricsHandler(ctx) + temporalnexus.GetLogger(ctx) + }, "Failed to get metrics handler or logger from operation context.") - if action == "fail-to-start" { - return client.StartWorkflowOptions{}, nexus.HandlerErrorf(nexus.HandlerErrorTypeInternal, "fake internal error") - } - return client.StartWorkflowOptions{ - ID: soo.RequestID, - }, nil - }) + handlerWfID = "" + if action == "fail-to-start" { + return client.StartWorkflowOptions{}, nexus.HandlerErrorf(nexus.HandlerErrorTypeInternal, "fake internal error") + } + handlerWfID = soo.RequestID + return client.StartWorkflowOptions{ + ID: soo.RequestID, + }, nil + }, + ) callerWorkflow := func(ctx workflow.Context, action string) error { c := workflow.NewNexusClient(tc.endpoint, "test") ctx, cancel := workflow.WithCancel(ctx) @@ -517,6 +555,72 @@ func TestAsyncOperationFromWorkflow(t *testing.T) { }, callerWorkflow, "succeed") require.NoError(t, err) require.NoError(t, run.Get(ctx, nil)) + + // Check the link is added in the caller workflow. + iter := tc.client.GetWorkflowHistory( + ctx, + run.GetID(), + run.GetRunID(), + false, + enums.HISTORY_EVENT_FILTER_TYPE_ALL_EVENT, + ) + var targetEvent *historypb.HistoryEvent + for iter.HasNext() { + event, err := iter.Next() + require.NoError(t, err) + if event.GetEventType() == enums.EVENT_TYPE_NEXUS_OPERATION_STARTED { + targetEvent = event + break + } + } + require.NotNil(t, targetEvent) + require.Len(t, targetEvent.GetLinks(), 1) + link := targetEvent.GetLinks()[0] + require.Equal(t, tc.testConfig.Namespace, link.GetWorkflowEvent().GetNamespace()) + require.Equal(t, handlerWfID, link.GetWorkflowEvent().GetWorkflowId()) + require.NotEmpty(t, link.GetWorkflowEvent().GetRunId()) + requireProtoEqual( + t, + &common.Link_WorkflowEvent_EventReference{ + EventType: enums.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED, + }, + link.GetWorkflowEvent().GetEventRef()) + handlerRunID := link.GetWorkflowEvent().GetRunId() + + // Check the link is added in the handler workflow. + iter = tc.client.GetWorkflowHistory( + ctx, + handlerWfID, + handlerRunID, + false, + enums.HISTORY_EVENT_FILTER_TYPE_ALL_EVENT, + ) + targetEvent = nil + for iter.HasNext() { + event, err := iter.Next() + require.NoError(t, err) + if event.GetEventType() == enums.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED { + targetEvent = event + break + } + } + require.NotNil(t, targetEvent) + require.Len(t, targetEvent.GetLinks(), 1) + requireProtoEqual( + t, + &common.Link_WorkflowEvent{ + Namespace: tc.testConfig.Namespace, + WorkflowId: run.GetID(), + RunId: run.GetRunID(), + Reference: &common.Link_WorkflowEvent_EventRef{ + EventRef: &common.Link_WorkflowEvent_EventReference{ + EventId: 5, + EventType: enums.EVENT_TYPE_NEXUS_OPERATION_SCHEDULED, + }, + }, + }, + targetEvent.GetLinks()[0].GetWorkflowEvent(), + ) }) t.Run("OpFailed", func(t *testing.T) {