Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update-with-start #1585

Draft
wants to merge 34 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
3a49246
Enable multiop
dandavison Oct 30, 2024
b216ff2
uws as updateWithStart
dandavison Nov 13, 2024
1ab785b
Autoformat, respond to upstream
dandavison Nov 20, 2024
ca09123
Use a class
dandavison Nov 23, 2024
17720bc
Update tests
dandavison Nov 23, 2024
229450b
Move startOp into options
dandavison Nov 23, 2024
3f46f3d
Set workflow handle and make startOp single-use
dandavison Nov 23, 2024
2a886ae
Add client-side poll loop
dandavison Nov 24, 2024
bcf4ea7
Fix type signatures
dandavison Nov 24, 2024
8b26a3e
Rethrow inner gRPC error
dandavison Nov 26, 2024
bb1da06
HACK: mutate existing error
dandavison Nov 26, 2024
8d136c2
Hack / tweak test
dandavison Nov 26, 2024
b4deb87
Custom error type
dandavison Nov 26, 2024
fd8d1f8
Bug fix: pass outcome from Completed into handle
dandavison Nov 26, 2024
636513e
Use dedicated interceptor
dandavison Nov 26, 2024
2240a39
Add test that start is resolved during polling
dandavison Nov 27, 2024
23ed849
Resolve StartWorkflow promise eagerly
dandavison Nov 27, 2024
9d097e8
Rename
dandavison Nov 27, 2024
46ee286
Refactor
dandavison Nov 27, 2024
c784d15
Revert "Add test that start is resolved during polling"
dandavison Dec 10, 2024
f22adfe
Fixups
dandavison Dec 10, 2024
74153ae
New interceptor API
dandavison Dec 11, 2024
bcf860f
Delete unused requestedStage
dandavison Dec 11, 2024
67ebf98
Share code path setting default updateId
dandavison Dec 11, 2024
fa37f5c
Fixup: rename fields in client interceptor struct
dandavison Dec 11, 2024
7e05eb5
Set distinct headers
dandavison Dec 11, 2024
a88a026
Rename: WithStartWorkflowOperation; used -> executed
dandavison Dec 11, 2024
7757ea2
Add docs
dandavison Dec 11, 2024
9df8ddd
Bug fix: switch polling from UpdateWorkflow to PollUpdate on Accepted
dandavison Dec 11, 2024
5d2907c
Delete poll-loop implementation comment
dandavison Dec 11, 2024
78a99bc
Tweak interceptor docs
dandavison Dec 11, 2024
0bd972c
Make the conflict policy required
dandavison Dec 11, 2024
1f4ad3e
Lint
dandavison Dec 11, 2024
d0b7470
Rearrange
dandavison Dec 12, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 9 additions & 6 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

23 changes: 23 additions & 0 deletions packages/client/src/helpers.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { ServiceError as GrpcServiceError } from '@grpc/grpc-js';
import {
CustomGrpcServiceError as CustomGrpcServiceError,
LoadedDataConverter,
mapFromPayloads,
NamespaceNotFoundError,
Expand Down Expand Up @@ -103,6 +104,7 @@ export function decodeCountWorkflowExecutionsResponse(
}

type ErrorDetailsName = `temporal.api.errordetails.v1.${keyof typeof temporal.api.errordetails.v1}`;
type FailureName = `temporal.api.failure.v1.${keyof typeof temporal.api.failure.v1}`;

/**
* If the error type can be determined based on embedded grpc error details,
Expand All @@ -123,6 +125,27 @@ export function rethrowKnownErrorTypes(err: GrpcServiceError): void {
const { namespace } = temporal.api.errordetails.v1.NamespaceNotFoundFailure.decode(entry.value);
throw new NamespaceNotFoundError(namespace);
}
case 'temporal.api.errordetails.v1.MultiOperationExecutionFailure': {
// MultiOperationExecutionFailure contains error statuses for multiple
// operations. A MultiOperationExecutionAborted error status means that
// the corresponding operation was aborted due to an error in one of the
// other operations. We rethrow the first operation error that is not
// MultiOperationExecutionAborted.
const { statuses } = temporal.api.errordetails.v1.MultiOperationExecutionFailure.decode(entry.value);
for (const status of statuses) {
const detail = status.details?.[0];
const statusType = detail?.type_url?.replace(/^type.googleapis.com\//, '') as FailureName | undefined;
if (statusType === 'temporal.api.failure.v1.MultiOperationExecutionAborted') {
continue;
}
throw new CustomGrpcServiceError(
status.message || err.message,
status.code || err.code,
detail?.value?.toString() || err.details,
err.metadata.getMap()
);
}
}
}
}
}
Expand Down
27 changes: 27 additions & 0 deletions packages/client/src/interceptors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,24 @@ export interface WorkflowStartUpdateOutput {
readonly outcome?: temporal.api.update.v1.IOutcome;
}

/** Input for WorkflowClientInterceptor.startUpdateWithStart */
export interface WorkflowStartUpdateWithStartInput {
readonly workflowType: string;
readonly workflowStartOptions: CompiledWorkflowOptions;
readonly workflowStartHeaders: Headers;
readonly updateName: string;
readonly updateArgs: unknown[];
readonly updateOptions: WorkflowUpdateOptions;
readonly updateHeaders: Headers;
}

/** Output for WorkflowClientInterceptor.startUpdateWithStart */
export interface WorkflowStartUpdateWithStartOutput {
readonly updateId: string;
readonly workflowRunId: string;
readonly outcome?: temporal.api.update.v1.IOutcome;
}

/** Input for WorkflowClientInterceptor.signal */
export interface WorkflowSignalInput {
readonly signalName: string;
Expand Down Expand Up @@ -107,6 +125,15 @@ export interface WorkflowClientInterceptor {
input: WorkflowStartUpdateInput,
next: Next<this, 'startUpdate'>
) => Promise<WorkflowStartUpdateOutput>;
/**
* Intercept a service call to startUpdateWithStart
*
* @experimental startUpdateWithStart is an experimental feature.
*/
startUpdateWithStart?: (
input: WorkflowStartUpdateWithStartInput,
next: Next<this, 'startUpdateWithStart'>
) => Promise<WorkflowStartUpdateWithStartOutput>;
/**
* Intercept a service call to signalWorkflowExecution
*
Expand Down
Loading
Loading