From 87e4b73d034736577fbc457fb553d8fbce8e35d4 Mon Sep 17 00:00:00 2001 From: Thomas Hardy Date: Tue, 10 Dec 2024 22:55:31 -0800 Subject: [PATCH] Refactored old integration tests to use integration testing helpers. Extended integration testing helpers to allow for configurable test contexts. --- packages/test/src/helpers-integration.ts | 110 +- packages/test/src/helpers.ts | 1 + packages/test/src/integration-tests-new.ts | 1371 ------------- packages/test/src/test-integration-new.ts | 1759 +++++++++++++++++ packages/test/src/test-integration-old.ts | 13 - packages/test/src/test-local-activities.ts | 27 +- .../test/src/workflows/activity-failure.ts | 22 - .../test/src/workflows/args-and-return.ts | 10 - .../test/src/workflows/async-fail-signal.ts | 11 - .../test/src/workflows/cancel-http-request.ts | 33 - .../src/workflows/cancel-timer-immediately.ts | 24 - .../src/workflows/cancel-timer-with-delay.ts | 16 - .../src/workflows/child-workflow-failure.ts | 11 - .../src/workflows/child-workflow-invoke.ts | 18 - .../workflows/child-workflow-start-fail.ts | 28 - .../workflows/child-workflow-termination.ts | 19 - .../src/workflows/child-workflow-timeout.ts | 14 - .../test/src/workflows/deprecate-patch.ts | 6 - packages/test/src/workflows/fail-signal.ts | 10 - packages/test/src/workflows/http.ts | 10 - packages/test/src/workflows/index.ts | 1 + packages/test/src/workflows/patched.ts | 15 - .../test/src/workflows/query-and-condition.ts | 10 - .../src/workflows/return-search-attributes.ts | 12 - .../src/workflows/return-workflow-info.ts | 5 - .../src/workflows/sleep-invalid-duration.ts | 6 - packages/test/src/workflows/success-string.ts | 1 + packages/test/src/workflows/throw-big-int.ts | 3 - packages/test/src/workflows/throw-object.ts | 3 - packages/test/src/workflows/url-whatwg.ts | 9 - 30 files changed, 1816 insertions(+), 1762 deletions(-) delete mode 100644 packages/test/src/integration-tests-new.ts create mode 100644 packages/test/src/test-integration-new.ts delete mode 100644 packages/test/src/test-integration-old.ts delete mode 100644 packages/test/src/workflows/activity-failure.ts delete mode 100644 packages/test/src/workflows/args-and-return.ts delete mode 100644 packages/test/src/workflows/async-fail-signal.ts delete mode 100644 packages/test/src/workflows/cancel-http-request.ts delete mode 100644 packages/test/src/workflows/cancel-timer-immediately.ts delete mode 100644 packages/test/src/workflows/cancel-timer-with-delay.ts delete mode 100644 packages/test/src/workflows/child-workflow-failure.ts delete mode 100644 packages/test/src/workflows/child-workflow-invoke.ts delete mode 100644 packages/test/src/workflows/child-workflow-start-fail.ts delete mode 100644 packages/test/src/workflows/child-workflow-termination.ts delete mode 100644 packages/test/src/workflows/child-workflow-timeout.ts delete mode 100644 packages/test/src/workflows/deprecate-patch.ts delete mode 100644 packages/test/src/workflows/fail-signal.ts delete mode 100644 packages/test/src/workflows/http.ts delete mode 100644 packages/test/src/workflows/patched.ts delete mode 100644 packages/test/src/workflows/query-and-condition.ts delete mode 100644 packages/test/src/workflows/return-search-attributes.ts delete mode 100644 packages/test/src/workflows/return-workflow-info.ts delete mode 100644 packages/test/src/workflows/sleep-invalid-duration.ts delete mode 100644 packages/test/src/workflows/throw-big-int.ts delete mode 100644 packages/test/src/workflows/throw-object.ts delete mode 100644 packages/test/src/workflows/url-whatwg.ts diff --git a/packages/test/src/helpers-integration.ts b/packages/test/src/helpers-integration.ts index ea5dfed45..e34aed998 100644 --- a/packages/test/src/helpers-integration.ts +++ b/packages/test/src/helpers-integration.ts @@ -36,13 +36,8 @@ import { registerDefaultCustomSearchAttributes, } from './helpers'; -export interface Context { - env: T; - workflowBundle: WorkflowBundle; -} - -export interface MultiEnvContext { - envs: T[]; +export interface Context { + env: TestWorkflowEnvironment; workflowBundle: WorkflowBundle; } @@ -58,16 +53,14 @@ const defaultDynamicConfigOptions = [ 'worker.removableBuildIdDurationSinceDefault=1', ]; -function setupRuntime( - recordedLogs?: { [workflowId: string]: LogEntry[] } -) { +function setupRuntime(recordedLogs?: { [workflowId: string]: LogEntry[] }) { const logger = recordedLogs - ? new DefaultLogger('DEBUG', (entry) => { - const workflowId = (entry.meta as any)?.workflowInfo?.workflowId ?? (entry.meta as any)?.workflowId; - recordedLogs![workflowId] ??= []; - recordedLogs![workflowId].push(entry); - }) - : new DefaultLogger((process.env.TEST_LOG_LEVEL || 'DEBUG').toUpperCase() as LogLevel); + ? new DefaultLogger('DEBUG', (entry) => { + const workflowId = (entry.meta as any)?.workflowInfo?.workflowId ?? (entry.meta as any)?.workflowId; + recordedLogs![workflowId] ??= []; + recordedLogs![workflowId].push(entry); + }) + : new DefaultLogger((process.env.TEST_LOG_LEVEL || 'DEBUG').toUpperCase() as LogLevel); Runtime.install({ logger, telemetryOptions: { @@ -80,19 +73,21 @@ function setupRuntime( }); } -export async function createDefaultTestWorkflowBundle( +export async function createTestWorkflowBundle( workflowsPath: string, - workflowInterceptorModules?: string[], + workflowInterceptorModules?: string[] ): Promise { return await bundleWorkflowCode({ ...bundlerOptions, workflowInterceptorModules: [...defaultWorkflowInterceptorModules, ...(workflowInterceptorModules ?? [])], - workflowsPath: workflowsPath, + workflowsPath, logger: new DefaultLogger('WARN'), }); } -export async function createDefaultTestEnvironment(opts?: LocalTestWorkflowEnvironmentOptions): Promise { +export async function createTestEnvironment( + opts?: LocalTestWorkflowEnvironmentOptions +): Promise { return await TestWorkflowEnvironment.createLocal({ ...(opts || {}), // Use provided options or default to an empty object server: { @@ -105,36 +100,18 @@ export async function createDefaultTestEnvironment(opts?: LocalTestWorkflowEnvir }); } -export function makeMultiEnvironmentTest( - opts: { - workflowsPath: string; - multiWorkflowEnvironmentOpts: LocalTestWorkflowEnvironmentOptions[]; - workflowInterceptorModules?: string[]; - recordedLogs?: { [workflowId: string]: LogEntry[] }; - configureEnv: (opt: LocalTestWorkflowEnvironmentOptions) => T, - } -): TestFn> { - if (opts.multiWorkflowEnvironmentOpts.length < 1) { - throw new Error("must have at least 1 workflow environment options"); - } - - let test = anyTest as TestFn>; +export function makeConfigurableEnvironmentTest(opts: { + recordedLogs?: { [workflowId: string]: LogEntry[] }; + createTestContext: (t: ExecutionContext) => Promise; + teardown: (t: T) => Promise; +}): TestFn { + const test = anyTest as TestFn; test.before(async (t) => { setupRuntime(opts.recordedLogs); - t.context = { - workflowBundle: await createDefaultTestWorkflowBundle(opts.workflowsPath, opts.workflowInterceptorModules), - envs: opts.multiWorkflowEnvironmentOpts.map(workflowEnvironmentOptions => opts.configureEnv(workflowEnvironmentOptions)), - } - for (const env of t.context.envs) { - // TODO(thomas): do environments create separate connections? - await registerDefaultCustomSearchAttributes(env.connection); - } + t.context = await opts.createTestContext(t); }); test.after.always(async (t) => { - for (const env of t.context.envs) { - // TODO(thomas): reused state across test envs will cause this to break I think - await env.teardown(); - } + await opts.teardown(t.context); }); return test; } @@ -144,20 +121,21 @@ export function makeTestFunction(opts: { workflowEnvironmentOpts?: LocalTestWorkflowEnvironmentOptions; workflowInterceptorModules?: string[]; recordedLogs?: { [workflowId: string]: LogEntry[] }; -}): TestFn> { - let test = anyTest as TestFn>; - test.before(async (t) => { - setupRuntime(opts.recordedLogs); - t.context = { - workflowBundle: await createDefaultTestWorkflowBundle(opts.workflowsPath, opts.workflowInterceptorModules), - env: await createDefaultTestEnvironment(opts.workflowEnvironmentOpts), - } - await registerDefaultCustomSearchAttributes(t.context.env.connection); - }); - test.after.always(async (t) => { - await t.context.env.teardown(); +}): TestFn { + return makeConfigurableEnvironmentTest({ + recordedLogs: opts.recordedLogs, + createTestContext: async (_t: ExecutionContext): Promise => { + const env = await createTestEnvironment(opts.workflowEnvironmentOpts); + await registerDefaultCustomSearchAttributes(env.connection); + return { + workflowBundle: await createTestWorkflowBundle(opts.workflowsPath, opts.workflowInterceptorModules), + env, + }; + }, + teardown: async (c: Context) => { + await c.env.teardown(); + }, }); - return test; } export interface Helpers { @@ -179,7 +157,11 @@ export interface Helpers { updateHasBeenAdmitted(handle: WorkflowHandle, updateId: string): Promise; } -export function helpers(t: ExecutionContext, testEnv: TestWorkflowEnvironment = t.context.env): Helpers { +export function configurableHelpers( + t: ExecutionContext, + workflowBundle: WorkflowBundle, + testEnv: TestWorkflowEnvironment +): Helpers { const taskQueue = t.title.replace(/ /g, '_'); return { @@ -187,7 +169,7 @@ export function helpers(t: ExecutionContext, testEnv: TestWorkflowEnvir async createWorker(opts?: Partial): Promise { return await Worker.create({ connection: testEnv.nativeConnection, - workflowBundle: t.context.workflowBundle, + workflowBundle, taskQueue, interceptors: { activity: [() => ({ inbound: new ConnectionInjectorInterceptor(testEnv.connection) })], @@ -202,7 +184,7 @@ export function helpers(t: ExecutionContext, testEnv: TestWorkflowEnvir ): Promise { await Worker.runReplayHistory( { - workflowBundle: t.context.workflowBundle, + workflowBundle, ...opts, }, history @@ -273,3 +255,7 @@ export function helpers(t: ExecutionContext, testEnv: TestWorkflowEnvir }, }; } + +export function helpers(t: ExecutionContext, testEnv: TestWorkflowEnvironment = t.context.env): Helpers { + return configurableHelpers(t, t.context.workflowBundle, testEnv); +} diff --git a/packages/test/src/helpers.ts b/packages/test/src/helpers.ts index f22507ce3..1a0b0ae66 100644 --- a/packages/test/src/helpers.ts +++ b/packages/test/src/helpers.ts @@ -131,6 +131,7 @@ export const bundlerOptions = { 'uuid', 'net', 'fs/promises', + require.resolve('./activities'), ], }; diff --git a/packages/test/src/integration-tests-new.ts b/packages/test/src/integration-tests-new.ts deleted file mode 100644 index 4c025264c..000000000 --- a/packages/test/src/integration-tests-new.ts +++ /dev/null @@ -1,1371 +0,0 @@ -/* eslint @typescript-eslint/no-non-null-assertion: 0 */ - -/** - * This file has been given the suffix -old because it uses an older style of - * integration testing. New code should follow the style of integration tests in - * the files without this suffix. - */ -import asyncRetry from 'async-retry'; -import { v4 as uuid4 } from 'uuid'; -import * as iface from '@temporalio/proto'; -import { cleanOptionalStackTrace, u8, Worker } from './helpers'; -import * as activities from './activities'; -import * as workflows from './workflows'; -import { Context, helpers, makeTestFunction } from './helpers-integration'; -import { withZeroesHTTPServer } from './zeroes-http-server'; -import { ActivityFailure, ApplicationFailure, ChildWorkflowFailure, QueryNotRegisteredError, WorkflowContinuedAsNewError, WorkflowFailedError } from '@temporalio/client'; -import dedent from 'dedent'; -import { defaultFailureConverter, defaultPayloadConverter, Payload, PayloadCodec, RetryState, searchAttributePayloadConverter, TerminatedFailure, TimeoutFailure, TimeoutType, WorkflowExecution, WorkflowExecutionAlreadyStartedError, WorkflowNotFoundError } from '@temporalio/common'; -import { msToNumber, tsToMs } from '@temporalio/common/lib/time'; -import pkg from '@temporalio/worker/lib/pkg'; -import { UnsafeWorkflowInfo } from '@temporalio/workflow/lib/interfaces'; -import { ExecutionContext, Implementation } from 'ava'; -import { decode, decodeFromPayloadsAtIndex } from '@temporalio/common/lib/internal-non-workflow'; -import { WorkerOptions } from '../../worker/lib/worker-options'; -import { ConnectionInjectorInterceptor } from './activities/interceptors'; - -const { EVENT_TYPE_TIMER_STARTED, EVENT_TYPE_TIMER_FIRED, EVENT_TYPE_TIMER_CANCELED } = - iface.temporal.api.enums.v1.EventType; - -const timerEventTypes = new Set([EVENT_TYPE_TIMER_STARTED, EVENT_TYPE_TIMER_FIRED, EVENT_TYPE_TIMER_CANCELED]); -const CHANGE_MARKER_NAME = 'core_patch'; - -export function runIntegrationTests(codec?: PayloadCodec): void { - const dataConverter = { payloadCodecs: codec ? [codec] : [] }; - const loadedDataConverter = { - payloadConverter: defaultPayloadConverter, - payloadCodecs: codec ? [codec] : [], - failureConverter: defaultFailureConverter, - }; - - async function fromPayload(payload: Payload) { - const [decodedPayload] = await decode(dataConverter.payloadCodecs, [payload]); - return defaultPayloadConverter.fromPayload(decodedPayload); - } - - const _test = makeTestFunction({ - workflowsPath: require.resolve('./workflows'), - workflowEnvironmentOpts: { - client: { - dataConverter, - } - } - }); - - const createWorkerWithDefaults = function(t: ExecutionContext, opts?: Partial): Promise { - return helpers(t).createWorker({ - dataConverter, - interceptors: { - activity: [() => ({ inbound: new ConnectionInjectorInterceptor(t.context.env.connection, loadedDataConverter) })], - }, - ...opts - }); - } - - const test = (name: string, fn: Implementation<[], Context>) => _test(codec ? 'With codecβ€”' + name : name, fn); - - test('Workflow not found results in task retry', async (t) => { - const { taskQueue } = helpers(t); - const worker = await createWorkerWithDefaults(t); - const client = t.context.env.client; - const handle = await client.workflow.start('not-found', { - taskQueue, - workflowId: uuid4(), - }); - - await worker.runUntil(async () => { - await asyncRetry( - async () => { - const history = await handle.fetchHistory(); - if ( - !history?.events?.some( - ({ workflowTaskFailedEventAttributes }) => - workflowTaskFailedEventAttributes?.failure?.message === - "Failed to initialize workflow of type 'not-found': no such function is exported by the workflow bundle" - ) - ) { - throw new Error('Cannot find workflow task failed event'); - } - }, - { - retries: 60, - maxTimeout: 1000, - } - ); - }); - - t.pass(); - }); - - test('args-and-return', async (t) => { - const { executeWorkflow } = helpers(t); - const worker = await createWorkerWithDefaults(t); - const res = await worker.runUntil(executeWorkflow(workflows.argsAndReturn, { - args: ['Hello', undefined, u8('world!')], - })); - t.is(res, 'Hello, world!'); - }); - - test('url-whatwg', async (t) => { - const { executeWorkflow } = helpers(t); - const worker = await createWorkerWithDefaults(t); - const res = await worker.runUntil( executeWorkflow(workflows.urlEcho, { - args: ['http://foo.com'], - })); - t.is(res, 'http://foo.com/?counter=1'); - }); - - test('cancel-fake-progress', async (t) => { - const { executeWorkflow } = helpers(t); - const worker = await createWorkerWithDefaults(t, { activities }); - await worker.runUntil(executeWorkflow(workflows.cancelFakeProgress)); - t.pass(); - }); - - test('cancel-http-request', async (t) => { - const { executeWorkflow } = helpers(t); - const worker = await createWorkerWithDefaults(t, { activities }); - await withZeroesHTTPServer(async (port) => { - const url = `http://127.0.0.1:${port}`; - await worker.runUntil(executeWorkflow(workflows.cancellableHTTPRequest, { - args: [url], - })); - }); - t.pass(); - }); - - test('activity-failure with Error', async (t) => { - const { executeWorkflow } = helpers(t); - const worker = await createWorkerWithDefaults(t, { activities }); - const err: WorkflowFailedError | undefined = await t.throwsAsync( - worker.runUntil(executeWorkflow(workflows.activityFailure, { - args: [{ useApplicationFailure: false }], - })), - { - instanceOf: WorkflowFailedError, - } - ); - t.is(err?.message, 'Workflow execution failed'); - if (!(err?.cause instanceof ActivityFailure)) { - t.fail('Expected err.cause to be an instance of ActivityFailure'); - return; - } - if (!(err.cause.cause instanceof ApplicationFailure)) { - t.fail('Expected err.cause.cause to be an instance of ApplicationFailure'); - return; - } - t.is(err.cause.cause.message, 'Fail me'); - t.is( - cleanOptionalStackTrace(err.cause.cause.stack), - dedent` - Error: Fail me - at throwAnError (test/src/activities/index.ts) - at ConnectionInjectorInterceptor.execute (test/src/activities/interceptors.ts) - ` - ); - }); - - test('activity-failure with ApplicationFailure', async (t) => { - const { executeWorkflow } = helpers(t); - const worker = await createWorkerWithDefaults(t, { activities }); - const err: WorkflowFailedError | undefined = await t.throwsAsync( - worker.runUntil(executeWorkflow(workflows.activityFailure, { - args: [{ useApplicationFailure: true }], - })), - { - instanceOf: WorkflowFailedError, - } - ); - t.is(err?.message, 'Workflow execution failed'); - if (!(err?.cause instanceof ActivityFailure)) { - t.fail('Expected err.cause to be an instance of ActivityFailure'); - return; - } - if (!(err.cause.cause instanceof ApplicationFailure)) { - t.fail('Expected err.cause.cause to be an instance of ApplicationFailure'); - return; - } - t.is(err.cause.cause.message, 'Fail me'); - t.is(err.cause.cause.type, 'Error'); - t.deepEqual(err.cause.cause.details, ['details', 123, false]); - t.is( - cleanOptionalStackTrace(err.cause.cause.stack), - dedent` - ApplicationFailure: Fail me - at Function.nonRetryable (common/src/failure.ts) - at throwAnError (test/src/activities/index.ts) - at ConnectionInjectorInterceptor.execute (test/src/activities/interceptors.ts) - ` - ); - }); - - test('child-workflow-invoke', async (t) => { - const { startWorkflow } = helpers(t); - const worker = await createWorkerWithDefaults(t); - const handle = await startWorkflow(workflows.childWorkflowInvoke); - const { workflowId, runId, execResult, result } = await worker.runUntil(handle.result()); - t.is(execResult, 'success'); - t.is(result, 'success'); - const client = t.context.env.client; - const child = client.workflow.getHandle(workflowId, runId); - t.is(await child.result(), 'success'); - }); - - test('child-workflow-failure', async (t) => { - const { executeWorkflow } = helpers(t); - const worker = await createWorkerWithDefaults(t); - await worker.runUntil(async () => { - const err: WorkflowFailedError | undefined = await t.throwsAsync( - executeWorkflow(workflows.childWorkflowFailure), - { - instanceOf: WorkflowFailedError, - } - ); - - if (!(err?.cause instanceof ChildWorkflowFailure)) { - return t.fail('Expected err.cause to be an instance of ChildWorkflowFailure'); - } - if (!(err.cause.cause instanceof ApplicationFailure)) { - return t.fail('Expected err.cause.cause to be an instance of ApplicationFailure'); - } - t.is(err.cause.cause.message, 'failure'); - t.is( - cleanOptionalStackTrace(err.cause.cause.stack), - dedent` - ApplicationFailure: failure - at Function.nonRetryable (common/src/failure.ts) - at throwAsync (test/src/workflows/throw-async.ts) - ` - ); - }); - }); - - test('child-workflow-termination', async (t) => { - const { startWorkflow } = helpers(t); - const worker = await createWorkerWithDefaults(t); - const handle = await startWorkflow(workflows.childWorkflowTermination); - const client = t.context.env.client; - - let childExecution: WorkflowExecution | undefined = undefined; - - await worker.runUntil(async () => { - const err: WorkflowFailedError | undefined = await t.throwsAsync(async () => { - while (childExecution === undefined) { - childExecution = await handle.query(workflows.childExecutionQuery); - } - const child = client.workflow.getHandle(childExecution.workflowId!, childExecution.runId!); - await child.terminate(); - await handle.result(); - },{ - instanceOf: WorkflowFailedError, - } - ); - - if (!(err?.cause instanceof ChildWorkflowFailure)) { - return t.fail('Expected err.cause to be an instance of ChildWorkflowFailure'); - } - t.is(err.cause.retryState, RetryState.NON_RETRYABLE_FAILURE); - if (!(err.cause.cause instanceof TerminatedFailure)) { - return t.fail('Expected err.cause.cause to be an instance of TerminatedFailure'); - } - }) - }); - - test('child-workflow-timeout', async (t) => { - const { executeWorkflow } = helpers(t); - const worker = await createWorkerWithDefaults(t); - const err: WorkflowFailedError | undefined = await t.throwsAsync( - worker.runUntil(executeWorkflow(workflows.childWorkflowTimeout)), - { - instanceOf: WorkflowFailedError, - } - ); - - if (!(err?.cause instanceof ChildWorkflowFailure)) { - return t.fail('Expected err.cause to be an instance of ChildWorkflowFailure'); - } - t.is(err.cause.retryState, RetryState.TIMEOUT); - if (!(err.cause.cause instanceof TimeoutFailure)) { - return t.fail('Expected err.cause.cause to be an instance of TimeoutFailure'); - } - t.is(err.cause.cause.timeoutType, TimeoutType.START_TO_CLOSE); - }); - - test('child-workflow-start-fail', async (t) => { - const { executeWorkflow } = helpers(t); - const worker = await createWorkerWithDefaults(t); - await worker.runUntil(executeWorkflow(workflows.childWorkflowStartFail)); - // Assertions in workflow code - t.pass(); - }); - - test('child-workflow-cancel', async (t) => { - const { executeWorkflow } = helpers(t); - const worker = await createWorkerWithDefaults(t); - await worker.runUntil(executeWorkflow(workflows.childWorkflowCancel)); - // Assertions in workflow code - t.pass(); - }); - - test('child-workflow-signals', async (t) => { - const { executeWorkflow } = helpers(t); - const worker = await createWorkerWithDefaults(t); - await worker.runUntil(executeWorkflow(workflows.childWorkflowSignals)); - // Assertions in workflow code - t.pass(); - }); - - test('query not found', async (t) => { - const { startWorkflow } = helpers(t); - const worker = await createWorkerWithDefaults(t); - const handle = await startWorkflow(workflows.unblockOrCancel); - await worker.runUntil(async () => { - await handle.signal(workflows.unblockSignal); - await handle.result(); - await t.throwsAsync(handle.query('not found'), { - instanceOf: QueryNotRegisteredError, - message: - 'Workflow did not register a handler for not found. Registered queries: [__stack_trace __enhanced_stack_trace __temporal_workflow_metadata isBlocked]', - }); - }) - }); - - test('query and unblock', async (t) => { - const { startWorkflow } = helpers(t); - const worker = await createWorkerWithDefaults(t); - const handle = await startWorkflow(workflows.unblockOrCancel); - await worker.runUntil(async () => { - t.true(await handle.query(workflows.isBlockedQuery)); - await handle.signal(workflows.unblockSignal); - await handle.result(); - t.false(await handle.query(workflows.isBlockedQuery)); - }) - }); - - test('interrupt-signal', async (t) => { - const { startWorkflow } = helpers(t); - const worker = await createWorkerWithDefaults(t); - const handle = await startWorkflow(workflows.interruptableWorkflow); - await worker.runUntil(async () => { - await handle.signal(workflows.interruptSignal, 'just because'); - const err: WorkflowFailedError | undefined = await t.throwsAsync(handle.result(), { - instanceOf: WorkflowFailedError, - }); - if (!(err?.cause instanceof ApplicationFailure)) { - return t.fail('Expected err.cause to be an instance of ApplicationFailure'); - } - t.is(err.cause.message, 'just because'); - }); - }); - - test('fail-signal', async (t) => { - const { startWorkflow } = helpers(t); - const worker = await createWorkerWithDefaults(t); - const handle = await startWorkflow(workflows.failSignalWorkflow); - await worker.runUntil(async () => { - await handle.signal(workflows.failSignal); - const err: WorkflowFailedError | undefined = await t.throwsAsync(handle.result(), { - instanceOf: WorkflowFailedError, - }); - if (!(err?.cause instanceof ApplicationFailure)) { - return t.fail('Expected err.cause to be an instance of ApplicationFailure'); - } - t.is(err.cause.message, 'Signal failed'); - }); - }); - - test('async-fail-signal', async (t) => { - const { startWorkflow } = helpers(t); - const worker = await createWorkerWithDefaults(t); - const handle = await startWorkflow(workflows.asyncFailSignalWorkflow); - await handle.signal(workflows.failSignal); - await worker.runUntil(async () => { - const err: WorkflowFailedError | undefined = await t.throwsAsync(handle.result(), { - instanceOf: WorkflowFailedError, - }); - if (!(err?.cause instanceof ApplicationFailure)) { - return t.fail('Expected err.cause to be an instance of ApplicationFailure'); - } - t.is(err.cause.message, 'Signal failed'); - }); - }); - - test('http', async (t) => { - const { executeWorkflow } = helpers(t); - const worker = await createWorkerWithDefaults(t, { activities }); - const res = await worker.runUntil(executeWorkflow(workflows.http)); - t.deepEqual(res, await activities.httpGet('https://temporal.io')); - }); - - test('sleep', async (t) => { - const { startWorkflow } = helpers(t); - const worker = await createWorkerWithDefaults(t); - const handle = await startWorkflow(workflows.sleeper); - const res = await worker.runUntil(handle.result()); - t.is(res, undefined); - const history = await handle.fetchHistory(); - const timerEvents = history.events!.filter(({ eventType }) => timerEventTypes.has(eventType!)); - t.is(timerEvents.length, 2); - t.is(timerEvents[0].timerStartedEventAttributes!.timerId, '1'); - t.is(tsToMs(timerEvents[0].timerStartedEventAttributes!.startToFireTimeout), 100); - t.is(timerEvents[1].timerFiredEventAttributes!.timerId, '1'); - }); - - test('cancel-timer-immediately', async (t) => { - const { startWorkflow } = helpers(t); - const worker = await createWorkerWithDefaults(t); - const handle = await startWorkflow(workflows.cancelTimer); - const res = await worker.runUntil(handle.result()); - t.is(res, undefined); - const history = await handle.fetchHistory(); - const timerEvents = history.events!.filter(({ eventType }) => timerEventTypes.has(eventType!)); - // Timer is cancelled before it is scheduled - t.is(timerEvents.length, 0); - }); - - test('cancel-timer-with-delay', async (t) => { - const { startWorkflow } = helpers(t); - const worker = await createWorkerWithDefaults(t); - const handle = await startWorkflow(workflows.cancelTimerWithDelay); - const res = await worker.runUntil(handle.result()); - t.is(res, undefined); - const history = await handle.fetchHistory(); - const timerEvents = history.events!.filter(({ eventType }) => timerEventTypes.has(eventType!)); - t.is(timerEvents.length, 4); - t.is(timerEvents[0].timerStartedEventAttributes!.timerId, '1'); - t.is(tsToMs(timerEvents[0].timerStartedEventAttributes!.startToFireTimeout), 10000); - t.is(timerEvents[1].timerStartedEventAttributes!.timerId, '2'); - t.is(tsToMs(timerEvents[1].timerStartedEventAttributes!.startToFireTimeout), 1); - t.is(timerEvents[2].timerFiredEventAttributes!.timerId, '2'); - t.is(timerEvents[3].timerCanceledEventAttributes!.timerId, '1'); - }); - - test('patched', async (t) => { - const { startWorkflow } = helpers(t); - const worker = await createWorkerWithDefaults(t); - const handle = await startWorkflow(workflows.patchedWorkflow); - const res = await worker.runUntil(handle.result()); - t.is(res, undefined); - const history = await handle.fetchHistory(); - const hasChangeEvents = history.events!.filter( - ({ eventType }) => eventType === iface.temporal.api.enums.v1.EventType.EVENT_TYPE_MARKER_RECORDED - ); - // There will only be one marker despite there being 2 hasChange calls because they have the - // same ID and core will only record one marker per id. - t.is(hasChangeEvents.length, 1); - t.is(hasChangeEvents[0].markerRecordedEventAttributes!.markerName, CHANGE_MARKER_NAME); - }); - - test('deprecate-patch', async (t) => { - const { startWorkflow } = helpers(t); - const worker = await createWorkerWithDefaults(t); - const handle = await startWorkflow(workflows.deprecatePatchWorkflow); - const res = await worker.runUntil(handle.result()); - t.is(res, undefined); - const history = await handle.fetchHistory(); - const hasChangeEvents = history.events!.filter( - ({ eventType }) => eventType === iface.temporal.api.enums.v1.EventType.EVENT_TYPE_MARKER_RECORDED - ); - t.is(hasChangeEvents.length, 1); - t.is(hasChangeEvents[0].markerRecordedEventAttributes!.markerName, CHANGE_MARKER_NAME); - }); - - test('Worker default ServerOptions are generated correctly', async (t) => { - const { startWorkflow } = helpers(t); - const worker = await createWorkerWithDefaults(t); - const handle = await startWorkflow(workflows.argsAndReturn, { - args: ['hey', undefined, Buffer.from('abc')], - }); - await worker.runUntil(handle.result()); - const history = await handle.fetchHistory(); - const events = history.events!.filter( - ({ eventType }) => eventType === iface.temporal.api.enums.v1.EventType.EVENT_TYPE_WORKFLOW_TASK_COMPLETED - ); - t.is(events.length, 1); - const [event] = events; - t.regex(event.workflowTaskCompletedEventAttributes!.identity!, /\d+@.+/); - let binid = event.workflowTaskCompletedEventAttributes!.binaryChecksum!; - if (binid === '') { - binid = event.workflowTaskCompletedEventAttributes!.workerVersion!.buildId!; - } - t.regex(binid, /@temporalio\/worker@\d+\.\d+\.\d+/); - }); - - test('WorkflowHandle.describe result is wrapped', async (t) => { - const date = new Date(); - const { startWorkflow } = helpers(t); - const worker = await createWorkerWithDefaults(t); - const handle = await startWorkflow(workflows.argsAndReturn, { - args: ['hey', undefined, Buffer.from('abc')], - searchAttributes: { - CustomKeywordField: ['test-value'], - CustomIntField: [1], - CustomDatetimeField: [date], - }, - memo: { - note: 'foo', - }, - }); - await worker.runUntil(handle.result()); - const execution = await handle.describe(); - t.deepEqual(execution.type, 'argsAndReturn'); - t.deepEqual(execution.memo, { note: 'foo' }); - t.true(execution.startTime instanceof Date); - t.deepEqual(execution.searchAttributes!.CustomKeywordField, ['test-value']); - t.deepEqual(execution.searchAttributes!.CustomIntField, [1]); - t.deepEqual(execution.searchAttributes!.CustomDatetimeField, [date]); - const binSum = execution.searchAttributes!.BinaryChecksums as string[]; - if (binSum != null) { - t.regex(binSum[0], /@temporalio\/worker@/); - } else { - t.deepEqual(execution.searchAttributes!.BuildIds, [ - 'unversioned', - `unversioned:${worker.options.buildId}`, - ]); - } - }); - - test('Workflow can read Search Attributes set at start', async (t) => { - const date = new Date(); - const { startWorkflow } = helpers(t); - const worker = await createWorkerWithDefaults(t); - const handle = await startWorkflow(workflows.returnSearchAttributes, { - searchAttributes: { - CustomKeywordField: ['test-value'], - CustomIntField: [1], - CustomDatetimeField: [date], - }, - }); - const res = await worker.runUntil(handle.result()); - t.deepEqual(res, { - CustomKeywordField: ['test-value'], - CustomIntField: [1], - CustomDatetimeField: [date.toISOString()], - datetimeInstanceofWorks: [true], - arrayInstanceofWorks: [true], - datetimeType: ['Date'], - }); - }); - - test('Workflow can upsert Search Attributes', async (t) => { - const date = new Date(); - const { startWorkflow } = helpers(t); - const worker = await createWorkerWithDefaults(t); - const handle = await startWorkflow(workflows.upsertAndReadSearchAttributes, { - args: [date.getTime()], - }); - const res = await worker.runUntil(handle.result()); - t.deepEqual(res, { - CustomBoolField: [true], - CustomIntField: [], // clear - CustomKeywordField: ['durable code'], - CustomTextField: ['is useful'], - CustomDatetimeField: [date.toISOString()], - CustomDoubleField: [3.14], - }); - const { searchAttributes } = await handle.describe(); - const { BinaryChecksums, BuildIds, ...rest } = searchAttributes; - t.deepEqual(rest, { - CustomBoolField: [true], - CustomKeywordField: ['durable code'], - CustomTextField: ['is useful'], - CustomDatetimeField: [date], - CustomDoubleField: [3.14], - }); - let checksum: any; - if (BinaryChecksums != null) { - t.true(BinaryChecksums.length === 1); - checksum = BinaryChecksums[0]; - } else { - t.true(BuildIds!.length === 2); - t.deepEqual(BuildIds![0], 'unversioned'); - checksum = BuildIds![1]; - } - t.true( - typeof checksum === 'string' && - checksum.includes(`@temporalio/worker@${pkg.version}+`) && - /\+[a-f0-9]{64}$/.test(checksum) // bundle checksum - ); - }); - - test('Workflow can read WorkflowInfo', async (t) => { - const { startWorkflow, taskQueue } = helpers(t); - const worker = await createWorkerWithDefaults(t); - const handle = await startWorkflow(workflows.returnWorkflowInfo, { - memo: { - nested: { object: true }, - }, - }); - const res = await worker.runUntil(handle.result()); - t.assert(res.historySize > 100); - t.deepEqual(res, { - memo: { - nested: { object: true }, - }, - attempt: 1, - firstExecutionRunId: handle.firstExecutionRunId, - namespace: 'default', - taskTimeoutMs: 10_000, - runId: handle.firstExecutionRunId, - taskQueue, - searchAttributes: {}, - workflowType: 'returnWorkflowInfo', - workflowId: handle.workflowId, - historyLength: 3, - continueAsNewSuggested: false, - // values ignored for the purpose of comparison - historySize: res.historySize, - startTime: res.startTime, - runStartTime: res.runStartTime, - currentBuildId: res.currentBuildId, - // unsafe.now is a function, so doesn't make it through serialization, but .now is required, so we need to cast - unsafe: { isReplaying: false } as UnsafeWorkflowInfo, - }); - }); - - test('WorkflowOptions are passed correctly with defaults', async (t) => { - const { startWorkflow, taskQueue } = helpers(t); - const worker = await createWorkerWithDefaults(t); - const handle = await startWorkflow(workflows.argsAndReturn, { - args: ['hey', undefined, Buffer.from('def')], - }); - await worker.runUntil(handle.result()); - const execution = await handle.describe(); - t.deepEqual(execution.type, 'argsAndReturn'); - const indexedFields = execution.raw.workflowExecutionInfo!.searchAttributes!.indexedFields!; - const indexedFieldKeys = Object.keys(indexedFields); - - let encodedId: any; - if (indexedFieldKeys.includes('BinaryChecksums')) { - encodedId = indexedFields.BinaryChecksums!; - } else { - encodedId = indexedFields.BuildIds!; - } - t.true(encodedId != null); - - const checksums = searchAttributePayloadConverter.fromPayload(encodedId); - console.log(checksums); - t.true(Array.isArray(checksums)); - t.regex((checksums as string[]).pop()!, /@temporalio\/worker@\d+\.\d+\.\d+/); - t.is(execution.raw.executionConfig?.taskQueue?.name, taskQueue); - t.is( - execution.raw.executionConfig?.taskQueue?.kind, - iface.temporal.api.enums.v1.TaskQueueKind.TASK_QUEUE_KIND_NORMAL - ); - t.is(execution.raw.executionConfig?.workflowRunTimeout, null); - t.is(execution.raw.executionConfig?.workflowExecutionTimeout, null); - }); - - test('WorkflowOptions are passed correctly', async (t) => { - const { startWorkflow } = helpers(t); - // Throws because we use a different task queue - const worker = await createWorkerWithDefaults(t); - const options = { - memo: { a: 'b' }, - searchAttributes: { CustomIntField: [3] }, - workflowRunTimeout: '2s', - workflowExecutionTimeout: '3s', - workflowTaskTimeout: '1s', - taskQueue: "diff-task-queue" - } as const; - const handle = await startWorkflow(workflows.sleeper, options); - await t.throwsAsync( - worker.runUntil(handle.result()), { - instanceOf: WorkflowFailedError, - message: 'Workflow execution timed out', - }); - const execution = await handle.describe(); - t.deepEqual( - execution.raw.workflowExecutionInfo?.type, - iface.temporal.api.common.v1.WorkflowType.create({ name: 'sleeper' }) - ); - t.deepEqual(await fromPayload(execution.raw.workflowExecutionInfo!.memo!.fields!.a!), 'b'); - t.deepEqual( - searchAttributePayloadConverter.fromPayload( - execution.raw.workflowExecutionInfo!.searchAttributes!.indexedFields!.CustomIntField! - ), - [3] - ); - t.deepEqual(execution.searchAttributes!.CustomIntField, [3]); - t.is(execution.raw.executionConfig?.taskQueue?.name, 'diff-task-queue'); - t.is( - execution.raw.executionConfig?.taskQueue?.kind, - iface.temporal.api.enums.v1.TaskQueueKind.TASK_QUEUE_KIND_NORMAL - ); - - t.is(tsToMs(execution.raw.executionConfig!.workflowRunTimeout!), msToNumber(options.workflowRunTimeout)); - t.is( - tsToMs(execution.raw.executionConfig!.workflowExecutionTimeout!), - msToNumber(options.workflowExecutionTimeout) - ); - t.is(tsToMs(execution.raw.executionConfig!.defaultWorkflowTaskTimeout!), msToNumber(options.workflowTaskTimeout)); - }); - - - test('WorkflowHandle.result() throws if terminated', async (t) => { - const { startWorkflow } = helpers(t); - const worker = await createWorkerWithDefaults(t); - const handle = await startWorkflow(workflows.sleeper, { - args: [1000000], - }); - await t.throwsAsync( - worker.runUntil(async () => { - await handle.terminate('hasta la vista baby'); - await handle.result(); - }), - { - instanceOf: WorkflowFailedError, - message: 'hasta la vista baby', - } - ); - }); - - test('WorkflowHandle.result() throws if continued as new', async (t) => { - const { startWorkflow } = helpers(t); - const worker = await createWorkerWithDefaults(t); - await worker.runUntil(async () => { - const originalWorkflowHandle = await startWorkflow(workflows.continueAsNewSameWorkflow, { - followRuns: false, - }); - let err = await t.throwsAsync( - originalWorkflowHandle.result(), - { instanceOf: WorkflowContinuedAsNewError } - ); - - if (!(err instanceof WorkflowContinuedAsNewError)) return; // Type assertion - const client = t.context.env.client; - let continueWorkflowHandle = client.workflow.getHandle( - originalWorkflowHandle.workflowId, - err.newExecutionRunId, - { - followRuns: false, - } - ); - - await continueWorkflowHandle.signal(workflows.continueAsNewSignal); - err = await t.throwsAsync(continueWorkflowHandle.result(), { - instanceOf: WorkflowContinuedAsNewError, - }); - if (!(err instanceof WorkflowContinuedAsNewError)) return; // Type assertion - - continueWorkflowHandle = client.workflow.getHandle(continueWorkflowHandle.workflowId, err.newExecutionRunId); - await continueWorkflowHandle.result(); - }); - }); - - test('WorkflowHandle.result() follows chain of execution', async (t) => { - const { executeWorkflow } = helpers(t); - const worker = await createWorkerWithDefaults(t); - await worker.runUntil(executeWorkflow(workflows.continueAsNewSameWorkflow, { - args: ['execute', 'none'], - })); - t.pass(); - }); - - test('continue-as-new-to-different-workflow', async (t) => { - const { startWorkflow } = helpers(t); - const worker = await createWorkerWithDefaults(t); - const client = t.context.env.client; - await worker.runUntil(async () => { - const originalWorkflowHandle = await startWorkflow(workflows.continueAsNewToDifferentWorkflow, { - followRuns: false, - }); - const err = await t.throwsAsync(originalWorkflowHandle.result(), { instanceOf: WorkflowContinuedAsNewError }); - if (!(err instanceof WorkflowContinuedAsNewError)) return; // Type assertion - const workflow = client.workflow.getHandle(originalWorkflowHandle.workflowId, err.newExecutionRunId, { - followRuns: false, - }); - await workflow.result(); - const info = await workflow.describe(); - t.is(info.raw.workflowExecutionInfo?.type?.name, 'sleeper'); - const history = await workflow.fetchHistory(); - const timeSlept = await decodeFromPayloadsAtIndex( - loadedDataConverter, - 0, - history?.events?.[0].workflowExecutionStartedEventAttributes?.input?.payloads - ); - t.is(timeSlept, 1); - }); - }); - - test('continue-as-new-to-same-workflow keeps memo and search attributes', async (t) => { - const { startWorkflow } = helpers(t); - const worker = await createWorkerWithDefaults(t); - const handle = await startWorkflow(workflows.continueAsNewSameWorkflow, { - memo: { - note: 'foo', - }, - searchAttributes: { - CustomKeywordField: ['test-value'], - CustomIntField: [1], - }, - followRuns: true, - }); - await worker.runUntil(async () => { - await handle.signal(workflows.continueAsNewSignal); - await handle.result(); - const execution = await handle.describe(); - t.not(execution.runId, handle.firstExecutionRunId); - t.deepEqual(execution.memo, { note: 'foo' }); - t.deepEqual(execution.searchAttributes!.CustomKeywordField, ['test-value']); - t.deepEqual(execution.searchAttributes!.CustomIntField, [1]); - }) - }); - - test('continue-as-new-to-different-workflow keeps memo and search attributes by default', async (t) => { - const { startWorkflow } = helpers(t); - const worker = await createWorkerWithDefaults(t); - const handle = await startWorkflow(workflows.continueAsNewToDifferentWorkflow, { - followRuns: true, - memo: { - note: 'foo', - }, - searchAttributes: { - CustomKeywordField: ['test-value'], - CustomIntField: [1], - }, - }); - await worker.runUntil(async () => { - await handle.result(); - const info = await handle.describe(); - t.is(info.type, 'sleeper'); - t.not(info.runId, handle.firstExecutionRunId); - t.deepEqual(info.memo, { note: 'foo' }); - t.deepEqual(info.searchAttributes!.CustomKeywordField, ['test-value']); - t.deepEqual(info.searchAttributes!.CustomIntField, [1]); - }); - }); - - test('continue-as-new-to-different-workflow can set memo and search attributes', async (t) => { - const { startWorkflow } = helpers(t); - const worker = await createWorkerWithDefaults(t); - const handle = await startWorkflow(workflows.continueAsNewToDifferentWorkflow, { - args: [ - 1, - { - memo: { - note: 'bar', - }, - searchAttributes: { - CustomKeywordField: ['test-value-2'], - CustomIntField: [3], - }, - }, - ], - followRuns: true, - memo: { - note: 'foo', - }, - searchAttributes: { - CustomKeywordField: ['test-value'], - CustomIntField: [1], - }, - }); - await worker.runUntil(async () => { - await handle.result(); - const info = await handle.describe(); - t.is(info.type, 'sleeper'); - t.not(info.runId, handle.firstExecutionRunId); - t.deepEqual(info.memo, { note: 'bar' }); - t.deepEqual(info.searchAttributes!.CustomKeywordField, ['test-value-2']); - t.deepEqual(info.searchAttributes!.CustomIntField, [3]); - }); - }); - - test('signalWithStart works as intended and returns correct runId', async (t) => { - const { taskQueue } = helpers(t); - const worker = await createWorkerWithDefaults(t); - const client = t.context.env.client; - const originalWorkflowHandle = await client.workflow.signalWithStart(workflows.interruptableWorkflow, { - taskQueue, - workflowId: uuid4(), - signal: workflows.interruptSignal, - signalArgs: ['interrupted from signalWithStart'], - }); - await worker.runUntil(async () => { - let err: WorkflowFailedError | undefined = await t.throwsAsync(originalWorkflowHandle.result(), { - instanceOf: WorkflowFailedError, - }); - if (!(err?.cause instanceof ApplicationFailure)) { - return t.fail('Expected err.cause to be an instance of ApplicationFailure'); - } - t.is(err.cause.message, 'interrupted from signalWithStart'); - - // Test returned runId - const handle = client.workflow.getHandle(originalWorkflowHandle.workflowId, originalWorkflowHandle.signaledRunId); - err = await t.throwsAsync(handle.result(), { - instanceOf: WorkflowFailedError, - }); - if (!(err?.cause instanceof ApplicationFailure)) { - return t.fail('Expected err.cause to be an instance of ApplicationFailure'); - } - t.is(err.cause.message, 'interrupted from signalWithStart'); - }); - }); - - test('activity-failures', async (t) => { - const { executeWorkflow } = helpers(t); - const worker = await createWorkerWithDefaults(t, { activities }); - await worker.runUntil(executeWorkflow(workflows.activityFailures)); - t.pass(); - }); - - test('sleepInvalidDuration is caught in Workflow runtime', async (t) => { - const { executeWorkflow } = helpers(t); - const worker = await createWorkerWithDefaults(t); - await worker.runUntil(executeWorkflow(workflows.sleepInvalidDuration)); - t.pass(); - }); - - test('unhandledRejection causes WFT to fail', async (t) => { - const { startWorkflow } = helpers(t); - const worker = await createWorkerWithDefaults(t); - const handle = await startWorkflow(workflows.throwUnhandledRejection, { - // throw an exception that our worker can associate with a running workflow - args: [{ crashWorker: false }], - }); - await worker.runUntil( - asyncRetry( - async () => { - const history = await handle.fetchHistory(); - const wftFailedEvent = history.events?.find((ev) => ev.workflowTaskFailedEventAttributes); - if (wftFailedEvent === undefined) { - throw new Error('No WFT failed event'); - } - const failure = wftFailedEvent.workflowTaskFailedEventAttributes?.failure; - if (!failure) { - t.fail(); - return; - } - t.is(failure.message, 'unhandled rejection'); - t.true(failure.stackTrace?.includes(`Error: unhandled rejection`)); - t.is(failure.cause?.message, 'root failure'); - }, - { minTimeout: 300, factor: 1, retries: 100 } - ) - ); - await handle.terminate(); - }); - - test('throwObject includes message with our recommendation', async (t) => { - const { startWorkflow } = helpers(t); - const worker = await createWorkerWithDefaults(t); - const handle = await startWorkflow(workflows.throwObject); - await worker.runUntil( - asyncRetry( - async () => { - const history = await handle.fetchHistory(); - const wftFailedEvent = history.events?.find((ev) => ev.workflowTaskFailedEventAttributes); - if (wftFailedEvent === undefined) { - throw new Error('No WFT failed event'); - } - const failure = wftFailedEvent.workflowTaskFailedEventAttributes?.failure; - if (!failure) { - t.fail(); - return; - } - t.is( - failure.message, - '{"plainObject":true} [A non-Error value was thrown from your code. We recommend throwing Error objects so that we can provide a stack trace]' - ); - }, - { minTimeout: 300, factor: 1, retries: 100 } - ) - ); - await handle.terminate(); - }); - - test('throwBigInt includes message with our recommendation', async (t) => { - const { startWorkflow } = helpers(t); - const worker = await createWorkerWithDefaults(t); - const handle = await startWorkflow(workflows.throwBigInt); - await worker.runUntil( - asyncRetry( - async () => { - const history = await handle.fetchHistory(); - const wftFailedEvent = history.events?.find((ev) => ev.workflowTaskFailedEventAttributes); - if (wftFailedEvent === undefined) { - throw new Error('No WFT failed event'); - } - const failure = wftFailedEvent.workflowTaskFailedEventAttributes?.failure; - if (!failure) { - t.fail(); - return; - } - t.is( - failure.message, - '42 [A non-Error value was thrown from your code. We recommend throwing Error objects so that we can provide a stack trace]' - ); - }, - { minTimeout: 300, factor: 1, retries: 100 } - ) - ); - await handle.terminate(); - }); - - test('Workflow RetryPolicy kicks in with retryable failure', async (t) => { - const { startWorkflow } = helpers(t); - const worker = await createWorkerWithDefaults(t); - const handle = await startWorkflow(workflows.throwAsync, { - args: ['retryable'], - retry: { - initialInterval: 1, - maximumInterval: 1, - maximumAttempts: 2, - }, - }); - await worker.runUntil(async () => { - await t.throwsAsync(handle.result()); - // Verify retry happened - const { runId } = await handle.describe(); - t.not(runId, handle.firstExecutionRunId); - }); - }); - - test('Workflow RetryPolicy ignored with nonRetryable failure', async (t) => { - const { startWorkflow } = helpers(t); - const worker = await createWorkerWithDefaults(t); - const handle = await startWorkflow(workflows.throwAsync, { - args: ['nonRetryable'], - retry: { - initialInterval: 1, - maximumInterval: 1, - maximumAttempts: 2, - }, - }); - await worker.runUntil(async () => { - await t.throwsAsync(handle.result()); - const res = await handle.describe(); - t.is( - res.raw.workflowExecutionInfo?.status, - iface.temporal.api.enums.v1.WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_FAILED - ); - // Verify retry did not happen - const { runId } = await handle.describe(); - t.is(runId, handle.firstExecutionRunId); - }); - }); - - test('WorkflowClient.start fails with WorkflowExecutionAlreadyStartedError', async (t) => { - const { startWorkflow, taskQueue } = helpers(t); - const worker = await createWorkerWithDefaults(t); - const client = t.context.env.client; - const handle = await startWorkflow(workflows.sleeper, { - args: [10000000], - }); - try { - await worker.runUntil( - t.throwsAsync( - client.workflow.start(workflows.sleeper, { - taskQueue, - workflowId: handle.workflowId, - }), - { - instanceOf: WorkflowExecutionAlreadyStartedError, - message: 'Workflow execution already started', - } - ) - ); - } finally { - await handle.terminate(); - } - }); - - test('WorkflowClient.signalWithStart fails with WorkflowExecutionAlreadyStartedError', async (t) => { - const { startWorkflow } = helpers(t); - const worker = await createWorkerWithDefaults(t); - const client = t.context.env.client; - const handle = await startWorkflow(workflows.sleeper); - await worker.runUntil(async () => { - await handle.result(); - await t.throwsAsync( - client.workflow.signalWithStart(workflows.sleeper, { - taskQueue: 'test', - workflowId: handle.workflowId, - signal: workflows.interruptSignal, - signalArgs: ['interrupted from signalWithStart'], - workflowIdReusePolicy: 'REJECT_DUPLICATE', - }), - { - instanceOf: WorkflowExecutionAlreadyStartedError, - message: 'Workflow execution already started', - } - ); - }) - }); - - test('Handle from WorkflowClient.start follows only own execution chain', async (t) => { - const { startWorkflow } = helpers(t); - const worker = await createWorkerWithDefaults(t); - const client = t.context.env.client; - const handleFromThrowerStart = await startWorkflow(workflows.throwAsync); - const handleFromGet = client.workflow.getHandle(handleFromThrowerStart.workflowId); - await worker.runUntil(async () => { - await t.throwsAsync(handleFromGet.result(), { message: /.*/ }); - const handleFromSleeperStart = await client.workflow.start(workflows.sleeper, { - taskQueue: 'test', - workflowId:handleFromThrowerStart.workflowId, - args: [1_000_000], - }); - try { - await t.throwsAsync(handleFromThrowerStart.result(), { message: 'Workflow execution failed' }); - } finally { - await handleFromSleeperStart.terminate(); - } - }); - }); - - test('Handle from WorkflowClient.signalWithStart follows only own execution chain', async (t) => { - const { taskQueue } = helpers(t); - const worker = await createWorkerWithDefaults(t); - const client = t.context.env.client; - const handleFromThrowerStart = await client.workflow.signalWithStart(workflows.throwAsync, { - taskQueue, - workflowId: uuid4(), - signal: 'unblock', - }); - const handleFromGet = client.workflow.getHandle(handleFromThrowerStart.workflowId); - await worker.runUntil(async () => { - await t.throwsAsync(handleFromGet.result(), { message: /.*/ }); - const handleFromSleeperStart = await client.workflow.start(workflows.sleeper, { - taskQueue: taskQueue, - workflowId: handleFromThrowerStart.workflowId, - args: [1_000_000], - }); - try { - await t.throwsAsync(handleFromThrowerStart.result(), { message: 'Workflow execution failed' }); - } finally { - await handleFromSleeperStart.terminate(); - } - }); - }); - - test('Handle from WorkflowClient.getHandle follows only own execution chain', async (t) => { - const { startWorkflow, taskQueue } = helpers(t); - const worker = await createWorkerWithDefaults(t); - const client = t.context.env.client; - const handleFromThrowerStart = await startWorkflow(workflows.throwAsync); - const handleFromGet = client.workflow.getHandle(handleFromThrowerStart.workflowId, undefined, { - firstExecutionRunId: handleFromThrowerStart.firstExecutionRunId, - }); - await worker.runUntil(async () => { - await t.throwsAsync(handleFromThrowerStart.result(), { message: /.*/ }); - const handleFromSleeperStart = await client.workflow.start(workflows.sleeper, { - taskQueue, - workflowId: handleFromThrowerStart.workflowId, - args: [1_000_000], - }); - try { - await t.throwsAsync(handleFromGet.result(), { message: 'Workflow execution failed' }); - } finally { - await handleFromSleeperStart.terminate(); - } - }) - }); - - test('Handle from WorkflowClient.start terminates run after continue as new', async (t) => { - const { startWorkflow } = helpers(t); - const worker = await createWorkerWithDefaults(t); - const client = t.context.env.client; - const handleFromStart = await startWorkflow(workflows.continueAsNewToDifferentWorkflow, { - args: [1_000_000], - }); - const handleFromGet = client.workflow.getHandle(handleFromStart.workflowId, handleFromStart.firstExecutionRunId, { followRuns: false }); - await worker.runUntil(async () => { - await t.throwsAsync(handleFromGet.result(), { instanceOf: WorkflowContinuedAsNewError }); - await handleFromStart.terminate(); - await t.throwsAsync(handleFromStart.result(), { message: 'Workflow execution terminated' }); - }); - }); - - test('Handle from WorkflowClient.getHandle does not terminate run after continue as new if given runId', async (t) => { - const { startWorkflow } = helpers(t); - const worker = await createWorkerWithDefaults(t); - const client = t.context.env.client; - const handleFromStart = await startWorkflow(workflows.continueAsNewToDifferentWorkflow, { - args: [1_000_000], - followRuns: false, - }); - const handleFromGet = client.workflow.getHandle(handleFromStart.workflowId, handleFromStart.firstExecutionRunId); - await worker.runUntil(async () => { - await t.throwsAsync(handleFromStart.result(), { instanceOf: WorkflowContinuedAsNewError }); - try { - await t.throwsAsync(handleFromGet.terminate(), { - instanceOf: WorkflowNotFoundError, - message: 'workflow execution already completed', - }); - } finally { - await client.workflow.getHandle(handleFromStart.workflowId).terminate(); - } - }); - }); - - test('Runtime does not issue cancellations for activities and timers that throw during validation', async (t) => { - const { executeWorkflow } = helpers(t); - const worker = await createWorkerWithDefaults(t); - await worker.runUntil(executeWorkflow(workflows.cancelScopeOnFailedValidation)); - t.pass(); - }); - - // TODO(thomas): fix - /* - if ('promiseHooks' in v8) { - // Skip in old node versions - test('Stack trace query returns stack that makes sense', async (t) => { - const { executeWorkflow, createWorker } = helpers(t); - const worker = await createWorkerWithDefaults(t, { activities }); - const rawStacks = await worker.runUntil(executeWorkflow(workflows.stackTracer)); - - const [stack1, stack2] = rawStacks.map((r) => - r - .split('\n\n') - .map((s) => cleanOptionalStackTrace(`\n${s}`)) - .join('\n') - ); - // Can't get the Trigger stack cleaned, this is okay for now - // NOTE: we check endsWith because under certain conditions we might see Promise.race in the trace - t.true( - stack1.endsWith( - ` - at Function.all () - at stackTracer (test/src/workflows/stack-tracer.ts) - - at stackTracer (test/src/workflows/stack-tracer.ts) - - at Promise.then () - at Trigger.then (workflow/src/trigger.ts)` - ), - `Got invalid stack:\n--- clean ---\n${stack1}\n--- raw ---\n${rawStacks[0]}` - ); - t.is( - stack2, - ` - at executeChild (workflow/src/workflow.ts) - at stackTracer (test/src/workflows/stack-tracer.ts) - - at new Promise () - at timerNextHandler (workflow/src/workflow.ts) - at sleep (workflow/src/workflow.ts) - at stackTracer (test/src/workflows/stack-tracer.ts) - - at stackTracer (test/src/workflows/stack-tracer.ts)` - ); - }); - - - test('Enhanced stack trace returns trace that makes sense', async (t) => { - const { executeWorkflow, createWorker } = helpers(t); - const worker = await createWorkerWithDefaults(t, { activities }); - const enhancedStack = await worker.runUntil(executeWorkflow(workflows.enhancedStackTracer)); - - const stacks = enhancedStack.stacks.map((s) => ({ - locations: s.locations.map((l) => ({ - ...l, - ...(l.file_path - ? { file_path: l.file_path.replace(path.resolve(__dirname, '../../../'), '').replace(/\\/g, '/') } - : undefined), - })), - })); - t.is(enhancedStack.sdk.name, 'typescript'); - t.is(enhancedStack.sdk.version, pkg.version); // Expect workflow and worker versions to match - t.deepEqual(stacks, [ - { - locations: [ - { - function_name: 'Function.all', - internal_code: false, - }, - { - file_path: '/packages/test/src/workflows/stack-tracer.ts', - function_name: 'enhancedStackTracer', - line: 32, - column: 35, - internal_code: false, - }, - ], - }, - { - locations: [ - { - file_path: '/packages/test/src/workflows/stack-tracer.ts', - function_name: 'enhancedStackTracer', - line: 32, - column: 35, - internal_code: false, - }, - ], - }, - { - locations: [ - { - function_name: 'Promise.then', - internal_code: false, - }, - { - file_path: '/packages/workflow/src/trigger.ts', - function_name: 'Trigger.then', - line: 47, - column: 24, - internal_code: false, - }, - ], - }, - ]); - const expectedSources = ['../src/workflows/stack-tracer.ts', '../../workflow/src/trigger.ts'].map((p) => [ - path.resolve(__dirname, p), - [{ content: readFileSync(path.resolve(__dirname, p), 'utf8'), line_offset: 0 }], - ]); - t.deepEqual(Object.entries(enhancedStack.sources), expectedSources); - }); - } - */ - - test('Query does not cause condition to be triggered', async (t) => { - const { startWorkflow } = helpers(t); - const worker = await createWorkerWithDefaults(t); - const handle = await startWorkflow(workflows.queryAndCondition); - await worker.runUntil(handle.query(workflows.mutateWorkflowStateQuery)); - await handle.terminate(); - // Worker did not crash - t.pass(); - }); - - /** - * NOTE: this test uses the `IN` operator API which requires advanced visibility as of server 1.18. - * It will silently succeed on servers that only support standard visibility (can't dynamically skip a test). - */ - test('Download and replay multiple executions with client list method', async (t) => { - const { startWorkflow } = helpers(t); - const worker = await createWorkerWithDefaults(t, { activities }); - const client = t.context.env.client; - try { - const fns = [ - workflows.http, - workflows.cancelFakeProgress, - workflows.childWorkflowInvoke, - workflows.activityFailures, - ]; - const handles = await Promise.all(fns.map((fn) => startWorkflow(fn))); - // Wait for the workflows to complete first - await worker.runUntil(Promise.all(handles.map((h) => h.result()))); - // Test the list API too while we're at it - const workflowIds = handles.map(({ workflowId }) => `'${workflowId}'`); - const histories = client.workflow.list({ query: `WorkflowId IN (${workflowIds.join(', ')})` }).intoHistories(); - const results = Worker.runReplayHistories( - { - workflowsPath: require.resolve('./workflows'), - dataConverter: t.context.env.options.client.dataConverter, - }, - histories - ); - - for await (const result of results) { - t.is(result.error, undefined); - } - } catch (e) { - // Don't report a test failure if the server does not support extended query - if (!(e as Error).message?.includes(`operator 'in' not allowed`)) throw e; - } - t.pass(); - }); -} \ No newline at end of file diff --git a/packages/test/src/test-integration-new.ts b/packages/test/src/test-integration-new.ts new file mode 100644 index 000000000..552905d56 --- /dev/null +++ b/packages/test/src/test-integration-new.ts @@ -0,0 +1,1759 @@ +/* eslint @typescript-eslint/no-non-null-assertion: 0 */ +import asyncRetry from 'async-retry'; +import { v4 as uuid4 } from 'uuid'; +import dedent from 'dedent'; +import { ExecutionContext } from 'ava'; +import * as iface from '@temporalio/proto'; +import { + ActivityFailure, + ChildWorkflowFailure, + QueryNotRegisteredError, + WorkflowContinuedAsNewError, + WorkflowFailedError, +} from '@temporalio/client'; +import { + ActivityCancellationType, + ApplicationFailure, + CancelledFailure, + defaultFailureConverter, + defaultPayloadConverter, + LoadedDataConverter, + Payload, + RetryState, + searchAttributePayloadConverter, + SearchAttributes, + TerminatedFailure, + TimeoutFailure, + TimeoutType, + WorkflowExecution, + WorkflowExecutionAlreadyStartedError, + WorkflowNotFoundError, +} from '@temporalio/common'; +import { msToNumber, tsToMs } from '@temporalio/common/lib/time'; +import pkg from '@temporalio/worker/lib/pkg'; +import { UnsafeWorkflowInfo, WorkflowInfo } from '@temporalio/workflow/lib/interfaces'; +import { decode as payloadDecode, decodeFromPayloadsAtIndex } from '@temporalio/common/lib/internal-non-workflow'; +import { WorkerOptions, WorkflowBundle } from '@temporalio/worker'; + +import { TestWorkflowEnvironment } from '@temporalio/testing'; +import { ConnectionInjectorInterceptor } from './activities/interceptors'; +import { + configurableHelpers, + createTestEnvironment, + createTestWorkflowBundle, + makeConfigurableEnvironmentTest, +} from './helpers-integration'; +import * as activities from './activities'; +import { + ByteSkewerPayloadCodec, + cleanOptionalStackTrace, + registerDefaultCustomSearchAttributes, + u8, + Worker, +} from './helpers'; +import { decode } from '@temporalio/common/lib/encoding'; +import { CancellationScope, condition, defineQuery, deprecatePatch, executeChild, isCancellation, patched, proxyActivities, setHandler, sleep, startChild, workflowInfo } from '@temporalio/workflow'; +import * as workflows from "./workflows" +import { withZeroesHTTPServer } from './zeroes-http-server'; + +// TODO(thomas): re-export shared workflows (or long workflows) +// - review the files where these workflows are shared +export * from './workflows'; + +const { EVENT_TYPE_TIMER_STARTED, EVENT_TYPE_TIMER_FIRED, EVENT_TYPE_TIMER_CANCELED } = + iface.temporal.api.enums.v1.EventType; + +const timerEventTypes = new Set([EVENT_TYPE_TIMER_STARTED, EVENT_TYPE_TIMER_FIRED, EVENT_TYPE_TIMER_CANCELED]); +const CHANGE_MARKER_NAME = 'core_patch'; + +interface TestConfig { + loadedDataConverter: LoadedDataConverter; + env: TestWorkflowEnvironment; + createWorkerWithDefaults: (t: ExecutionContext, opts?: Partial) => Promise; +} +interface TestContext { + workflowBundle: WorkflowBundle; + configs: TestConfig[]; +} + +const codecs = [undefined, new ByteSkewerPayloadCodec()]; + +const test = makeConfigurableEnvironmentTest({ + createTestContext: async (t: ExecutionContext) => { + const configs: TestConfig[] = []; + codecs.map(async (codec) => { + const dataConverter = { payloadCodecs: codec ? [codec] : [] }; + const loadedDataConverter = { + payloadConverter: defaultPayloadConverter, + payloadCodecs: codec ? [codec] : [], + failureConverter: defaultFailureConverter, + }; + + const env = await createTestEnvironment({ + client: { dataConverter }, + }); + await registerDefaultCustomSearchAttributes(env.connection); + + configs.push({ + loadedDataConverter, + env, + createWorkerWithDefaults(t: ExecutionContext, opts?: Partial): Promise { + return configurableHelpers(t, t.context.workflowBundle, env).createWorker({ + dataConverter, + interceptors: { + activity: [() => ({ inbound: new ConnectionInjectorInterceptor(env.connection, loadedDataConverter) })], + }, + ...opts, + }); + }, + }); + }); + return { + workflowBundle: await createTestWorkflowBundle(__filename), + configs, + }; + }, + teardown: async (testContext: TestContext) => { + for (const config of testContext.configs) { + await config.env.teardown(); + } + }, +}); + +const configMacro = test.macro( + async ( + t: ExecutionContext, + testFn: (t: ExecutionContext, config: TestConfig) => Promise | unknown + ) => { + const testPromises = t.context.configs.map(async (config) => { + // TODO(thomas): ideally, we'd like to add an annotation to the test name to indicate what codec it used + await testFn(t, config); + }); + await Promise.all(testPromises); + } +); + +test('Workflow not found results in task retry', configMacro, async (t, config) => { + const { env, createWorkerWithDefaults } = config; + const { taskQueue } = configurableHelpers(t, t.context.workflowBundle, env); + const worker = await createWorkerWithDefaults(t); + const client = env.client; + const handle = await client.workflow.start('not-found', { + taskQueue, + workflowId: uuid4(), + }); + + await worker.runUntil(async () => { + await asyncRetry( + async () => { + const history = await handle.fetchHistory(); + if ( + !history?.events?.some( + ({ workflowTaskFailedEventAttributes }) => + workflowTaskFailedEventAttributes?.failure?.message === + "Failed to initialize workflow of type 'not-found': no such function is exported by the workflow bundle" + ) + ) { + throw new Error('Cannot find workflow task failed event'); + } + }, + { + retries: 60, + maxTimeout: 1000, + } + ); + }); + + t.pass(); +}); + +export async function argsAndReturn(greeting: string, _skip: undefined, arr: Uint8Array): Promise { + if (!(arr instanceof Uint8Array)) { + throw ApplicationFailure.nonRetryable('Uint8Array not wrapped'); + } + const name = decode(arr); + return `${greeting}, ${name}`; +} + +test('args-and-return', configMacro, async (t, config) => { + const { env, createWorkerWithDefaults } = config; + const { executeWorkflow } = configurableHelpers(t, t.context.workflowBundle, env); + const worker = await createWorkerWithDefaults(t); + const res = await worker.runUntil( + executeWorkflow(argsAndReturn, { + args: ['Hello', undefined, u8('world!')], + }) + ); + t.is(res, 'Hello, world!'); +}); + +export async function urlEcho(url: string): Promise { + const parsedURL = new URL(url); + const searchParams = new URLSearchParams({ counter: '1' }); + parsedURL.search = searchParams.toString(); + return parsedURL.toString(); +} + +test('url-whatwg', configMacro, async (t, config) => { + const { env, createWorkerWithDefaults } = config; + const { executeWorkflow } = configurableHelpers(t, t.context.workflowBundle, env); + const worker = await createWorkerWithDefaults(t); + const res = await worker.runUntil( + executeWorkflow(urlEcho, { + args: ['http://foo.com'], + }) + ); + t.is(res, 'http://foo.com/?counter=1'); +}); + +test('cancel-fake-progress', configMacro, async (t, config) => { + const { env, createWorkerWithDefaults } = config; + const { executeWorkflow } = configurableHelpers(t, t.context.workflowBundle, env); + + const worker = await createWorkerWithDefaults(t, { + activities, + }); + await worker.runUntil(executeWorkflow(workflows.cancelFakeProgress)); + t.pass(); +}); + +export async function cancellableHTTPRequest(url: string): Promise { + const { cancellableFetch } = proxyActivities({ + startToCloseTimeout: '20s', + heartbeatTimeout: '3s', + cancellationType: ActivityCancellationType.WAIT_CANCELLATION_COMPLETED, + }); + let activityStarted = false; + setHandler(workflows.activityStartedSignal, () => void (activityStarted = true)); + try { + await CancellationScope.cancellable(async () => { + const promise = cancellableFetch(url, true); + await condition(() => activityStarted); + CancellationScope.current().cancel(); + await promise; + }); + } catch (err) { + if (!isCancellation(err)) { + throw err; + } + } +} + +// TODO(thomas): fix, withZeroesHTTPServer uses node:http +/* +test('cancel-http-request', configMacro, async (t, config) => { + const { env, createWorkerWithDefaults } = config; + const { executeWorkflow } = configurableHelpers(t, t.context.workflowBundle, env); + const worker = await createWorkerWithDefaults(t, { activities }); + await withZeroesHTTPServer(async (port) => { + const url = `http://127.0.0.1:${port}`; + await worker.runUntil(executeWorkflow(cancellableHTTPRequest, { + args: [url], + })); + }); + t.pass(); +}); +*/ + +export async function activityFailure(useApplicationFailure: boolean): Promise { + const { throwAnError } = proxyActivities({ + startToCloseTimeout: '5s', + retry: { initialInterval: '1s', maximumAttempts: 1 }, + }); + if (useApplicationFailure) { + await throwAnError(true, 'Fail me'); + } else { + await throwAnError(false, 'Fail me'); + } +} + +test('activity-failure with Error', configMacro, async (t, config) => { + const { env, createWorkerWithDefaults } = config; + const { executeWorkflow } = configurableHelpers(t, t.context.workflowBundle, env); + const worker = await createWorkerWithDefaults(t, { activities }); + const err: WorkflowFailedError | undefined = await t.throwsAsync( + worker.runUntil( + executeWorkflow(activityFailure, { + args: [false], + }) + ), + { + instanceOf: WorkflowFailedError, + } + ); + t.is(err?.message, 'Workflow execution failed'); + if (!(err?.cause instanceof ActivityFailure)) { + t.fail('Expected err.cause to be an instance of ActivityFailure'); + return; + } + if (!(err.cause.cause instanceof ApplicationFailure)) { + t.fail('Expected err.cause.cause to be an instance of ApplicationFailure'); + return; + } + t.is(err.cause.cause.message, 'Fail me'); + t.is( + cleanOptionalStackTrace(err.cause.cause.stack), + dedent` + Error: Fail me + at throwAnError (test/src/activities/index.ts) + at ConnectionInjectorInterceptor.execute (test/src/activities/interceptors.ts) + ` + ); +}); + +test('activity-failure with ApplicationFailure', configMacro, async (t, config) => { + const { env, createWorkerWithDefaults } = config; + const { executeWorkflow } = configurableHelpers(t, t.context.workflowBundle, env); + const worker = await createWorkerWithDefaults(t, { activities }); + const err: WorkflowFailedError | undefined = await t.throwsAsync( + worker.runUntil( + executeWorkflow(activityFailure, { + args: [true], + }) + ), + { + instanceOf: WorkflowFailedError, + } + ); + t.is(err?.message, 'Workflow execution failed'); + if (!(err?.cause instanceof ActivityFailure)) { + t.fail('Expected err.cause to be an instance of ActivityFailure'); + return; + } + if (!(err.cause.cause instanceof ApplicationFailure)) { + t.fail('Expected err.cause.cause to be an instance of ApplicationFailure'); + return; + } + t.is(err.cause.cause.message, 'Fail me'); + t.is(err.cause.cause.type, 'Error'); + t.deepEqual(err.cause.cause.details, ['details', 123, false]); + t.is( + cleanOptionalStackTrace(err.cause.cause.stack), + dedent` + ApplicationFailure: Fail me + at Function.nonRetryable (common/src/failure.ts) + at throwAnError (test/src/activities/index.ts) + at ConnectionInjectorInterceptor.execute (test/src/activities/interceptors.ts) + ` + ); +}); + +export async function childWorkflowInvoke(): Promise<{ + workflowId: string; + runId: string; + execResult: string; + result: string; +}> { + const child = await startChild(workflows.successString, {}); + const execResult = await executeChild(workflows.successString, {}); + return { workflowId: child.workflowId, runId: child.firstExecutionRunId, result: await child.result(), execResult }; +} + +test('child-workflow-invoke', configMacro, async (t, config) => { + const { env, createWorkerWithDefaults } = config; + const { startWorkflow } = configurableHelpers(t, t.context.workflowBundle, env); + const worker = await createWorkerWithDefaults(t); + const handle = await startWorkflow(childWorkflowInvoke); + const { workflowId, runId, execResult, result } = await worker.runUntil(handle.result()); + t.is(execResult, 'success'); + t.is(result, 'success'); + const client = env.client; + const child = client.workflow.getHandle(workflowId, runId); + t.is(await child.result(), 'success'); +}); + +export async function childWorkflowFailure(): Promise { + await executeChild(workflows.throwAsync); +} + +test('child-workflow-failure', configMacro, async (t, config) => { + const { env, createWorkerWithDefaults } = config; + const { executeWorkflow } = configurableHelpers(t, t.context.workflowBundle, env); + const worker = await createWorkerWithDefaults(t); + await worker.runUntil(async () => { + const err: WorkflowFailedError | undefined = await t.throwsAsync(executeWorkflow(childWorkflowFailure), { + instanceOf: WorkflowFailedError, + }); + + if (!(err?.cause instanceof ChildWorkflowFailure)) { + return t.fail('Expected err.cause to be an instance of ChildWorkflowFailure'); + } + if (!(err.cause.cause instanceof ApplicationFailure)) { + return t.fail('Expected err.cause.cause to be an instance of ApplicationFailure'); + } + t.is(err.cause.cause.message, 'failure'); + t.is( + cleanOptionalStackTrace(err.cause.cause.stack), + dedent` + ApplicationFailure: failure + at Function.nonRetryable (common/src/failure.ts) + at throwAsync (test/src/workflows/throw-async.ts) + ` + ); + }); +}); + +const childExecutionQuery = defineQuery('childExecution');1 +export async function childWorkflowTermination(): Promise { + let workflowExecution: WorkflowExecution | undefined = undefined; + setHandler(childExecutionQuery, () => workflowExecution); + + const child = await startChild(workflows.unblockOrCancel, {}); + workflowExecution = { workflowId: child.workflowId, runId: child.firstExecutionRunId }; + await child.result(); +} + +test('child-workflow-termination', configMacro, async (t, config) => { + const { env, createWorkerWithDefaults } = config; + const { startWorkflow } = configurableHelpers(t, t.context.workflowBundle, env); + const worker = await createWorkerWithDefaults(t); + const handle = await startWorkflow(childWorkflowTermination); + const client = env.client; + + let childExecution: WorkflowExecution | undefined = undefined; + + await worker.runUntil(async () => { + const err: WorkflowFailedError | undefined = await t.throwsAsync( + async () => { + while (childExecution === undefined) { + childExecution = await handle.query(childExecutionQuery); + } + const child = client.workflow.getHandle(childExecution.workflowId!, childExecution.runId!); + await child.terminate(); + await handle.result(); + }, + { + instanceOf: WorkflowFailedError, + } + ); + + if (!(err?.cause instanceof ChildWorkflowFailure)) { + return t.fail('Expected err.cause to be an instance of ChildWorkflowFailure'); + } + t.is(err.cause.retryState, RetryState.NON_RETRYABLE_FAILURE); + if (!(err.cause.cause instanceof TerminatedFailure)) { + return t.fail('Expected err.cause.cause to be an instance of TerminatedFailure'); + } + }); +}); + +export async function childWorkflowTimeout(): Promise { + await executeChild(workflows.unblockOrCancel, { + workflowExecutionTimeout: '10ms', + retry: { maximumAttempts: 1 }, + }); +} + +test('child-workflow-timeout', configMacro, async (t, config) => { + const { env, createWorkerWithDefaults } = config; + const { executeWorkflow } = configurableHelpers(t, t.context.workflowBundle, env); + const worker = await createWorkerWithDefaults(t); + const err: WorkflowFailedError | undefined = await t.throwsAsync( + worker.runUntil(executeWorkflow(childWorkflowTimeout)), + { + instanceOf: WorkflowFailedError, + } + ); + + if (!(err?.cause instanceof ChildWorkflowFailure)) { + return t.fail('Expected err.cause to be an instance of ChildWorkflowFailure'); + } + t.is(err.cause.retryState, RetryState.TIMEOUT); + if (!(err.cause.cause instanceof TimeoutFailure)) { + return t.fail('Expected err.cause.cause to be an instance of TimeoutFailure'); + } + t.is(err.cause.cause.timeoutType, TimeoutType.START_TO_CLOSE); +}); + +export async function childWorkflowStartFail(): Promise { + const child = await startChild(workflows.successString, { + workflowIdReusePolicy: 'REJECT_DUPLICATE', + }); + await child.result(); + + try { + await startChild(workflows.successString, { + taskQueue: 'test', + workflowId: child.workflowId, // duplicate + workflowIdReusePolicy: 'REJECT_DUPLICATE', + }); + throw new Error('Managed to start a Workflow with duplicate workflowId'); + } catch (err) { + if (!(err instanceof WorkflowExecutionAlreadyStartedError)) { + throw new Error(`Got invalid error: ${err}`); + } + } +} + +test('child-workflow-start-fail', configMacro, async (t, config) => { + const { env, createWorkerWithDefaults } = config; + const { executeWorkflow } = configurableHelpers(t, t.context.workflowBundle, env); + const worker = await createWorkerWithDefaults(t); + await worker.runUntil(executeWorkflow(childWorkflowStartFail)); + // Assertions in workflow code + t.pass(); +}); + +test('child-workflow-cancel', configMacro, async (t, config) => { + const { env, createWorkerWithDefaults } = config; + const { executeWorkflow } = configurableHelpers(t, t.context.workflowBundle, env); + const worker = await createWorkerWithDefaults(t); + await worker.runUntil(executeWorkflow(workflows.childWorkflowCancel)); + // Assertions in workflow code + t.pass(); +}); + +test('child-workflow-signals', configMacro, async (t, config) => { + const { env, createWorkerWithDefaults } = config; + const { executeWorkflow } = configurableHelpers(t, t.context.workflowBundle, env); + const worker = await createWorkerWithDefaults(t); + await worker.runUntil(executeWorkflow(workflows.childWorkflowSignals)); + // Assertions in workflow code + t.pass(); +}); + +test('query not found', configMacro, async (t, config) => { + const { env, createWorkerWithDefaults } = config; + const { startWorkflow } = configurableHelpers(t, t.context.workflowBundle, env); + const worker = await createWorkerWithDefaults(t); + const handle = await startWorkflow(workflows.unblockOrCancel); + await worker.runUntil(async () => { + await handle.signal(workflows.unblockSignal); + await handle.result(); + await t.throwsAsync(handle.query('not found'), { + instanceOf: QueryNotRegisteredError, + message: + 'Workflow did not register a handler for not found. Registered queries: [__stack_trace __enhanced_stack_trace __temporal_workflow_metadata isBlocked]', + }); + }); +}); + +test('query and unblock', configMacro, async (t, config) => { + const { env, createWorkerWithDefaults } = config; + const { startWorkflow } = configurableHelpers(t, t.context.workflowBundle, env); + const worker = await createWorkerWithDefaults(t); + const handle = await startWorkflow(workflows.unblockOrCancel); + await worker.runUntil(async () => { + t.true(await handle.query(workflows.isBlockedQuery)); + await handle.signal(workflows.unblockSignal); + await handle.result(); + t.false(await handle.query(workflows.isBlockedQuery)); + }); +}); + +test('interrupt-signal', configMacro, async (t, config) => { + const { env, createWorkerWithDefaults } = config; + const { startWorkflow } = configurableHelpers(t, t.context.workflowBundle, env); + const worker = await createWorkerWithDefaults(t); + const handle = await startWorkflow(workflows.interruptableWorkflow); + await worker.runUntil(async () => { + await handle.signal(workflows.interruptSignal, 'just because'); + const err: WorkflowFailedError | undefined = await t.throwsAsync(handle.result(), { + instanceOf: WorkflowFailedError, + }); + if (!(err?.cause instanceof ApplicationFailure)) { + return t.fail('Expected err.cause to be an instance of ApplicationFailure'); + } + t.is(err.cause.message, 'just because'); + }); +}); + +export async function failSignalWorkflow(): Promise { + setHandler(workflows.failSignal, () => { + throw ApplicationFailure.nonRetryable('Signal failed'); + }); + // Don't complete to allow Workflow to be interrupted by fail() signal + await sleep(10000); +} + +test('fail-signal', configMacro, async (t, config) => { + const { env, createWorkerWithDefaults } = config; + const { startWorkflow } = configurableHelpers(t, t.context.workflowBundle, env); + const worker = await createWorkerWithDefaults(t); + const handle = await startWorkflow(failSignalWorkflow); + await worker.runUntil(async () => { + await handle.signal(workflows.failSignal); + const err: WorkflowFailedError | undefined = await t.throwsAsync(handle.result(), { + instanceOf: WorkflowFailedError, + }); + if (!(err?.cause instanceof ApplicationFailure)) { + return t.fail('Expected err.cause to be an instance of ApplicationFailure'); + } + t.is(err.cause.message, 'Signal failed'); + }); +}); + +export async function asyncFailSignalWorkflow(): Promise { + setHandler(workflows.failSignal, async () => { + await sleep(100); + throw ApplicationFailure.nonRetryable('Signal failed'); + }); + // Don't complete to allow Workflow to be interrupted by fail() signal + await sleep(100000); +} + +test('async-fail-signal', configMacro, async (t, config) => { + const { env, createWorkerWithDefaults } = config; + const { startWorkflow } = configurableHelpers(t, t.context.workflowBundle, env); + const worker = await createWorkerWithDefaults(t); + const handle = await startWorkflow(asyncFailSignalWorkflow); + await handle.signal(workflows.failSignal); + await worker.runUntil(async () => { + const err: WorkflowFailedError | undefined = await t.throwsAsync(handle.result(), { + instanceOf: WorkflowFailedError, + }); + if (!(err?.cause instanceof ApplicationFailure)) { + return t.fail('Expected err.cause to be an instance of ApplicationFailure'); + } + t.is(err.cause.message, 'Signal failed'); + }); +}); + +export async function http(): Promise { + const { httpGet } = proxyActivities({ + startToCloseTimeout: '1 minute', + }); + return await httpGet('https://temporal.io'); +} + +test('http', configMacro, async (t, config) => { + const { env, createWorkerWithDefaults } = config; + const { executeWorkflow } = configurableHelpers(t, t.context.workflowBundle, env); + const worker = await createWorkerWithDefaults(t, { activities }); + const res = await worker.runUntil(executeWorkflow(http)); + t.deepEqual(res, await activities.httpGet('https://temporal.io')); +}); + +test('sleep', configMacro, async (t, config) => { + const { env, createWorkerWithDefaults } = config; + const { startWorkflow } = configurableHelpers(t, t.context.workflowBundle, env); + const worker = await createWorkerWithDefaults(t); + const handle = await startWorkflow(workflows.sleeper); + const res = await worker.runUntil(handle.result()); + t.is(res, undefined); + const history = await handle.fetchHistory(); + const timerEvents = history.events!.filter(({ eventType }) => timerEventTypes.has(eventType!)); + t.is(timerEvents.length, 2); + t.is(timerEvents[0].timerStartedEventAttributes!.timerId, '1'); + t.is(tsToMs(timerEvents[0].timerStartedEventAttributes!.startToFireTimeout), 100); + t.is(timerEvents[1].timerFiredEventAttributes!.timerId, '1'); +}); + +/** + * Demonstrates the basics of cancellation scopes. + * Used in the documentation site. + */ +// @@@SNIPSTART typescript-cancel-a-timer-from-workflow +export async function cancelTimer(): Promise { + // Timers and Activities are automatically cancelled when their containing scope is cancelled. + try { + await CancellationScope.cancellable(async () => { + const promise = sleep(1); // <-- Will be cancelled because it is attached to this closure's scope + CancellationScope.current().cancel(); + await promise; // <-- Promise must be awaited in order for `cancellable` to throw + }); + } catch (e) { + if (e instanceof CancelledFailure) { + console.log('Timer cancelled πŸ‘'); + } else { + throw e; // <-- Fail the workflow + } + } +} +// @@@SNIPEND + +test('cancel-timer-immediately', configMacro, async (t, config) => { + const { env, createWorkerWithDefaults } = config; + const { startWorkflow } = configurableHelpers(t, t.context.workflowBundle, env); + const worker = await createWorkerWithDefaults(t); + const handle = await startWorkflow(cancelTimer); + const res = await worker.runUntil(handle.result()); + t.is(res, undefined); + const history = await handle.fetchHistory(); + const timerEvents = history.events!.filter(({ eventType }) => timerEventTypes.has(eventType!)); + // Timer is cancelled before it is scheduled + t.is(timerEvents.length, 0); +}); + +export async function cancelTimerWithDelay(): Promise { + const scope = new CancellationScope(); + const promise = scope.run(() => sleep(10000)); + await sleep(1).then(() => scope.cancel()); + try { + await promise; + } catch (e) { + if (e instanceof CancelledFailure) { + console.log('Timer cancelled πŸ‘'); + } else { + throw e; + } + } +} + +test('cancel-timer-with-delay', configMacro, async (t, config) => { + const { env, createWorkerWithDefaults } = config; + const { startWorkflow } = configurableHelpers(t, t.context.workflowBundle, env); + const worker = await createWorkerWithDefaults(t); + const handle = await startWorkflow(cancelTimerWithDelay); + const res = await worker.runUntil(handle.result()); + t.is(res, undefined); + const history = await handle.fetchHistory(); + const timerEvents = history.events!.filter(({ eventType }) => timerEventTypes.has(eventType!)); + t.is(timerEvents.length, 4); + t.is(timerEvents[0].timerStartedEventAttributes!.timerId, '1'); + t.is(tsToMs(timerEvents[0].timerStartedEventAttributes!.startToFireTimeout), 10000); + t.is(timerEvents[1].timerStartedEventAttributes!.timerId, '2'); + t.is(tsToMs(timerEvents[1].timerStartedEventAttributes!.startToFireTimeout), 1); + t.is(timerEvents[2].timerFiredEventAttributes!.timerId, '2'); + t.is(timerEvents[3].timerCanceledEventAttributes!.timerId, '1'); +}); + +export async function patchedWorkflow(): Promise { + if (patched('my-change-id')) { + console.log('has change'); + } else { + console.log('no change'); + } + await sleep(100); + if (patched('my-change-id')) { + console.log('has change 2'); + } else { + console.log('no change 2'); + } +} + +test('patched', configMacro, async (t, config) => { + const { env, createWorkerWithDefaults } = config; + const { startWorkflow } = configurableHelpers(t, t.context.workflowBundle, env); + const worker = await createWorkerWithDefaults(t); + const handle = await startWorkflow(patchedWorkflow); + const res = await worker.runUntil(handle.result()); + t.is(res, undefined); + const history = await handle.fetchHistory(); + const hasChangeEvents = history.events!.filter( + ({ eventType }) => eventType === iface.temporal.api.enums.v1.EventType.EVENT_TYPE_MARKER_RECORDED + ); + // There will only be one marker despite there being 2 hasChange calls because they have the + // same ID and core will only record one marker per id. + t.is(hasChangeEvents.length, 1); + t.is(hasChangeEvents[0].markerRecordedEventAttributes!.markerName, CHANGE_MARKER_NAME); +}); + +export async function deprecatePatchWorkflow(): Promise { + deprecatePatch('my-change-id'); + console.log('has change'); +} + +test('deprecate-patch', configMacro, async (t, config) => { + const { env, createWorkerWithDefaults } = config; + const { startWorkflow } = configurableHelpers(t, t.context.workflowBundle, env); + const worker = await createWorkerWithDefaults(t); + const handle = await startWorkflow(deprecatePatchWorkflow); + const res = await worker.runUntil(handle.result()); + t.is(res, undefined); + const history = await handle.fetchHistory(); + const hasChangeEvents = history.events!.filter( + ({ eventType }) => eventType === iface.temporal.api.enums.v1.EventType.EVENT_TYPE_MARKER_RECORDED + ); + t.is(hasChangeEvents.length, 1); + t.is(hasChangeEvents[0].markerRecordedEventAttributes!.markerName, CHANGE_MARKER_NAME); +}); + +test('Worker default ServerOptions are generated correctly', configMacro, async (t, config) => { + const { env, createWorkerWithDefaults } = config; + const { startWorkflow } = configurableHelpers(t, t.context.workflowBundle, env); + const worker = await createWorkerWithDefaults(t); + const handle = await startWorkflow(argsAndReturn, { + args: ['hey', undefined, Buffer.from('abc')], + }); + await worker.runUntil(handle.result()); + const history = await handle.fetchHistory(); + const events = history.events!.filter( + ({ eventType }) => eventType === iface.temporal.api.enums.v1.EventType.EVENT_TYPE_WORKFLOW_TASK_COMPLETED + ); + t.is(events.length, 1); + const [event] = events; + t.regex(event.workflowTaskCompletedEventAttributes!.identity!, /\d+@.+/); + let binid = event.workflowTaskCompletedEventAttributes!.binaryChecksum!; + if (binid === '') { + binid = event.workflowTaskCompletedEventAttributes!.workerVersion!.buildId!; + } + t.regex(binid, /@temporalio\/worker@\d+\.\d+\.\d+/); +}); + +test('WorkflowHandle.describe result is wrapped', configMacro, async (t, config) => { + const { env, createWorkerWithDefaults } = config; + const date = new Date(); + const { startWorkflow } = configurableHelpers(t, t.context.workflowBundle, env); + const worker = await createWorkerWithDefaults(t); + const handle = await startWorkflow(argsAndReturn, { + args: ['hey', undefined, Buffer.from('abc')], + searchAttributes: { + CustomKeywordField: ['test-value'], + CustomIntField: [1], + CustomDatetimeField: [date], + }, + memo: { + note: 'foo', + }, + }); + await worker.runUntil(handle.result()); + const execution = await handle.describe(); + t.deepEqual(execution.type, 'argsAndReturn'); + t.deepEqual(execution.memo, { note: 'foo' }); + t.true(execution.startTime instanceof Date); + t.deepEqual(execution.searchAttributes!.CustomKeywordField, ['test-value']); + t.deepEqual(execution.searchAttributes!.CustomIntField, [1]); + t.deepEqual(execution.searchAttributes!.CustomDatetimeField, [date]); + const binSum = execution.searchAttributes!.BinaryChecksums as string[]; + if (binSum != null) { + t.regex(binSum[0], /@temporalio\/worker@/); + } else { + t.deepEqual(execution.searchAttributes!.BuildIds, ['unversioned', `unversioned:${worker.options.buildId}`]); + } +}); + +export async function returnSearchAttributes(): Promise { + const sa = workflowInfo().searchAttributes!; // eslint-disable-line @typescript-eslint/no-non-null-assertion + const datetime = (sa.CustomDatetimeField as Array)[0]; + return { + ...sa, + datetimeType: [Object.getPrototypeOf(datetime).constructor.name], + datetimeInstanceofWorks: [datetime instanceof Date], + arrayInstanceofWorks: [sa.CustomIntField instanceof Array], + }; +} + +test('Workflow can read Search Attributes set at start', configMacro, async (t, config) => { + const { env, createWorkerWithDefaults } = config; + const date = new Date(); + const { startWorkflow } = configurableHelpers(t, t.context.workflowBundle, env); + const worker = await createWorkerWithDefaults(t); + const handle = await startWorkflow(returnSearchAttributes, { + searchAttributes: { + CustomKeywordField: ['test-value'], + CustomIntField: [1], + CustomDatetimeField: [date], + }, + }); + const res = await worker.runUntil(handle.result()); + t.deepEqual(res, { + CustomKeywordField: ['test-value'], + CustomIntField: [1], + CustomDatetimeField: [date.toISOString()], + datetimeInstanceofWorks: [true], + arrayInstanceofWorks: [true], + datetimeType: ['Date'], + }); +}); + +test('Workflow can upsert Search Attributes', configMacro, async (t, config) => { + const { env, createWorkerWithDefaults } = config; + const date = new Date(); + const { startWorkflow } = configurableHelpers(t, t.context.workflowBundle, env); + const worker = await createWorkerWithDefaults(t); + const handle = await startWorkflow(workflows.upsertAndReadSearchAttributes, { + args: [date.getTime()], + }); + const res = await worker.runUntil(handle.result()); + t.deepEqual(res, { + CustomBoolField: [true], + CustomIntField: [], // clear + CustomKeywordField: ['durable code'], + CustomTextField: ['is useful'], + CustomDatetimeField: [date.toISOString()], + CustomDoubleField: [3.14], + }); + const { searchAttributes } = await handle.describe(); + const { BinaryChecksums, BuildIds, ...rest } = searchAttributes; + t.deepEqual(rest, { + CustomBoolField: [true], + CustomKeywordField: ['durable code'], + CustomTextField: ['is useful'], + CustomDatetimeField: [date], + CustomDoubleField: [3.14], + }); + let checksum: any; + if (BinaryChecksums != null) { + t.true(BinaryChecksums.length === 1); + checksum = BinaryChecksums[0]; + } else { + t.true(BuildIds!.length === 2); + t.deepEqual(BuildIds![0], 'unversioned'); + checksum = BuildIds![1]; + } + t.true( + typeof checksum === 'string' && + checksum.includes(`@temporalio/worker@${pkg.version}+`) && + /\+[a-f0-9]{64}$/.test(checksum) // bundle checksum + ); +}); + +export async function returnWorkflowInfo(): Promise { + return workflowInfo(); +} + +test('Workflow can read WorkflowInfo', configMacro, async (t, config) => { + const { env, createWorkerWithDefaults } = config; + const { startWorkflow, taskQueue } = configurableHelpers(t, t.context.workflowBundle, env); + const worker = await createWorkerWithDefaults(t); + const handle = await startWorkflow(returnWorkflowInfo, { + memo: { + nested: { object: true }, + }, + }); + const res = await worker.runUntil(handle.result()); + t.assert(res.historySize > 100); + t.deepEqual(res, { + memo: { + nested: { object: true }, + }, + attempt: 1, + firstExecutionRunId: handle.firstExecutionRunId, + namespace: 'default', + taskTimeoutMs: 10_000, + runId: handle.firstExecutionRunId, + taskQueue, + searchAttributes: {}, + workflowType: 'returnWorkflowInfo', + workflowId: handle.workflowId, + historyLength: 3, + continueAsNewSuggested: false, + // values ignored for the purpose of comparison + historySize: res.historySize, + startTime: res.startTime, + runStartTime: res.runStartTime, + currentBuildId: res.currentBuildId, + // unsafe.now is a function, so doesn't make it through serialization, but .now is required, so we need to cast + unsafe: { isReplaying: false } as UnsafeWorkflowInfo, + }); +}); + +test('WorkflowOptions are passed correctly with defaults', configMacro, async (t, config) => { + const { env, createWorkerWithDefaults } = config; + const { startWorkflow, taskQueue } = configurableHelpers(t, t.context.workflowBundle, env); + const worker = await createWorkerWithDefaults(t); + const handle = await startWorkflow(argsAndReturn, { + args: ['hey', undefined, Buffer.from('def')], + }); + await worker.runUntil(handle.result()); + const execution = await handle.describe(); + t.deepEqual(execution.type, 'argsAndReturn'); + const indexedFields = execution.raw.workflowExecutionInfo!.searchAttributes!.indexedFields!; + const indexedFieldKeys = Object.keys(indexedFields); + + let encodedId: any; + if (indexedFieldKeys.includes('BinaryChecksums')) { + encodedId = indexedFields.BinaryChecksums!; + } else { + encodedId = indexedFields.BuildIds!; + } + t.true(encodedId != null); + + const checksums = searchAttributePayloadConverter.fromPayload(encodedId); + console.log(checksums); + t.true(Array.isArray(checksums)); + t.regex((checksums as string[]).pop()!, /@temporalio\/worker@\d+\.\d+\.\d+/); + t.is(execution.raw.executionConfig?.taskQueue?.name, taskQueue); + t.is( + execution.raw.executionConfig?.taskQueue?.kind, + iface.temporal.api.enums.v1.TaskQueueKind.TASK_QUEUE_KIND_NORMAL + ); + t.is(execution.raw.executionConfig?.workflowRunTimeout, null); + t.is(execution.raw.executionConfig?.workflowExecutionTimeout, null); +}); + +test('WorkflowOptions are passed correctly', configMacro, async (t, config) => { + const { env, createWorkerWithDefaults } = config; + const { startWorkflow } = configurableHelpers(t, t.context.workflowBundle, env); + // Throws because we use a different task queue + const worker = await createWorkerWithDefaults(t); + const options = { + memo: { a: 'b' }, + searchAttributes: { CustomIntField: [3] }, + workflowRunTimeout: '2s', + workflowExecutionTimeout: '3s', + workflowTaskTimeout: '1s', + taskQueue: 'diff-task-queue', + } as const; + const handle = await startWorkflow(workflows.sleeper, options); + async function fromPayload(payload: Payload) { + const payloadCodecs = env.client.options.dataConverter.payloadCodecs ?? []; + const [decodedPayload] = await payloadDecode(payloadCodecs, [payload]); + return defaultPayloadConverter.fromPayload(decodedPayload); + } + await t.throwsAsync(worker.runUntil(handle.result()), { + instanceOf: WorkflowFailedError, + message: 'Workflow execution timed out', + }); + const execution = await handle.describe(); + t.deepEqual( + execution.raw.workflowExecutionInfo?.type, + iface.temporal.api.common.v1.WorkflowType.create({ name: 'sleeper' }) + ); + t.deepEqual(await fromPayload(execution.raw.workflowExecutionInfo!.memo!.fields!.a!), 'b'); + t.deepEqual( + searchAttributePayloadConverter.fromPayload( + execution.raw.workflowExecutionInfo!.searchAttributes!.indexedFields!.CustomIntField! + ), + [3] + ); + t.deepEqual(execution.searchAttributes!.CustomIntField, [3]); + t.is(execution.raw.executionConfig?.taskQueue?.name, 'diff-task-queue'); + t.is( + execution.raw.executionConfig?.taskQueue?.kind, + iface.temporal.api.enums.v1.TaskQueueKind.TASK_QUEUE_KIND_NORMAL + ); + + t.is(tsToMs(execution.raw.executionConfig!.workflowRunTimeout!), msToNumber(options.workflowRunTimeout)); + t.is(tsToMs(execution.raw.executionConfig!.workflowExecutionTimeout!), msToNumber(options.workflowExecutionTimeout)); + t.is(tsToMs(execution.raw.executionConfig!.defaultWorkflowTaskTimeout!), msToNumber(options.workflowTaskTimeout)); +}); + +test('WorkflowHandle.result() throws if terminated', configMacro, async (t, config) => { + const { env, createWorkerWithDefaults } = config; + const { startWorkflow } = configurableHelpers(t, t.context.workflowBundle, env); + const worker = await createWorkerWithDefaults(t); + const handle = await startWorkflow(workflows.sleeper, { + args: [1000000], + }); + await t.throwsAsync( + worker.runUntil(async () => { + await handle.terminate('hasta la vista baby'); + await handle.result(); + }), + { + instanceOf: WorkflowFailedError, + message: 'hasta la vista baby', + } + ); +}); + +test('WorkflowHandle.result() throws if continued as new', configMacro, async (t, config) => { + const { env, createWorkerWithDefaults } = config; + const { startWorkflow } = configurableHelpers(t, t.context.workflowBundle, env); + const worker = await createWorkerWithDefaults(t); + await worker.runUntil(async () => { + const originalWorkflowHandle = await startWorkflow(workflows.continueAsNewSameWorkflow, { + followRuns: false, + }); + let err = await t.throwsAsync(originalWorkflowHandle.result(), { instanceOf: WorkflowContinuedAsNewError }); + + if (!(err instanceof WorkflowContinuedAsNewError)) return; // Type assertion + const client = env.client; + let continueWorkflowHandle = client.workflow.getHandle( + originalWorkflowHandle.workflowId, + err.newExecutionRunId, + { + followRuns: false, + } + ); + + await continueWorkflowHandle.signal(workflows.continueAsNewSignal); + err = await t.throwsAsync(continueWorkflowHandle.result(), { + instanceOf: WorkflowContinuedAsNewError, + }); + if (!(err instanceof WorkflowContinuedAsNewError)) return; // Type assertion + + continueWorkflowHandle = client.workflow.getHandle( + continueWorkflowHandle.workflowId, + err.newExecutionRunId + ); + await continueWorkflowHandle.result(); + }); +}); + +test('WorkflowHandle.result() follows chain of execution', configMacro, async (t, config) => { + const { env, createWorkerWithDefaults } = config; + const { executeWorkflow } = configurableHelpers(t, t.context.workflowBundle, env); + const worker = await createWorkerWithDefaults(t); + await worker.runUntil( + executeWorkflow(workflows.continueAsNewSameWorkflow, { + args: ['execute', 'none'], + }) + ); + t.pass(); +}); + +test('continue-as-new-to-different-workflow', configMacro, async (t, config) => { + const { env, createWorkerWithDefaults, loadedDataConverter } = config; + const { startWorkflow } = configurableHelpers(t, t.context.workflowBundle, env); + const worker = await createWorkerWithDefaults(t); + const client = env.client; + await worker.runUntil(async () => { + const originalWorkflowHandle = await startWorkflow(workflows.continueAsNewToDifferentWorkflow, { + followRuns: false, + }); + const err = await t.throwsAsync(originalWorkflowHandle.result(), { instanceOf: WorkflowContinuedAsNewError }); + if (!(err instanceof WorkflowContinuedAsNewError)) return; // Type assertion + const workflow = client.workflow.getHandle( + originalWorkflowHandle.workflowId, + err.newExecutionRunId, + { + followRuns: false, + } + ); + await workflow.result(); + const info = await workflow.describe(); + t.is(info.raw.workflowExecutionInfo?.type?.name, 'sleeper'); + const history = await workflow.fetchHistory(); + const timeSlept = await decodeFromPayloadsAtIndex( + loadedDataConverter, + 0, + history?.events?.[0].workflowExecutionStartedEventAttributes?.input?.payloads + ); + t.is(timeSlept, 1); + }); +}); + +test('continue-as-new-to-same-workflow keeps memo and search attributes', configMacro, async (t, config) => { + const { env, createWorkerWithDefaults } = config; + const { startWorkflow } = configurableHelpers(t, t.context.workflowBundle, env); + const worker = await createWorkerWithDefaults(t); + const handle = await startWorkflow(workflows.continueAsNewSameWorkflow, { + memo: { + note: 'foo', + }, + searchAttributes: { + CustomKeywordField: ['test-value'], + CustomIntField: [1], + }, + followRuns: true, + }); + await worker.runUntil(async () => { + await handle.signal(workflows.continueAsNewSignal); + await handle.result(); + const execution = await handle.describe(); + t.not(execution.runId, handle.firstExecutionRunId); + t.deepEqual(execution.memo, { note: 'foo' }); + t.deepEqual(execution.searchAttributes!.CustomKeywordField, ['test-value']); + t.deepEqual(execution.searchAttributes!.CustomIntField, [1]); + }); +}); + +test( + 'continue-as-new-to-different-workflow keeps memo and search attributes by default', + configMacro, + async (t, config) => { + const { env, createWorkerWithDefaults } = config; + + const { startWorkflow } = configurableHelpers(t, t.context.workflowBundle, env); + const worker = await createWorkerWithDefaults(t); + const handle = await startWorkflow(workflows.continueAsNewToDifferentWorkflow, { + followRuns: true, + memo: { + note: 'foo', + }, + searchAttributes: { + CustomKeywordField: ['test-value'], + CustomIntField: [1], + }, + }); + await worker.runUntil(async () => { + await handle.result(); + const info = await handle.describe(); + t.is(info.type, 'sleeper'); + t.not(info.runId, handle.firstExecutionRunId); + t.deepEqual(info.memo, { note: 'foo' }); + t.deepEqual(info.searchAttributes!.CustomKeywordField, ['test-value']); + t.deepEqual(info.searchAttributes!.CustomIntField, [1]); + }); + } +); + +test('continue-as-new-to-different-workflow can set memo and search attributes', configMacro, async (t, config) => { + const { env, createWorkerWithDefaults } = config; + const { startWorkflow } = configurableHelpers(t, t.context.workflowBundle, env); + const worker = await createWorkerWithDefaults(t); + const handle = await startWorkflow(workflows.continueAsNewToDifferentWorkflow, { + args: [ + 1, + { + memo: { + note: 'bar', + }, + searchAttributes: { + CustomKeywordField: ['test-value-2'], + CustomIntField: [3], + }, + }, + ], + followRuns: true, + memo: { + note: 'foo', + }, + searchAttributes: { + CustomKeywordField: ['test-value'], + CustomIntField: [1], + }, + }); + await worker.runUntil(async () => { + await handle.result(); + const info = await handle.describe(); + t.is(info.type, 'sleeper'); + t.not(info.runId, handle.firstExecutionRunId); + t.deepEqual(info.memo, { note: 'bar' }); + t.deepEqual(info.searchAttributes!.CustomKeywordField, ['test-value-2']); + t.deepEqual(info.searchAttributes!.CustomIntField, [3]); + }); +}); + +test('signalWithStart works as intended and returns correct runId', configMacro, async (t, config) => { + const { env, createWorkerWithDefaults } = config; + const { taskQueue } = configurableHelpers(t, t.context.workflowBundle, env); + const worker = await createWorkerWithDefaults(t); + const client = env.client; + const originalWorkflowHandle = await client.workflow.signalWithStart(workflows.interruptableWorkflow, { + taskQueue, + workflowId: uuid4(), + signal: workflows.interruptSignal, + signalArgs: ['interrupted from signalWithStart'], + }); + await worker.runUntil(async () => { + let err: WorkflowFailedError | undefined = await t.throwsAsync(originalWorkflowHandle.result(), { + instanceOf: WorkflowFailedError, + }); + if (!(err?.cause instanceof ApplicationFailure)) { + return t.fail('Expected err.cause to be an instance of ApplicationFailure'); + } + t.is(err.cause.message, 'interrupted from signalWithStart'); + + // Test returned runId + const handle = client.workflow.getHandle( + originalWorkflowHandle.workflowId, + originalWorkflowHandle.signaledRunId + ); + err = await t.throwsAsync(handle.result(), { + instanceOf: WorkflowFailedError, + }); + if (!(err?.cause instanceof ApplicationFailure)) { + return t.fail('Expected err.cause to be an instance of ApplicationFailure'); + } + t.is(err.cause.message, 'interrupted from signalWithStart'); + }); +}); + +test('activity-failures', configMacro, async (t, config) => { + const { env, createWorkerWithDefaults } = config; + const { executeWorkflow } = configurableHelpers(t, t.context.workflowBundle, env); + const worker = await createWorkerWithDefaults(t, { activities }); + await worker.runUntil(executeWorkflow(workflows.activityFailures)); + t.pass(); +}); + +export async function sleepInvalidDuration(): Promise { + await sleep(0); + await new Promise((resolve) => setTimeout(resolve, -1)); +} + +test('sleepInvalidDuration is caught in Workflow runtime', configMacro, async (t, config) => { + const { env, createWorkerWithDefaults } = config; + + const { executeWorkflow } = configurableHelpers(t, t.context.workflowBundle, env); + const worker = await createWorkerWithDefaults(t); + await worker.runUntil(executeWorkflow(sleepInvalidDuration)); + t.pass(); +}); + +test('unhandledRejection causes WFT to fail', configMacro, async (t, config) => { + const { env, createWorkerWithDefaults } = config; + const { startWorkflow } = configurableHelpers(t, t.context.workflowBundle, env); + const worker = await createWorkerWithDefaults(t); + const handle = await startWorkflow(workflows.throwUnhandledRejection, { + // throw an exception that our worker can associate with a running workflow + args: [{ crashWorker: false }], + }); + await worker.runUntil( + asyncRetry( + async () => { + const history = await handle.fetchHistory(); + const wftFailedEvent = history.events?.find((ev) => ev.workflowTaskFailedEventAttributes); + if (wftFailedEvent === undefined) { + throw new Error('No WFT failed event'); + } + const failure = wftFailedEvent.workflowTaskFailedEventAttributes?.failure; + if (!failure) { + t.fail(); + return; + } + t.is(failure.message, 'unhandled rejection'); + t.true(failure.stackTrace?.includes(`Error: unhandled rejection`)); + t.is(failure.cause?.message, 'root failure'); + }, + { minTimeout: 300, factor: 1, retries: 100 } + ) + ); + await handle.terminate(); +}); + +export async function throwObject(): Promise { + throw { plainObject: true }; +} + +test('throwObject includes message with our recommendation', configMacro, async (t, config) => { + const { env, createWorkerWithDefaults } = config; + const { startWorkflow } = configurableHelpers(t, t.context.workflowBundle, env); + const worker = await createWorkerWithDefaults(t); + const handle = await startWorkflow(throwObject); + await worker.runUntil( + asyncRetry( + async () => { + const history = await handle.fetchHistory(); + const wftFailedEvent = history.events?.find((ev) => ev.workflowTaskFailedEventAttributes); + if (wftFailedEvent === undefined) { + throw new Error('No WFT failed event'); + } + const failure = wftFailedEvent.workflowTaskFailedEventAttributes?.failure; + if (!failure) { + t.fail(); + return; + } + t.is( + failure.message, + '{"plainObject":true} [A non-Error value was thrown from your code. We recommend throwing Error objects so that we can provide a stack trace]' + ); + }, + { minTimeout: 300, factor: 1, retries: 100 } + ) + ); + await handle.terminate(); +}); + +export async function throwBigInt(): Promise { + throw 42n; +} + +test('throwBigInt includes message with our recommendation', configMacro, async (t, config) => { + const { env, createWorkerWithDefaults } = config; + const { startWorkflow } = configurableHelpers(t, t.context.workflowBundle, env); + const worker = await createWorkerWithDefaults(t); + const handle = await startWorkflow(throwBigInt); + await worker.runUntil( + asyncRetry( + async () => { + const history = await handle.fetchHistory(); + const wftFailedEvent = history.events?.find((ev) => ev.workflowTaskFailedEventAttributes); + if (wftFailedEvent === undefined) { + throw new Error('No WFT failed event'); + } + const failure = wftFailedEvent.workflowTaskFailedEventAttributes?.failure; + if (!failure) { + t.fail(); + return; + } + t.is( + failure.message, + '42 [A non-Error value was thrown from your code. We recommend throwing Error objects so that we can provide a stack trace]' + ); + }, + { minTimeout: 300, factor: 1, retries: 100 } + ) + ); + await handle.terminate(); +}); + +test('Workflow RetryPolicy kicks in with retryable failure', configMacro, async (t, config) => { + const { env, createWorkerWithDefaults } = config; + const { startWorkflow } = configurableHelpers(t, t.context.workflowBundle, env); + const worker = await createWorkerWithDefaults(t); + const handle = await startWorkflow(workflows.throwAsync, { + args: ['retryable'], + retry: { + initialInterval: 1, + maximumInterval: 1, + maximumAttempts: 2, + }, + }); + await worker.runUntil(async () => { + await t.throwsAsync(handle.result()); + // Verify retry happened + const { runId } = await handle.describe(); + t.not(runId, handle.firstExecutionRunId); + }); +}); + +test('Workflow RetryPolicy ignored with nonRetryable failure', configMacro, async (t, config) => { + const { env, createWorkerWithDefaults } = config; + const { startWorkflow } = configurableHelpers(t, t.context.workflowBundle, env); + const worker = await createWorkerWithDefaults(t); + const handle = await startWorkflow(workflows.throwAsync, { + args: ['nonRetryable'], + retry: { + initialInterval: 1, + maximumInterval: 1, + maximumAttempts: 2, + }, + }); + await worker.runUntil(async () => { + await t.throwsAsync(handle.result()); + const res = await handle.describe(); + t.is( + res.raw.workflowExecutionInfo?.status, + iface.temporal.api.enums.v1.WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_FAILED + ); + // Verify retry did not happen + const { runId } = await handle.describe(); + t.is(runId, handle.firstExecutionRunId); + }); +}); + +test('WorkflowClient.start fails with WorkflowExecutionAlreadyStartedError', configMacro, async (t, config) => { + const { env, createWorkerWithDefaults } = config; + const { startWorkflow, taskQueue } = configurableHelpers(t, t.context.workflowBundle, env); + const worker = await createWorkerWithDefaults(t); + const client = env.client; + const handle = await startWorkflow(workflows.sleeper, { + args: [10000000], + }); + try { + await worker.runUntil( + t.throwsAsync( + client.workflow.start(workflows.sleeper, { + taskQueue, + workflowId: handle.workflowId, + }), + { + instanceOf: WorkflowExecutionAlreadyStartedError, + message: 'Workflow execution already started', + } + ) + ); + } finally { + await handle.terminate(); + } +}); + +test( + 'WorkflowClient.signalWithStart fails with WorkflowExecutionAlreadyStartedError', + configMacro, + async (t, config) => { + const { env, createWorkerWithDefaults } = config; + const { startWorkflow } = configurableHelpers(t, t.context.workflowBundle, env); + const worker = await createWorkerWithDefaults(t); + const client = env.client; + const handle = await startWorkflow(workflows.sleeper); + await worker.runUntil(async () => { + await handle.result(); + await t.throwsAsync( + client.workflow.signalWithStart(workflows.sleeper, { + taskQueue: 'test', + workflowId: handle.workflowId, + signal: workflows.interruptSignal, + signalArgs: ['interrupted from signalWithStart'], + workflowIdReusePolicy: 'REJECT_DUPLICATE', + }), + { + instanceOf: WorkflowExecutionAlreadyStartedError, + message: 'Workflow execution already started', + } + ); + }); + } +); + +test('Handle from WorkflowClient.start follows only own execution chain', configMacro, async (t, config) => { + const { env, createWorkerWithDefaults } = config; + const { startWorkflow } = configurableHelpers(t, t.context.workflowBundle, env); + const worker = await createWorkerWithDefaults(t); + const client = env.client; + const handleFromThrowerStart = await startWorkflow(workflows.throwAsync); + const handleFromGet = client.workflow.getHandle(handleFromThrowerStart.workflowId); + await worker.runUntil(async () => { + await t.throwsAsync(handleFromGet.result(), { message: /.*/ }); + const handleFromSleeperStart = await client.workflow.start(workflows.sleeper, { + taskQueue: 'test', + workflowId: handleFromThrowerStart.workflowId, + args: [1_000_000], + }); + try { + await t.throwsAsync(handleFromThrowerStart.result(), { message: 'Workflow execution failed' }); + } finally { + await handleFromSleeperStart.terminate(); + } + }); +}); + +test('Handle from WorkflowClient.signalWithStart follows only own execution chain', configMacro, async (t, config) => { + const { env, createWorkerWithDefaults } = config; + const { taskQueue } = configurableHelpers(t, t.context.workflowBundle, env); + const worker = await createWorkerWithDefaults(t); + const client = env.client; + const handleFromThrowerStart = await client.workflow.signalWithStart(workflows.throwAsync, { + taskQueue, + workflowId: uuid4(), + signal: 'unblock', + }); + const handleFromGet = client.workflow.getHandle(handleFromThrowerStart.workflowId); + await worker.runUntil(async () => { + await t.throwsAsync(handleFromGet.result(), { message: /.*/ }); + const handleFromSleeperStart = await client.workflow.start(workflows.sleeper, { + taskQueue, + workflowId: handleFromThrowerStart.workflowId, + args: [1_000_000], + }); + try { + await t.throwsAsync(handleFromThrowerStart.result(), { message: 'Workflow execution failed' }); + } finally { + await handleFromSleeperStart.terminate(); + } + }); +}); + +test('Handle from WorkflowClient.getHandle follows only own execution chain', configMacro, async (t, config) => { + const { env, createWorkerWithDefaults } = config; + const { startWorkflow, taskQueue } = configurableHelpers(t, t.context.workflowBundle, env); + const worker = await createWorkerWithDefaults(t); + const client = env.client; + const handleFromThrowerStart = await startWorkflow(workflows.throwAsync); + const handleFromGet = client.workflow.getHandle(handleFromThrowerStart.workflowId, undefined, { + firstExecutionRunId: handleFromThrowerStart.firstExecutionRunId, + }); + await worker.runUntil(async () => { + await t.throwsAsync(handleFromThrowerStart.result(), { message: /.*/ }); + const handleFromSleeperStart = await client.workflow.start(workflows.sleeper, { + taskQueue, + workflowId: handleFromThrowerStart.workflowId, + args: [1_000_000], + }); + try { + await t.throwsAsync(handleFromGet.result(), { message: 'Workflow execution failed' }); + } finally { + await handleFromSleeperStart.terminate(); + } + }); +}); + +test('Handle from WorkflowClient.start terminates run after continue as new', configMacro, async (t, config) => { + const { env, createWorkerWithDefaults } = config; + const { startWorkflow } = configurableHelpers(t, t.context.workflowBundle, env); + const worker = await createWorkerWithDefaults(t); + const client = env.client; + const handleFromStart = await startWorkflow(workflows.continueAsNewToDifferentWorkflow, { + args: [1_000_000], + }); + const handleFromGet = client.workflow.getHandle(handleFromStart.workflowId, handleFromStart.firstExecutionRunId, { + followRuns: false, + }); + await worker.runUntil(async () => { + await t.throwsAsync(handleFromGet.result(), { instanceOf: WorkflowContinuedAsNewError }); + await handleFromStart.terminate(); + await t.throwsAsync(handleFromStart.result(), { message: 'Workflow execution terminated' }); + }); +}); + +test( + 'Handle from WorkflowClient.getHandle does not terminate run after continue as new if given runId', + configMacro, + async (t, config) => { + const { env, createWorkerWithDefaults } = config; + const { startWorkflow } = configurableHelpers(t, t.context.workflowBundle, env); + const worker = await createWorkerWithDefaults(t); + const client = env.client; + const handleFromStart = await startWorkflow(workflows.continueAsNewToDifferentWorkflow, { + args: [1_000_000], + followRuns: false, + }); + const handleFromGet = client.workflow.getHandle(handleFromStart.workflowId, handleFromStart.firstExecutionRunId); + await worker.runUntil(async () => { + await t.throwsAsync(handleFromStart.result(), { instanceOf: WorkflowContinuedAsNewError }); + try { + await t.throwsAsync(handleFromGet.terminate(), { + instanceOf: WorkflowNotFoundError, + message: 'workflow execution already completed', + }); + } finally { + await client.workflow.getHandle(handleFromStart.workflowId).terminate(); + } + }); + } +); + +test( + 'Runtime does not issue cancellations for activities and timers that throw during validation', + configMacro, + async (t, config) => { + const { env, createWorkerWithDefaults } = config; + const { executeWorkflow } = configurableHelpers(t, t.context.workflowBundle, env); + const worker = await createWorkerWithDefaults(t); + await worker.runUntil(executeWorkflow(workflows.cancelScopeOnFailedValidation)); + t.pass(); + } +); + +// TODO(thomas): fix +/* +if ('promiseHooks' in v8) { + // Skip in old node versions + test('Stack trace query returns stack that makes sense', configMacro, async (t, config) => { + const { env, createWorkerWithDefaults } = config; + + const { executeWorkflow, createWorker } = configurableHelpers(t, t.context.workflowBundle, env); + const worker = await createWorkerWithDefaults(t, { activities }); + const rawStacks = await worker.runUntil(executeWorkflow(workflows.stackTracer)); + + const [stack1, stack2] = rawStacks.map((r) => + r + .split('\n\n') + .map((s) => cleanOptionalStackTrace(`\n${s}`)) + .join('\n') + ); + // Can't get the Trigger stack cleaned, this is okay for now + // NOTE: we check endsWith because under certain conditions we might see Promise.race in the trace + t.true( + stack1.endsWith( + ` + at Function.all () + at stackTracer (test/src/workflows/stack-tracer.ts) + + at stackTracer (test/src/workflows/stack-tracer.ts) + + at Promise.then () + at Trigger.then (workflow/src/trigger.ts)` + ), + `Got invalid stack:\n--- clean ---\n${stack1}\n--- raw ---\n${rawStacks[0]}` + ); + t.is( + stack2, + ` + at executeChild (workflow/src/workflow.ts) + at stackTracer (test/src/workflows/stack-tracer.ts) + + at new Promise () + at timerNextHandler (workflow/src/workflow.ts) + at sleep (workflow/src/workflow.ts) + at stackTracer (test/src/workflows/stack-tracer.ts) + + at stackTracer (test/src/workflows/stack-tracer.ts)` + ); + }); + + + test('Enhanced stack trace returns trace that makes sense', configMacro, async (t, config) => { + const { env, createWorkerWithDefaults } = config; + + const { executeWorkflow, createWorker } = configurableHelpers(t, t.context.workflowBundle, env); + const worker = await createWorkerWithDefaults(t, { activities }); + const enhancedStack = await worker.runUntil(executeWorkflow(workflows.enhancedStackTracer)); + + const stacks = enhancedStack.stacks.map((s) => ({ + locations: s.locations.map((l) => ({ + ...l, + ...(l.file_path + ? { file_path: l.file_path.replace(path.resolve(__dirname, '../../../'), '').replace(/\\/g, '/') } + : undefined), + })), + })); + t.is(enhancedStack.sdk.name, 'typescript'); + t.is(enhancedStack.sdk.version, pkg.version); // Expect workflow and worker versions to match + t.deepEqual(stacks, [ + { + locations: [ + { + function_name: 'Function.all', + internal_code: false, + }, + { + file_path: '/packages/test/src/workflows/stack-tracer.ts', + function_name: 'enhancedStackTracer', + line: 32, + column: 35, + internal_code: false, + }, + ], + }, + { + locations: [ + { + file_path: '/packages/test/src/workflows/stack-tracer.ts', + function_name: 'enhancedStackTracer', + line: 32, + column: 35, + internal_code: false, + }, + ], + }, + { + locations: [ + { + function_name: 'Promise.then', + internal_code: false, + }, + { + file_path: '/packages/workflow/src/trigger.ts', + function_name: 'Trigger.then', + line: 47, + column: 24, + internal_code: false, + }, + ], + }, + ]); + const expectedSources = ['../src/workflows/stack-tracer.ts', '../../workflow/src/trigger.ts'].map((p) => [ + path.resolve(__dirname, p), + [{ content: readFileSync(path.resolve(__dirname, p), 'utf8'), line_offset: 0 }], + ]); + t.deepEqual(Object.entries(enhancedStack.sources), expectedSources); + }); +} +*/ + +const mutateWorkflowStateQuery = defineQuery('mutateWorkflowState'); +export async function queryAndCondition(): Promise { + let mutated = false; + // Not a valid query, used to verify that condition isn't triggered for query jobs + setHandler(mutateWorkflowStateQuery, () => void (mutated = true)); + await condition(() => mutated); +} + +test('Query does not cause condition to be triggered', configMacro, async (t, config) => { + const { env, createWorkerWithDefaults } = config; + + const { startWorkflow } = configurableHelpers(t, t.context.workflowBundle, env); + const worker = await createWorkerWithDefaults(t); + const handle = await startWorkflow(queryAndCondition); + await worker.runUntil(handle.query(mutateWorkflowStateQuery)); + await handle.terminate(); + // Worker did not crash + t.pass(); +}); + +/** + * NOTE: this test uses the `IN` operator API which requires advanced visibility as of server 1.18. + * It will silently succeed on servers that only support standard visibility (can't dynamically skip a test). + */ +test('Download and replay multiple executions with client list method', configMacro, async (t, config) => { + const { env, createWorkerWithDefaults } = config; + + const { startWorkflow } = configurableHelpers(t, t.context.workflowBundle, env); + const worker = await createWorkerWithDefaults(t, { activities }); + const client = env.client; + try { + const fns = [ + http, + workflows.cancelFakeProgress, + childWorkflowInvoke, + workflows.activityFailures, + ]; + const handles = await Promise.all(fns.map((fn) => startWorkflow(fn))); + // Wait for the workflows to complete first + await worker.runUntil(Promise.all(handles.map((h) => h.result()))); + // Test the list API too while we're at it + const workflowIds = handles.map(({ workflowId }) => `'${workflowId}'`); + const histories = client.workflow.list({ query: `WorkflowId IN (${workflowIds.join(', ')})` }).intoHistories(); + const results = Worker.runReplayHistories( + { + workflowsPath: require.resolve('./workflows'), + dataConverter: env.options.client.dataConverter, + }, + histories + ); + + for await (const result of results) { + t.is(result.error, undefined); + } + } catch (e) { + // Don't report a test failure if the server does not support extended query + if (!(e as Error).message?.includes(`operator 'in' not allowed`)) throw e; + } + t.pass(); +}); \ No newline at end of file diff --git a/packages/test/src/test-integration-old.ts b/packages/test/src/test-integration-old.ts deleted file mode 100644 index 613193aaf..000000000 --- a/packages/test/src/test-integration-old.ts +++ /dev/null @@ -1,13 +0,0 @@ -/** - * This file has been given the suffix -old because it uses an older style of - * integration testing. New code should follow the style of integration tests in - * the files without this suffix. - */ - -// TODO(thomas): change file name -import { RUN_INTEGRATION_TESTS } from './helpers'; -import { runIntegrationTests } from './integration-tests-new'; - -if (RUN_INTEGRATION_TESTS) { - runIntegrationTests(); -} diff --git a/packages/test/src/test-local-activities.ts b/packages/test/src/test-local-activities.ts index 43d2e73d4..2266523fe 100644 --- a/packages/test/src/test-local-activities.ts +++ b/packages/test/src/test-local-activities.ts @@ -1,21 +1,18 @@ import { firstValueFrom, Subject } from 'rxjs'; import { Context as ActivityContext } from '@temporalio/activity'; -import { - ApplicationFailure, - defaultPayloadConverter, - WorkflowFailedError, -} from '@temporalio/client'; +import { ApplicationFailure, defaultPayloadConverter, WorkflowFailedError } from '@temporalio/client'; import { LocalActivityOptions } from '@temporalio/common'; import { msToNumber } from '@temporalio/common/lib/time'; import { temporal } from '@temporalio/proto'; import * as workflow from '@temporalio/workflow'; import { Worker } from './helpers'; import { helpers, makeTestFunction } from './helpers-integration'; +import { throwAnError } from './activities'; const test = makeTestFunction({ workflowsPath: __filename, workflowInterceptorModules: [__filename], -}) +}); export async function runOneLocalActivity(s: string): Promise { return await workflow.proxyLocalActivities({ startToCloseTimeout: '1m' }).echo(s); @@ -149,17 +146,13 @@ test.serial('Parallel local activities work end to end', async (t) => { }); export async function throwAnErrorFromLocalActivity(message: string): Promise { - await workflow.proxyLocalActivities({ startToCloseTimeout: '1m' }).throwAnError(message); + await workflow.proxyLocalActivities({ startToCloseTimeout: '1m' }).throwAnError(true, message); } test.serial('Local activity error is propagated properly to the Workflow', async (t) => { const { executeWorkflow, createWorker } = helpers(t); const worker = await createWorker({ - activities: { - async throwAnError(message: string): Promise { - throw ApplicationFailure.nonRetryable(message, 'Error', 'details', 123, false); - }, - }, + activities: { throwAnError }, }); await worker.runUntil(async () => { const err: WorkflowFailedError | undefined = await t.throwsAsync( @@ -296,15 +289,7 @@ export async function throwAnExplicitNonRetryableErrorFromLocalActivity(message: test.serial('Local activity does not retry if error is in nonRetryableErrorTypes', async (t) => { const { executeWorkflow, createWorker } = helpers(t); const worker = await createWorker({ - activities: { - async throwAnError(useApplicationFailure: boolean, message: string): Promise { - if (useApplicationFailure) { - throw ApplicationFailure.nonRetryable(message, 'Error', 'details', 123, false); - } else { - throw new Error(message); - } - }, - }, + activities: { throwAnError }, }); await worker.runUntil(async () => { const err: WorkflowFailedError | undefined = await t.throwsAsync( diff --git a/packages/test/src/workflows/activity-failure.ts b/packages/test/src/workflows/activity-failure.ts deleted file mode 100644 index ebebe5f1f..000000000 --- a/packages/test/src/workflows/activity-failure.ts +++ /dev/null @@ -1,22 +0,0 @@ -/** - * Tests that ActivityFailure is propagated correctly to client - */ -import { proxyActivities } from '@temporalio/workflow'; -import type * as activities from '../activities'; - -const { throwAnError } = proxyActivities({ - startToCloseTimeout: '5s', - retry: { initialInterval: '1s', maximumAttempts: 1 }, -}); - -export interface ActivityFailureWorkflowOptions { - useApplicationFailure: boolean; -} - -export async function activityFailure({ useApplicationFailure }: ActivityFailureWorkflowOptions): Promise { - if (useApplicationFailure) { - await throwAnError(true, 'Fail me'); - } else { - await throwAnError(false, 'Fail me'); - } -} diff --git a/packages/test/src/workflows/args-and-return.ts b/packages/test/src/workflows/args-and-return.ts deleted file mode 100644 index d126dccaa..000000000 --- a/packages/test/src/workflows/args-and-return.ts +++ /dev/null @@ -1,10 +0,0 @@ -import { decode } from '@temporalio/common/lib/encoding'; -import { ApplicationFailure } from '@temporalio/workflow'; - -export async function argsAndReturn(greeting: string, _skip: undefined, arr: Uint8Array): Promise { - if (!(arr instanceof Uint8Array)) { - throw ApplicationFailure.nonRetryable('Uint8Array not wrapped'); - } - const name = decode(arr); - return `${greeting}, ${name}`; -} diff --git a/packages/test/src/workflows/async-fail-signal.ts b/packages/test/src/workflows/async-fail-signal.ts deleted file mode 100644 index 93e5055cc..000000000 --- a/packages/test/src/workflows/async-fail-signal.ts +++ /dev/null @@ -1,11 +0,0 @@ -import { setHandler, sleep, ApplicationFailure } from '@temporalio/workflow'; -import { failSignal } from './definitions'; - -export async function asyncFailSignalWorkflow(): Promise { - setHandler(failSignal, async () => { - await sleep(100); - throw ApplicationFailure.nonRetryable('Signal failed'); - }); - // Don't complete to allow Workflow to be interrupted by fail() signal - await sleep(100000); -} diff --git a/packages/test/src/workflows/cancel-http-request.ts b/packages/test/src/workflows/cancel-http-request.ts deleted file mode 100644 index 846206db4..000000000 --- a/packages/test/src/workflows/cancel-http-request.ts +++ /dev/null @@ -1,33 +0,0 @@ -import { - ActivityCancellationType, - proxyActivities, - CancellationScope, - isCancellation, - setHandler, - condition, -} from '@temporalio/workflow'; -import type * as activities from '../activities'; -import { activityStartedSignal } from './definitions'; - -const { cancellableFetch } = proxyActivities({ - startToCloseTimeout: '20s', - heartbeatTimeout: '3s', - cancellationType: ActivityCancellationType.WAIT_CANCELLATION_COMPLETED, -}); - -export async function cancellableHTTPRequest(url: string): Promise { - let activityStarted = false; - setHandler(activityStartedSignal, () => void (activityStarted = true)); - try { - await CancellationScope.cancellable(async () => { - const promise = cancellableFetch(url, true); - await condition(() => activityStarted); - CancellationScope.current().cancel(); - await promise; - }); - } catch (err) { - if (!isCancellation(err)) { - throw err; - } - } -} diff --git a/packages/test/src/workflows/cancel-timer-immediately.ts b/packages/test/src/workflows/cancel-timer-immediately.ts deleted file mode 100644 index 2bb103435..000000000 --- a/packages/test/src/workflows/cancel-timer-immediately.ts +++ /dev/null @@ -1,24 +0,0 @@ -/** - * Demonstrates the basics of cancellation scopes. - * Used in the documentation site. - */ -// @@@SNIPSTART typescript-cancel-a-timer-from-workflow -import { CancelledFailure, CancellationScope, sleep } from '@temporalio/workflow'; - -export async function cancelTimer(): Promise { - // Timers and Activities are automatically cancelled when their containing scope is cancelled. - try { - await CancellationScope.cancellable(async () => { - const promise = sleep(1); // <-- Will be cancelled because it is attached to this closure's scope - CancellationScope.current().cancel(); - await promise; // <-- Promise must be awaited in order for `cancellable` to throw - }); - } catch (e) { - if (e instanceof CancelledFailure) { - console.log('Timer cancelled πŸ‘'); - } else { - throw e; // <-- Fail the workflow - } - } -} -// @@@SNIPEND diff --git a/packages/test/src/workflows/cancel-timer-with-delay.ts b/packages/test/src/workflows/cancel-timer-with-delay.ts deleted file mode 100644 index 5115907d6..000000000 --- a/packages/test/src/workflows/cancel-timer-with-delay.ts +++ /dev/null @@ -1,16 +0,0 @@ -import { CancelledFailure, CancellationScope, sleep } from '@temporalio/workflow'; - -export async function cancelTimerWithDelay(): Promise { - const scope = new CancellationScope(); - const promise = scope.run(() => sleep(10000)); - await sleep(1).then(() => scope.cancel()); - try { - await promise; - } catch (e) { - if (e instanceof CancelledFailure) { - console.log('Timer cancelled πŸ‘'); - } else { - throw e; - } - } -} diff --git a/packages/test/src/workflows/child-workflow-failure.ts b/packages/test/src/workflows/child-workflow-failure.ts deleted file mode 100644 index 7aca05589..000000000 --- a/packages/test/src/workflows/child-workflow-failure.ts +++ /dev/null @@ -1,11 +0,0 @@ -/** - * Tests child workflow failure from the parent workflow perspective - * @module - */ - -import { executeChild } from '@temporalio/workflow'; -import { throwAsync } from './throw-async'; - -export async function childWorkflowFailure(): Promise { - await executeChild(throwAsync); -} diff --git a/packages/test/src/workflows/child-workflow-invoke.ts b/packages/test/src/workflows/child-workflow-invoke.ts deleted file mode 100644 index 57a61e589..000000000 --- a/packages/test/src/workflows/child-workflow-invoke.ts +++ /dev/null @@ -1,18 +0,0 @@ -/** - * Tests the happy path of starting and awaiting a child workflow - * @module - */ - -import { startChild, executeChild } from '@temporalio/workflow'; -import { successString } from './success-string'; - -export async function childWorkflowInvoke(): Promise<{ - workflowId: string; - runId: string; - execResult: string; - result: string; -}> { - const child = await startChild(successString, {}); - const execResult = await executeChild(successString, {}); - return { workflowId: child.workflowId, runId: child.firstExecutionRunId, result: await child.result(), execResult }; -} diff --git a/packages/test/src/workflows/child-workflow-start-fail.ts b/packages/test/src/workflows/child-workflow-start-fail.ts deleted file mode 100644 index d353fa645..000000000 --- a/packages/test/src/workflows/child-workflow-start-fail.ts +++ /dev/null @@ -1,28 +0,0 @@ -/** - * Tests child workflow start failures - * @module - */ - -import { startChild } from '@temporalio/workflow'; -import { WorkflowExecutionAlreadyStartedError } from '@temporalio/common'; -import { successString } from './success-string'; - -export async function childWorkflowStartFail(): Promise { - const child = await startChild(successString, { - workflowIdReusePolicy: 'REJECT_DUPLICATE', - }); - await child.result(); - - try { - await startChild(successString, { - taskQueue: 'test', - workflowId: child.workflowId, // duplicate - workflowIdReusePolicy: 'REJECT_DUPLICATE', - }); - throw new Error('Managed to start a Workflow with duplicate workflowId'); - } catch (err) { - if (!(err instanceof WorkflowExecutionAlreadyStartedError)) { - throw new Error(`Got invalid error: ${err}`); - } - } -} diff --git a/packages/test/src/workflows/child-workflow-termination.ts b/packages/test/src/workflows/child-workflow-termination.ts deleted file mode 100644 index 9718276a1..000000000 --- a/packages/test/src/workflows/child-workflow-termination.ts +++ /dev/null @@ -1,19 +0,0 @@ -/** - * Tests child workflow termination from the parent workflow perspective - * @module - */ - -import { WorkflowExecution } from '@temporalio/common'; -import { startChild, defineQuery, setHandler } from '@temporalio/workflow'; -import { unblockOrCancel } from './unblock-or-cancel'; - -export const childExecutionQuery = defineQuery('childExecution'); - -export async function childWorkflowTermination(): Promise { - let workflowExecution: WorkflowExecution | undefined = undefined; - setHandler(childExecutionQuery, () => workflowExecution); - - const child = await startChild(unblockOrCancel, {}); - workflowExecution = { workflowId: child.workflowId, runId: child.firstExecutionRunId }; - await child.result(); -} diff --git a/packages/test/src/workflows/child-workflow-timeout.ts b/packages/test/src/workflows/child-workflow-timeout.ts deleted file mode 100644 index 24accf529..000000000 --- a/packages/test/src/workflows/child-workflow-timeout.ts +++ /dev/null @@ -1,14 +0,0 @@ -/** - * Tests child workflow timeout from the parent workflow perspective - * @module - */ - -import { executeChild } from '@temporalio/workflow'; -import { unblockOrCancel } from './unblock-or-cancel'; - -export async function childWorkflowTimeout(): Promise { - await executeChild(unblockOrCancel, { - workflowExecutionTimeout: '10ms', - retry: { maximumAttempts: 1 }, - }); -} diff --git a/packages/test/src/workflows/deprecate-patch.ts b/packages/test/src/workflows/deprecate-patch.ts deleted file mode 100644 index 9ea46e797..000000000 --- a/packages/test/src/workflows/deprecate-patch.ts +++ /dev/null @@ -1,6 +0,0 @@ -import { deprecatePatch } from '@temporalio/workflow'; - -export async function deprecatePatchWorkflow(): Promise { - deprecatePatch('my-change-id'); - console.log('has change'); -} diff --git a/packages/test/src/workflows/fail-signal.ts b/packages/test/src/workflows/fail-signal.ts deleted file mode 100644 index 45f3e0920..000000000 --- a/packages/test/src/workflows/fail-signal.ts +++ /dev/null @@ -1,10 +0,0 @@ -import { setHandler, sleep, ApplicationFailure } from '@temporalio/workflow'; -import { failSignal } from './definitions'; - -export async function failSignalWorkflow(): Promise { - setHandler(failSignal, () => { - throw ApplicationFailure.nonRetryable('Signal failed'); - }); - // Don't complete to allow Workflow to be interrupted by fail() signal - await sleep(100000); -} diff --git a/packages/test/src/workflows/http.ts b/packages/test/src/workflows/http.ts deleted file mode 100644 index f88e6cede..000000000 --- a/packages/test/src/workflows/http.ts +++ /dev/null @@ -1,10 +0,0 @@ -import { proxyActivities } from '@temporalio/workflow'; -import type * as activities from '../activities'; - -const { httpGet } = proxyActivities({ - startToCloseTimeout: '1 minute', -}); - -export async function http(): Promise { - return await httpGet('https://temporal.io'); -} diff --git a/packages/test/src/workflows/index.ts b/packages/test/src/workflows/index.ts index 45c5c1ff5..33d69bc6e 100644 --- a/packages/test/src/workflows/index.ts +++ b/packages/test/src/workflows/index.ts @@ -1,3 +1,4 @@ +// TODO(thomas): use filenames to organize into workflows export * from './activity-failure'; export * from './activity-failures'; export * from './args-and-return'; diff --git a/packages/test/src/workflows/patched.ts b/packages/test/src/workflows/patched.ts deleted file mode 100644 index 9c7e9b242..000000000 --- a/packages/test/src/workflows/patched.ts +++ /dev/null @@ -1,15 +0,0 @@ -import { patched, sleep } from '@temporalio/workflow'; - -export async function patchedWorkflow(): Promise { - if (patched('my-change-id')) { - console.log('has change'); - } else { - console.log('no change'); - } - await sleep(100); - if (patched('my-change-id')) { - console.log('has change 2'); - } else { - console.log('no change 2'); - } -} diff --git a/packages/test/src/workflows/query-and-condition.ts b/packages/test/src/workflows/query-and-condition.ts deleted file mode 100644 index 50c0f6f43..000000000 --- a/packages/test/src/workflows/query-and-condition.ts +++ /dev/null @@ -1,10 +0,0 @@ -import * as wf from '@temporalio/workflow'; - -export const mutateWorkflowStateQuery = wf.defineQuery('mutateWorkflowState'); - -export async function queryAndCondition(): Promise { - let mutated = false; - // Not a valid query, used to verify that condition isn't triggered for query jobs - wf.setHandler(mutateWorkflowStateQuery, () => void (mutated = true)); - await wf.condition(() => mutated); -} diff --git a/packages/test/src/workflows/return-search-attributes.ts b/packages/test/src/workflows/return-search-attributes.ts deleted file mode 100644 index 431b768fa..000000000 --- a/packages/test/src/workflows/return-search-attributes.ts +++ /dev/null @@ -1,12 +0,0 @@ -import { SearchAttributes, workflowInfo } from '@temporalio/workflow'; - -export async function returnSearchAttributes(): Promise { - const sa = workflowInfo().searchAttributes!; // eslint-disable-line @typescript-eslint/no-non-null-assertion - const datetime = (sa.CustomDatetimeField as Array)[0]; - return { - ...sa, - datetimeType: [Object.getPrototypeOf(datetime).constructor.name], - datetimeInstanceofWorks: [datetime instanceof Date], - arrayInstanceofWorks: [sa.CustomIntField instanceof Array], - }; -} diff --git a/packages/test/src/workflows/return-workflow-info.ts b/packages/test/src/workflows/return-workflow-info.ts deleted file mode 100644 index a14039630..000000000 --- a/packages/test/src/workflows/return-workflow-info.ts +++ /dev/null @@ -1,5 +0,0 @@ -import { WorkflowInfo, workflowInfo } from '@temporalio/workflow'; - -export async function returnWorkflowInfo(): Promise { - return workflowInfo(); -} diff --git a/packages/test/src/workflows/sleep-invalid-duration.ts b/packages/test/src/workflows/sleep-invalid-duration.ts deleted file mode 100644 index f112521b0..000000000 --- a/packages/test/src/workflows/sleep-invalid-duration.ts +++ /dev/null @@ -1,6 +0,0 @@ -import { sleep } from '@temporalio/workflow'; - -export async function sleepInvalidDuration(): Promise { - await sleep(0); - await new Promise((resolve) => setTimeout(resolve, -1)); -} diff --git a/packages/test/src/workflows/success-string.ts b/packages/test/src/workflows/success-string.ts index 7e872c165..fd6fb2480 100644 --- a/packages/test/src/workflows/success-string.ts +++ b/packages/test/src/workflows/success-string.ts @@ -1,3 +1,4 @@ +// TODO(thomas): worth removing? export async function successString(): Promise { return 'success'; } diff --git a/packages/test/src/workflows/throw-big-int.ts b/packages/test/src/workflows/throw-big-int.ts deleted file mode 100644 index b26a2acc5..000000000 --- a/packages/test/src/workflows/throw-big-int.ts +++ /dev/null @@ -1,3 +0,0 @@ -export async function throwBigInt(): Promise { - throw 42n; -} diff --git a/packages/test/src/workflows/throw-object.ts b/packages/test/src/workflows/throw-object.ts deleted file mode 100644 index 95ec112fc..000000000 --- a/packages/test/src/workflows/throw-object.ts +++ /dev/null @@ -1,3 +0,0 @@ -export async function throwObject(): Promise { - throw { plainObject: true }; -} diff --git a/packages/test/src/workflows/url-whatwg.ts b/packages/test/src/workflows/url-whatwg.ts deleted file mode 100644 index b3d21b941..000000000 --- a/packages/test/src/workflows/url-whatwg.ts +++ /dev/null @@ -1,9 +0,0 @@ -// import URL but use directly from global URLSearchParams -import { URL } from 'url'; - -export async function urlEcho(url: string): Promise { - const parsedURL = new URL(url); - const searchParams = new URLSearchParams({ counter: '1' }); - parsedURL.search = searchParams.toString(); - return parsedURL.toString(); -}