Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(workflow): Add historySizeInBytes and continueAsNewSuggested to WorkflowInfo (#695) #1223

Merged
merged 14 commits into from
Sep 6, 2023
Merged
2 changes: 2 additions & 0 deletions packages/client/src/helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ export async function executionInfoFromRaw<T>(
},
// Safe to convert to number, max history length is 50k, which is much less than Number.MAX_SAFE_INTEGER
historyLength: raw.historyLength!.toNumber(),
// Exact truncation for multi-petabyte histories
historySizeBytes: raw.historySizeBytes?.toNumber() || undefined,
startTime: tsToDate(raw.startTime!),
executionTime: optionalTsToDate(raw.executionTime),
closeTime: optionalTsToDate(raw.closeTime),
Expand Down
2 changes: 2 additions & 0 deletions packages/client/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ export interface WorkflowExecutionInfo {
taskQueue: string;
status: { code: proto.temporal.api.enums.v1.WorkflowExecutionStatus; name: WorkflowExecutionStatusName };
historyLength: number;
// Only available in server version 1.20 or later
mjameswh marked this conversation as resolved.
Show resolved Hide resolved
historySizeBytes?: number;
mjameswh marked this conversation as resolved.
Show resolved Hide resolved
startTime: Date;
executionTime?: Date;
closeTime?: Date;
Expand Down
3 changes: 1 addition & 2 deletions packages/common/src/time.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
// eslint-disable-next-line import/no-named-as-default
import Long from 'long';
import Long from 'long'; // eslint-disable-line import/no-named-as-default
import ms, { StringValue } from 'ms';
import type { google } from '@temporalio/proto';
import { ValueError } from './errors';
Expand Down
4 changes: 4 additions & 0 deletions packages/test/src/integration-tests.ts
Original file line number Diff line number Diff line change
Expand Up @@ -680,6 +680,7 @@ export function runIntegrationTests(codec?: PayloadCodec): void {
},
});
const result = await workflow.result();
t.assert(typeof result.historySizeBytes === 'number' && result.historySizeBytes > 300);
t.deepEqual(result, {
memo: {
nested: { object: true },
Expand All @@ -694,6 +695,9 @@ export function runIntegrationTests(codec?: PayloadCodec): void {
workflowType: 'returnWorkflowInfo',
workflowId,
historyLength: 3,
continueAsNewSuggested: false,
// values ignored for the purpose of comparison
historySizeBytes: result.historySizeBytes,
startTime: result.startTime,
runStartTime: result.runStartTime,
// unsafe.now is a function, so doesn't make it through serialization, but .now is required, so we need to cast
Expand Down
46 changes: 46 additions & 0 deletions packages/test/src/test-integration-workflows.ts
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,52 @@ test('Condition 0 patch sets a timer', async (t) => {
t.false(await worker.runUntil(executeWorkflow(conditionTimeout0)));
});

export async function historySizeBytesGrows(): Promise<[number | undefined, number | undefined]> {
const before = workflow.workflowInfo().historySizeBytes;
await workflow.sleep(1);
const after = workflow.workflowInfo().historySizeBytes;
return [before, after];
}

test('HistorySizeBytes grows with new WFT', async (t) => {
const { createWorker, executeWorkflow } = helpers(t);
const worker = await createWorker();
const [before, after] = await worker.runUntil(executeWorkflow(historySizeBytesGrows));
t.true(after && before && after > before && before > 100);
});

test('HistorySizeBytes is visible in WorkflowExecutionInfo', async (t) => {
const { createWorker, startWorkflow } = helpers(t);
const worker = await createWorker();
const handle = await startWorkflow(historySizeBytesGrows);

await worker.runUntil(handle.result());
const historySizeBytes = (await handle.describe()).historySizeBytes;
t.true(historySizeBytes && historySizeBytes > 100);
});

export async function suggestedCAN(): Promise<boolean> {
const maxEvents = 40_000;
const batchSize = 100;
if (workflow.workflowInfo().continueAsNewSuggested) {
return false;
}
while (workflow.workflowInfo().historyLength < maxEvents) {
await Promise.all(new Array(batchSize).fill(undefined).map((_) => workflow.sleep(1)));
if (workflow.workflowInfo().continueAsNewSuggested) {
return true;
}
}
return false;
}

test('ContinueAsNew is suggested', async (t) => {
const { createWorker, executeWorkflow } = helpers(t);
const worker = await createWorker();
const flaggedCAN = await worker.runUntil(executeWorkflow(suggestedCAN));
t.true(flaggedCAN);
});

test('Activity initialInterval is not getting rounded', async (t) => {
const { createWorker, startWorkflow } = helpers(t);
const worker = await createWorker({
Expand Down
19 changes: 11 additions & 8 deletions packages/test/src/test-sinks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,9 @@ if (RUN_INTEGRATION_TESTS) {
fn: string;
}

const dummyDate = new Date(2000, 1, 0, 0, 0, 0);
function fixWorkflowInfoDates(input: WorkflowInfo): WorkflowInfo {
delete (input.unsafe as any).now;
return {
...input,
startTime: dummyDate,
runStartTime: dummyDate,
};
return input;
}

const recordedCalls: RecordedCall[] = [];
Expand Down Expand Up @@ -112,6 +107,11 @@ if (RUN_INTEGRATION_TESTS) {
await wf.result();
return wf;
});

// Capture volatile values that are hard to predict
const { historySizeBytes, startTime, runStartTime } = recordedCalls[0].info;
t.assert(typeof historySizeBytes === 'number' && historySizeBytes > 300);

const info: WorkflowInfo = {
namespace: 'default',
firstExecutionRunId: wf.firstExecutionRunId,
Expand All @@ -134,8 +134,11 @@ if (RUN_INTEGRATION_TESTS) {
parent: undefined,
searchAttributes: {},
historyLength: 3,
startTime: dummyDate,
runStartTime: dummyDate,
continueAsNewSuggested: false,
// values ignored for the purpose of comparison
historySizeBytes,
startTime,
runStartTime,
// unsafe.now() doesn't make it through serialization, but .now is required, so we need to cast
unsafe: {
isReplaying: false,
Expand Down
10 changes: 10 additions & 0 deletions packages/test/src/test-workflows.ts
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ async function createWorkflow(
taskQueue: 'test',
searchAttributes: {},
historyLength: 3,
historySizeBytes: 300,
continueAsNewSuggested: false,
unsafe: { isReplaying: false, now: Date.now },
startTime: new Date(),
runStartTime: new Date(),
Expand Down Expand Up @@ -353,6 +355,14 @@ test('successString', async (t) => {
compareCompletion(t, req, makeSuccess([makeCompleteWorkflowExecution(defaultPayloadConverter.toPayload('success'))]));
});

test('continueAsNewSuggested', async (t) => {
mjameswh marked this conversation as resolved.
Show resolved Hide resolved
mjameswh marked this conversation as resolved.
Show resolved Hide resolved
const { workflowType } = t.context;
const activation = makeStartWorkflow(workflowType);
activation.continueAsNewSuggested = true;
const req = await activate(t, activation);
compareCompletion(t, req, makeSuccess([makeCompleteWorkflowExecution(defaultPayloadConverter.toPayload(true))]));
});

function cleanWorkflowFailureStackTrace(
req: coresdk.workflow_completion.IWorkflowActivationCompletion,
commandIndex = 0
Expand Down
5 changes: 5 additions & 0 deletions packages/test/src/workflows/continue-as-new-suggested.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
import { workflowInfo } from '@temporalio/workflow';

export async function continueAsNewSuggested(): Promise<boolean> {
return workflowInfo().continueAsNewSuggested;
}
1 change: 1 addition & 0 deletions packages/test/src/workflows/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ export * from './condition';
export * from './condition-completion-race';
export * from './condition-timeout-0';
export * from './continue-as-new-same-workflow';
export * from './continue-as-new-suggested';
export * from './continue-as-new-to-different-workflow';
export * from './core-issue-589';
export * from './date';
Expand Down
5 changes: 5 additions & 0 deletions packages/worker/src/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1157,6 +1157,8 @@ export class Worker {
searchAttributes,
} = startWorkflow;

// Exact truncation for multi-petabyte histories
const historySizeBytes = activation.historySizeBytes.toNumber();
const workflowInfo: WorkflowInfo = {
workflowId,
runId: activation.runId,
Expand Down Expand Up @@ -1193,6 +1195,9 @@ export class Worker {
// 0 is the default, and not a valid value, since crons are at least a minute apart
cronScheduleToScheduleInterval: optionalTsToMs(cronScheduleToScheduleInterval) || undefined,
historyLength: activation.historyLength,
// A zero value means that it was not set by the server
historySizeBytes: historySizeBytes ? historySizeBytes : undefined,
continueAsNewSuggested: activation.continueAsNewSuggested,
unsafe: {
now: () => Date.now(), // re-set in initRuntime
isReplaying: activation.isReplaying,
Expand Down
21 changes: 21 additions & 0 deletions packages/workflow/src/interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,27 @@ export interface WorkflowInfo {
*/
historyLength: number;

/**
* Size of Workflow history in bytes until the current Workflow Task.
*
* This value changes during the lifetime of an Execution.
*
* This value is only available in server versions > 1.20
*
* You may safely use this information to decide when to {@link continueAsNew}.
*/
historySizeBytes?: number;

/**
* A hint provided by the current WorkflowTaskStarted event recommending whether to
* {@link continueAsNew}.
*
* This value changes during the lifetime of an Execution.
*
* Supported only on Temporal Server 1.20+, always `false` on older servers.
*/
continueAsNewSuggested: boolean;
mjameswh marked this conversation as resolved.
Show resolved Hide resolved

/**
* Task queue this Workflow is executing on
*/
Expand Down
11 changes: 7 additions & 4 deletions packages/workflow/src/worker-interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -186,11 +186,14 @@ export function activate(activation: coresdk.workflow_activation.WorkflowActivat
// timestamp will not be updated for activation that contain only queries
activator.now = tsToMs(activation.timestamp);
}
if (activation.historyLength == null) {
throw new TypeError('Got activation with no historyLength');
}

// The Rust Core ensures that these activation fields are not null
activator.info.unsafe.isReplaying = activation.isReplaying ?? false;
activator.info.historyLength = activation.historyLength;
activator.info.historyLength = activation.historyLength as number;
// Exact truncation for multi-petabyte histories
// historySizeBytes === 0 means WFT was generated by pre-1.20.0 server, and the history size is unknown
activator.info.historySizeBytes = activation.historySizeBytes?.toNumber() || undefined;
activator.info.continueAsNewSuggested = activation.continueAsNewSuggested ?? false;
}

// Cast from the interface to the class which has the `variant` attribute.
Expand Down
Loading