Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(workflow): Add historySizeInBytes and continueAsNewSuggested to WorkflowInfo (#695) #1223

Merged
merged 14 commits into from
Sep 6, 2023
Merged
2 changes: 2 additions & 0 deletions packages/client/src/helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ export async function executionInfoFromRaw<T>(
},
// Safe to convert to number, max history length is 50k, which is much less than Number.MAX_SAFE_INTEGER
historyLength: raw.historyLength!.toNumber(),
// Exact truncation for multi-petabyte histories
historySizeBytes: raw.historySizeBytes!.toNumber(),
mjameswh marked this conversation as resolved.
Show resolved Hide resolved
startTime: tsToDate(raw.startTime!),
executionTime: optionalTsToDate(raw.executionTime),
closeTime: optionalTsToDate(raw.closeTime),
Expand Down
1 change: 1 addition & 0 deletions packages/client/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ export interface WorkflowExecutionInfo {
taskQueue: string;
status: { code: proto.temporal.api.enums.v1.WorkflowExecutionStatus; name: WorkflowExecutionStatusName };
historyLength: number;
historySizeBytes: number;
mjameswh marked this conversation as resolved.
Show resolved Hide resolved
startTime: Date;
executionTime?: Date;
closeTime?: Date;
Expand Down
4 changes: 4 additions & 0 deletions packages/test/src/integration-tests.ts
Original file line number Diff line number Diff line change
Expand Up @@ -680,6 +680,7 @@ export function runIntegrationTests(codec?: PayloadCodec): void {
},
});
const result = await workflow.result();
t.assert(typeof result.historySizeBytes === 'number' && result.historySizeBytes > 300);
t.deepEqual(result, {
memo: {
nested: { object: true },
Expand All @@ -694,6 +695,9 @@ export function runIntegrationTests(codec?: PayloadCodec): void {
workflowType: 'returnWorkflowInfo',
workflowId,
historyLength: 3,
// historySizeBytes changes in every run, e.g., due to variable encoding of process id
historySizeBytes: result.historySizeBytes,
mjameswh marked this conversation as resolved.
Show resolved Hide resolved
continueAsNewSuggested: false,
startTime: result.startTime,
runStartTime: result.runStartTime,
// unsafe.now is a function, so doesn't make it through serialization, but .now is required, so we need to cast
Expand Down
43 changes: 43 additions & 0 deletions packages/test/src/test-integration-workflows.ts
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,49 @@ test('Condition 0 patch sets a timer', async (t) => {
t.false(await worker.runUntil(executeWorkflow(conditionTimeout0)));
});

export async function historySizeBytesGrows(): Promise<[number, number]> {
const before = workflow.workflowInfo().historySizeBytes;
await workflow.sleep(1);
const after = workflow.workflowInfo().historySizeBytes;
return [before, after];
}

test('HistorySizeBytes grows with new WFT', async (t) => {
const { createWorker, executeWorkflow } = helpers(t);
const worker = await createWorker();
const [before, after] = await worker.runUntil(executeWorkflow(historySizeBytesGrows));
t.assert(after > before && before > 100);
});

test('HistorySizeBytes is visible in WorkflowExecutionInfo', async (t) => {
const { createWorker, startWorkflow } = helpers(t);
const worker = await createWorker();
const handle = await startWorkflow(historySizeBytesGrows);

await worker.runUntil(handle.result());
const historySizeBytes = (await handle.describe()).historySizeBytes;
t.assert(historySizeBytes > 100);
});

export async function suggestedCAN(): Promise<boolean> {
const MAX_EVENTS = 40_000;
mjameswh marked this conversation as resolved.
Show resolved Hide resolved
const BATCH_SIZE = 100;
while (workflow.workflowInfo().historyLength < MAX_EVENTS) {
await Promise.all(new Array(BATCH_SIZE).fill(undefined).map((_) => workflow.sleep(1)));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I'd use an upsert search attributes instead, it's cheaper to run.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is actually pretty fast, and even against the cloud, it will succeed in just 20 round trips (the actual server settings are 4K events, not 40K)

if (workflow.workflowInfo().continueAsNewSuggested) {
return true;
}
}
return false;
}

test('ContinueAsNew is suggested', async (t) => {
const { createWorker, executeWorkflow } = helpers(t);
const worker = await createWorker();
const flaggedCAN = await worker.runUntil(executeWorkflow(suggestedCAN));
t.assert(flaggedCAN);
});

test('Activity initialInterval is not getting rounded', async (t) => {
const { createWorker, startWorkflow } = helpers(t);
const worker = await createWorker({
Expand Down
19 changes: 11 additions & 8 deletions packages/test/src/test-sinks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,9 @@ if (RUN_INTEGRATION_TESTS) {
fn: string;
}

const dummyDate = new Date(2000, 1, 0, 0, 0, 0);
function fixWorkflowInfoDates(input: WorkflowInfo): WorkflowInfo {
delete (input.unsafe as any).now;
return {
...input,
startTime: dummyDate,
runStartTime: dummyDate,
};
return input;
}

const recordedCalls: RecordedCall[] = [];
Expand Down Expand Up @@ -112,6 +107,11 @@ if (RUN_INTEGRATION_TESTS) {
await wf.result();
return wf;
});

// Capture volatile values that are hard to predict
const { historySizeBytes, startTime, runStartTime } = recordedCalls[0].info;
t.assert(typeof historySizeBytes === 'number' && historySizeBytes > 300);

const info: WorkflowInfo = {
namespace: 'default',
firstExecutionRunId: wf.firstExecutionRunId,
Expand All @@ -134,8 +134,11 @@ if (RUN_INTEGRATION_TESTS) {
parent: undefined,
searchAttributes: {},
historyLength: 3,
startTime: dummyDate,
runStartTime: dummyDate,
continueAsNewSuggested: false,
// values ignored for the purpose of comparison
historySizeBytes,
startTime,
runStartTime,
// unsafe.now() doesn't make it through serialization, but .now is required, so we need to cast
unsafe: {
isReplaying: false,
Expand Down
10 changes: 10 additions & 0 deletions packages/test/src/test-workflows.ts
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ async function createWorkflow(
taskQueue: 'test',
searchAttributes: {},
historyLength: 3,
historySizeBytes: 300,
continueAsNewSuggested: false,
unsafe: { isReplaying: false, now: Date.now },
startTime: new Date(),
runStartTime: new Date(),
Expand Down Expand Up @@ -353,6 +355,14 @@ test('successString', async (t) => {
compareCompletion(t, req, makeSuccess([makeCompleteWorkflowExecution(defaultPayloadConverter.toPayload('success'))]));
});

test('continueAsNewSuggested', async (t) => {
mjameswh marked this conversation as resolved.
Show resolved Hide resolved
mjameswh marked this conversation as resolved.
Show resolved Hide resolved
const { workflowType } = t.context;
const activation = makeStartWorkflow(workflowType);
activation.continueAsNewSuggested = true;
const req = await activate(t, activation);
compareCompletion(t, req, makeSuccess([makeCompleteWorkflowExecution(defaultPayloadConverter.toPayload(true))]));
});

function cleanWorkflowFailureStackTrace(
req: coresdk.workflow_completion.IWorkflowActivationCompletion,
commandIndex = 0
Expand Down
5 changes: 5 additions & 0 deletions packages/test/src/workflows/continue-as-new-suggested.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
import { workflowInfo } from '@temporalio/workflow';

export async function continueAsNewSuggested(): Promise<boolean> {
return workflowInfo().continueAsNewSuggested;
}
1 change: 1 addition & 0 deletions packages/test/src/workflows/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ export * from './condition';
export * from './condition-completion-race';
export * from './condition-timeout-0';
export * from './continue-as-new-same-workflow';
export * from './continue-as-new-suggested';
export * from './continue-as-new-to-different-workflow';
export * from './core-issue-589';
export * from './date';
Expand Down
3 changes: 3 additions & 0 deletions packages/worker/src/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1193,6 +1193,9 @@ export class Worker {
// 0 is the default, and not a valid value, since crons are at least a minute apart
cronScheduleToScheduleInterval: optionalTsToMs(cronScheduleToScheduleInterval) || undefined,
historyLength: activation.historyLength,
// Exact truncation for multi-petabyte histories
historySizeBytes: activation.historySizeBytes.toNumber(),
continueAsNewSuggested: activation.continueAsNewSuggested,
unsafe: {
now: () => Date.now(), // re-set in initRuntime
isReplaying: activation.isReplaying,
Expand Down
17 changes: 17 additions & 0 deletions packages/workflow/src/interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,23 @@ export interface WorkflowInfo {
*/
historyLength: number;

/**
* Size of Workflow history in bytes until the current Workflow Task.
*
* This value changes during the lifetime of an Execution.
*
* You may safely use this information to decide when to {@link continueAsNew}.
*/
historySizeBytes: number;
mjameswh marked this conversation as resolved.
Show resolved Hide resolved

/**
* A hint provided by the current WorkflowTaskStarted event recommending whether to
* {@link continueAsNew}.
*
* This value changes during the lifetime of an Execution.
*/
continueAsNewSuggested: boolean;
mjameswh marked this conversation as resolved.
Show resolved Hide resolved

/**
* Task queue this Workflow is executing on
*/
Expand Down
10 changes: 10 additions & 0 deletions packages/workflow/src/worker-interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -189,8 +189,18 @@ export function activate(activation: coresdk.workflow_activation.WorkflowActivat
if (activation.historyLength == null) {
throw new TypeError('Got activation with no historyLength');
}
if (activation.historySizeBytes == null) {
mjameswh marked this conversation as resolved.
Show resolved Hide resolved
throw new TypeError('Got activation with no historySizeBytes');
}
if (activation.continueAsNewSuggested == null) {
throw new TypeError('Got activation with no continueAsNewSuggested');
}

activator.info.unsafe.isReplaying = activation.isReplaying ?? false;
activator.info.historyLength = activation.historyLength;
// Exact truncation for multi-petabyte histories
activator.info.historySizeBytes = activation.historySizeBytes.toNumber();
activator.info.continueAsNewSuggested = activation.continueAsNewSuggested;
}

// Cast from the interface to the class which has the `variant` attribute.
Expand Down
Loading