From d6e27384da50d6db478ffe76b3eac2b58845a816 Mon Sep 17 00:00:00 2001 From: James Watkins-Harvey Date: Wed, 14 Aug 2024 13:19:37 -0400 Subject: [PATCH] fix(workflow): process all activation jobs as a single batch (#1488) --- docs/activation-in-debug-mode.mermaid | 19 +- docs/activation.mermaid | 16 +- ...update_after_workflow_returns_pre1488.json | 115 ++++ packages/test/src/helpers-integration.ts | 15 + packages/test/src/helpers.ts | 14 + packages/test/src/test-integration-update.ts | 202 +++++- packages/test/src/test-otel.ts | 11 +- packages/test/src/test-replay.ts | 16 +- packages/test/src/test-workflows.ts | 614 +++++++++++++++--- .../workflows/condition-completion-race.ts | 5 +- packages/test/src/workflows/index.ts | 2 + .../src/workflows/signal-update-ordering.ts | 18 + .../signals-timers-activities-order.ts | 131 ++++ packages/worker/src/utils.ts | 8 - packages/worker/src/worker.ts | 1 + packages/worker/src/workflow/reusable-vm.ts | 4 +- packages/worker/src/workflow/vm-shared.ts | 137 +++- packages/worker/src/workflow/vm.ts | 4 +- packages/workflow/src/flags.ts | 19 + packages/workflow/src/interfaces.ts | 1 + packages/workflow/src/internals.ts | 82 ++- packages/workflow/src/worker-interface.ts | 72 +- 22 files changed, 1245 insertions(+), 261 deletions(-) create mode 100644 packages/test/history_files/complete_update_after_workflow_returns_pre1488.json create mode 100644 packages/test/src/workflows/signal-update-ordering.ts create mode 100644 packages/test/src/workflows/signals-timers-activities-order.ts diff --git a/docs/activation-in-debug-mode.mermaid b/docs/activation-in-debug-mode.mermaid index fcbb0e024..dcaecb19b 100644 --- a/docs/activation-in-debug-mode.mermaid +++ b/docs/activation-in-debug-mode.mermaid @@ -12,18 +12,29 @@ sequenceDiagram end Core->>-MT: Respond with Activation MT->>MT: Decode Payloads - loop patches, signals, updates, completions, queries as jobs - MT->>VM: Activate(jobs) + MT->>+WT: Run Workflow Activation + + WT->>VM: Update Activator (now, WorkflowInfo, SDK flags, patches) + + alt "Single Batch mode" + WT->>VM: Activate(queries) VM->>VM: Run Microtasks - MT->>VM: Try Unblock Conditions + WT->>VM: Try Unblock Conditions + else Legacy "Multi Batches mode" + loop [signals, updates+completions] as jobs + WT->>VM: Activate(jobs) + VM->>VM: Run Microtasks + WT->>VM: Try Unblock Conditions + end end + MT->>VM: Collect Commands MT->>MT: Encode Payloads MT->>+VM: Collect Sink Calls VM-->>-MT: Respond with Sink Calls MT->>MT: Run Sink Functions MT->>Core: Complete Activation - opt Completed Workflow Task + opt Completed Workflow Task Core->>Server: Complete Workflow Task end diff --git a/docs/activation.mermaid b/docs/activation.mermaid index b55e1ae3d..04e8ce07a 100644 --- a/docs/activation.mermaid +++ b/docs/activation.mermaid @@ -14,11 +14,21 @@ sequenceDiagram Core->>-MT: Respond with Activation MT->>MT: Decode Payloads MT->>+WT: Run Workflow Activation - loop patches, signals, updates, completions, queries as jobs - WT->>VM: Activate(jobs) + + WT->>VM: Update Activator (now, WorkflowInfo, SDK flags, patches) + + alt "Single Batch mode" + WT->>VM: Activate(queries) VM->>VM: Run Microtasks WT->>VM: Try Unblock Conditions + else Legacy "Multi Batches mode" + loop [signals, updates+completions] as jobs + WT->>VM: Activate(jobs) + VM->>VM: Run Microtasks + WT->>VM: Try Unblock Conditions + end end + WT->>VM: Collect Commands WT-->>-MT: Respond to Activation MT->>MT: Encode Payloads @@ -28,6 +38,6 @@ sequenceDiagram WT-->>-MT: Respond with Sink Calls MT->>MT: Run Sink Functions MT->>Core: Complete Activation - opt Completed Workflow Task + opt Completed Workflow Task Core->>Server: Complete Workflow Task end diff --git a/packages/test/history_files/complete_update_after_workflow_returns_pre1488.json b/packages/test/history_files/complete_update_after_workflow_returns_pre1488.json new file mode 100644 index 000000000..bad9e73d4 --- /dev/null +++ b/packages/test/history_files/complete_update_after_workflow_returns_pre1488.json @@ -0,0 +1,115 @@ +{ + "events": [ + { + "eventId": "1", + "eventTime": "2024-08-14T03:50:59.998228Z", + "eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_STARTED", + "taskId": "1048642", + "workflowExecutionStartedEventAttributes": { + "workflowType": { + "name": "canCompleteUpdateAfterWorkflowReturns" + }, + "taskQueue": { + "name": "test", + "kind": "TASK_QUEUE_KIND_NORMAL" + }, + "workflowExecutionTimeout": "0s", + "workflowRunTimeout": "0s", + "workflowTaskTimeout": "10s", + "originalExecutionRunId": "b9d2c3ad-e03e-49e4-857e-1939d9d32f5e", + "identity": "temporal-cli:jwatkins@JamesMBTemporal", + "firstExecutionRunId": "b9d2c3ad-e03e-49e4-857e-1939d9d32f5e", + "attempt": 1, + "firstWorkflowTaskBackoff": "0s", + "header": {}, + "workflowId": "eb5f6727-7fb3-4f48-aba2-1bd7d46823a1" + } + }, + { + "eventId": "2", + "eventTime": "2024-08-14T03:50:59.998393Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_SCHEDULED", + "taskId": "1048643", + "workflowTaskScheduledEventAttributes": { + "taskQueue": { + "name": "test", + "kind": "TASK_QUEUE_KIND_NORMAL" + }, + "startToCloseTimeout": "10s", + "attempt": 1 + } + }, + { + "eventId": "3", + "eventTime": "2024-08-14T03:51:24.737259Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_STARTED", + "taskId": "1048648", + "workflowTaskStartedEventAttributes": { + "scheduledEventId": "2", + "identity": "13971@JamesMBTemporal", + "requestId": "f8a583b6-d423-45b7-a34d-b3c8e822d10f", + "historySizeBytes": "293", + "workerVersion": { + "buildId": "@temporalio/worker@1.10.1+8983e4c58e21c0f316606d45c034d286695e7f31b7693b88a8ca3c102fce506c" + } + } + }, + { + "eventId": "4", + "eventTime": "2024-08-14T03:51:24.779886Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_COMPLETED", + "taskId": "1048652", + "workflowTaskCompletedEventAttributes": { + "scheduledEventId": "2", + "startedEventId": "3", + "identity": "13971@JamesMBTemporal", + "workerVersion": { + "buildId": "@temporalio/worker@1.10.1+8983e4c58e21c0f316606d45c034d286695e7f31b7693b88a8ca3c102fce506c" + }, + "sdkMetadata": { + "coreUsedFlags": [2, 1] + }, + "meteringMetadata": {} + } + }, + { + "eventId": "5", + "eventTime": "2024-08-14T03:51:24.779952Z", + "eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_ACCEPTED", + "taskId": "1048653", + "workflowExecutionUpdateAcceptedEventAttributes": { + "protocolInstanceId": "fb28b772-4538-45a4-99f0-550fae0b7668", + "acceptedRequestMessageId": "fb28b772-4538-45a4-99f0-550fae0b7668/request", + "acceptedRequestSequencingEventId": "2", + "acceptedRequest": { + "meta": { + "updateId": "fb28b772-4538-45a4-99f0-550fae0b7668", + "identity": "temporal-cli:jwatkins@JamesMBTemporal" + }, + "input": { + "header": {}, + "name": "doneUpdate" + } + } + } + }, + { + "eventId": "6", + "eventTime": "2024-08-14T03:51:24.779982Z", + "eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED", + "taskId": "1048654", + "workflowExecutionCompletedEventAttributes": { + "result": { + "payloads": [ + { + "metadata": { + "encoding": "YmluYXJ5L251bGw=" + } + } + ] + }, + "workflowTaskCompletedEventId": "4" + } + } + ] +} diff --git a/packages/test/src/helpers-integration.ts b/packages/test/src/helpers-integration.ts index f627f6fa1..ce02d8525 100644 --- a/packages/test/src/helpers-integration.ts +++ b/packages/test/src/helpers-integration.ts @@ -16,6 +16,7 @@ import { DefaultLogger, LogEntry, LogLevel, + ReplayWorkerOptions, Runtime, WorkerOptions, WorkflowBundle, @@ -23,6 +24,7 @@ import { makeTelemetryFilterString, } from '@temporalio/worker'; import * as workflow from '@temporalio/workflow'; +import { temporal } from '@temporalio/proto'; import { ConnectionInjectorInterceptor } from './activities/interceptors'; import { Worker, @@ -105,6 +107,7 @@ export function makeTestFunction(opts: { export interface Helpers { taskQueue: string; createWorker(opts?: Partial): Promise; + runReplayHistory(opts: Partial, history: temporal.api.history.v1.IHistory): Promise; executeWorkflow Promise>(workflowType: T): Promise>; executeWorkflow( fn: T, @@ -137,6 +140,18 @@ export function helpers(t: ExecutionContext, testEnv: TestWorkflowEnvir ...opts, }); }, + async runReplayHistory( + opts: Partial, + history: temporal.api.history.v1.IHistory + ): Promise { + await Worker.runReplayHistory( + { + workflowBundle: t.context.workflowBundle, + ...opts, + }, + history + ); + }, async executeWorkflow( fn: workflow.Workflow, opts?: Omit diff --git a/packages/test/src/helpers.ts b/packages/test/src/helpers.ts index dd76d8032..3cd944150 100644 --- a/packages/test/src/helpers.ts +++ b/packages/test/src/helpers.ts @@ -1,3 +1,4 @@ +import * as fs from 'fs/promises'; import * as net from 'net'; import path from 'path'; import StackUtils from 'stack-utils'; @@ -128,6 +129,7 @@ export const bundlerOptions = { 'async-retry', 'uuid', 'net', + 'fs/promises', ], }; @@ -293,3 +295,15 @@ export function asSdkLoggerSink( }, }; } + +export async function getHistories(fname: string): Promise { + const isJson = fname.endsWith('json'); + const fpath = path.resolve(__dirname, `../history_files/${fname}`); + if (isJson) { + const hist = await fs.readFile(fpath, 'utf8'); + return JSON.parse(hist); + } else { + const hist = await fs.readFile(fpath); + return iface.temporal.api.history.v1.History.decode(hist); + } +} diff --git a/packages/test/src/test-integration-update.ts b/packages/test/src/test-integration-update.ts index 247da90fa..332469f4e 100644 --- a/packages/test/src/test-integration-update.ts +++ b/packages/test/src/test-integration-update.ts @@ -1,10 +1,14 @@ -import { WorkflowUpdateStage, WorkflowUpdateRPCTimeoutOrCancelledError } from '@temporalio/client'; +import { WorkflowUpdateStage, WorkflowUpdateRPCTimeoutOrCancelledError, WorkflowFailedError } from '@temporalio/client'; import * as wf from '@temporalio/workflow'; +import { temporal } from '@temporalio/proto'; import { helpers, makeTestFunction } from './helpers-integration'; +import { signalUpdateOrderingWorkflow } from './workflows/signal-update-ordering'; +import { signalsActivitiesTimersPromiseOrdering } from './workflows/signals-timers-activities-order'; +import { getHistories, waitUntil } from './helpers'; // Use a reduced server long-poll expiration timeout, in order to confirm that client // polling/retry strategies result in the expected behavior -const LONG_POLL_EXPIRATION_INTERVAL_SECONDS = 1.0; +const LONG_POLL_EXPIRATION_INTERVAL_SECONDS = 5.0; const test = makeTestFunction({ workflowsPath: __filename, @@ -25,7 +29,7 @@ export async function workflowWithUpdates(): Promise { const state: string[] = []; const updateHandler = async (arg: string): Promise => { if (arg === 'wait-for-longer-than-server-long-poll-timeout') { - await wf.sleep(500 + LONG_POLL_EXPIRATION_INTERVAL_SECONDS * 1000); + await wf.sleep(LONG_POLL_EXPIRATION_INTERVAL_SECONDS * 1500); } state.push(arg); return state; @@ -464,7 +468,7 @@ test('startUpdate does not return handle before update has reached requested sta }) .then(() => 'update'); const timeoutPromise = new Promise((f) => - setTimeout(() => f('timeout'), 500 + LONG_POLL_EXPIRATION_INTERVAL_SECONDS * 1000) + setTimeout(() => f('timeout'), LONG_POLL_EXPIRATION_INTERVAL_SECONDS * 1500) ); t.is( await Promise.race([updatePromise, timeoutPromise]), @@ -597,3 +601,193 @@ test('update result poll throws WorkflowUpdateRPCTimeoutOrCancelledError', async }); }); }); + +export { signalUpdateOrderingWorkflow }; + +// Validate that issue #1474 is fixed in 1.11.0+ +test("Pending promises can't unblock between signals and updates", async (t) => { + const { createWorker, startWorkflow, updateHasBeenAdmitted } = helpers(t); + + const handle = await startWorkflow(signalUpdateOrderingWorkflow); + const worker1 = await createWorker({ maxCachedWorkflows: 0 }); + await worker1.runUntil(async () => { + // Wait for the workflow to reach the first condition + await handle.executeUpdate('fooUpdate'); + }); + + const updateId = 'update-id'; + await handle.signal('fooSignal'); + const updateResult = handle.executeUpdate('fooUpdate', { updateId }); + await waitUntil(() => updateHasBeenAdmitted(handle, updateId), 5000); + + const worker2 = await createWorker(); + await worker2.runUntil(async () => { + t.is(await handle.result(), 3); + t.is(await updateResult, 3); + }); +}); + +export { signalsActivitiesTimersPromiseOrdering }; + +// A broader check covering issue #1474, but also other subtle ordering issues caused by the fact +// that signals used to be processed in a distinct phase from other types of jobs. +test('Signals/Updates/Activities/Timers have coherent promise completion ordering', async (t) => { + const { createWorker, startWorkflow, taskQueue, updateHasBeenAdmitted } = helpers(t); + + // We need signal+update+timer completion+activity completion to all happen in the same workflow task. + // To get there, as soon as the activity gets scheduled, we shutdown the workflow worker, then send + // the signal and update while there is no worker alive. When it eventually comes back up, all events + // will be queued up for the next WFT. + + const worker1 = await createWorker({ maxCachedWorkflows: 0 }); + const worker1Promise = worker1.run(); + const killWorker1 = async () => { + try { + worker1.shutdown(); + } catch { + // We may attempt to shutdown the worker multiple times. Ignore errors. + } + await worker1Promise; + }; + + try { + const activityWorker = await createWorker({ + taskQueue: `${taskQueue}-activity`, + activities: { myActivity: killWorker1 }, + workflowBundle: undefined, + workflowsPath: undefined, + }); + await activityWorker.runUntil(async () => { + const handle = await startWorkflow(signalsActivitiesTimersPromiseOrdering); + + // The workflow will schedule the activity, which will shutdown the worker. + // Then this promise will resolves. + await worker1Promise; + + await handle.signal('aaSignal'); + const updateId = 'update-id'; + const updatePromise = handle.executeUpdate('aaUpdate', { updateId }); + + // Timing is important here. Make sure that everything is ready before creating the new worker. + await waitUntil(async () => { + const updateAdmitted = await updateHasBeenAdmitted(handle, updateId); + if (!updateAdmitted) return false; + + const { events } = await handle.fetchHistory(); + return ( + events != null && + events.some((e) => e.eventType === temporal.api.enums.v1.EventType.EVENT_TYPE_ACTIVITY_TASK_COMPLETED) && + events.some((e) => e.eventType === temporal.api.enums.v1.EventType.EVENT_TYPE_TIMER_FIRED) && + events.some((e) => e.eventType === temporal.api.enums.v1.EventType.EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED) + ); + }, 5000); + + await ( + await createWorker({}) + ).runUntil(async () => { + t.deepEqual(await handle.result(), [true, true, true, true]); + }); + + await updatePromise; + }); + } finally { + await killWorker1(); + } +}); + +export async function canCompleteUpdateAfterWorkflowReturns(fail: boolean = false): Promise { + let gotUpdate = false; + let mainReturned = false; + + wf.setHandler(wf.defineUpdate('doneUpdate'), async () => { + gotUpdate = true; + await wf.condition(() => mainReturned); + return 'completed'; + }); + + await wf.condition(() => gotUpdate); + mainReturned = true; + if (fail) throw wf.ApplicationFailure.nonRetryable('Intentional failure'); +} + +test('Can complete update after workflow returns', async (t) => { + const { createWorker, startWorkflow } = helpers(t); + + const worker = await createWorker(); + await worker.runUntil(async () => { + const handle = await startWorkflow(canCompleteUpdateAfterWorkflowReturns); + const updateHandler = await handle.executeUpdate(wf.defineUpdate('doneUpdate')); + await handle.result(); + + await t.is(updateHandler, 'completed'); + }); +}); + +test('Can complete update after Workflow fails', async (t) => { + const { createWorker, startWorkflow } = helpers(t); + + const worker = await createWorker(); + await worker.runUntil(async () => { + const handle = await startWorkflow(canCompleteUpdateAfterWorkflowReturns, { args: [true] }); + const updateHandler = await handle.executeUpdate(wf.defineUpdate('doneUpdate')); + await t.throwsAsync(handle.result(), { instanceOf: WorkflowFailedError }); + await t.is(updateHandler, 'completed'); + }); +}); + +/** + * The {@link canCompleteUpdateAfterWorkflowReturns} workflow above features an update handler that + * return safter the main workflow functions has returned. It will (assuming an update is sent in + * the first WFT) generate a raw command sequence (before sending to core) of: + * + * [UpdateAccepted, CompleteWorkflowExecution, UpdateCompleted]. + * + * Prior to https://github.com/temporalio/sdk-typescript/pull/1488, TS SDK ignored any command + * produced after a completion command, therefore truncating this command sequence to: + * + * [UpdateAccepted, CompleteWorkflowExecution]. + * + * Starting with #1488, TS SDK now performs no truncation, and Core reorders the sequence to: + * + * [UpdateAccepted, UpdateCompleted, CompleteWorkflowExecution]. + * + * This test takes a history generated using pre-#1488 SDK code, and replays it. That history + * contains the following events: + * + * 1 WorkflowExecutionStarted + * 2 WorkflowTaskScheduled + * 3 WorkflowTaskStarted + * 4 WorkflowTaskCompleted + * 5 WorkflowExecutionUpdateAccepted + * 6 WorkflowExecutionCompleted + * + * Note that the history lacks a `WorkflowExecutionUpdateCompleted` event. + * + * If Core's logic (which involves a flag) incorrectly allowed this history to be replayed using + * Core's post-#1488 implementation, then a non-determinism error would result. Specifically, Core + * would, at some point during replay, do the following: + * + * - Receive [UpdateAccepted, CompleteWorkflowExecution, UpdateCompleted] from lang; + * - Change that to `[UpdateAccepted, UpdateCompleted, CompleteWorkflowExecution]`; + * - Create an `UpdateMachine` instance (the `WorkflowTaskMachine` instance already exists). + * - Continue to consume history events. + * + * Event 5, `WorkflowExecutionUpdateAccepted`, would apply to the `UpdateMachine` associated with + * the `UpdateAccepted` command, but event 6, `WorkflowExecutionCompleted` would not, since Core is + * expecting an event that can be applied to the `UpdateMachine` corresponding to `UpdateCompleted`. + * If we modify Core to incorrectly apply its new logic then we do see that: + * + * [TMPRL1100] Nondeterminism error: Update machine does not handle this event: HistoryEvent(id: 6, WorkflowExecutionCompleted) + * + * The test passes because Core in fact (because the history lacks the flag) uses its old logic and + * changes the command sequence from `[UpdateAccepted, CompleteWorkflowExecution, UpdateCompleted]` + * to `[UpdateAccepted, CompleteWorkflowExecution]`, i.e. truncating commands emitted after the + * first completion command like TS SDK used to do, so that events 5 and 6 can be applied to the + * corresponding state machines. + */ +test('Can complete update after workflow returns - pre-1.11.0 compatibility', async (t) => { + const { runReplayHistory } = helpers(t); + const hist = await getHistories('complete_update_after_workflow_returns_pre1488.json'); + await runReplayHistory({}, hist); + t.pass(); +}); diff --git a/packages/test/src/test-otel.ts b/packages/test/src/test-otel.ts index 01b8c185c..f38b7013e 100644 --- a/packages/test/src/test-otel.ts +++ b/packages/test/src/test-otel.ts @@ -62,7 +62,12 @@ test.serial('Runtime.install() accepts metrics.otel.url without headers', async } }); -test.serial('Exporting OTEL metrics from Core works', async (t) => { +// FIXME: Core's OTLP exporter has become extremely noisy when exporting metrics but not getting a +// proper grpc response from the collector. I'm skipping this test until we can reimplement this using +// a more a more convincing mock collector. This has also coincided with one case where of CI integration +// tests hanged, though I don't know at this point etither that's related or just a coincidence. +// https://github.com/temporalio/sdk-typescript/issues/1495 +test.serial.skip('Exporting OTEL metrics from Core works', async (t) => { let resolveCapturedRequest = (_req: http2.Http2ServerRequest) => undefined as void; const capturedRequest = new Promise((r) => (resolveCapturedRequest = r)); await withHttp2Server(async (port: number) => { @@ -110,7 +115,9 @@ test.serial('Exporting OTEL metrics from Core works', async (t) => { }); if (RUN_INTEGRATION_TESTS) { - test.serial('Otel interceptor spans are connected and complete', async (t) => { + // FIXME: See comment above. + // https://github.com/temporalio/sdk-typescript/issues/1495 + test.serial.skip('Otel interceptor spans are connected and complete', async (t) => { const spans = Array(); const staticResource = new opentelemetry.resources.Resource({ diff --git a/packages/test/src/test-replay.ts b/packages/test/src/test-replay.ts index d14dcfa11..7ba9740af 100644 --- a/packages/test/src/test-replay.ts +++ b/packages/test/src/test-replay.ts @@ -1,12 +1,10 @@ /* eslint-disable @typescript-eslint/no-non-null-assertion */ -import * as fs from 'node:fs'; -import path from 'node:path'; import anyTest, { TestFn } from 'ava'; import { temporal } from '@temporalio/proto'; import { bundleWorkflowCode, ReplayError, WorkflowBundle } from '@temporalio/worker'; import { DeterminismViolationError } from '@temporalio/workflow'; -import { Worker } from './helpers'; +import { getHistories, Worker } from './helpers'; async function gen2array(gen: AsyncIterable): Promise { const out: T[] = []; @@ -20,18 +18,6 @@ export interface Context { bundle: WorkflowBundle; } -async function getHistories(fname: string): Promise { - const isJson = fname.endsWith('json'); - const fpath = path.resolve(__dirname, `../history_files/${fname}`); - if (isJson) { - const hist = await fs.promises.readFile(fpath, 'utf8'); - return JSON.parse(hist); - } else { - const hist = await fs.promises.readFile(fpath); - return temporal.api.history.v1.History.decode(hist); - } -} - function historator(histories: Array) { return (async function* () { for (const history of histories) { diff --git a/packages/test/src/test-workflows.ts b/packages/test/src/test-workflows.ts index 5c90840c2..f989eee78 100644 --- a/packages/test/src/test-workflows.ts +++ b/packages/test/src/test-workflows.ts @@ -122,6 +122,7 @@ async function createWorkflow( randomnessSeed: Long.fromInt(1337).toBytes(), now: startTime, patches: [], + sdkFlags: [], showStackTraceSources: true, })) as VMWorkflow; return workflow; @@ -129,7 +130,24 @@ async function createWorkflow( async function activate(t: ExecutionContext, activation: coresdk.workflow_activation.IWorkflowActivation) { const { workflow, runId } = t.context; - const completion = await workflow.activate(activation); + + // Core guarantees the following jobs ordering: patches -> signals+update -> others + // Tests are likely to fail if we artifically pass an activation that does not follow that order + const jobs: coresdk.workflow_activation.IWorkflowActivationJob[] = activation.jobs ?? []; + function getPriority(job: coresdk.workflow_activation.IWorkflowActivationJob) { + if (job.notifyHasPatch) return 0; + if (job.signalWorkflow || job.doUpdate) return 1; + return 2; + } + jobs.reduce((prevPriority: number, currJob) => { + const currPriority = getPriority(currJob); + if (prevPriority > currPriority) { + throw new Error('Jobs are not correctly sorted'); + } + return currPriority; + }, 0); + + const completion = await workflow.activate(coresdk.workflow_activation.WorkflowActivation.fromObject(activation)); t.deepEqual(completion.runId, runId); return completion; } @@ -265,9 +283,13 @@ async function makeSignalWorkflow( args: any[], timestamp: number = Date.now() ): Promise { - return makeActivation(timestamp, { + return makeActivation(timestamp, makeSignalWorkflowJob(signalName, args)); +} + +function makeSignalWorkflowJob(signalName: string, args: any[]): coresdk.workflow_activation.IWorkflowActivationJob { + return { signalWorkflow: { signalName, input: toPayloads(defaultPayloadConverter, ...args) }, - }); + }; } function makeCompleteWorkflowExecution(result?: Payload): coresdk.workflow_commands.IWorkflowCommand { @@ -709,20 +731,23 @@ test('interruptableWorkflow', async (t) => { compareCompletion( t, req, - makeSuccess([ - makeFailWorkflowExecution( - 'just because', - // The stack trace is weird here and might confuse users, it might be a JS limitation - // since the Error stack trace is generated in the constructor. - dedent` + makeSuccess( + [ + makeFailWorkflowExecution( + 'just because', + // The stack trace is weird here and might confuse users, it might be a JS limitation + // since the Error stack trace is generated in the constructor. + dedent` ApplicationFailure: just because at Function.retryable (common/src/failure.ts) at test/src/workflows/interrupt-signal.ts `, - 'Error', - false - ), - ]) + 'Error', + false + ), + ], + [SdkFlags.ProcessWorkflowActivationJobsAsSingleBatch] + ) ); } }); @@ -738,17 +763,20 @@ test('failSignalWorkflow', async (t) => { compareCompletion( t, req, - makeSuccess([ - makeFailWorkflowExecution( - 'Signal failed', - dedent` + makeSuccess( + [ + makeFailWorkflowExecution( + 'Signal failed', + dedent` ApplicationFailure: Signal failed at Function.nonRetryable (common/src/failure.ts) at test/src/workflows/fail-signal.ts `, - 'Error' - ), - ]) + 'Error' + ), + ], + [SdkFlags.ProcessWorkflowActivationJobsAsSingleBatch] + ) ); } }); @@ -761,23 +789,33 @@ test('asyncFailSignalWorkflow', async (t) => { } { const req = await activate(t, await makeSignalWorkflow('fail', [])); - compareCompletion(t, req, makeSuccess([makeStartTimerCommand({ seq: 2, startToFireTimeout: msToTs(100) })])); + compareCompletion( + t, + req, + makeSuccess( + [makeStartTimerCommand({ seq: 2, startToFireTimeout: msToTs(100) })], + [SdkFlags.ProcessWorkflowActivationJobsAsSingleBatch] + ) + ); } { const req = cleanWorkflowFailureStackTrace(await activate(t, makeFireTimer(2))); compareCompletion( t, req, - makeSuccess([ - makeFailWorkflowExecution( - 'Signal failed', - dedent` + makeSuccess( + [ + makeFailWorkflowExecution( + 'Signal failed', + dedent` ApplicationFailure: Signal failed at Function.nonRetryable (common/src/failure.ts) at test/src/workflows/async-fail-signal.ts`, - 'Error' - ), - ]) + 'Error' + ), + ], + [SdkFlags.ProcessWorkflowActivationJobsAsSingleBatch] + ) ); } }); @@ -888,19 +926,22 @@ test('unblock - unblockOrCancel', async (t) => { } { const completion = await activate(t, await makeSignalWorkflow('unblock', [])); - compareCompletion(t, completion, makeSuccess()); + compareCompletion(t, completion, makeSuccess(undefined, [SdkFlags.ProcessWorkflowActivationJobsAsSingleBatch])); } { const completion = await activate(t, makeQueryWorkflow('2', 'isBlocked', [])); compareCompletion( t, completion, - makeSuccess([ - makeRespondToQueryCommand({ - queryId: '2', - succeeded: { response: defaultPayloadConverter.toPayload(false) }, - }), - ]) + makeSuccess( + [ + makeRespondToQueryCommand({ + queryId: '2', + succeeded: { response: defaultPayloadConverter.toPayload(false) }, + }), + ], + [SdkFlags.ProcessWorkflowActivationJobsAsSingleBatch] + ) ); } }); @@ -1741,7 +1782,7 @@ test('replay-with-marker patchedWorkflow', async (t) => { runId: 'test-runId', timestamp: msToTs(Date.now()), isReplaying: true, - jobs: [makeStartWorkflowJob(workflowType), makeNotifyHasPatchJob('my-change-id')], + jobs: [makeNotifyHasPatchJob('my-change-id'), makeStartWorkflowJob(workflowType)], }; const completion = await activate(t, act); compareCompletion( @@ -1805,11 +1846,15 @@ test('failUnlessSignaledBeforeStart', async (t) => { const { workflowType } = t.context; const completion = await activate( t, - makeActivation(undefined, makeStartWorkflowJob(workflowType), { - signalWorkflow: { signalName: 'someShallPass' }, - }) + makeActivation( + undefined, + { + signalWorkflow: { signalName: 'someShallPass' }, + }, + makeStartWorkflowJob(workflowType) + ) ); - compareCompletion(t, completion, makeSuccess()); + compareCompletion(t, completion, makeSuccess(undefined, [SdkFlags.ProcessWorkflowActivationJobsAsSingleBatch])); }); test('conditionWaiter', async (t) => { @@ -1853,7 +1898,14 @@ test('conditionRacer', async (t) => { makeFireTimerJob(1) ) ); - compareCompletion(t, completion, makeSuccess([{ cancelTimer: { seq: 1 } }])); + compareCompletion( + t, + completion, + makeSuccess( + [makeCompleteWorkflowExecution(defaultPayloadConverter.toPayload(true))], + [SdkFlags.ProcessWorkflowActivationJobsAsSingleBatch] + ) + ); } }); @@ -1883,14 +1935,17 @@ test('signalHandlersCanBeCleared', async (t) => { } ) ); - compareCompletion(t, completion, makeSuccess([])); + compareCompletion(t, completion, makeSuccess([], [SdkFlags.ProcessWorkflowActivationJobsAsSingleBatch])); } { const completion = await activate(t, makeFireTimer(1)); compareCompletion( t, completion, - makeSuccess([makeStartTimerCommand({ seq: 2, startToFireTimeout: msToTs('1ms') })]) + makeSuccess( + [makeStartTimerCommand({ seq: 2, startToFireTimeout: msToTs('1ms') })], + [SdkFlags.ProcessWorkflowActivationJobsAsSingleBatch] + ) ); } { @@ -1898,7 +1953,10 @@ test('signalHandlersCanBeCleared', async (t) => { compareCompletion( t, completion, - makeSuccess([makeStartTimerCommand({ seq: 3, startToFireTimeout: msToTs('1ms') })]) + makeSuccess( + [makeStartTimerCommand({ seq: 3, startToFireTimeout: msToTs('1ms') })], + [SdkFlags.ProcessWorkflowActivationJobsAsSingleBatch] + ) ); } { @@ -1906,7 +1964,10 @@ test('signalHandlersCanBeCleared', async (t) => { compareCompletion( t, completion, - makeSuccess([makeCompleteWorkflowExecution(defaultPayloadConverter.toPayload(111))]) + makeSuccess( + [makeCompleteWorkflowExecution(defaultPayloadConverter.toPayload(111))], + [SdkFlags.ProcessWorkflowActivationJobsAsSingleBatch] + ) ); } }); @@ -1923,7 +1984,7 @@ test('waitOnUser', async (t) => { } { const completion = await activate(t, await makeSignalWorkflow('completeUserInteraction', [])); - compareCompletion(t, completion, makeSuccess()); + compareCompletion(t, completion, makeSuccess(undefined, [SdkFlags.ProcessWorkflowActivationJobsAsSingleBatch])); } }); @@ -1955,15 +2016,19 @@ test('scopeCancelledWhileWaitingOnExternalWorkflowCancellation', async (t) => { test('query not found - successString', async (t) => { const { workflowType } = t.context; { - const completion = await activate( + const completion = await activate(t, makeActivation(undefined, makeStartWorkflowJob(workflowType))); + compareCompletion( t, - makeActivation(undefined, makeStartWorkflowJob(workflowType), makeQueryWorkflowJob('qid', 'not-found')) + completion, + makeSuccess([makeCompleteWorkflowExecution(defaultPayloadConverter.toPayload('success'))]) ); + } + { + const completion = await activate(t, makeActivation(undefined, makeQueryWorkflowJob('qid', 'not-found'))); compareCompletion( t, completion, makeSuccess([ - makeCompleteWorkflowExecution(defaultPayloadConverter.toPayload('success')), makeRespondToQueryCommand({ queryId: 'qid', failed: { @@ -1990,14 +2055,14 @@ test('Buffered signals are dispatched to correct handler and in correct order - t, makeActivation( undefined, - makeStartWorkflowJob(workflowType), { signalWorkflow: { signalName: 'non-existant', input: toPayloads(defaultPayloadConverter, 1) } }, { signalWorkflow: { signalName: 'signalA', input: toPayloads(defaultPayloadConverter, 2) } }, { signalWorkflow: { signalName: 'signalA', input: toPayloads(defaultPayloadConverter, 3) } }, { signalWorkflow: { signalName: 'signalC', input: toPayloads(defaultPayloadConverter, 4) } }, { signalWorkflow: { signalName: 'signalB', input: toPayloads(defaultPayloadConverter, 5) } }, { signalWorkflow: { signalName: 'non-existant', input: toPayloads(defaultPayloadConverter, 6) } }, - { signalWorkflow: { signalName: 'signalB', input: toPayloads(defaultPayloadConverter, 7) } } + { signalWorkflow: { signalName: 'signalB', input: toPayloads(defaultPayloadConverter, 7) } }, + makeStartWorkflowJob(workflowType) ) ); @@ -2012,19 +2077,22 @@ test('Buffered signals are dispatched to correct handler and in correct order - compareCompletion( t, completion, - makeSuccess([ - makeCompleteWorkflowExecution( - defaultPayloadConverter.toPayload([ - { handler: 'signalA', args: [2] }, - { handler: 'signalB', args: [5] }, - { handler: 'signalB', args: [7] }, - { handler: 'default', signalName: 'non-existant', args: [1] }, - { handler: 'default', signalName: 'signalA', args: [3] }, - { handler: 'default', signalName: 'signalC', args: [4] }, - { handler: 'default', signalName: 'non-existant', args: [6] }, - ] as ProcessedSignal[]) - ), - ]) + makeSuccess( + [ + makeCompleteWorkflowExecution( + defaultPayloadConverter.toPayload([ + { handler: 'signalA', args: [2] }, + { handler: 'signalB', args: [5] }, + { handler: 'signalB', args: [7] }, + { handler: 'default', signalName: 'non-existant', args: [1] }, + { handler: 'default', signalName: 'signalA', args: [3] }, + { handler: 'default', signalName: 'signalC', args: [4] }, + { handler: 'default', signalName: 'non-existant', args: [6] }, + ] as ProcessedSignal[]) + ), + ], + [SdkFlags.ProcessWorkflowActivationJobsAsSingleBatch] + ) ); } }); @@ -2036,32 +2104,428 @@ test('Buffered signals dispatch is reentrant - signalsOrdering2', async (t) => t, makeActivation( undefined, - makeStartWorkflowJob(workflowType), { signalWorkflow: { signalName: 'non-existant', input: toPayloads(defaultPayloadConverter, 1) } }, { signalWorkflow: { signalName: 'signalA', input: toPayloads(defaultPayloadConverter, 2) } }, { signalWorkflow: { signalName: 'signalA', input: toPayloads(defaultPayloadConverter, 3) } }, { signalWorkflow: { signalName: 'signalB', input: toPayloads(defaultPayloadConverter, 4) } }, { signalWorkflow: { signalName: 'signalB', input: toPayloads(defaultPayloadConverter, 5) } }, { signalWorkflow: { signalName: 'signalC', input: toPayloads(defaultPayloadConverter, 6) } }, - { signalWorkflow: { signalName: 'signalC', input: toPayloads(defaultPayloadConverter, 7) } } + { signalWorkflow: { signalName: 'signalC', input: toPayloads(defaultPayloadConverter, 7) } }, + makeStartWorkflowJob(workflowType) + ) + ); + compareCompletion( + t, + completion, + makeSuccess( + [ + makeCompleteWorkflowExecution( + defaultPayloadConverter.toPayload([ + { handler: 'signalA', args: [2] }, + { handler: 'signalB', args: [4] }, + { handler: 'signalC', args: [6] }, + { handler: 'default', signalName: 'non-existant', args: [1] }, + { handler: 'signalA', args: [3] }, + { handler: 'signalB', args: [5] }, + { handler: 'signalC', args: [7] }, + ] as ProcessedSignal[]) + ), + ], + [SdkFlags.ProcessWorkflowActivationJobsAsSingleBatch] ) ); + } +}); + +// Validate that issue #1474 is fixed in 1.11.0+ +test("Pending promises can't unblock between signals and updates - 1.11.0+ - signalUpdateOrderingWorkflow", async (t) => { + const { workflowType } = t.context; + { + const completion = await activate(t, { + ...makeActivation( + undefined, + { + doUpdate: { name: 'fooUpdate', protocolInstanceId: '1', runValidator: false, id: 'first' }, + }, + makeStartWorkflowJob(workflowType) + ), + isReplaying: false, + }); compareCompletion( t, completion, makeSuccess([ + { updateResponse: { protocolInstanceId: '1', accepted: {} } }, + { updateResponse: { protocolInstanceId: '1', completed: defaultPayloadConverter.toPayload(1) } }, + ]) + ); + } + + { + const completion = await activate(t, { + ...makeActivation( + undefined, + { signalWorkflow: { signalName: 'fooSignal', input: [] } }, + { doUpdate: { name: 'fooUpdate', protocolInstanceId: '2', id: 'second' } } + ), + isReplaying: false, + }); + compareCompletion( + t, + completion, + makeSuccess( + [ + { updateResponse: { protocolInstanceId: '2', accepted: {} } }, + { updateResponse: { protocolInstanceId: '2', completed: defaultPayloadConverter.toPayload(3) } }, + { completeWorkflowExecution: { result: defaultPayloadConverter.toPayload(3) } }, + ], + [SdkFlags.ProcessWorkflowActivationJobsAsSingleBatch] + ) + ); + } +}); + +// Validate that issue #1474 legacy behavior is maintained when replaying from pre-1.11.0 history +test("Pending promises can't unblock between signals and updates - pre-1.11.0 - signalUpdateOrderingWorkflow", async (t) => { + const { workflowType } = t.context; + { + const completion = await activate(t, { + ...makeActivation( + undefined, + { + doUpdate: { name: 'fooUpdate', protocolInstanceId: '1', runValidator: false, id: 'first' }, + }, + makeStartWorkflowJob(workflowType) + ), + isReplaying: true, + }); + compareCompletion( + t, + completion, + makeSuccess([ + { updateResponse: { protocolInstanceId: '1', accepted: {} } }, + { updateResponse: { protocolInstanceId: '1', completed: defaultPayloadConverter.toPayload(1) } }, + ]) + ); + } + + { + const completion = await activate(t, { + ...makeActivation( + undefined, + { signalWorkflow: { signalName: 'fooSignal', input: [] } }, + { doUpdate: { name: 'fooUpdate', protocolInstanceId: '2', id: 'second' } } + ), + isReplaying: true, + }); + compareCompletion( + t, + completion, + makeSuccess([ + { completeWorkflowExecution: { result: defaultPayloadConverter.toPayload(2) } }, + { updateResponse: { protocolInstanceId: '2', accepted: {} } }, + { updateResponse: { protocolInstanceId: '2', completed: defaultPayloadConverter.toPayload(3) } }, + ]) + ); + } +}); + +test('Signals/Updates/Activities/Timers have coherent promise completion ordering (no signal) - pre-1.11.0 compatibility - signalsActivitiesTimersPromiseOrdering', async (t) => { + const { workflowType } = t.context; + { + const completion = await activate(t, { + ...makeActivation(undefined, makeStartWorkflowJob(workflowType)), + isReplaying: true, + }); + compareCompletion( + t, + completion, + makeSuccess([ + makeStartTimerCommand({ seq: 1, startToFireTimeout: msToTs(100) }), + makeScheduleActivityCommand({ + seq: 1, + activityId: '1', + activityType: 'myActivity', + scheduleToCloseTimeout: msToTs('1s'), + taskQueue: 'test-activity', + doNotEagerlyExecute: false, + versioningIntent: coresdk.common.VersioningIntent.UNSPECIFIED, + }), + ]) + ); + } + { + const completion = await activate(t, { + ...makeActivation( + undefined, + { doUpdate: { id: 'first', name: 'aaUpdate', protocolInstanceId: '1' } }, + makeFireTimerJob(1), + makeResolveActivityJob(1, { completed: {} }) + ), + isReplaying: true, + }); + compareCompletion( + t, + completion, + makeSuccess([ + { updateResponse: { protocolInstanceId: '1', accepted: {} } }, + { updateResponse: { protocolInstanceId: '1', completed: defaultPayloadConverter.toPayload(undefined) } }, + makeCompleteWorkflowExecution(defaultPayloadConverter.toPayload([false, true, true, true])), + ]) + ); + } +}); + +test('Signals/Updates/Activities/Timers have coherent promise completion ordering (w/ signals) - pre-1.11.0 compatibility - signalsActivitiesTimersPromiseOrdering', async (t) => { + const { workflowType } = t.context; + { + const completion = await activate(t, { + ...makeActivation(undefined, makeStartWorkflowJob(workflowType)), + isReplaying: true, + }); + compareCompletion( + t, + completion, + makeSuccess([ + makeStartTimerCommand({ seq: 1, startToFireTimeout: msToTs(100) }), + makeScheduleActivityCommand({ + seq: 1, + activityId: '1', + activityType: 'myActivity', + scheduleToCloseTimeout: msToTs('1s'), + taskQueue: 'test-activity', + doNotEagerlyExecute: false, + versioningIntent: coresdk.common.VersioningIntent.UNSPECIFIED, + }), + ]) + ); + } + { + const completion = await activate(t, { + ...makeActivation( + undefined, + makeSignalWorkflowJob('aaSignal', []), + { doUpdate: { id: 'first', name: 'aaUpdate', protocolInstanceId: '1' } }, + makeFireTimerJob(1), + makeResolveActivityJob(1, { completed: {} }) + ), + isReplaying: true, + }); + compareCompletion( + t, + completion, + makeSuccess([ + // Note the missing update responses here; this is due to #1474. The fact that the activity + // and timer completions have not been observed before the workflow completed is a related but + // distinct issue. But are resolved by the ProcessWorkflowActivationJobsAsSingleBatch fix. + makeCompleteWorkflowExecution(defaultPayloadConverter.toPayload([true, false, false, false])), + { updateResponse: { protocolInstanceId: '1', accepted: {} } }, + { updateResponse: { protocolInstanceId: '1', completed: defaultPayloadConverter.toPayload(undefined) } }, + ]) + ); + } +}); + +test('Signals/Updates/Activities/Timers have coherent promise completion ordering (w/ signals) - signalsActivitiesTimersPromiseOrdering', async (t) => { + const { workflowType } = t.context; + { + const completion = await activate(t, { + ...makeActivation(undefined, makeStartWorkflowJob(workflowType)), + }); + compareCompletion( + t, + completion, + makeSuccess([ + makeStartTimerCommand({ seq: 1, startToFireTimeout: msToTs(100) }), + makeScheduleActivityCommand({ + seq: 1, + activityId: '1', + activityType: 'myActivity', + scheduleToCloseTimeout: msToTs('1s'), + taskQueue: 'test-activity', + doNotEagerlyExecute: false, + versioningIntent: coresdk.common.VersioningIntent.UNSPECIFIED, + }), + ]) + ); + } + { + const completion = await activate(t, { + ...makeActivation( + undefined, + makeSignalWorkflowJob('aaSignal', []), + { doUpdate: { id: 'first', name: 'aaUpdate', protocolInstanceId: '1' } }, + makeFireTimerJob(1), + makeResolveActivityJob(1, { completed: {} }) + ), + }); + compareCompletion( + t, + completion, + makeSuccess( + [ + { updateResponse: { protocolInstanceId: '1', accepted: {} } }, + { updateResponse: { protocolInstanceId: '1', completed: defaultPayloadConverter.toPayload(undefined) } }, + makeCompleteWorkflowExecution(defaultPayloadConverter.toPayload([true, true, true, true])), + ], + [SdkFlags.ProcessWorkflowActivationJobsAsSingleBatch] + ) + ); + } +}); + +test('Signals/Updates/Activities/Timers - Trace promises completion order - pre-1.11.0 compatibility - signalsActivitiesTimersPromiseOrderingTracer', async (t) => { + const { workflowType } = t.context; + { + const completion = await activate(t, { + ...makeActivation(undefined, makeStartWorkflowJob(workflowType)), + isReplaying: true, + }); + compareCompletion( + t, + completion, + makeSuccess([ + makeStartTimerCommand({ seq: 1, startToFireTimeout: msToTs(1) }), + makeScheduleActivityCommand({ + seq: 1, + activityId: '1', + activityType: 'myActivity', + scheduleToCloseTimeout: msToTs('1s'), + taskQueue: 'test', + doNotEagerlyExecute: false, + versioningIntent: coresdk.common.VersioningIntent.UNSPECIFIED, + }), + ]) + ); + } + { + const completion = await activate(t, { + ...makeActivation( + undefined, + makeSignalWorkflowJob('aaSignal', ['signal1']), + { + doUpdate: { + id: 'first', + name: 'aaUpdate', + protocolInstanceId: '1', + input: toPayloads(defaultPayloadConverter, ['update1']), + }, + }, + makeSignalWorkflowJob('aaSignal', ['signal2']), + { + doUpdate: { + id: 'second', + name: 'aaUpdate', + protocolInstanceId: '2', + input: toPayloads(defaultPayloadConverter, ['update2']), + }, + }, + makeFireTimerJob(1), + makeResolveActivityJob(1, { completed: {} }) + ), + isReplaying: true, + }); + compareCompletion( + t, + completion, + makeSuccess([ + { updateResponse: { protocolInstanceId: '1', accepted: {} } }, + { updateResponse: { protocolInstanceId: '2', accepted: {} } }, + { updateResponse: { protocolInstanceId: '1', completed: defaultPayloadConverter.toPayload(undefined) } }, + { updateResponse: { protocolInstanceId: '2', completed: defaultPayloadConverter.toPayload(undefined) } }, makeCompleteWorkflowExecution( - defaultPayloadConverter.toPayload([ - { handler: 'signalA', args: [2] }, - { handler: 'signalB', args: [4] }, - { handler: 'signalC', args: [6] }, - { handler: 'default', signalName: 'non-existant', args: [1] }, - { handler: 'signalA', args: [3] }, - { handler: 'signalB', args: [5] }, - { handler: 'signalC', args: [7] }, - ] as ProcessedSignal[]) + defaultPayloadConverter.toPayload( + [ + // Signals first (sync part, then microtasks) + 'signal1.sync, signal2.sync', + 'signal1.1, signal2.1, signal1.2, signal2.2, signal1.3, signal2.3, signal1.4, signal2.4', + + // Then update (sync part first), then microtasks for update+timers+activities + 'update1.sync, update2.sync', + 'update1.1, update2.1, timer.1, activity.1', + 'update1.2, update2.2, timer.2, activity.2', + 'update1.3, update2.3, timer.3, activity.3', + 'update1.4, update2.4, timer.4, activity.4', + ].flatMap((x) => x.split(', ')) + ) ), ]) ); } }); + +test('Signals/Updates/Activities/Timers - Trace promises completion order - 1.11.0+ - signalsActivitiesTimersPromiseOrderingTracer', async (t) => { + const { workflowType } = t.context; + { + const completion = await activate(t, { + ...makeActivation(undefined, makeStartWorkflowJob(workflowType)), + }); + compareCompletion( + t, + completion, + makeSuccess([ + makeStartTimerCommand({ seq: 1, startToFireTimeout: msToTs(1) }), + makeScheduleActivityCommand({ + seq: 1, + activityId: '1', + activityType: 'myActivity', + scheduleToCloseTimeout: msToTs('1s'), + taskQueue: 'test', + doNotEagerlyExecute: false, + versioningIntent: coresdk.common.VersioningIntent.UNSPECIFIED, + }), + ]) + ); + } + { + const completion = await activate(t, { + ...makeActivation( + undefined, + makeSignalWorkflowJob('aaSignal', ['signal1']), + { + doUpdate: { + id: 'first', + name: 'aaUpdate', + protocolInstanceId: '1', + input: toPayloads(defaultPayloadConverter, ['update1']), + }, + }, + makeSignalWorkflowJob('aaSignal', ['signal2']), + { + doUpdate: { + id: 'second', + name: 'aaUpdate', + protocolInstanceId: '2', + input: toPayloads(defaultPayloadConverter, ['update2']), + }, + }, + makeFireTimerJob(1), + makeResolveActivityJob(1, { completed: {} }) + ), + }); + compareCompletion( + t, + completion, + makeSuccess( + [ + { updateResponse: { protocolInstanceId: '1', accepted: {} } }, + { updateResponse: { protocolInstanceId: '2', accepted: {} } }, + { updateResponse: { protocolInstanceId: '1', completed: defaultPayloadConverter.toPayload(undefined) } }, + { updateResponse: { protocolInstanceId: '2', completed: defaultPayloadConverter.toPayload(undefined) } }, + makeCompleteWorkflowExecution( + defaultPayloadConverter.toPayload( + [ + 'signal1.sync, update1.sync, signal2.sync, update2.sync', + 'signal1.1, update1.1, signal2.1, update2.1, timer.1, activity.1', + 'signal1.2, update1.2, signal2.2, update2.2, timer.2, activity.2', + 'signal1.3, update1.3, signal2.3, update2.3, timer.3, activity.3', + 'signal1.4, update1.4, signal2.4, update2.4, timer.4, activity.4', + ].flatMap((x) => x.split(', ')) + ) + ), + ], + [SdkFlags.ProcessWorkflowActivationJobsAsSingleBatch] + ) + ); + } +}); diff --git a/packages/test/src/workflows/condition-completion-race.ts b/packages/test/src/workflows/condition-completion-race.ts index 6a60efebe..d3c4810e2 100644 --- a/packages/test/src/workflows/condition-completion-race.ts +++ b/packages/test/src/workflows/condition-completion-race.ts @@ -6,9 +6,8 @@ import { condition, setHandler } from '@temporalio/workflow'; import { unblockSignal } from './definitions'; -export async function conditionRacer(): Promise { +export async function conditionRacer(): Promise { let blocked = true; setHandler(unblockSignal, () => void (blocked = false)); - await condition(() => !blocked, '1s'); - await condition(() => blocked); + return await condition(() => !blocked, '1s'); } diff --git a/packages/test/src/workflows/index.ts b/packages/test/src/workflows/index.ts index b02728f8b..8e6da3d7f 100644 --- a/packages/test/src/workflows/index.ts +++ b/packages/test/src/workflows/index.ts @@ -74,6 +74,8 @@ export * from './signal-handlers-clear'; export * from './signal-target'; export * from './signals-are-always-processed'; export * from './signals-ordering'; +export * from './signal-update-ordering'; +export * from './signals-timers-activities-order'; export * from './sinks'; export * from './sleep'; export * from './sleep-invalid-duration'; diff --git a/packages/test/src/workflows/signal-update-ordering.ts b/packages/test/src/workflows/signal-update-ordering.ts new file mode 100644 index 000000000..75ed31d3f --- /dev/null +++ b/packages/test/src/workflows/signal-update-ordering.ts @@ -0,0 +1,18 @@ +import * as wf from '@temporalio/workflow'; + +export const fooSignal = wf.defineSignal('fooSignal'); +export const fooUpdate = wf.defineUpdate('fooUpdate'); + +// Repro for https://github.com/temporalio/sdk-typescript/issues/1474 +export async function signalUpdateOrderingWorkflow(): Promise { + let numFoos = 0; + wf.setHandler(fooSignal, () => { + numFoos++; + }); + wf.setHandler(fooUpdate, () => { + numFoos++; + return numFoos; + }); + await wf.condition(() => numFoos > 1); + return numFoos; +} diff --git a/packages/test/src/workflows/signals-timers-activities-order.ts b/packages/test/src/workflows/signals-timers-activities-order.ts new file mode 100644 index 000000000..e879448fa --- /dev/null +++ b/packages/test/src/workflows/signals-timers-activities-order.ts @@ -0,0 +1,131 @@ +import * as wf from '@temporalio/workflow'; + +const aaSignal = wf.defineSignal<[string?]>('aaSignal'); +const aaUpdate = wf.defineUpdate('aaUpdate'); + +/** + * This workflow demonstrates a subtle anomaly in the ordering of async operations in + * pre-ProcessWfActivationJobsAsSingleBatch, and confirms that ordering is now coherent after + * introduction of that flag. + * + * Assuming the workflow receives the activity completion and timer completion in the same workflow + * task, both `activityCompleted` and `timerCompleted` will be true once the workflow completes; this + * is a desirable behavior. However, before 1.11.0, if signal also came in during that same activation, + * then the workflow would complete with`activityCompleted` and `timerCompleted` both `false`, *even + * though they really did completed*. The fact that all conditions and microtasks resulting from + * incoming signals will be settled before any other type of event, while other events will all other + * type of events will execute interleaved with each other, is something non-obvious to users. + * + * This can be a problem in many practical use cases, e.g. notably with some form of Entity workflows + * (the workflow might CAN with a state that doesn't reflect the fact that some activity task has + * completed) or saga workflows (workflow may fail to properly undo the activity because if it didn't + * knew that it had completed). + * + * Note that there is no safe and easy way for users to resolve this issue by themselves in pre-1.11.0, + * and neither would calling `wf.allHandlersFinished()` help. + */ +export async function signalsActivitiesTimersPromiseOrdering(): Promise { + let activityCompleted = false; + let timerCompleted = false; + let gotSignal = false; + let gotUpdate = false; + + wf.setHandler(aaSignal, async () => { + gotSignal = true; + }); + + wf.setHandler(aaUpdate, async () => { + gotUpdate = true; + }); + + (async () => { + await wf.sleep(100); + timerCompleted = true; + })().catch((_err) => { + /* ignore */ + }); + + (async () => { + await wf.scheduleActivity('myActivity', [], { + scheduleToCloseTimeout: '1s', + taskQueue: `${wf.workflowInfo().taskQueue}-activity`, + }); + activityCompleted = true; + })().catch((_err) => { + /* ignore */ + }); + + await wf.condition(() => gotSignal || gotUpdate || timerCompleted || activityCompleted); + return [gotSignal, gotUpdate, timerCompleted, activityCompleted]; +} + +/** + * This workflow demonstrates the same anomaly as the `conditionsActivitiesTimersPromiseOrdering` + * workflow above, but provides a trace of the order of events, making it more suitable for debugging. + */ +export async function signalsActivitiesTimersPromiseOrderingTracer(): Promise { + const events: string[] = []; + + wf.setHandler(aaSignal, async (s?: string) => { + // Signal handlers, compared to activities and timers, get a first chance to run code + // synchronously; that plus the fact that they are always processed before any other + // type of event, means that these signal handlers will each get to push two entries to + // `events` before any other event gets to push their first event. This is a bit + // surprising, but totally correct given the current execution model. + events.push(`${s}.sync`); + await Promise.resolve(); + + events.push(`${s}.1`); + await Promise.resolve(); + events.push(`${s}.2`); + + await wf.condition(() => true); + events.push(`${s}.3`); + await Promise.resolve(); + events.push(`${s}.4`); + }); + + wf.setHandler(aaUpdate, async (s?: string) => { + // Similar as signal handlers; see comment above. + events.push(`${s}.sync`); + await Promise.resolve(); + + events.push(`${s}.1`); + await Promise.resolve(); + events.push(`${s}.2`); + + await wf.condition(() => true); + events.push(`${s}.3`); + await Promise.resolve(); + events.push(`${s}.4`); + }); + + const timer = (async () => { + await wf.sleep(1); + + events.push(`timer.1`); + await Promise.resolve(); + events.push(`timer.2`); + + await wf.condition(() => true); + events.push(`timer.3`); + await Promise.resolve(); + events.push(`timer.4`); + })(); + + const activity = (async () => { + await wf.scheduleActivity('myActivity', [], { scheduleToCloseTimeout: '1s' }); + + events.push(`activity.1`); + await Promise.resolve(); + events.push(`activity.2`); + + await wf.condition(() => true); + events.push(`activity.3`); + await Promise.resolve(); + events.push(`activity.4`); + })(); + + await Promise.all([timer, activity]); + return events; +} diff --git a/packages/worker/src/utils.ts b/packages/worker/src/utils.ts index 6454ffa30..62b8198e0 100644 --- a/packages/worker/src/utils.ts +++ b/packages/worker/src/utils.ts @@ -3,14 +3,6 @@ import { IllegalStateError, ParentWorkflowInfo } from '@temporalio/workflow'; export const MiB = 1024 ** 2; -// ts-prune-ignore-next (no idea why ts-prune is complaining all of a sudden) -export function partition(arr: T[], predicate: (x: T) => boolean): [T[], T[]] { - const truthy = Array(); - const falsy = Array(); - arr.forEach((v) => (predicate(v) ? truthy : falsy).push(v)); - return [truthy, falsy]; -} - export function toMB(bytes: number, fractionDigits = 2): string { return (bytes / 1024 / 1024).toFixed(fractionDigits); } diff --git a/packages/worker/src/worker.ts b/packages/worker/src/worker.ts index f3553d96a..6b314c73e 100644 --- a/packages/worker/src/worker.ts +++ b/packages/worker/src/worker.ts @@ -1262,6 +1262,7 @@ export class Worker { randomnessSeed: randomnessSeed.toBytes(), now: tsToMs(activation.timestamp), patches, + sdkFlags: activation.availableInternalFlags ?? [], showStackTraceSources: this.options.showStackTraceSources, }); diff --git a/packages/worker/src/workflow/reusable-vm.ts b/packages/worker/src/workflow/reusable-vm.ts index e36141cd1..286e280ad 100644 --- a/packages/worker/src/workflow/reusable-vm.ts +++ b/packages/worker/src/workflow/reusable-vm.ts @@ -151,7 +151,7 @@ export class ReusableVMWorkflowCreator implements WorkflowCreator { }); const activator = bag.__TEMPORAL_ACTIVATOR__ as any; - const newVM = new ReusableVMWorkflow(options.info, context, activator, workflowModule, isolateExecutionTimeoutMs); + const newVM = new ReusableVMWorkflow(options.info.runId, context, activator, workflowModule); ReusableVMWorkflowCreator.workflowByRunId.set(options.info.runId, newVM); return newVM; } @@ -189,6 +189,6 @@ type WorkflowModule = typeof internals; */ export class ReusableVMWorkflow extends BaseVMWorkflow { public async dispose(): Promise { - ReusableVMWorkflowCreator.workflowByRunId.delete(this.info.runId); + ReusableVMWorkflowCreator.workflowByRunId.delete(this.runId); } } diff --git a/packages/worker/src/workflow/vm-shared.ts b/packages/worker/src/workflow/vm-shared.ts index ea3aa157e..6edecec1b 100644 --- a/packages/worker/src/workflow/vm-shared.ts +++ b/packages/worker/src/workflow/vm-shared.ts @@ -2,12 +2,13 @@ import v8 from 'node:v8'; import vm from 'node:vm'; import { SourceMapConsumer } from 'source-map'; import { cutoffStackTrace, IllegalStateError } from '@temporalio/common'; +import { tsToMs } from '@temporalio/common/lib/time'; import { coresdk } from '@temporalio/proto'; -import type { WorkflowInfo, StackTraceFileLocation } from '@temporalio/workflow'; +import type { StackTraceFileLocation } from '@temporalio/workflow'; import { type SinkCall } from '@temporalio/workflow/lib/sinks'; import * as internals from '@temporalio/workflow/lib/worker-interface'; import { Activator } from '@temporalio/workflow/lib/internals'; -import { partition } from '../utils'; +import { SdkFlags } from '@temporalio/workflow/lib/flags'; import { Workflow } from './interface'; import { WorkflowBundleWithSourceMapAndFilename } from './workflow-worker-thread/input'; @@ -270,78 +271,135 @@ export const globalHandlers = new GlobalHandlers(); export type WorkflowModule = typeof internals; /** - * A Workflow implementation using Node.js' built-in `vm` module + * A Workflow implementation using Node.js' built-in `vm` module. */ export abstract class BaseVMWorkflow implements Workflow { unhandledRejection: unknown; constructor( - public readonly info: WorkflowInfo, + readonly runId: string, protected context: vm.Context | undefined, protected activator: Activator, - readonly workflowModule: WorkflowModule, - public readonly isolateExecutionTimeoutMs: number + readonly workflowModule: WorkflowModule ) {} /** * Send request to the Workflow runtime's worker-interface */ async getAndResetSinkCalls(): Promise { - return this.workflowModule.getAndResetSinkCalls(); + return this.activator.getAndResetSinkCalls(); } /** * Send request to the Workflow runtime's worker-interface - * - * The Workflow is activated in batches to ensure correct order of activation - * job application. */ public async activate( activation: coresdk.workflow_activation.IWorkflowActivation ): Promise { - if (this.context === undefined) { - throw new IllegalStateError('Workflow isolate context uninitialized'); - } - if (!activation.jobs) { - throw new Error('Expected workflow activation jobs to be defined'); + if (this.context === undefined) throw new IllegalStateError('Workflow isolate context uninitialized'); + activation = coresdk.workflow_activation.WorkflowActivation.fromObject(activation); + if (!activation.jobs) throw new TypeError('Expected workflow activation jobs to be defined'); + + // Queries are particular in many ways, and Core guarantees that a single activation will not + // contain both queries and other jobs. So let's handle them separately. + const [queries, nonQueries] = partition(activation.jobs, ({ queryWorkflow }) => queryWorkflow != null); + if (queries.length > 0) { + if (nonQueries.length > 0) throw new TypeError('Got both queries and other jobs in a single activation'); + return this.activateQueries(activation); } - // Job processing order - // 1. patch notifications - // 2. signals - // 3. anything left except for queries - // 4. queries + // Update the activator's state in preparation for a non-query activation. + // This is done early, so that we can then rely on the activator while processing the activation. + if (activation.timestamp == null) + throw new TypeError('Expected activation.timestamp to be set for non-query activation'); + this.activator.now = tsToMs(activation.timestamp); + this.activator.mutateWorkflowInfo((info) => ({ + ...info, + historyLength: activation.historyLength as number, + // Exact truncation for multi-petabyte histories + // historySize === 0 means WFT was generated by pre-1.20.0 server, and the history size is unknown + historySize: activation.historySizeBytes?.toNumber() ?? 0, + continueAsNewSuggested: activation.continueAsNewSuggested ?? false, + currentBuildId: activation.buildIdForCurrentTask ?? undefined, + unsafe: { + ...info.unsafe, + isReplaying: activation.isReplaying ?? false, + }, + })); + this.activator.addKnownFlags(activation.availableInternalFlags ?? []); + + const hasSignals = activation.jobs.some(({ signalWorkflow }) => signalWorkflow != null); + const doSingleBatch = !hasSignals || this.activator.hasFlag(SdkFlags.ProcessWorkflowActivationJobsAsSingleBatch); + const [patches, nonPatches] = partition(activation.jobs, ({ notifyHasPatch }) => notifyHasPatch != null); - const [signals, nonSignals] = partition(nonPatches, ({ signalWorkflow }) => signalWorkflow != null); - const [queries, rest] = partition(nonSignals, ({ queryWorkflow }) => queryWorkflow != null); - let batchIndex = 0; - - // Loop and invoke each batch and wait for microtasks to complete. - // This is done outside of the isolate because when we used isolated-vm we couldn't wait for microtasks from inside the isolate, not relevant anymore. - for (const jobs of [patches, signals, rest, queries]) { - if (jobs.length === 0) { - continue; - } + for (const { notifyHasPatch } of patches) { + if (notifyHasPatch == null) throw new TypeError('Expected notifyHasPatch to be set'); + this.activator.notifyHasPatch(notifyHasPatch); + } + + if (doSingleBatch) { + // updateRandomSeed require the same special handling as patches (before anything else, and don't + // unblock conditions after each job). Unfortunately, prior to ProcessWorkflowActivationJobsAsSingleBatch, + // they were handled as regular jobs, making it unsafe to properly handle that job above, with patches. + const [updateRandomSeed, rest] = partition(activation.jobs, ({ updateRandomSeed }) => updateRandomSeed != null); + if (updateRandomSeed.length > 0) + this.activator.updateRandomSeed(updateRandomSeed[updateRandomSeed.length - 1].updateRandomSeed!); + this.workflowModule.activate( - coresdk.workflow_activation.WorkflowActivation.fromObject({ ...activation, jobs }), - batchIndex++ + coresdk.workflow_activation.WorkflowActivation.fromObject({ ...activation, jobs: rest }), + 0 ); - if (internals.shouldUnblockConditions(jobs[0])) { - this.tryUnblockConditions(); + this.tryUnblockConditionsAndMicrotasks(); + } else { + const [signals, nonSignals] = partition( + nonPatches, + // Move signals to a first batch; all the rest goes in a second batch. + ({ signalWorkflow }) => signalWorkflow != null + ); + + // Loop and invoke each batch, waiting for microtasks to complete after each batch. + let batchIndex = 0; + for (const jobs of [signals, nonSignals]) { + if (jobs.length === 0) continue; + this.workflowModule.activate( + coresdk.workflow_activation.WorkflowActivation.fromObject({ ...activation, jobs }), + batchIndex++ + ); + this.tryUnblockConditionsAndMicrotasks(); } } + const completion = this.workflowModule.concludeActivation(); + // Give unhandledRejection handler a chance to be triggered. await new Promise(setImmediate); if (this.unhandledRejection) { return { runId: this.activator.info.runId, + // FIXME: Calling `activator.errorToFailure()` directly from outside the VM is unsafe, as it + // depends on the `failureConverter` and `payloadConverter`, which may be customized and + // therefore aren't guaranteed not to access `global` or to cause scheduling microtasks. + // Admitingly, the risk is very low, so we're leaving it as is for now. failed: { failure: this.activator.errorToFailure(this.unhandledRejection) }, }; } return completion; } + private activateQueries( + activation: coresdk.workflow_activation.IWorkflowActivation + ): coresdk.workflow_completion.IWorkflowActivationCompletion { + this.activator.mutateWorkflowInfo((info) => ({ + ...info, + unsafe: { + ...info.unsafe, + isReplaying: true, + }, + })); + this.workflowModule.activate(activation, 0); + return this.workflowModule.concludeActivation(); + } + /** * If called (by an external unhandledRejection handler), activations will fail with provided error. */ @@ -350,12 +408,12 @@ export abstract class BaseVMWorkflow implements Workflow { } /** - * Call into the Workflow context to attempt to unblock any blocked conditions. + * Call into the Workflow context to attempt to unblock any blocked conditions and microtasks. * * This is performed in a loop allowing microtasks to be processed between * each iteration until there are no more conditions to unblock. */ - protected tryUnblockConditions(): void { + protected tryUnblockConditionsAndMicrotasks(): void { for (;;) { const numUnblocked = this.workflowModule.tryUnblockConditions(); if (numUnblocked === 0) break; @@ -367,3 +425,10 @@ export abstract class BaseVMWorkflow implements Workflow { */ public abstract dispose(): Promise; } + +function partition(arr: T[], predicate: (x: T) => boolean): [T[], T[]] { + const truthy = Array(); + const falsy = Array(); + arr.forEach((v) => (predicate(v) ? truthy : falsy).push(v)); + return [truthy, falsy]; +} diff --git a/packages/worker/src/workflow/vm.ts b/packages/worker/src/workflow/vm.ts index a1a876c67..25e6c98f9 100644 --- a/packages/worker/src/workflow/vm.ts +++ b/packages/worker/src/workflow/vm.ts @@ -68,7 +68,7 @@ export class VMWorkflowCreator implements WorkflowCreator { }); const activator = context.__TEMPORAL_ACTIVATOR__ as any; - const newVM = new VMWorkflow(options.info, context, activator, workflowModule, isolateExecutionTimeoutMs); + const newVM = new VMWorkflow(options.info.runId, context, activator, workflowModule); VMWorkflowCreator.workflowByRunId.set(options.info.runId, newVM); return newVM; } @@ -124,7 +124,7 @@ export class VMWorkflowCreator implements WorkflowCreator { export class VMWorkflow extends BaseVMWorkflow { public async dispose(): Promise { this.workflowModule.dispose(); - VMWorkflowCreator.workflowByRunId.delete(this.info.runId); + VMWorkflowCreator.workflowByRunId.delete(this.runId); delete this.context; } } diff --git a/packages/workflow/src/flags.ts b/packages/workflow/src/flags.ts index 9c1d8ce17..73ce44ad8 100644 --- a/packages/workflow/src/flags.ts +++ b/packages/workflow/src/flags.ts @@ -17,6 +17,25 @@ export const SdkFlags = { * @since Introduced in 1.10.2/1.11.0. */ NonCancellableScopesAreShieldedFromPropagation: defineFlag(1, true), + + /** + * Prior to 1.11.0, when processing a Workflow activation, the SDK would execute `notifyHasPatch` + * and `signalWorkflow` jobs in distinct phases, before other types of jobs. The primary reason + * behind that multi-phase algorithm was to avoid the possibility that a Workflow execution might + * complete before all incoming signals have been dispatched (at least to the point that the + * _synchronous_ part of the handler function has been executed). + * + * This flag replaces that multi-phase algorithm with a simpler one where jobs are simply sorted as + * `(signals and updates) -> others`, but without processing them as distinct batches (i.e. without + * leaving/reentering the VM context between each group, which automatically triggers the execution + * of all outstanding microtasks). That single-phase approach resolves a number of quirks of the + * former algorithm, and yet still satisfies to the original requirement of ensuring that every + * `signalWorkflow` jobs - and now `doUpdate` jobs as well - have been given a proper chance to + * execute before the Workflow main function might completes. + * + * @since Introduced in 1.11.0. This change is not rollback-safe. + */ + ProcessWorkflowActivationJobsAsSingleBatch: defineFlag(2, true), } as const; function defineFlag(id: number, def: boolean): SdkFlag { diff --git a/packages/workflow/src/interfaces.ts b/packages/workflow/src/interfaces.ts index 10c30a13c..304a188bf 100644 --- a/packages/workflow/src/interfaces.ts +++ b/packages/workflow/src/interfaces.ts @@ -453,6 +453,7 @@ export interface WorkflowCreateOptions { randomnessSeed: number[]; now: number; patches: string[]; + sdkFlags: number[]; showStackTraceSources: boolean; } diff --git a/packages/workflow/src/internals.ts b/packages/workflow/src/internals.ts index 2dcb90774..a0d50182d 100644 --- a/packages/workflow/src/internals.ts +++ b/packages/workflow/src/internals.ts @@ -101,6 +101,19 @@ interface MessageHandlerExecution { * Keeps all of the Workflow runtime state like pending completions for activities and timers. * * Implements handlers for all workflow activation jobs. + * + * Note that most methods in this class are meant to be called only from within the VM. + * + * However, a few methods may be called directly from outside the VM (essentially from `vm-shared.ts`). + * These methods are specifically marked with a comment and require careful consideration, as the + * execution context may not properly reflect that of the target workflow execution (e.g.: with Reusable + * VMs, the `global` may not have been swapped to those of that workflow execution; the active microtask + * queue may be that of the thread/process, rather than the queue of that VM context; etc). Consequently, + * methods that are meant to be called from outside of the VM must not do any of the following: + * + * - Access any global variable; + * - Create Promise objects, use async/await, or otherwise schedule microtasks; + * - Call user-defined functions, including any form of interceptor. */ export class Activator implements ActivationHandler { /** @@ -129,15 +142,6 @@ export class Activator implements ActivationHandler { */ readonly bufferedSignals = Array(); - /** - * Holds buffered query calls until a handler is registered. - * - * **IMPORTANT** queries are only buffered until workflow is started. - * This is required because async interceptors might block workflow function invocation - * which delays query handler registration. - */ - protected readonly bufferedQueries = Array(); - /** * Mapping of update name to handler and validator */ @@ -292,13 +296,6 @@ export class Activator implements ActivationHandler { */ protected cancelled = false; - /** - * This is tracked to allow buffering queries until a workflow function is called. - * TODO(bergundy): I don't think this makes sense since queries run last in an activation and must be responded to in - * the same activation. - */ - protected workflowFunctionWasCalled = false; - /** * The next (incremental) sequence to assign when generating completable commands */ @@ -315,6 +312,7 @@ export class Activator implements ActivationHandler { /** * This is set every time the workflow executes an activation + * May be accessed and modified from outside the VM. */ now: number; @@ -325,6 +323,7 @@ export class Activator implements ActivationHandler { /** * Information about the current Workflow + * May be accessed from outside the VM. */ public info: WorkflowInfo; @@ -366,6 +365,7 @@ export class Activator implements ActivationHandler { showStackTraceSources, sourceMap, getTimeOfDay, + sdkFlags, randomnessSeed, patches, registeredActivityNames, @@ -378,13 +378,15 @@ export class Activator implements ActivationHandler { this.random = alea(randomnessSeed); this.registeredActivityNames = registeredActivityNames; - if (info.unsafe.isReplaying) { - for (const patchId of patches) { - this.notifyHasPatch({ patchId }); - } + this.addKnownFlags(sdkFlags); + for (const patchId of patches) { + this.notifyHasPatch({ patchId }); } } + /** + * May be invoked from outside the VM. + */ mutateWorkflowInfo(fn: (info: WorkflowInfo) => WorkflowInfo): void { this.info = fn(this.info); } @@ -411,6 +413,9 @@ export class Activator implements ActivationHandler { return [...stacks].map(([_, stack]) => stack); } + /** + * May be invoked from outside the VM. + */ getAndResetSinkCalls(): SinkCall[] { const { sinkCalls } = this; this.sinkCalls = []; @@ -423,8 +428,6 @@ export class Activator implements ActivationHandler { * Prevents commands from being added after Workflow completion. */ pushCommand(cmd: coresdk.workflow_commands.IWorkflowCommand, complete = false): void { - // Only query responses may be sent after completion - if (this.completed && !cmd.respondToQuery) return; this.commands.push(cmd); if (complete) { this.completed = true; @@ -443,19 +446,7 @@ export class Activator implements ActivationHandler { if (workflow === undefined) { throw new IllegalStateError('Workflow uninitialized'); } - let promise: Promise; - try { - promise = workflow(...args); - } finally { - // Queries must be handled even if there was an exception when invoking the Workflow function. - this.workflowFunctionWasCalled = true; - // Empty the buffer - const buffer = this.bufferedQueries.splice(0); - for (const activation of buffer) { - this.queryWorkflow(activation); - } - } - return await promise; + return await workflow(...args); } public startWorkflow(activation: coresdk.workflow_activation.IStartWorkflow): void { @@ -586,11 +577,6 @@ export class Activator implements ActivationHandler { } public queryWorkflow(activation: coresdk.workflow_activation.IQueryWorkflow): void { - if (!this.workflowFunctionWasCalled) { - this.bufferedQueries.push(activation); - return; - } - const { queryType, queryId, headers } = activation; if (!(queryType && queryId)) { throw new TypeError('Missing query activation attributes'); @@ -845,9 +831,9 @@ export class Activator implements ActivationHandler { } public notifyHasPatch(activation: coresdk.workflow_activation.INotifyHasPatch): void { - if (!activation.patchId) { - throw new TypeError('Notify has patch missing patch name'); - } + if (!this.info.unsafe.isReplaying) + throw new IllegalStateError('Unexpected notifyHasPatch job on non-replay activation'); + if (!activation.patchId) throw new TypeError('notifyHasPatch missing patch id'); this.knownPresentPatches.add(activation.patchId); } @@ -867,7 +853,10 @@ export class Activator implements ActivationHandler { return usePatch; } - // Called early while handling an activation to register known flags + /** + * Called early while handling an activation to register known flags. + * May be invoked from outside the VM. + */ public addKnownFlags(flags: number[]): void { for (const flag of flags) { assertValidFlag(flag); @@ -875,6 +864,11 @@ export class Activator implements ActivationHandler { } } + /** + * Check if a flag is known to the Workflow Execution; if not, enable the flag if workflow + * is not replaying and the flag is configured to be enabled by default. + * May be invoked from outside the VM. + */ public hasFlag(flag: SdkFlag): boolean { if (this.knownFlags.has(flag.id)) { return true; diff --git a/packages/workflow/src/worker-interface.ts b/packages/workflow/src/worker-interface.ts index 2bd6ee57f..ec89cd2af 100644 --- a/packages/workflow/src/worker-interface.ts +++ b/packages/workflow/src/worker-interface.ts @@ -4,7 +4,6 @@ * @module */ import { IllegalStateError } from '@temporalio/common'; -import { tsToMs } from '@temporalio/common/lib/time'; import { composeInterceptors } from '@temporalio/common/lib/interceptors'; import { coresdk } from '@temporalio/proto'; import { disableStorage } from './cancellation-scope'; @@ -13,7 +12,6 @@ import { WorkflowInterceptorsFactory } from './interceptors'; import { WorkflowCreateOptionsInternal } from './interfaces'; import { Activator } from './internals'; import { setActivatorUntyped, getActivator } from './global-attributes'; -import { type SinkCall } from './sinks'; // Export the type for use on the "worker" side export { PromiseStackStore } from './internals'; @@ -111,66 +109,26 @@ function fixPrototypes(obj: X): X { /** * Run a chunk of activation jobs - * @returns a boolean indicating whether job was processed or ignored */ -export function activate(activation: coresdk.workflow_activation.WorkflowActivation, batchIndex: number): void { +export function activate(activation: coresdk.workflow_activation.IWorkflowActivation, batchIndex: number): void { const activator = getActivator(); - const intercept = composeInterceptors(activator.interceptors.internals, 'activate', ({ activation, batchIndex }) => { - if (batchIndex === 0) { - if (!activation.jobs) { - throw new TypeError('Got activation with no jobs'); - } - if (activation.timestamp != null) { - // timestamp will not be updated for activation that contain only queries - activator.now = tsToMs(activation.timestamp); - } - activator.addKnownFlags(activation.availableInternalFlags ?? []); - - // The Rust Core ensures that these activation fields are not null - activator.mutateWorkflowInfo((info) => ({ - ...info, - historyLength: activation.historyLength as number, - // Exact truncation for multi-petabyte histories - // historySize === 0 means WFT was generated by pre-1.20.0 server, and the history size is unknown - historySize: activation.historySizeBytes?.toNumber() || 0, - continueAsNewSuggested: activation.continueAsNewSuggested ?? false, - currentBuildId: activation.buildIdForCurrentTask ?? undefined, - unsafe: { - ...info.unsafe, - isReplaying: activation.isReplaying ?? false, - }, - })); - } - + const intercept = composeInterceptors(activator.interceptors.internals, 'activate', ({ activation }) => { // Cast from the interface to the class which has the `variant` attribute. // This is safe because we know that activation is a proto class. const jobs = activation.jobs as coresdk.workflow_activation.WorkflowActivationJob[]; for (const job of jobs) { - if (job.variant === undefined) { - throw new TypeError('Expected job.variant to be defined'); - } + if (job.variant === undefined) throw new TypeError('Expected job.variant to be defined'); const variant = job[job.variant]; - if (!variant) { - throw new TypeError(`Expected job.${job.variant} to be set`); - } - // The only job that can be executed on a completed workflow is a query. - // We might get other jobs after completion for instance when a single - // activation contains multiple jobs and the first one completes the workflow. - if (activator.completed && job.variant !== 'queryWorkflow') { - return; - } + if (!variant) throw new TypeError(`Expected job.${job.variant} to be set`); + activator[job.variant](variant as any /* TS can't infer this type */); - if (shouldUnblockConditions(job)) { - tryUnblockConditions(); - } + + if (job.variant !== 'queryWorkflow') tryUnblockConditions(); } }); - intercept({ - activation, - batchIndex, - }); + intercept({ activation, batchIndex }); } /** @@ -183,7 +141,6 @@ export function concludeActivation(): coresdk.workflow_completion.IWorkflowActiv const activator = getActivator(); activator.rejectBufferedUpdates(); const intercept = composeInterceptors(activator.interceptors.internals, 'concludeActivation', (input) => input); - const { info } = activator; const activationCompletion = activator.concludeActivation(); const { commands } = intercept({ commands: activationCompletion.commands }); if (activator.completed) { @@ -191,15 +148,11 @@ export function concludeActivation(): coresdk.workflow_completion.IWorkflowActiv } return { - runId: info.runId, + runId: activator.info.runId, successful: { ...activationCompletion, commands }, }; } -export function getAndResetSinkCalls(): SinkCall[] { - return getActivator().getAndResetSinkCalls(); -} - /** * Loop through all blocked conditions, evaluate and unblock if possible. * @@ -224,13 +177,6 @@ export function tryUnblockConditions(): number { return numUnblocked; } -/** - * Predicate used to prevent triggering conditions for non-query and non-patch jobs. - */ -export function shouldUnblockConditions(job: coresdk.workflow_activation.IWorkflowActivationJob): boolean { - return !job.queryWorkflow && !job.notifyHasPatch; -} - export function dispose(): void { const dispose = composeInterceptors(getActivator().interceptors.internals, 'dispose', async () => { disableStorage();