Skip to content

Commit

Permalink
fix(workflow): process all activation jobs as a single batch (#1488)
Browse files Browse the repository at this point in the history
  • Loading branch information
mjameswh authored Aug 14, 2024
1 parent 56207d8 commit d6e2738
Show file tree
Hide file tree
Showing 22 changed files with 1,245 additions and 261 deletions.
19 changes: 15 additions & 4 deletions docs/activation-in-debug-mode.mermaid
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,29 @@ sequenceDiagram
end
Core->>-MT: Respond with Activation
MT->>MT: Decode Payloads
loop patches, signals, updates, completions, queries as jobs
MT->>VM: Activate(jobs)
MT->>+WT: Run Workflow Activation

WT->>VM: Update Activator (now, WorkflowInfo, SDK flags, patches)

alt "Single Batch mode"
WT->>VM: Activate(queries)
VM->>VM: Run Microtasks
MT->>VM: Try Unblock Conditions
WT->>VM: Try Unblock Conditions
else Legacy "Multi Batches mode"
loop [signals, updates+completions] as jobs
WT->>VM: Activate(jobs)
VM->>VM: Run Microtasks
WT->>VM: Try Unblock Conditions
end
end

MT->>VM: Collect Commands
MT->>MT: Encode Payloads
MT->>+VM: Collect Sink Calls
VM-->>-MT: Respond with Sink Calls
MT->>MT: Run Sink Functions
MT->>Core: Complete Activation
opt Completed Workflow Task
opt Completed Workflow Task
Core->>Server: Complete Workflow Task
end

16 changes: 13 additions & 3 deletions docs/activation.mermaid
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,21 @@ sequenceDiagram
Core->>-MT: Respond with Activation
MT->>MT: Decode Payloads
MT->>+WT: Run Workflow Activation
loop patches, signals, updates, completions, queries as jobs
WT->>VM: Activate(jobs)

WT->>VM: Update Activator (now, WorkflowInfo, SDK flags, patches)

alt "Single Batch mode"
WT->>VM: Activate(queries)
VM->>VM: Run Microtasks
WT->>VM: Try Unblock Conditions
else Legacy "Multi Batches mode"
loop [signals, updates+completions] as jobs
WT->>VM: Activate(jobs)
VM->>VM: Run Microtasks
WT->>VM: Try Unblock Conditions
end
end

WT->>VM: Collect Commands
WT-->>-MT: Respond to Activation
MT->>MT: Encode Payloads
Expand All @@ -28,6 +38,6 @@ sequenceDiagram
WT-->>-MT: Respond with Sink Calls
MT->>MT: Run Sink Functions
MT->>Core: Complete Activation
opt Completed Workflow Task
opt Completed Workflow Task
Core->>Server: Complete Workflow Task
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
{
"events": [
{
"eventId": "1",
"eventTime": "2024-08-14T03:50:59.998228Z",
"eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_STARTED",
"taskId": "1048642",
"workflowExecutionStartedEventAttributes": {
"workflowType": {
"name": "canCompleteUpdateAfterWorkflowReturns"
},
"taskQueue": {
"name": "test",
"kind": "TASK_QUEUE_KIND_NORMAL"
},
"workflowExecutionTimeout": "0s",
"workflowRunTimeout": "0s",
"workflowTaskTimeout": "10s",
"originalExecutionRunId": "b9d2c3ad-e03e-49e4-857e-1939d9d32f5e",
"identity": "temporal-cli:jwatkins@JamesMBTemporal",
"firstExecutionRunId": "b9d2c3ad-e03e-49e4-857e-1939d9d32f5e",
"attempt": 1,
"firstWorkflowTaskBackoff": "0s",
"header": {},
"workflowId": "eb5f6727-7fb3-4f48-aba2-1bd7d46823a1"
}
},
{
"eventId": "2",
"eventTime": "2024-08-14T03:50:59.998393Z",
"eventType": "EVENT_TYPE_WORKFLOW_TASK_SCHEDULED",
"taskId": "1048643",
"workflowTaskScheduledEventAttributes": {
"taskQueue": {
"name": "test",
"kind": "TASK_QUEUE_KIND_NORMAL"
},
"startToCloseTimeout": "10s",
"attempt": 1
}
},
{
"eventId": "3",
"eventTime": "2024-08-14T03:51:24.737259Z",
"eventType": "EVENT_TYPE_WORKFLOW_TASK_STARTED",
"taskId": "1048648",
"workflowTaskStartedEventAttributes": {
"scheduledEventId": "2",
"identity": "13971@JamesMBTemporal",
"requestId": "f8a583b6-d423-45b7-a34d-b3c8e822d10f",
"historySizeBytes": "293",
"workerVersion": {
"buildId": "@temporalio/[email protected]+8983e4c58e21c0f316606d45c034d286695e7f31b7693b88a8ca3c102fce506c"
}
}
},
{
"eventId": "4",
"eventTime": "2024-08-14T03:51:24.779886Z",
"eventType": "EVENT_TYPE_WORKFLOW_TASK_COMPLETED",
"taskId": "1048652",
"workflowTaskCompletedEventAttributes": {
"scheduledEventId": "2",
"startedEventId": "3",
"identity": "13971@JamesMBTemporal",
"workerVersion": {
"buildId": "@temporalio/[email protected]+8983e4c58e21c0f316606d45c034d286695e7f31b7693b88a8ca3c102fce506c"
},
"sdkMetadata": {
"coreUsedFlags": [2, 1]
},
"meteringMetadata": {}
}
},
{
"eventId": "5",
"eventTime": "2024-08-14T03:51:24.779952Z",
"eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_ACCEPTED",
"taskId": "1048653",
"workflowExecutionUpdateAcceptedEventAttributes": {
"protocolInstanceId": "fb28b772-4538-45a4-99f0-550fae0b7668",
"acceptedRequestMessageId": "fb28b772-4538-45a4-99f0-550fae0b7668/request",
"acceptedRequestSequencingEventId": "2",
"acceptedRequest": {
"meta": {
"updateId": "fb28b772-4538-45a4-99f0-550fae0b7668",
"identity": "temporal-cli:jwatkins@JamesMBTemporal"
},
"input": {
"header": {},
"name": "doneUpdate"
}
}
}
},
{
"eventId": "6",
"eventTime": "2024-08-14T03:51:24.779982Z",
"eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED",
"taskId": "1048654",
"workflowExecutionCompletedEventAttributes": {
"result": {
"payloads": [
{
"metadata": {
"encoding": "YmluYXJ5L251bGw="
}
}
]
},
"workflowTaskCompletedEventId": "4"
}
}
]
}
15 changes: 15 additions & 0 deletions packages/test/src/helpers-integration.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@ import {
DefaultLogger,
LogEntry,
LogLevel,
ReplayWorkerOptions,
Runtime,
WorkerOptions,
WorkflowBundle,
bundleWorkflowCode,
makeTelemetryFilterString,
} from '@temporalio/worker';
import * as workflow from '@temporalio/workflow';
import { temporal } from '@temporalio/proto';
import { ConnectionInjectorInterceptor } from './activities/interceptors';
import {
Worker,
Expand Down Expand Up @@ -105,6 +107,7 @@ export function makeTestFunction(opts: {
export interface Helpers {
taskQueue: string;
createWorker(opts?: Partial<WorkerOptions>): Promise<Worker>;
runReplayHistory(opts: Partial<ReplayWorkerOptions>, history: temporal.api.history.v1.IHistory): Promise<void>;
executeWorkflow<T extends () => Promise<any>>(workflowType: T): Promise<workflow.WorkflowResultType<T>>;
executeWorkflow<T extends workflow.Workflow>(
fn: T,
Expand Down Expand Up @@ -137,6 +140,18 @@ export function helpers(t: ExecutionContext<Context>, testEnv: TestWorkflowEnvir
...opts,
});
},
async runReplayHistory(
opts: Partial<ReplayWorkerOptions>,
history: temporal.api.history.v1.IHistory
): Promise<void> {
await Worker.runReplayHistory(
{
workflowBundle: t.context.workflowBundle,
...opts,
},
history
);
},
async executeWorkflow(
fn: workflow.Workflow,
opts?: Omit<WorkflowStartOptions, 'taskQueue' | 'workflowId'>
Expand Down
14 changes: 14 additions & 0 deletions packages/test/src/helpers.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import * as fs from 'fs/promises';
import * as net from 'net';
import path from 'path';
import StackUtils from 'stack-utils';
Expand Down Expand Up @@ -128,6 +129,7 @@ export const bundlerOptions = {
'async-retry',
'uuid',
'net',
'fs/promises',
],
};

Expand Down Expand Up @@ -293,3 +295,15 @@ export function asSdkLoggerSink(
},
};
}

export async function getHistories(fname: string): Promise<iface.temporal.api.history.v1.History> {
const isJson = fname.endsWith('json');
const fpath = path.resolve(__dirname, `../history_files/${fname}`);
if (isJson) {
const hist = await fs.readFile(fpath, 'utf8');
return JSON.parse(hist);
} else {
const hist = await fs.readFile(fpath);
return iface.temporal.api.history.v1.History.decode(hist);
}
}
Loading

0 comments on commit d6e2738

Please sign in to comment.