diff --git a/packages/client/src/helpers.ts b/packages/client/src/helpers.ts index 9520b420d..9be21e8fb 100644 --- a/packages/client/src/helpers.ts +++ b/packages/client/src/helpers.ts @@ -59,6 +59,9 @@ export async function executionInfoFromRaw( }, // Safe to convert to number, max history length is 50k, which is much less than Number.MAX_SAFE_INTEGER historyLength: raw.historyLength!.toNumber(), + // Exact truncation for multi-petabyte histories + // historySize === 0 means WFT was generated by pre-1.20.0 server, and the history size is unknown + historySize: raw.historySizeBytes?.toNumber() || undefined, startTime: tsToDate(raw.startTime!), executionTime: optionalTsToDate(raw.executionTime), closeTime: optionalTsToDate(raw.closeTime), diff --git a/packages/client/src/types.ts b/packages/client/src/types.ts index e3225e8c3..5308a0247 100644 --- a/packages/client/src/types.ts +++ b/packages/client/src/types.ts @@ -36,6 +36,12 @@ export interface WorkflowExecutionInfo { taskQueue: string; status: { code: proto.temporal.api.enums.v1.WorkflowExecutionStatus; name: WorkflowExecutionStatusName }; historyLength: number; + /** + * Size of Workflow history in bytes. + * + * This value is only available in server versions >= 1.20 + */ + historySize?: number; startTime: Date; executionTime?: Date; closeTime?: Date; diff --git a/packages/common/src/time.ts b/packages/common/src/time.ts index feb6d91fe..77398bab6 100644 --- a/packages/common/src/time.ts +++ b/packages/common/src/time.ts @@ -1,5 +1,4 @@ -// eslint-disable-next-line import/no-named-as-default -import Long from 'long'; +import Long from 'long'; // eslint-disable-line import/no-named-as-default import ms, { StringValue } from 'ms'; import type { google } from '@temporalio/proto'; import { ValueError } from './errors'; diff --git a/packages/test/src/integration-tests.ts b/packages/test/src/integration-tests.ts index 552e31bd6..dc035e575 100644 --- a/packages/test/src/integration-tests.ts +++ b/packages/test/src/integration-tests.ts @@ -680,6 +680,7 @@ export function runIntegrationTests(codec?: PayloadCodec): void { }, }); const result = await workflow.result(); + t.assert(result.historySize > 300); t.deepEqual(result, { memo: { nested: { object: true }, @@ -694,6 +695,9 @@ export function runIntegrationTests(codec?: PayloadCodec): void { workflowType: 'returnWorkflowInfo', workflowId, historyLength: 3, + continueAsNewSuggested: false, + // values ignored for the purpose of comparison + historySize: result.historySize, startTime: result.startTime, runStartTime: result.runStartTime, // unsafe.now is a function, so doesn't make it through serialization, but .now is required, so we need to cast diff --git a/packages/test/src/test-integration-workflows.ts b/packages/test/src/test-integration-workflows.ts index f599907ae..000329a7a 100644 --- a/packages/test/src/test-integration-workflows.ts +++ b/packages/test/src/test-integration-workflows.ts @@ -216,6 +216,52 @@ test('Condition 0 patch sets a timer', async (t) => { t.false(await worker.runUntil(executeWorkflow(conditionTimeout0))); }); +export async function historySizeGrows(): Promise<[number, number]> { + const before = workflow.workflowInfo().historySize; + await workflow.sleep(1); + const after = workflow.workflowInfo().historySize; + return [before, after]; +} + +test('HistorySize grows with new WFT', async (t) => { + const { createWorker, executeWorkflow } = helpers(t); + const worker = await createWorker(); + const [before, after] = await worker.runUntil(executeWorkflow(historySizeGrows)); + t.true(after > before && before > 100); +}); + +test('HistorySize is visible in WorkflowExecutionInfo', async (t) => { + const { createWorker, startWorkflow } = helpers(t); + const worker = await createWorker(); + const handle = await startWorkflow(historySizeGrows); + + await worker.runUntil(handle.result()); + const historySize = (await handle.describe()).historySize; + t.true(historySize && historySize > 100); +}); + +export async function suggestedCAN(): Promise { + const maxEvents = 40_000; + const batchSize = 100; + if (workflow.workflowInfo().continueAsNewSuggested) { + return false; + } + while (workflow.workflowInfo().historyLength < maxEvents) { + await Promise.all(new Array(batchSize).fill(undefined).map((_) => workflow.sleep(1))); + if (workflow.workflowInfo().continueAsNewSuggested) { + return true; + } + } + return false; +} + +test('ContinueAsNew is suggested', async (t) => { + const { createWorker, executeWorkflow } = helpers(t); + const worker = await createWorker(); + const flaggedCAN = await worker.runUntil(executeWorkflow(suggestedCAN)); + t.true(flaggedCAN); +}); + test('Activity initialInterval is not getting rounded', async (t) => { const { createWorker, startWorkflow } = helpers(t); const worker = await createWorker({ diff --git a/packages/test/src/test-sinks.ts b/packages/test/src/test-sinks.ts index 1e6476e40..c76751247 100644 --- a/packages/test/src/test-sinks.ts +++ b/packages/test/src/test-sinks.ts @@ -55,14 +55,9 @@ if (RUN_INTEGRATION_TESTS) { fn: string; } - const dummyDate = new Date(2000, 1, 0, 0, 0, 0); function fixWorkflowInfoDates(input: WorkflowInfo): WorkflowInfo { delete (input.unsafe as any).now; - return { - ...input, - startTime: dummyDate, - runStartTime: dummyDate, - }; + return input; } const recordedCalls: RecordedCall[] = []; @@ -112,6 +107,11 @@ if (RUN_INTEGRATION_TESTS) { await wf.result(); return wf; }); + + // Capture volatile values that are hard to predict + const { historySize, startTime, runStartTime } = recordedCalls[0].info; + t.true(historySize > 300); + const info: WorkflowInfo = { namespace: 'default', firstExecutionRunId: wf.firstExecutionRunId, @@ -134,8 +134,11 @@ if (RUN_INTEGRATION_TESTS) { parent: undefined, searchAttributes: {}, historyLength: 3, - startTime: dummyDate, - runStartTime: dummyDate, + continueAsNewSuggested: false, + // values ignored for the purpose of comparison + historySize, + startTime, + runStartTime, // unsafe.now() doesn't make it through serialization, but .now is required, so we need to cast unsafe: { isReplaying: false, diff --git a/packages/test/src/test-workflows.ts b/packages/test/src/test-workflows.ts index ee075f25d..876896beb 100644 --- a/packages/test/src/test-workflows.ts +++ b/packages/test/src/test-workflows.ts @@ -111,6 +111,8 @@ async function createWorkflow( taskQueue: 'test', searchAttributes: {}, historyLength: 3, + historySize: 300, + continueAsNewSuggested: false, unsafe: { isReplaying: false, now: Date.now }, startTime: new Date(), runStartTime: new Date(), @@ -353,6 +355,14 @@ test('successString', async (t) => { compareCompletion(t, req, makeSuccess([makeCompleteWorkflowExecution(defaultPayloadConverter.toPayload('success'))])); }); +test('continueAsNewSuggested', async (t) => { + const { workflowType } = t.context; + const activation = makeStartWorkflow(workflowType); + activation.continueAsNewSuggested = true; + const req = await activate(t, activation); + compareCompletion(t, req, makeSuccess([makeCompleteWorkflowExecution(defaultPayloadConverter.toPayload(true))])); +}); + function cleanWorkflowFailureStackTrace( req: coresdk.workflow_completion.IWorkflowActivationCompletion, commandIndex = 0 diff --git a/packages/test/src/workflows/continue-as-new-suggested.ts b/packages/test/src/workflows/continue-as-new-suggested.ts new file mode 100644 index 000000000..761e67a79 --- /dev/null +++ b/packages/test/src/workflows/continue-as-new-suggested.ts @@ -0,0 +1,5 @@ +import { workflowInfo } from '@temporalio/workflow'; + +export async function continueAsNewSuggested(): Promise { + return workflowInfo().continueAsNewSuggested; +} diff --git a/packages/test/src/workflows/index.ts b/packages/test/src/workflows/index.ts index 66576597f..e0bfa07f9 100644 --- a/packages/test/src/workflows/index.ts +++ b/packages/test/src/workflows/index.ts @@ -28,6 +28,7 @@ export * from './condition'; export * from './condition-completion-race'; export * from './condition-timeout-0'; export * from './continue-as-new-same-workflow'; +export * from './continue-as-new-suggested'; export * from './continue-as-new-to-different-workflow'; export * from './core-issue-589'; export * from './date'; diff --git a/packages/worker/src/worker.ts b/packages/worker/src/worker.ts index dd32c0513..1e8a10c4a 100644 --- a/packages/worker/src/worker.ts +++ b/packages/worker/src/worker.ts @@ -1193,6 +1193,10 @@ export class Worker { // 0 is the default, and not a valid value, since crons are at least a minute apart cronScheduleToScheduleInterval: optionalTsToMs(cronScheduleToScheduleInterval) || undefined, historyLength: activation.historyLength, + // Exact truncation for multi-petabyte histories + // A zero value means that it was not set by the server + historySize: activation.historySizeBytes.toNumber(), + continueAsNewSuggested: activation.continueAsNewSuggested, unsafe: { now: () => Date.now(), // re-set in initRuntime isReplaying: activation.isReplaying, diff --git a/packages/workflow/src/interfaces.ts b/packages/workflow/src/interfaces.ts index 401d47554..b1335fc38 100644 --- a/packages/workflow/src/interfaces.ts +++ b/packages/workflow/src/interfaces.ts @@ -70,6 +70,27 @@ export interface WorkflowInfo { */ historyLength: number; + /** + * Size of Workflow history in bytes until the current Workflow Task. + * + * This value changes during the lifetime of an Execution. + * + * Supported only on Temporal Server 1.20+, always zero on older servers. + * + * You may safely use this information to decide when to {@link continueAsNew}. + */ + historySize: number; + + /** + * A hint provided by the current WorkflowTaskStarted event recommending whether to + * {@link continueAsNew}. + * + * This value changes during the lifetime of an Execution. + * + * Supported only on Temporal Server 1.20+, always `false` on older servers. + */ + continueAsNewSuggested: boolean; + /** * Task queue this Workflow is executing on */ diff --git a/packages/workflow/src/worker-interface.ts b/packages/workflow/src/worker-interface.ts index a7d9e38d7..268991154 100644 --- a/packages/workflow/src/worker-interface.ts +++ b/packages/workflow/src/worker-interface.ts @@ -186,11 +186,14 @@ export function activate(activation: coresdk.workflow_activation.WorkflowActivat // timestamp will not be updated for activation that contain only queries activator.now = tsToMs(activation.timestamp); } - if (activation.historyLength == null) { - throw new TypeError('Got activation with no historyLength'); - } + + // The Rust Core ensures that these activation fields are not null activator.info.unsafe.isReplaying = activation.isReplaying ?? false; - activator.info.historyLength = activation.historyLength; + activator.info.historyLength = activation.historyLength as number; + // Exact truncation for multi-petabyte histories + // historySize === 0 means WFT was generated by pre-1.20.0 server, and the history size is unknown + activator.info.historySize = activation.historySizeBytes?.toNumber() || 0; + activator.info.continueAsNewSuggested = activation.continueAsNewSuggested ?? false; } // Cast from the interface to the class which has the `variant` attribute.