Skip to content

Commit

Permalink
feat(workflow): Add historySizeInBytes and continueAsNewSuggested to …
Browse files Browse the repository at this point in the history
…WorkflowInfo (#695) (#1223)

## What was changed
<!-- Describe what has changed in this PR -->
Makes visible to workflows the new info fields historySizeInBytes and
continueAsNewSuggested

## Why?
Makes it easier for workflow code to decide when to continueAsNew()
before history becomes too large

## Checklist
<!--- add/delete as needed --->

1. Closes <!-- add issue number here -->
#695 
2. How was this tested:
A new system test and unit test added

3. Any docs updates needed?
<!--- update README if applicable
      or point out where to update docs.temporal.io -->
jsdoc of the new fields
  • Loading branch information
antlai-temporal authored Sep 6, 2023
1 parent be98c23 commit aab4149
Show file tree
Hide file tree
Showing 12 changed files with 119 additions and 14 deletions.
3 changes: 3 additions & 0 deletions packages/client/src/helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ export async function executionInfoFromRaw<T>(
},
// Safe to convert to number, max history length is 50k, which is much less than Number.MAX_SAFE_INTEGER
historyLength: raw.historyLength!.toNumber(),
// Exact truncation for multi-petabyte histories
// historySize === 0 means WFT was generated by pre-1.20.0 server, and the history size is unknown
historySize: raw.historySizeBytes?.toNumber() || undefined,
startTime: tsToDate(raw.startTime!),
executionTime: optionalTsToDate(raw.executionTime),
closeTime: optionalTsToDate(raw.closeTime),
Expand Down
6 changes: 6 additions & 0 deletions packages/client/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,12 @@ export interface WorkflowExecutionInfo {
taskQueue: string;
status: { code: proto.temporal.api.enums.v1.WorkflowExecutionStatus; name: WorkflowExecutionStatusName };
historyLength: number;
/**
 * Size of Workflow history in bytes.
 *
 * This value is only available in server versions >= 1.20
 */
historySize?: number;
startTime: Date;
executionTime?: Date;
closeTime?: Date;
Expand Down
3 changes: 1 addition & 2 deletions packages/common/src/time.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
// eslint-disable-next-line import/no-named-as-default
import Long from 'long';
import Long from 'long'; // eslint-disable-line import/no-named-as-default
import ms, { StringValue } from 'ms';
import type { google } from '@temporalio/proto';
import { ValueError } from './errors';
Expand Down
4 changes: 4 additions & 0 deletions packages/test/src/integration-tests.ts
Original file line number Diff line number Diff line change
Expand Up @@ -680,6 +680,7 @@ export function runIntegrationTests(codec?: PayloadCodec): void {
},
});
const result = await workflow.result();
t.assert(result.historySize > 300);
t.deepEqual(result, {
memo: {
nested: { object: true },
Expand All @@ -694,6 +695,9 @@ export function runIntegrationTests(codec?: PayloadCodec): void {
workflowType: 'returnWorkflowInfo',
workflowId,
historyLength: 3,
continueAsNewSuggested: false,
// values ignored for the purpose of comparison
historySize: result.historySize,
startTime: result.startTime,
runStartTime: result.runStartTime,
// unsafe.now is a function, so doesn't make it through serialization, but .now is required, so we need to cast
Expand Down
46 changes: 46 additions & 0 deletions packages/test/src/test-integration-workflows.ts
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,52 @@ test('Condition 0 patch sets a timer', async (t) => {
t.false(await worker.runUntil(executeWorkflow(conditionTimeout0)));
});

export async function historySizeGrows(): Promise<[number, number]> {
const before = workflow.workflowInfo().historySize;
await workflow.sleep(1);
const after = workflow.workflowInfo().historySize;
return [before, after];
}

test('HistorySize grows with new WFT', async (t) => {
const { createWorker, executeWorkflow } = helpers(t);
const worker = await createWorker();
const [before, after] = await worker.runUntil(executeWorkflow(historySizeGrows));
t.true(after > before && before > 100);
});

test('HistorySize is visible in WorkflowExecutionInfo', async (t) => {
const { createWorker, startWorkflow } = helpers(t);
const worker = await createWorker();
const handle = await startWorkflow(historySizeGrows);

await worker.runUntil(handle.result());
const historySize = (await handle.describe()).historySize;
t.true(historySize && historySize > 100);
});

export async function suggestedCAN(): Promise<boolean> {
const maxEvents = 40_000;
const batchSize = 100;
if (workflow.workflowInfo().continueAsNewSuggested) {
return false;
}
while (workflow.workflowInfo().historyLength < maxEvents) {
await Promise.all(new Array(batchSize).fill(undefined).map((_) => workflow.sleep(1)));
if (workflow.workflowInfo().continueAsNewSuggested) {
return true;
}
}
return false;
}

test('ContinueAsNew is suggested', async (t) => {
const { createWorker, executeWorkflow } = helpers(t);
const worker = await createWorker();
const flaggedCAN = await worker.runUntil(executeWorkflow(suggestedCAN));
t.true(flaggedCAN);
});

test('Activity initialInterval is not getting rounded', async (t) => {
const { createWorker, startWorkflow } = helpers(t);
const worker = await createWorker({
Expand Down
19 changes: 11 additions & 8 deletions packages/test/src/test-sinks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,9 @@ if (RUN_INTEGRATION_TESTS) {
fn: string;
}

const dummyDate = new Date(2000, 1, 0, 0, 0, 0);
function fixWorkflowInfoDates(input: WorkflowInfo): WorkflowInfo {
delete (input.unsafe as any).now;
return {
...input,
startTime: dummyDate,
runStartTime: dummyDate,
};
return input;
}

const recordedCalls: RecordedCall[] = [];
Expand Down Expand Up @@ -112,6 +107,11 @@ if (RUN_INTEGRATION_TESTS) {
await wf.result();
return wf;
});

// Capture volatile values that are hard to predict
const { historySize, startTime, runStartTime } = recordedCalls[0].info;
t.true(historySize > 300);

const info: WorkflowInfo = {
namespace: 'default',
firstExecutionRunId: wf.firstExecutionRunId,
Expand All @@ -134,8 +134,11 @@ if (RUN_INTEGRATION_TESTS) {
parent: undefined,
searchAttributes: {},
historyLength: 3,
startTime: dummyDate,
runStartTime: dummyDate,
continueAsNewSuggested: false,
// values ignored for the purpose of comparison
historySize,
startTime,
runStartTime,
// unsafe.now() doesn't make it through serialization, but .now is required, so we need to cast
unsafe: {
isReplaying: false,
Expand Down
10 changes: 10 additions & 0 deletions packages/test/src/test-workflows.ts
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ async function createWorkflow(
taskQueue: 'test',
searchAttributes: {},
historyLength: 3,
historySize: 300,
continueAsNewSuggested: false,
unsafe: { isReplaying: false, now: Date.now },
startTime: new Date(),
runStartTime: new Date(),
Expand Down Expand Up @@ -353,6 +355,14 @@ test('successString', async (t) => {
compareCompletion(t, req, makeSuccess([makeCompleteWorkflowExecution(defaultPayloadConverter.toPayload('success'))]));
});

test('continueAsNewSuggested', async (t) => {
const { workflowType } = t.context;
const activation = makeStartWorkflow(workflowType);
activation.continueAsNewSuggested = true;
const req = await activate(t, activation);
compareCompletion(t, req, makeSuccess([makeCompleteWorkflowExecution(defaultPayloadConverter.toPayload(true))]));
});

function cleanWorkflowFailureStackTrace(
req: coresdk.workflow_completion.IWorkflowActivationCompletion,
commandIndex = 0
Expand Down
5 changes: 5 additions & 0 deletions packages/test/src/workflows/continue-as-new-suggested.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
import { workflowInfo } from '@temporalio/workflow';

export async function continueAsNewSuggested(): Promise<boolean> {
return workflowInfo().continueAsNewSuggested;
}
1 change: 1 addition & 0 deletions packages/test/src/workflows/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ export * from './condition';
export * from './condition-completion-race';
export * from './condition-timeout-0';
export * from './continue-as-new-same-workflow';
export * from './continue-as-new-suggested';
export * from './continue-as-new-to-different-workflow';
export * from './core-issue-589';
export * from './date';
Expand Down
4 changes: 4 additions & 0 deletions packages/worker/src/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1193,6 +1193,10 @@ export class Worker {
// 0 is the default, and not a valid value, since crons are at least a minute apart
cronScheduleToScheduleInterval: optionalTsToMs(cronScheduleToScheduleInterval) || undefined,
historyLength: activation.historyLength,
// Exact truncation for multi-petabyte histories
// A zero value means that it was not set by the server
historySize: activation.historySizeBytes.toNumber(),
continueAsNewSuggested: activation.continueAsNewSuggested,
unsafe: {
now: () => Date.now(), // re-set in initRuntime
isReplaying: activation.isReplaying,
Expand Down
21 changes: 21 additions & 0 deletions packages/workflow/src/interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,27 @@ export interface WorkflowInfo {
*/
historyLength: number;

/**
* Size of Workflow history in bytes until the current Workflow Task.
*
* This value changes during the lifetime of an Execution.
*
* Supported only on Temporal Server 1.20+, always zero on older servers.
*
* You may safely use this information to decide when to {@link continueAsNew}.
*/
historySize: number;

/**
* A hint provided by the current WorkflowTaskStarted event recommending whether to
* {@link continueAsNew}.
*
* This value changes during the lifetime of an Execution.
*
* Supported only on Temporal Server 1.20+, always `false` on older servers.
*/
continueAsNewSuggested: boolean;

/**
* Task queue this Workflow is executing on
*/
Expand Down
11 changes: 7 additions & 4 deletions packages/workflow/src/worker-interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -186,11 +186,14 @@ export function activate(activation: coresdk.workflow_activation.WorkflowActivat
// timestamp will not be updated for activation that contain only queries
activator.now = tsToMs(activation.timestamp);
}
if (activation.historyLength == null) {
throw new TypeError('Got activation with no historyLength');
}

// The Rust Core ensures that these activation fields are not null
activator.info.unsafe.isReplaying = activation.isReplaying ?? false;
activator.info.historyLength = activation.historyLength;
activator.info.historyLength = activation.historyLength as number;
// Exact truncation for multi-petabyte histories
// historySize === 0 means WFT was generated by pre-1.20.0 server, and the history size is unknown
activator.info.historySize = activation.historySizeBytes?.toNumber() || 0;
activator.info.continueAsNewSuggested = activation.continueAsNewSuggested ?? false;
}

// Cast from the interface to the class which has the `variant` attribute.
Expand Down

0 comments on commit aab4149

Please sign in to comment.