Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(workflow): Add historySizeInBytes and continueAsNewSuggested to WorkflowInfo (#695) #1223

Merged
merged 14 commits into from
Sep 6, 2023
Merged
2 changes: 2 additions & 0 deletions packages/client/src/helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ export async function executionInfoFromRaw<T>(
},
// Safe to convert to number, max history length is 50k, which is much less than Number.MAX_SAFE_INTEGER
historyLength: raw.historyLength!.toNumber(),
// Exact truncation for multi-petabyte histories
historySizeBytes: raw.historySizeBytes!.toNumber(),
mjameswh marked this conversation as resolved.
Show resolved Hide resolved
startTime: tsToDate(raw.startTime!),
executionTime: optionalTsToDate(raw.executionTime),
closeTime: optionalTsToDate(raw.closeTime),
Expand Down
1 change: 1 addition & 0 deletions packages/client/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ export interface WorkflowExecutionInfo {
taskQueue: string;
status: { code: proto.temporal.api.enums.v1.WorkflowExecutionStatus; name: WorkflowExecutionStatusName };
historyLength: number;
historySizeBytes: number;
mjameswh marked this conversation as resolved.
Show resolved Hide resolved
startTime: Date;
executionTime?: Date;
closeTime?: Date;
Expand Down
4 changes: 4 additions & 0 deletions packages/test/src/integration-tests.ts
Original file line number Diff line number Diff line change
Expand Up @@ -680,6 +680,7 @@ export function runIntegrationTests(codec?: PayloadCodec): void {
},
});
const result = await workflow.result();
t.assert(typeof result.historySizeBytes === 'number' && result.historySizeBytes > 300);
t.deepEqual(result, {
memo: {
nested: { object: true },
Expand All @@ -694,6 +695,9 @@ export function runIntegrationTests(codec?: PayloadCodec): void {
workflowType: 'returnWorkflowInfo',
workflowId,
historyLength: 3,
// historySizeBytes changes in every run, e.g., due to variable encoding of process id
historySizeBytes: result.historySizeBytes,
mjameswh marked this conversation as resolved.
Show resolved Hide resolved
continueAsNewSuggested: false,
startTime: result.startTime,
runStartTime: result.runStartTime,
// unsafe.now is a function, so doesn't make it through serialization, but .now is required, so we need to cast
Expand Down
48 changes: 48 additions & 0 deletions packages/test/src/test-integration-workflows.ts
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,54 @@ test('Condition 0 patch sets a timer', async (t) => {
t.false(await worker.runUntil(executeWorkflow(conditionTimeout0)));
});

export async function historySizeBytesGrows(): Promise<[number, number]> {
const before = workflow.workflowInfo().historySizeBytes;
await workflow.sleep(1);
const after = workflow.workflowInfo().historySizeBytes;
return [before, after];
}

test('HistorySizeBytes grows with new WFT', async (t) => {
const { createWorker, executeWorkflow } = helpers(t);
const worker = await createWorker();
const [before, after] = await worker.runUntil(executeWorkflow(historySizeBytesGrows));
t.assert(after > before && before > 100);
});

test('HistorySizeBytes is visible in WorkflowExecutionInfo', async (t) => {
const { createWorker, startWorkflow } = helpers(t);
const worker = await createWorker();
const handle = await startWorkflow(historySizeBytesGrows);

await worker.runUntil(handle.result());
const historySizeBytes = (await handle.describe()).historySizeBytes;
t.assert(historySizeBytes > 100);
});

export async function suggestedCAN(): Promise<boolean> {
const MAX_EVENTS = 40_000;
mjameswh marked this conversation as resolved.
Show resolved Hide resolved
const BATCH_SIZE = 100;
const EVENTS_PER_COMMAND = 2;
for (let i = 0; i < MAX_EVENTS / (BATCH_SIZE * EVENTS_PER_COMMAND); i++) {
mjameswh marked this conversation as resolved.
Show resolved Hide resolved
const batch: Promise<void>[] = [];
mjameswh marked this conversation as resolved.
Show resolved Hide resolved
for (let j = 0; j < BATCH_SIZE; j++) {
batch.push(workflow.sleep(1));
}
await Promise.all(batch);
if (workflow.workflowInfo().continueAsNewSuggested) {
return true;
}
}
return false;
}

test('ContinueAsNew is suggested', async (t) => {
const { createWorker, executeWorkflow } = helpers(t);
const worker = await createWorker();
const flaggedCAN = await worker.runUntil(executeWorkflow(suggestedCAN));
t.assert(flaggedCAN);
});

test('Activity initialInterval is not getting rounded', async (t) => {
const { createWorker, startWorkflow } = helpers(t);
const worker = await createWorker({
Expand Down
5 changes: 5 additions & 0 deletions packages/test/src/test-sinks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,9 @@ if (RUN_INTEGRATION_TESTS) {
await wf.result();
return wf;
});
// historySizeBytes changes in every run, e.g., due to variable encoding of process id
const expectedHistorySizeBytes = recordedCalls[0].info.historySizeBytes;
t.assert(typeof expectedHistorySizeBytes === 'number' && expectedHistorySizeBytes > 300);
const info: WorkflowInfo = {
namespace: 'default',
firstExecutionRunId: wf.firstExecutionRunId,
Expand All @@ -134,6 +137,8 @@ if (RUN_INTEGRATION_TESTS) {
parent: undefined,
searchAttributes: {},
historyLength: 3,
historySizeBytes: expectedHistorySizeBytes,
mjameswh marked this conversation as resolved.
Show resolved Hide resolved
continueAsNewSuggested: false,
startTime: dummyDate,
runStartTime: dummyDate,
// unsafe.now() doesn't make it through serialization, but .now is required, so we need to cast
Expand Down
11 changes: 11 additions & 0 deletions packages/test/src/test-workflows.ts
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,9 @@ async function createWorkflow(
taskQueue: 'test',
searchAttributes: {},
historyLength: 3,
// Assuming 100 bytes per entry
mjameswh marked this conversation as resolved.
Show resolved Hide resolved
historySizeBytes: 300,
continueAsNewSuggested: false,
unsafe: { isReplaying: false, now: Date.now },
startTime: new Date(),
runStartTime: new Date(),
Expand Down Expand Up @@ -353,6 +356,14 @@ test('successString', async (t) => {
compareCompletion(t, req, makeSuccess([makeCompleteWorkflowExecution(defaultPayloadConverter.toPayload('success'))]));
});

test('continueAsNewSuggested', async (t) => {
mjameswh marked this conversation as resolved.
Show resolved Hide resolved
mjameswh marked this conversation as resolved.
Show resolved Hide resolved
const { workflowType } = t.context;
const activation = makeStartWorkflow(workflowType);
activation.continueAsNewSuggested = true;
const req = await activate(t, activation);
compareCompletion(t, req, makeSuccess([makeCompleteWorkflowExecution(defaultPayloadConverter.toPayload(true))]));
});

function cleanWorkflowFailureStackTrace(
req: coresdk.workflow_completion.IWorkflowActivationCompletion,
commandIndex = 0
Expand Down
5 changes: 5 additions & 0 deletions packages/test/src/workflows/continue-as-new-suggested.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
import { workflowInfo } from '@temporalio/workflow';

export async function continueAsNewSuggested(): Promise<boolean> {
return workflowInfo().continueAsNewSuggested;
}
1 change: 1 addition & 0 deletions packages/test/src/workflows/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ export * from './condition';
export * from './condition-completion-race';
export * from './condition-timeout-0';
export * from './continue-as-new-same-workflow';
export * from './continue-as-new-suggested';
export * from './continue-as-new-to-different-workflow';
export * from './core-issue-589';
export * from './date';
Expand Down
3 changes: 3 additions & 0 deletions packages/worker/src/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1193,6 +1193,9 @@ export class Worker {
// 0 is the default, and not a valid value, since crons are at least a minute apart
cronScheduleToScheduleInterval: optionalTsToMs(cronScheduleToScheduleInterval) || undefined,
historyLength: activation.historyLength,
// Exact truncation for multi-petabyte histories
historySizeBytes: activation.historySizeBytes.toNumber(),
continueAsNewSuggested: activation.continueAsNewSuggested,
unsafe: {
now: () => Date.now(), // re-set in initRuntime
isReplaying: activation.isReplaying,
Expand Down
17 changes: 17 additions & 0 deletions packages/workflow/src/interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,23 @@ export interface WorkflowInfo {
*/
historyLength: number;

/**
* Size of Workflow history in bytes until the current Workflow Task.
*
* This value changes during the lifetime of an Execution.
*
* You may safely use this information to decide when to {@link continueAsNew}.
*/
historySizeBytes: number;
mjameswh marked this conversation as resolved.
Show resolved Hide resolved

/**
* A hint provided by the current WorkflowTaskStarted event recommending whether to
* {@link continueAsNew}.
*
* This value changes during the lifetime of an Execution.
*/
continueAsNewSuggested: boolean;
mjameswh marked this conversation as resolved.
Show resolved Hide resolved

/**
* Task queue this Workflow is executing on
*/
Expand Down
10 changes: 10 additions & 0 deletions packages/workflow/src/worker-interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -189,8 +189,18 @@ export function activate(activation: coresdk.workflow_activation.WorkflowActivat
if (activation.historyLength == null) {
throw new TypeError('Got activation with no historyLength');
}
if (activation.historySizeBytes == null) {
mjameswh marked this conversation as resolved.
Show resolved Hide resolved
throw new TypeError('Got activation with no historySizeBytes');
}
if (activation.continueAsNewSuggested == null) {
throw new TypeError('Got activation with no continueAsNewSuggested');
}

activator.info.unsafe.isReplaying = activation.isReplaying ?? false;
activator.info.historyLength = activation.historyLength;
// Exact truncation for multi-petabyte histories
activator.info.historySizeBytes = activation.historySizeBytes.toNumber();
activator.info.continueAsNewSuggested = activation.continueAsNewSuggested;
}

// Cast from the interface to the class which has the `variant` attribute.
Expand Down
Loading