diff --git a/packages/client/src/helpers.ts b/packages/client/src/helpers.ts index 4682be663..7d553887f 100644 --- a/packages/client/src/helpers.ts +++ b/packages/client/src/helpers.ts @@ -10,7 +10,12 @@ import { Replace } from '@temporalio/common/lib/type-helpers'; import { optionalTsToDate, requiredTsToDate } from '@temporalio/common/lib/time'; import { decodeMapFromPayloads } from '@temporalio/common/lib/internal-non-workflow/codec-helpers'; import { temporal, google } from '@temporalio/proto'; -import { RawWorkflowExecutionInfo, WorkflowExecutionInfo, WorkflowExecutionStatusName } from './types'; +import { + CountWorkflowExecution, + RawWorkflowExecutionInfo, + WorkflowExecutionInfo, + WorkflowExecutionStatusName, +} from './types'; function workflowStatusCodeToName(code: temporal.api.enums.v1.WorkflowExecutionStatus): WorkflowExecutionStatusName { return workflowStatusCodeToNameInternal(code) ?? 'UNKNOWN'; @@ -81,6 +86,22 @@ export async function executionInfoFromRaw( }; } +export function decodeCountWorkflowExecutionsResponse( + raw: temporal.api.workflowservice.v1.ICountWorkflowExecutionsResponse +): CountWorkflowExecution { + return { + // Note: lossy conversion of Long to number + count: raw.count!.toNumber(), + groups: raw.groups!.map((group) => { + return { + // Note: lossy conversion of Long to number + count: group.count!.toNumber(), + groupValues: group.groupValues!.map((value) => searchAttributePayloadConverter.fromPayload(value)), + }; + }), + }; +} + type ErrorDetailsName = `temporal.api.errordetails.v1.${keyof typeof temporal.api.errordetails.v1}`; /** diff --git a/packages/client/src/types.ts b/packages/client/src/types.ts index bfc495031..6dd29ce9d 100644 --- a/packages/client/src/types.ts +++ b/packages/client/src/types.ts @@ -1,5 +1,5 @@ import type * as grpc from '@grpc/grpc-js'; -import type { SearchAttributes } from '@temporalio/common'; +import type { SearchAttributes, SearchAttributeValue } from '@temporalio/common'; import { makeProtoEnumConverters } from '@temporalio/common/lib/internal-workflow'; import * as proto from '@temporalio/proto'; import { Replace } from '@temporalio/common/lib/type-helpers'; @@ -52,6 +52,14 @@ export interface WorkflowExecutionInfo { raw: RawWorkflowExecutionInfo; } +export interface CountWorkflowExecution { + count: number; + groups: { + count: number; + groupValues: SearchAttributeValue[]; + }[]; +} + export type WorkflowExecutionDescription = Replace< WorkflowExecutionInfo, { diff --git a/packages/client/src/workflow-client.ts b/packages/client/src/workflow-client.ts index 3e0fb0150..173d03bad 100644 --- a/packages/client/src/workflow-client.ts +++ b/packages/client/src/workflow-client.ts @@ -58,6 +58,7 @@ import { WorkflowStartUpdateOutput, } from './interceptors'; import { + CountWorkflowExecution, DescribeWorkflowExecutionResponse, encodeQueryRejectCondition, GetWorkflowExecutionHistoryRequest, @@ -77,7 +78,7 @@ import { WorkflowStartOptions, WorkflowUpdateOptions, } from './workflow-options'; -import { executionInfoFromRaw, rethrowKnownErrorTypes } from './helpers'; +import { decodeCountWorkflowExecutionsResponse, executionInfoFromRaw, rethrowKnownErrorTypes } from './helpers'; import { BaseClient, BaseClientOptions, @@ -1285,9 +1286,9 @@ export class WorkflowClient extends BaseClient { } /** - * List workflows by given `query`. + * Return a list of Workflow Executions matching the given `query`. * - * ⚠️ To use advanced query functionality, as of the 1.18 server release, you must use Elasticsearch based visibility. + * Note that the list of Workflow Executions returned is approximate and eventually consistent. * * More info on the concept of "visibility" and the query syntax on the Temporal documentation site: * https://docs.temporal.io/visibility @@ -1308,6 +1309,29 @@ export class WorkflowClient extends BaseClient { }; } + /** + * Return the number of Workflow Executions matching the given `query`. If no `query` is provided, then return the + * total number of Workflow Executions for this namespace. + * + * Note that the number of Workflow Executions returned is approximate and eventually consistent. + * + * More info on the concept of "visibility" and the query syntax on the Temporal documentation site: + * https://docs.temporal.io/visibility + */ + public async count(query?: string): Promise { + let response: temporal.api.workflowservice.v1.CountWorkflowExecutionsResponse; + try { + response = await this.workflowService.countWorkflowExecutions({ + namespace: this.options.namespace, + query, + }); + } catch (e) { + this.rethrowGrpcError(e, 'Failed to count workflows'); + } + + return decodeCountWorkflowExecutionsResponse(response); + } + protected getOrMakeInterceptors(workflowId: string, runId?: string): WorkflowClientInterceptor[] { if (typeof this.options.interceptors === 'object' && 'calls' in this.options.interceptors) { // eslint-disable-next-line deprecation/deprecation diff --git a/packages/test/src/test-integration-workflows.ts b/packages/test/src/test-integration-workflows.ts index 727e4fc70..d033c018e 100644 --- a/packages/test/src/test-integration-workflows.ts +++ b/packages/test/src/test-integration-workflows.ts @@ -1,7 +1,7 @@ import { randomUUID } from 'crypto'; import { ExecutionContext } from 'ava'; import { firstValueFrom, Subject } from 'rxjs'; -import { WorkflowFailedError } from '@temporalio/client'; +import { CountWorkflowExecution, WorkflowFailedError } from '@temporalio/client'; import * as activity from '@temporalio/activity'; import { msToNumber, tsToMs } from '@temporalio/common/lib/time'; import { TestWorkflowEnvironment } from '@temporalio/testing'; @@ -1264,3 +1264,42 @@ export const interceptors: workflow.WorkflowInterceptorsFactory = () => { } return {}; }; + +export async function completableWorkflow(completes: boolean): Promise { + await workflow.condition(() => completes); +} + +test('Count workflow executions', async (t) => { + const { taskQueue, createWorker, executeWorkflow, startWorkflow } = helpers(t); + const worker = await createWorker(); + const client = t.context.env.client; + + // Run 2 workflows that don't complete + // (use startWorkflow to avoid waiting for workflows to complete, which they never will) + for (let i = 0; i < 2; i++) { + await startWorkflow(completableWorkflow, { args: [false] }); + } + + await worker.runUntil(async () => { + // Run 3 workflows that complete. + await Promise.all([ + executeWorkflow(completableWorkflow, { args: [true] }), + executeWorkflow(completableWorkflow, { args: [true] }), + executeWorkflow(completableWorkflow, { args: [true] }), + ]); + }); + + const actualTotal = await client.workflow.count(`TaskQueue = '${taskQueue}'`); + t.deepEqual(actualTotal, { count: 5, groups: [] }); + + const expectedByExecutionStatus: CountWorkflowExecution = { + count: 5, + groups: [ + { count: 2, groupValues: [['Running']] }, + { count: 3, groupValues: [['Completed']] }, + ], + }; + + const actualByExecutionStatus = await client.workflow.count(`TaskQueue = '${taskQueue}' GROUP BY ExecutionStatus`); + t.deepEqual(actualByExecutionStatus, expectedByExecutionStatus); +});