Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(client): add client.workflow.count high level API #1573

Merged
merged 9 commits into from
Dec 9, 2024
23 changes: 22 additions & 1 deletion packages/client/src/helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,12 @@ import { Replace } from '@temporalio/common/lib/type-helpers';
import { optionalTsToDate, requiredTsToDate } from '@temporalio/common/lib/time';
import { decodeMapFromPayloads } from '@temporalio/common/lib/internal-non-workflow/codec-helpers';
import { temporal, google } from '@temporalio/proto';
import { RawWorkflowExecutionInfo, WorkflowExecutionInfo, WorkflowExecutionStatusName } from './types';
import {
CountWorkflowExecution,
RawWorkflowExecutionInfo,
WorkflowExecutionInfo,
WorkflowExecutionStatusName,
} from './types';

function workflowStatusCodeToName(code: temporal.api.enums.v1.WorkflowExecutionStatus): WorkflowExecutionStatusName {
return workflowStatusCodeToNameInternal(code) ?? 'UNKNOWN';
Expand Down Expand Up @@ -81,6 +86,22 @@ export async function executionInfoFromRaw<T>(
};
}

export function decodeCountWorkflowExecutionsResponse(
raw: temporal.api.workflowservice.v1.ICountWorkflowExecutionsResponse
): CountWorkflowExecution {
return {
// Note: lossy conversion of Long to number
count: raw.count!.toNumber(),
groups: raw.groups!.map((group) => {
return {
// Note: lossy conversion of Long to number
count: group.count!.toNumber(),
groupValues: group.groupValues!.map((value) => searchAttributePayloadConverter.fromPayload(value)),
};
}),
};
}

type ErrorDetailsName = `temporal.api.errordetails.v1.${keyof typeof temporal.api.errordetails.v1}`;

/**
Expand Down
10 changes: 9 additions & 1 deletion packages/client/src/types.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import type * as grpc from '@grpc/grpc-js';
import type { SearchAttributes } from '@temporalio/common';
import type { SearchAttributes, SearchAttributeValue } from '@temporalio/common';
import { makeProtoEnumConverters } from '@temporalio/common/lib/internal-workflow';
import * as proto from '@temporalio/proto';
import { Replace } from '@temporalio/common/lib/type-helpers';
Expand Down Expand Up @@ -52,6 +52,14 @@ export interface WorkflowExecutionInfo {
raw: RawWorkflowExecutionInfo;
}

export interface CountWorkflowExecution {
count: number;
groups: {
count: number;
groupValues: SearchAttributeValue[];
}[];
}

export type WorkflowExecutionDescription = Replace<
WorkflowExecutionInfo,
{
Expand Down
30 changes: 27 additions & 3 deletions packages/client/src/workflow-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ import {
WorkflowStartUpdateOutput,
} from './interceptors';
import {
CountWorkflowExecution,
DescribeWorkflowExecutionResponse,
encodeQueryRejectCondition,
GetWorkflowExecutionHistoryRequest,
Expand All @@ -77,7 +78,7 @@ import {
WorkflowStartOptions,
WorkflowUpdateOptions,
} from './workflow-options';
import { executionInfoFromRaw, rethrowKnownErrorTypes } from './helpers';
import { decodeCountWorkflowExecutionsResponse, executionInfoFromRaw, rethrowKnownErrorTypes } from './helpers';
import {
BaseClient,
BaseClientOptions,
Expand Down Expand Up @@ -1285,9 +1286,9 @@ export class WorkflowClient extends BaseClient {
}

/**
* List workflows by given `query`.
* Return a list of Workflow Executions matching the given `query`.
*
* ⚠️ To use advanced query functionality, as of the 1.18 server release, you must use Elasticsearch based visibility.
* Note that the list of Workflow Executions returned is approximate and eventually consistent.
*
* More info on the concept of "visibility" and the query syntax on the Temporal documentation site:
* https://docs.temporal.io/visibility
Expand All @@ -1308,6 +1309,29 @@ export class WorkflowClient extends BaseClient {
};
}

/**
* Return the number of Workflow Executions matching the given `query`. If no `query` is provided, then return the
* total number of Workflow Executions for this namespace.
*
* Note that the number of Workflow Executions returned is approximate and eventually consistent.
*
* More info on the concept of "visibility" and the query syntax on the Temporal documentation site:
* https://docs.temporal.io/visibility
*/
public async count(query?: string): Promise<CountWorkflowExecution> {
let response: temporal.api.workflowservice.v1.CountWorkflowExecutionsResponse;
try {
response = await this.workflowService.countWorkflowExecutions({
namespace: this.options.namespace,
query,
});
} catch (e) {
this.rethrowGrpcError(e, 'Failed to count workflows');
}

return decodeCountWorkflowExecutionsResponse(response);
}

protected getOrMakeInterceptors(workflowId: string, runId?: string): WorkflowClientInterceptor[] {
if (typeof this.options.interceptors === 'object' && 'calls' in this.options.interceptors) {
// eslint-disable-next-line deprecation/deprecation
Expand Down
41 changes: 40 additions & 1 deletion packages/test/src/test-integration-workflows.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { randomUUID } from 'crypto';
import { ExecutionContext } from 'ava';
import { firstValueFrom, Subject } from 'rxjs';
import { WorkflowFailedError } from '@temporalio/client';
import { CountWorkflowExecution, WorkflowFailedError } from '@temporalio/client';
import * as activity from '@temporalio/activity';
import { msToNumber, tsToMs } from '@temporalio/common/lib/time';
import { TestWorkflowEnvironment } from '@temporalio/testing';
Expand Down Expand Up @@ -1264,3 +1264,42 @@ export const interceptors: workflow.WorkflowInterceptorsFactory = () => {
}
return {};
};

export async function completableWorkflow(completes: boolean): Promise<void> {
await workflow.condition(() => completes);
}

test('Count workflow executions', async (t) => {
const { taskQueue, createWorker, executeWorkflow, startWorkflow } = helpers(t);
const worker = await createWorker();
const client = t.context.env.client;

// Run 2 workflows that don't complete
// (use startWorkflow to avoid waiting for workflows to complete, which they never will)
for (let i = 0; i < 2; i++) {
await startWorkflow(completableWorkflow, { args: [false] });
}

await worker.runUntil(async () => {
// Run 3 workflows that complete.
await Promise.all([
executeWorkflow(completableWorkflow, { args: [true] }),
executeWorkflow(completableWorkflow, { args: [true] }),
executeWorkflow(completableWorkflow, { args: [true] }),
]);
});

const actualTotal = await client.workflow.count(`TaskQueue = '${taskQueue}'`);
t.deepEqual(actualTotal, { count: 5, groups: [] });

const expectedByExecutionStatus: CountWorkflowExecution = {
count: 5,
groups: [
{ count: 2, groupValues: [['Running']] },
{ count: 3, groupValues: [['Completed']] },
],
};

const actualByExecutionStatus = await client.workflow.count(`TaskQueue = '${taskQueue}' GROUP BY ExecutionStatus`);
t.deepEqual(actualByExecutionStatus, expectedByExecutionStatus);
});
Loading