diff --git a/apps/tests/aws-runtime/test/test-service.ts b/apps/tests/aws-runtime/test/test-service.ts index 412f6c18d..ce0534a27 100644 --- a/apps/tests/aws-runtime/test/test-service.ts +++ b/apps/tests/aws-runtime/test/test-service.ts @@ -11,6 +11,8 @@ import { workflow, heartbeat, HeartbeatTimeout, + completeActivity, + failActivity, } from "@eventual/core"; import { SendMessageCommand, SQSClient } from "@aws-sdk/client-sqs"; import { AsyncWriterTestEvent } from "./async-writer-handler.js"; @@ -148,11 +150,13 @@ export const childWorkflow = workflow( } ); -const slowActivity = activity( - "slowAct", - { timeoutSeconds: 5 }, - () => new Promise((resolve) => setTimeout(resolve, 10 * 1000)) -); +const delay = (seconds: number) => + new Promise((resolve) => setTimeout(resolve, seconds * 1000)); + +const slowActivity = activity("slowAct", { timeoutSeconds: 5 }, async () => { + await delay(10); + return "done finally"; +}); const slowWf = workflow("slowWorkflow", { timeoutSeconds: 5 }, () => sleepFor(10) @@ -332,3 +336,144 @@ const sendFinishEvent = activity("sendFinish", async (executionId: string) => { proxy: true, }); }); + +/** + * An event which is raised by the activityOverrideActivity. + * Can provide a value to the activity via activity token or defer the responsibility to the workflow via signal. + */ +const activityOverrideEvent = event<{ + executionId: string; + token: string; + location: "handler" | "signal"; + type: "complete" | "fail"; +}>("activityOverrideEvent"); + +/** + * An async activity which will be cancelled using it's activity token. + */ +const activityOverrideActivity = activity( + "eventPublish", + async ({ + type, + location, + executionId, + }: { + executionId: string; + location: "handler" | "signal"; + type: "complete" | "fail"; + }) => { + return asyncResult( + async (token) => + await activityOverrideEvent.publish({ + token, + location, + type, + executionId, + }) + ); + } +); + +/** + * A signal called by the activityOverrideEvent handler to pass the activity token to the workflow. + * Used to test activity completion from the workflow. + */ +const activityOverrideSignal = new Signal<{ + token: string; + type: "complete" | "fail"; +}>("activityOverrideSignal"); + +activityOverrideEvent.on(async ({ token, location, type, executionId }) => { + if (location === "handler") { + if (type === "complete") { + await completeActivity(token, "from the event handler!"); + } else { + await failActivity(token, new Error("WHY!!!")); + } + } else { + await activityOverrideSignal.send(executionId, { token, type }); + } +}); + +/** + * Activity which waits to be closed/cancelled. + * If it is closed, it signals the workflow with "complete", if not it signals with "fail". + */ +const activityOverrideAwareActivity = activity( + "overrideAware", + async ({ executionId }: { executionId: string }) => { + let n = 0; + while (n < 10) { + await delay(1); + const { closed } = await heartbeat(); + if (closed) { + await activityOverrideSignal.send(executionId, { + token: "", + type: "complete", + }); + return; + } + } + + await activityOverrideSignal.send(executionId, { token: "", type: "fail" }); + } +); + +/** + * Activity Override Tests. + * + * 1. cancel an activity from the workflow and record the result + * 2. Use the activity token to cancel, fail, or complete an async token from within an event handler or workflow + * 3. Cancel an activity and use a signal to signify that it knows it was cancelled and record the result. + */ +export const overrideWorkflow = workflow( + "override", + async (_, { execution: { id: executionId } }) => { + const act = slowActivity(); + act.cancel("because"); + + const results1 = await Promise.allSettled([act]); + + const signalHandler = activityOverrideSignal.on(async ({ token, type }) => { + if (type === "complete") { + await completeActivity(token, "from the signal handler!"); + } else { + await failActivity(token, new Error("BECAUSE!!!")); + } + }); + + const results2 = await Promise.allSettled([ + activityOverrideActivity({ + location: "handler", + type: "complete", + executionId, + }), + activityOverrideActivity({ + location: "handler", + type: "fail", + executionId, + }), + activityOverrideActivity({ + location: "signal", + type: "complete", + executionId, + }), + activityOverrideActivity({ + location: "signal", + type: "fail", + executionId, + }), + ]); + + const aware = activityOverrideAwareActivity({ executionId }); + await sleepFor(1); + aware.cancel("because"); + + signalHandler.dispose(); + + // the override activity SHOULD send a signal when it realizes it is cancelled. + const result = await expectSignal(activityOverrideSignal); + + return [results1, results2, result]; + } +); diff --git a/apps/tests/aws-runtime/test/tester.test.ts b/apps/tests/aws-runtime/test/tester.test.ts index 93e018ab9..2a074eb67 100644 --- a/apps/tests/aws-runtime/test/tester.test.ts +++ b/apps/tests/aws-runtime/test/tester.test.ts @@ -1,4 +1,8 @@ -import { HeartbeatTimeout } from "@eventual/core"; +import { + ActivityCancelled, + EventualError, + HeartbeatTimeout, +} from "@eventual/core"; import { eventualRuntimeTestHarness } from "./runtime-test-harness.js"; import { eventDrivenWorkflow, @@ -10,6 +14,7 @@ import { workflow2, workflow3, workflow4, + overrideWorkflow, } from "./test-service.js"; jest.setTimeout(100 * 1000); @@ -31,7 +36,7 @@ eventualRuntimeTestHarness(({ testCompletion }) => { { status: "fulfilled", value: ["HELLO SAM", "HELLO CHRIS", "HELLO SAM"] }, { status: "fulfilled", value: ["hello sam", "hello chris", "hello sam"] }, { status: "fulfilled", value: "hello sam" }, - { status: "rejected", reason: "Error" }, + { status: "rejected", reason: { name: "Error", message: "failed" } }, ]); testCompletion("parent-child", parentWorkflow, "done"); @@ -45,7 +50,10 @@ eventualRuntimeTestHarness(({ testCompletion }) => { testCompletion("asyncActivities", asyncWorkflow, [ "hello from the async writer!", - "AsyncWriterError", + { + name: "AsyncWriterError", + message: "I was told to fail this activity, sorry.", + }, ]); testCompletion("heartbeat", heartbeatWorkflow, 10, [ @@ -62,4 +70,21 @@ eventualRuntimeTestHarness(({ testCompletion }) => { ]); testCompletion("event-driven", eventDrivenWorkflow, "done!"); + + testCompletion("overrideActivities", overrideWorkflow, [ + [{ status: "rejected", reason: new ActivityCancelled("because").toJSON() }], + [ + { status: "fulfilled", value: "from the event handler!" }, + { + status: "rejected", + reason: new EventualError("Error", "WHY!!!").toJSON(), + }, + { status: "fulfilled", value: "from the signal handler!" }, + { + status: "rejected", + reason: new EventualError("Error", "BECAUSE!!!").toJSON(), + }, + ], + { token: "", type: "complete" }, + ]); }); diff --git a/packages/@eventual/aws-cdk/src/activities.ts b/packages/@eventual/aws-cdk/src/activities.ts index 151e47171..f5029b435 100644 --- a/packages/@eventual/aws-cdk/src/activities.ts +++ b/packages/@eventual/aws-cdk/src/activities.ts @@ -87,10 +87,12 @@ export class Activities extends Construct implements IActivities, IGrantable { public configureCompleteActivity(func: Function) { this.props.workflows.configureSendWorkflowEvent(func); + this.configureUpdateActivity(func); } public grantCompleteActivity(grantable: IGrantable) { this.props.workflows.grantSendWorkflowEvent(grantable); + this.grantUpdateActivity(grantable); } public configureUpdateActivity(func: Function) { diff --git a/packages/@eventual/aws-runtime/src/clients/activity-runtime-client.ts b/packages/@eventual/aws-runtime/src/clients/activity-runtime-client.ts index ef8507ad2..c68fa1d87 100644 --- a/packages/@eventual/aws-runtime/src/clients/activity-runtime-client.ts +++ b/packages/@eventual/aws-runtime/src/clients/activity-runtime-client.ts @@ -67,7 +67,7 @@ export class AWSActivityRuntimeClient implements ActivityRuntimeClient { executionId: string, seq: number, heartbeatTime: string - ): Promise<{ cancelled: boolean }> { + ): Promise<{ closed: boolean }> { const item = await this.props.dynamo.send( new UpdateItemCommand({ Key: { @@ -86,27 +86,36 @@ export class AWSActivityRuntimeClient implements ActivityRuntimeClient { ); return { - cancelled: - (item.Attributes as ActivityExecutionRecord).cancelled?.BOOL ?? false, + closed: + (item.Attributes as ActivityExecutionRecord).closed?.BOOL ?? false, }; } - async cancelActivity(executionId: string, seq: number) { - await this.props.dynamo.send( + /** + * An activity execution is closed when it already has a result (completed or failed). + * + * Close the activity to prevent others from emitting events for it and report to the activity worker + * that the activity is no longer able to report a result. + */ + async closeActivity(executionId: string, seq: number) { + const item = await this.props.dynamo.send( new UpdateItemCommand({ Key: { pk: { S: ActivityExecutionRecord.key(executionId, seq) }, }, UpdateExpression: - "SET cancelled=:cancelled, executionId = :executionId, seq = :seq", + "SET closed=:closed, executionId = :executionId, seq = :seq", ExpressionAttributeValues: { - ":cancelled": { BOOL: true }, + ":closed": { BOOL: true }, ":executionId": { S: executionId }, ":seq": { N: `${seq}` }, }, TableName: this.props.activityTableName, + ReturnValues: ReturnValue.UPDATED_OLD, }) ); + + return { alreadyClosed: item.Attributes?.["closed"]?.BOOL ?? false }; } async getActivity( @@ -132,7 +141,7 @@ export interface ActivityExecutionRecord { executionId: AttributeValue.SMember; seq: AttributeValue.NMember; heartbeatTime?: AttributeValue.SMember; - cancelled?: AttributeValue.BOOLMember; + closed?: AttributeValue.BOOLMember; } export namespace ActivityExecutionRecord { @@ -148,7 +157,7 @@ function createActivityFromRecord( return { executionId: activityRecord.executionId.S, seq: Number(activityRecord.seq.N), - cancelled: Boolean(activityRecord.cancelled?.BOOL ?? false), + closed: Boolean(activityRecord.closed?.BOOL ?? false), heartbeatTime: activityRecord?.heartbeatTime?.S, }; } diff --git a/packages/@eventual/aws-runtime/src/handlers/orchestrator.ts b/packages/@eventual/aws-runtime/src/handlers/orchestrator.ts index 13f3ee519..adbe87181 100644 --- a/packages/@eventual/aws-runtime/src/handlers/orchestrator.ts +++ b/packages/@eventual/aws-runtime/src/handlers/orchestrator.ts @@ -6,6 +6,7 @@ import type { SQSEvent, SQSRecord } from "aws-lambda"; import { logger, loggerMiddlewares } from "../logger.js"; import { AWSMetricsClient } from "../clients/metrics-client.js"; import { + createActivityRuntimeClient, createEventClient, createExecutionHistoryClient, createTimerClient, @@ -24,6 +25,7 @@ const orchestrate = createOrchestrator({ workflowRuntimeClient: createWorkflowRuntimeClient(), workflowClient: createWorkflowClient(), eventClient: createEventClient(), + activityRuntimeClient: createActivityRuntimeClient(), metricsClient: AWSMetricsClient, logger, }); diff --git a/packages/@eventual/core/src/activity.ts b/packages/@eventual/core/src/activity.ts index fb8337485..3a2002edd 100644 --- a/packages/@eventual/core/src/activity.ts +++ b/packages/@eventual/core/src/activity.ts @@ -1,9 +1,14 @@ -import { createActivityCall } from "./calls/activity-call.js"; +import { + createActivityCall, + createOverrideActivityCall, +} from "./calls/activity-call.js"; +import { ActivityCancelled, EventualError } from "./error.js"; import { callableActivities, getActivityContext, getWorkflowClient, } from "./global.js"; +import { Result } from "./result.js"; import { CompleteActivityRequest } from "./runtime/clients/workflow-client.js"; import { isActivityWorker, isOrchestratorWorker } from "./runtime/flags.js"; @@ -31,7 +36,8 @@ export interface ActivityFunction< Arguments extends any[], Output extends any = any > { - (...args: Arguments): Promise>>; + (...args: Arguments): Promise>> & + ActivityExecutionReference; /** * Complete an activity request by its {@link CompleteActivityRequest.activityToken}. @@ -72,6 +78,9 @@ export type UnwrapAsync = Output extends AsyncResult ? O : Output; +export type ActivityOutput> = + A extends ActivityFunction ? UnwrapAsync : never; + const AsyncTokenSymbol = Symbol.for("eventual:AsyncToken"); /** @@ -175,12 +184,102 @@ export function activity( Output >; } - func.complete = async function (request) { - return getWorkflowClient().completeActivity(request); + func.complete = function (request) { + return completeActivity(request.activityToken, request.result); }; return func; } +export type ActivityTarget = OwnActivityTarget | ActivityTokenTarget; + +export enum ActivityTargetType { + OwnActivity, + ActivityToken, +} + +export interface OwnActivityTarget { + type: ActivityTargetType.OwnActivity; + seq: number; +} + +export interface ActivityTokenTarget { + type: ActivityTargetType.ActivityToken; + activityToken: string; +} + +export interface ActivityExecutionReference { + /** + * Cancel this activity. + * + * The activity will reject with a {@link ActivityCancelled} error. + * + * If the activity is calling {@link heartbeat}, closed: true will be + * return to signal the workflow considers the activity finished. + */ + cancel: (reason: string) => Promise; +} + +/** + * Causes the activity to resolve the provided value to the workflow. + * + * If the activity is calling {@link heartbeat}, closed: true will be + * return to signal the workflow considers the activity finished. + */ +export function completeActivity = any>( + activityToken: string, + result: ActivityOutput +): Promise { + if (isOrchestratorWorker()) { + return createOverrideActivityCall( + { + type: ActivityTargetType.ActivityToken, + activityToken, + }, + Result.resolved(result) + ) as any; + } else { + return getWorkflowClient().completeActivity({ activityToken, result }); + } +} + +/** + * Causes the activity to reject with the provided value within the workflow. + * + * If the activity is calling {@link heartbeat}, closed: true will be + * return to signal the workflow considers the activity finished. + */ +export function failActivity( + activityToken: string, + error: Error +): Promise; +export function failActivity( + activityToken: string, + error: string, + message: string +): Promise; +export function failActivity( + activityToken: string, + ...args: [error: Error] | [error: string, message: string] +): Promise { + const error = + args.length === 1 ? args[0] : new EventualError(args[0], args[1]); + if (isOrchestratorWorker()) { + return createOverrideActivityCall( + { + type: ActivityTargetType.ActivityToken, + activityToken, + }, + Result.failed(error) + ) as any; + } else { + return getWorkflowClient().failActivity({ + activityToken, + error: error.name, + message: error.message, + }); + } +} + /** * Retrieve an activity function that has been registered in a workflow. */ diff --git a/packages/@eventual/core/src/calls/activity-call.ts b/packages/@eventual/core/src/calls/activity-call.ts index 841ed3434..95f367c6e 100644 --- a/packages/@eventual/core/src/calls/activity-call.ts +++ b/packages/@eventual/core/src/calls/activity-call.ts @@ -1,19 +1,25 @@ +import { + ActivityExecutionReference, + ActivityTarget, + ActivityTargetType, +} from "../activity.js"; +import { ActivityCancelled } from "../error.js"; import { EventualKind, - EventualBase, isEventualOfKind, createEventual, + CommandCallBase, } from "../eventual.js"; import { registerEventual } from "../global.js"; -import { Resolved, Failed } from "../result.js"; +import { Resolved, Failed, Result } from "../result.js"; export function isActivityCall(a: any): a is ActivityCall { return isEventualOfKind(EventualKind.ActivityCall, a); } export interface ActivityCall - extends EventualBase | Failed> { - seq?: number; + extends CommandCallBase | Failed>, + ActivityExecutionReference { name: string; args: any[]; heartbeatSeconds?: number; @@ -26,12 +32,44 @@ export function createActivityCall( timeoutSeconds?: number, heartbeatSeconds?: number ): ActivityCall { - return registerEventual( + const call = registerEventual( createEventual(EventualKind.ActivityCall, { name, args, timeoutSeconds, heartbeatSeconds, + } as ActivityCall) + ); + + call.cancel = function (reason) { + return createOverrideActivityCall( + { type: ActivityTargetType.OwnActivity, seq: this.seq! }, + Result.failed(new ActivityCancelled(reason)) + ) as unknown as Promise; + }; + + return call; +} + +export function isOverrideActivityCall(a: any): a is OverrideActivityCall { + return isEventualOfKind(EventualKind.OverrideActivityCall, a); +} + +export interface OverrideActivityCall + extends CommandCallBase { + target: ActivityTarget; + outcome: Resolved | Failed; +} + +export function createOverrideActivityCall( + target: ActivityTarget, + outcome: Resolved | Failed +): OverrideActivityCall { + return registerEventual( + createEventual(EventualKind.OverrideActivityCall, { + target, + outcome, + result: Result.resolved(undefined), }) ); } diff --git a/packages/@eventual/core/src/calls/condition-call.ts b/packages/@eventual/core/src/calls/condition-call.ts index 5a8c9fe07..d12b309fb 100644 --- a/packages/@eventual/core/src/calls/condition-call.ts +++ b/packages/@eventual/core/src/calls/condition-call.ts @@ -1,7 +1,7 @@ import { ConditionPredicate } from "../condition.js"; import { + CommandCallBase, createEventual, - EventualBase, EventualKind, isEventualOfKind, } from "../eventual.js"; @@ -13,8 +13,7 @@ export function isConditionCall(a: any): a is ConditionCall { } export interface ConditionCall - extends EventualBase | Failed> { - seq?: number; + extends CommandCallBase | Failed> { predicate: ConditionPredicate; timeoutSeconds?: number; } diff --git a/packages/@eventual/core/src/calls/expect-signal-call.ts b/packages/@eventual/core/src/calls/expect-signal-call.ts index f15a0a661..9c2d2b1f6 100644 --- a/packages/@eventual/core/src/calls/expect-signal-call.ts +++ b/packages/@eventual/core/src/calls/expect-signal-call.ts @@ -1,8 +1,8 @@ import { EventualKind, - EventualBase, isEventualOfKind, createEventual, + CommandCallBase, } from "../eventual.js"; import { registerEventual } from "../global.js"; import { Failed, Resolved } from "../result.js"; @@ -12,8 +12,7 @@ export function isExpectSignalCall(a: any): a is ExpectSignalCall { } export interface ExpectSignalCall - extends EventualBase | Failed> { - seq?: number; + extends CommandCallBase | Failed> { signalId: string; timeoutSeconds?: number; } diff --git a/packages/@eventual/core/src/calls/send-events-call.ts b/packages/@eventual/core/src/calls/send-events-call.ts index a36aae51a..3e0a257fd 100644 --- a/packages/@eventual/core/src/calls/send-events-call.ts +++ b/packages/@eventual/core/src/calls/send-events-call.ts @@ -1,8 +1,8 @@ import { EventualKind, - EventualBase, isEventualOfKind, createEventual, + CommandCallBase, } from "../eventual.js"; import { registerEventual } from "../global.js"; import { EventEnvelope } from "../event.js"; @@ -13,8 +13,7 @@ export function isPublishEventsCall(a: any): a is PublishEventsCall { } export interface PublishEventsCall - extends EventualBase> { - seq?: number; + extends CommandCallBase> { events: EventEnvelope[]; id?: string; } diff --git a/packages/@eventual/core/src/calls/send-signal-call.ts b/packages/@eventual/core/src/calls/send-signal-call.ts index d2eec3553..b50175cc5 100644 --- a/packages/@eventual/core/src/calls/send-signal-call.ts +++ b/packages/@eventual/core/src/calls/send-signal-call.ts @@ -1,8 +1,8 @@ import { EventualKind, - EventualBase, isEventualOfKind, createEventual, + CommandCallBase, } from "../eventual.js"; import { registerEventual } from "../global.js"; import { Resolved, Result } from "../result.js"; @@ -13,8 +13,7 @@ export function isSendSignalCall(a: any): a is SendSignalCall { } export interface SendSignalCall - extends EventualBase> { - seq?: number; + extends CommandCallBase> { signalId: string; payload?: any; target: SignalTarget; diff --git a/packages/@eventual/core/src/calls/signal-handler-call.ts b/packages/@eventual/core/src/calls/signal-handler-call.ts index ffd35c132..9da6f307a 100644 --- a/packages/@eventual/core/src/calls/signal-handler-call.ts +++ b/packages/@eventual/core/src/calls/signal-handler-call.ts @@ -1,6 +1,6 @@ import { + CommandCallBase, createEventual, - EventualBase, EventualKind, isEventualOfKind, } from "../eventual.js"; @@ -16,9 +16,8 @@ export function isRegisterSignalHandlerCall( } export interface RegisterSignalHandlerCall - extends EventualBase, + extends CommandCallBase, SignalsHandler { - seq?: number; signalId: string; handler: (input: T) => Program | void; } diff --git a/packages/@eventual/core/src/calls/sleep-call.ts b/packages/@eventual/core/src/calls/sleep-call.ts index 19d649565..84ef30b90 100644 --- a/packages/@eventual/core/src/calls/sleep-call.ts +++ b/packages/@eventual/core/src/calls/sleep-call.ts @@ -1,8 +1,8 @@ import { EventualKind, - EventualBase, isEventualOfKind, createEventual, + CommandCallBase, } from "../eventual.js"; import { registerEventual } from "../global.js"; import { Resolved } from "../result.js"; @@ -16,14 +16,12 @@ export function isSleepUntilCall(a: any): a is SleepUntilCall { } export interface SleepForCall - extends EventualBase> { - seq?: number; + extends CommandCallBase> { durationSeconds: number; } export interface SleepUntilCall - extends EventualBase> { - seq?: number; + extends CommandCallBase> { isoDate: string; } diff --git a/packages/@eventual/core/src/calls/workflow-call.ts b/packages/@eventual/core/src/calls/workflow-call.ts index c58c08afb..eac9fad09 100644 --- a/packages/@eventual/core/src/calls/workflow-call.ts +++ b/packages/@eventual/core/src/calls/workflow-call.ts @@ -1,7 +1,7 @@ import { + CommandCallBase, createEventual, Eventual, - EventualBase, EventualKind, isEventualOfKind, } from "../eventual.js"; @@ -24,11 +24,10 @@ export function isWorkflowCall(a: Eventual): a is WorkflowCall { * An {@link Eventual} representing an awaited call to a {@link Workflow}. */ export interface WorkflowCall - extends EventualBase>, + extends CommandCallBase>, ChildExecution { name: string; input?: any; - seq?: number; opts?: WorkflowOptions; } diff --git a/packages/@eventual/core/src/command.ts b/packages/@eventual/core/src/command.ts index 99e2c9957..b9c678564 100644 --- a/packages/@eventual/core/src/command.ts +++ b/packages/@eventual/core/src/command.ts @@ -1,9 +1,11 @@ import { EventEnvelope } from "./event.js"; +import { ActivityTarget, Failed, Resolved } from "./index.js"; import { SignalTarget } from "./signals.js"; import { WorkflowOptions } from "./workflow.js"; export type Command = | ExpectSignalCommand + | OverrideActivityCommand | ScheduleActivityCommand | ScheduleWorkflowCommand | PublishEventsCommand @@ -19,6 +21,7 @@ interface CommandBase { export enum CommandType { ExpectSignal = "ExpectSignal", + OverrideActivity = "OverrideActivity", PublishEvents = "PublishEvents", SendSignal = "SendSignal", SleepFor = "SleepFor", @@ -48,6 +51,18 @@ export function isScheduleActivityCommand( return a.kind === CommandType.StartActivity; } +export interface OverrideActivityCommand + extends CommandBase { + target: ActivityTarget; + outcome: Resolved | Failed; +} + +export function isOverrideActivityCommand( + a: Command +): a is OverrideActivityCommand { + return a.kind === CommandType.OverrideActivity; +} + // TODO support a timeout at the parent workflow level. The current timeout fails the whole workflow and not just the waiter. export interface ScheduleWorkflowCommand extends CommandBase { diff --git a/packages/@eventual/core/src/error.ts b/packages/@eventual/core/src/error.ts index f87204ae8..c319827e0 100644 --- a/packages/@eventual/core/src/error.ts +++ b/packages/@eventual/core/src/error.ts @@ -9,7 +9,7 @@ export class EventualError extends Error { toJSON() { return { name: this.name, - message: this.message, + ...(this.message ? { message: this.message } : {}), }; } } @@ -66,6 +66,16 @@ export class HeartbeatTimeout extends Timeout { this.name = "HeartbeatTimeout"; } } + +/** + * Thrown when an activity was cancelled by the workflow. + */ +export class ActivityCancelled extends EventualError { + constructor(reason: string) { + super("ActivityCancelled", reason); + } +} + /** * Thrown when a particular context only support synchronous operations (ex: condition predicate). */ diff --git a/packages/@eventual/core/src/eventual.ts b/packages/@eventual/core/src/eventual.ts index 173ebfcd3..33d42a58e 100644 --- a/packages/@eventual/core/src/eventual.ts +++ b/packages/@eventual/core/src/eventual.ts @@ -1,4 +1,9 @@ -import { ActivityCall, isActivityCall } from "./calls/activity-call.js"; +import { + ActivityCall, + OverrideActivityCall, + isActivityCall, + isOverrideActivityCall, +} from "./calls/activity-call.js"; import { AwaitAll, createAwaitAll } from "./await-all.js"; import { chain, Chain } from "./chain.js"; import type { Program } from "./interpret.js"; @@ -44,6 +49,11 @@ export interface EventualBase { result?: R; } +export interface CommandCallBase + extends EventualBase { + seq?: number; +} + export enum EventualKind { ActivityCall = 1, AwaitAll = 0, @@ -52,6 +62,7 @@ export enum EventualKind { Chain = 2, ConditionCall = 9, ExpectSignalCall = 6, + OverrideActivityCall = 14, PublishEventsCall = 13, Race = 11, RegisterSignalHandlerCall = 7, @@ -95,6 +106,7 @@ export type CommandCall = | ActivityCall | ConditionCall | ExpectSignalCall + | OverrideActivityCall | RegisterSignalHandlerCall | PublishEventsCall | SendSignalCall @@ -107,6 +119,7 @@ export function isCommandCall(call: Eventual): call is CommandCall { isActivityCall(call) || isConditionCall(call) || isExpectSignalCall(call) || + isOverrideActivityCall(call) || isPublishEventsCall(call) || isRegisterSignalHandlerCall(call) || isSendSignalCall(call) || diff --git a/packages/@eventual/core/src/heartbeat.ts b/packages/@eventual/core/src/heartbeat.ts index c1f06d50e..e98c923ae 100644 --- a/packages/@eventual/core/src/heartbeat.ts +++ b/packages/@eventual/core/src/heartbeat.ts @@ -1,5 +1,5 @@ import { getActivityContext, getWorkflowClient } from "./global.js"; -import { HeartbeatResponse } from "./runtime/clients/workflow-client.js"; +import { HeartbeatResponse } from "./runtime/clients/activity-runtime-client.js"; import { isActivityWorker, isOrchestratorWorker } from "./runtime/flags.js"; /** diff --git a/packages/@eventual/core/src/interpret.ts b/packages/@eventual/core/src/interpret.ts index 21acfffed..73b8c4eb7 100644 --- a/packages/@eventual/core/src/interpret.ts +++ b/packages/@eventual/core/src/interpret.ts @@ -6,9 +6,13 @@ import { EventualCallCollector, } from "./eventual.js"; import { isAwaitAll } from "./await-all.js"; -import { isActivityCall } from "./calls/activity-call.js"; +import { + isActivityCall, + isOverrideActivityCall, +} from "./calls/activity-call.js"; import { DeterminismError, + EventualError, HeartbeatTimeout, SynchronousOperationError, Timeout, @@ -36,6 +40,7 @@ import { isActivityTimedOut, isActivityHeartbeatTimedOut, isEventsPublished, + isActivityOverridden, } from "./workflow-events.js"; import { Result, @@ -66,6 +71,7 @@ import { isAwaitAllSettled } from "./await-all-settled.js"; import { isAwaitAny } from "./await-any.js"; import { isRace } from "./race.js"; import { isPublishEventsCall } from "./calls/send-events-call.js"; +import { ActivityTargetType } from "./index.js"; export interface WorkflowResult { /** @@ -240,6 +246,13 @@ export function interpret( seq: call.seq!, events: call.events, }; + } else if (isOverrideActivityCall(call)) { + return { + kind: CommandType.OverrideActivity, + seq: call.seq!, + outcome: call.outcome, + target: call.target, + }; } return assertNever(call); @@ -269,6 +282,18 @@ export function interpret( if (isCommandCall(activity)) { if (isExpectSignalCall(activity)) { subscribeToSignal(activity.signalId, activity); + } else if (isOverrideActivityCall(activity)) { + if (activity.target.type === ActivityTargetType.OwnActivity) { + const act = callTable[activity.target.seq]; + if (act === undefined) { + throw new DeterminismError( + `Call for seq ${activity.target.seq} was not emitted.` + ); + } + if (!act.result) { + act.result = activity.outcome; + } + } } else if (isConditionCall(activity)) { // if the condition is resolvable, don't add it to the calls. const result = tryResolveResult(activity); @@ -467,7 +492,16 @@ export function interpret( (r): PromiseFulfilledResult | PromiseRejectedResult => isResolved(r) ? { status: "fulfilled", value: r.value } - : { status: "rejected", reason: r.error } + : { + status: "rejected", + reason: + r.error instanceof Error + ? new EventualError( + r.error.name, + r.error.message ?? undefined + ) + : r.error, + } ) ); } @@ -530,7 +564,7 @@ export function interpret( ? Result.failed(new Timeout("Activity Timed Out")) : isActivityHeartbeatTimedOut(event) ? Result.failed(new HeartbeatTimeout("Activity Heartbeat TimedOut")) - : Result.failed(event.error); + : Result.failed(new EventualError(event.error, event.message)); } } @@ -551,6 +585,8 @@ function isCorresponding(event: ScheduledEvent, call: CommandCall) { return isConditionCall(call); } else if (isEventsPublished(event)) { return isPublishEventsCall(call); + } else if (isActivityOverridden(event)) { + return isOverrideActivityCall(call); } return assertNever(event); } diff --git a/packages/@eventual/core/src/runtime/clients/activity-runtime-client.ts b/packages/@eventual/core/src/runtime/clients/activity-runtime-client.ts index ad9b05f37..1e8654d1e 100644 --- a/packages/@eventual/core/src/runtime/clients/activity-runtime-client.ts +++ b/packages/@eventual/core/src/runtime/clients/activity-runtime-client.ts @@ -21,13 +21,16 @@ export interface ActivityRuntimeClient { executionId: string, seq: number, heartbeatTime: string - ): Promise<{ cancelled: boolean }>; + ): Promise; /** - * Marks an activity as cancelled. An activity can use the {@link heartbeat} call + * Marks an activity as closed. An activity can use the {@link heartbeat} call * to retrieve this value. */ - cancelActivity(executionId: string, seq: number): Promise; + closeActivity( + executionId: string, + seq: number + ): Promise<{ alreadyClosed: boolean }>; /** * Retrieves the activity to check the cancellation status, heartbeat, or other properties. @@ -43,5 +46,15 @@ export interface ActivityExecution { seq: number; claims?: string[]; heartbeatTime?: string; - cancelled?: boolean; + closed?: boolean; +} + +export interface HeartbeatResponse { + /** + * True when the activity has already completed elsewhere. + * + * This is the only way for a long running activity to know that the activity + * is no longer looking for a result. + */ + closed: boolean; } diff --git a/packages/@eventual/core/src/runtime/clients/workflow-client.ts b/packages/@eventual/core/src/runtime/clients/workflow-client.ts index 0fd01647b..9a6157ad0 100644 --- a/packages/@eventual/core/src/runtime/clients/workflow-client.ts +++ b/packages/@eventual/core/src/runtime/clients/workflow-client.ts @@ -10,7 +10,7 @@ import { Execution, ExecutionStatus } from "../../execution.js"; import { Signal } from "../../signals.js"; import { Workflow, WorkflowInput, WorkflowOptions } from "../../workflow.js"; import { decodeActivityToken } from "../activity-token.js"; -import { ActivityRuntimeClient } from "./activity-runtime-client.js"; +import { ActivityRuntimeClient, HeartbeatResponse } from "./activity-runtime-client.js"; export abstract class WorkflowClient { constructor(private activityRuntimeClient: ActivityRuntimeClient) {} @@ -107,7 +107,7 @@ export abstract class WorkflowClient { const execution = await this.getExecution(data.payload.executionId); if (execution?.status !== ExecutionStatus.IN_PROGRESS) { - return { cancelled: true }; + return { closed: true }; } return await this.activityRuntimeClient.heartbeatActivity( @@ -121,13 +121,23 @@ export abstract class WorkflowClient { E extends ActivityCompleted | ActivityFailed >(activityToken: string, event: Omit) { const data = decodeActivityToken(activityToken); - await this.submitWorkflowTask( - data.payload.executionId, - createEvent({ - ...event, - seq: data.payload.seq, - }) - ); + // mark the activity as cancelled because we are supplying a value + const { alreadyClosed: alreadyCancelled } = + await this.activityRuntimeClient.closeActivity( + data.payload.executionId, + data.payload.seq + ); + if (!alreadyCancelled) { + await this.submitWorkflowTask( + data.payload.executionId, + createEvent({ + ...event, + seq: data.payload.seq, + }) + ); + } else { + throw new Error("Activity already completed."); + } } } @@ -192,11 +202,3 @@ export interface HeartbeatRequest { activityToken: string; } -export interface HeartbeatResponse { - /** - * True when the activity has been cancelled. - * - * This is the only way for a long running activity to know it was canelled. - */ - cancelled: boolean; -} diff --git a/packages/@eventual/core/src/runtime/command-executor.ts b/packages/@eventual/core/src/runtime/command-executor.ts index c77c43185..831472e19 100644 --- a/packages/@eventual/core/src/runtime/command-executor.ts +++ b/packages/@eventual/core/src/runtime/command-executor.ts @@ -1,7 +1,9 @@ import { Command, ExpectSignalCommand, + OverrideActivityCommand, isExpectSignalCommand, + isOverrideActivityCommand, isPublishEventsCommand, isScheduleActivityCommand, isScheduleWorkflowCommand, @@ -31,13 +33,23 @@ import { ConditionStarted, ConditionTimedOut, SignalSent, + ActivityOverridden, + ActivityCompleted, + ActivityFailed, } from "../workflow-events.js"; -import { EventsPublished, isChildExecutionTarget } from "../index.js"; +import { + ActivityTargetType, + EventsPublished, + isChildExecutionTarget, + isResolved, +} from "../index.js"; import { assertNever } from "../util.js"; import { Workflow } from "../workflow.js"; import { formatChildExecutionName, formatExecutionId } from "./execution-id.js"; import { ActivityWorkerRequest } from "./handlers/activity-worker.js"; import { + ActivityRuntimeClient, + decodeActivityToken, EventClient, Schedule, TimerClient, @@ -50,6 +62,7 @@ interface CommandExecutorProps { timerClient: TimerClient; workflowClient: WorkflowClient; eventClient: EventClient; + activityRuntimeClient: ActivityRuntimeClient; } /** @@ -81,6 +94,8 @@ export class CommandExecutor { return startCondition(command); } else if (isPublishEventsCommand(command)) { return publishEvents(command); + } else if (isOverrideActivityCommand(command)) { + return overrideActivity(command); } else { return assertNever(command, `unknown command type`); } @@ -235,5 +250,54 @@ export class CommandExecutor { seq: command.seq!, }); } + + async function overrideActivity(command: OverrideActivityCommand) { + if (command.target.type === ActivityTargetType.OwnActivity) { + await self.props.activityRuntimeClient.closeActivity( + executionId, + command.target.seq + ); + return createEvent({ + executionId, + activitySeq: command.target.seq, + seq: command.seq, + type: WorkflowEventType.ActivityOverridden, + }); + } else { + const data = decodeActivityToken(command.target.activityToken); + if (isResolved(command.outcome)) { + await self.props.workflowClient.submitWorkflowTask( + data.payload.executionId, + createEvent({ + type: WorkflowEventType.ActivityCompleted, + seq: data.payload.seq, + result: command.outcome.value, + }) + ); + } else { + await self.props.workflowClient.submitWorkflowTask( + data.payload.executionId, + createEvent({ + type: WorkflowEventType.ActivityFailed, + seq: data.payload.seq, + error: + command.outcome.error instanceof Error + ? command.outcome.error.name + : "Error", + message: + command.outcome.error instanceof Error + ? command.outcome.error.message + : JSON.stringify(command.outcome.error), + }) + ); + } + return createEvent({ + executionId: data.payload.executionId, + activitySeq: data.payload.seq, + seq: command.seq, + type: WorkflowEventType.ActivityOverridden, + }); + } + } } } diff --git a/packages/@eventual/core/src/runtime/handlers/activity-worker.ts b/packages/@eventual/core/src/runtime/handlers/activity-worker.ts index c43c60022..972be5c3c 100644 --- a/packages/@eventual/core/src/runtime/handlers/activity-worker.ts +++ b/packages/@eventual/core/src/runtime/handlers/activity-worker.ts @@ -227,9 +227,23 @@ export function createActivityWorker({ event: ActivityCompleted | ActivityFailed, duration: number ) { - await timed(metrics, ActivityMetrics.SubmitWorkflowTaskDuration, () => - workflowClient.submitWorkflowTask(request.executionId, event) + const { alreadyClosed } = await timed( + metrics, + ActivityMetrics.SubmitWorkflowTaskDuration, + () => + activityRuntimeClient.closeActivity( + request.executionId, + request.command.seq + ) ); + // if an activity is closed, do not send the result on completion. + if (!alreadyClosed) { + await timed(metrics, ActivityMetrics.SubmitWorkflowTaskDuration, () => + workflowClient.submitWorkflowTask(request.executionId, event) + ); + } else { + logger.info("Activity was already closed, do not emit result."); + } logActivityCompleteMetrics(isWorkflowFailed(event), duration); } diff --git a/packages/@eventual/core/src/runtime/handlers/orchestrator.ts b/packages/@eventual/core/src/runtime/handlers/orchestrator.ts index 9acde41e3..370c68d7a 100644 --- a/packages/@eventual/core/src/runtime/handlers/orchestrator.ts +++ b/packages/@eventual/core/src/runtime/handlers/orchestrator.ts @@ -27,6 +27,7 @@ import { import { isFailed, isResolved, isResult, Result } from "../../result.js"; import { lookupWorkflow, progressWorkflow, Workflow } from "../../workflow.js"; import { + ActivityRuntimeClient, EventClient, ExecutionHistoryClient, MetricsClient, @@ -48,6 +49,7 @@ import { promiseAllSettledPartitioned } from "../utils.js"; * The Orchestrator's client dependencies. */ export interface OrchestratorDependencies { + activityRuntimeClient: ActivityRuntimeClient; executionHistoryClient: ExecutionHistoryClient; timerClient: TimerClient; workflowRuntimeClient: WorkflowRuntimeClient; @@ -71,6 +73,7 @@ export interface OrchestratorResult { * inject its own client implementations designed for that platform. */ export function createOrchestrator({ + activityRuntimeClient, executionHistoryClient, timerClient, workflowRuntimeClient, @@ -86,6 +89,7 @@ export function createOrchestrator({ workflowClient, workflowRuntimeClient, eventClient, + activityRuntimeClient, }); return async (eventsByExecutionId) => { diff --git a/packages/@eventual/core/src/runtime/metrics/constants.ts b/packages/@eventual/core/src/runtime/metrics/constants.ts index 0a048a5c4..92904e11f 100644 --- a/packages/@eventual/core/src/runtime/metrics/constants.ts +++ b/packages/@eventual/core/src/runtime/metrics/constants.ts @@ -162,6 +162,10 @@ export namespace ActivityMetrics { * amount of time it took to submit a workflow task to SQS to resume the workflow. */ export const SubmitWorkflowTaskDuration = "SubmitWorkflowTaskDuration"; + /** + * amount of time it took to make the activity as cancelled. + */ + export const CancelActivityDuration = "CancelActivityDuration"; } export namespace SchedulerForwarderMetrics { diff --git a/packages/@eventual/core/src/signals.ts b/packages/@eventual/core/src/signals.ts index 482a40a5a..7a1438e7f 100644 --- a/packages/@eventual/core/src/signals.ts +++ b/packages/@eventual/core/src/signals.ts @@ -252,6 +252,7 @@ export function sendSignal( return getWorkflowClient().sendSignal({ executionId, signal, + payload, id: id ?? ulid(), }); } diff --git a/packages/@eventual/core/src/workflow-events.ts b/packages/@eventual/core/src/workflow-events.ts index 208ce6bed..076f201ee 100644 --- a/packages/@eventual/core/src/workflow-events.ts +++ b/packages/@eventual/core/src/workflow-events.ts @@ -19,6 +19,7 @@ export interface HistoryEventBase extends Omit { export enum WorkflowEventType { ActivityCompleted = "ActivityCompleted", ActivityFailed = "ActivityFailed", + ActivityOverridden = "ActivityOverridden", ActivityHeartbeatTimedOut = "ActivityHeartbeatTimedOut", ActivityScheduled = "ActivityScheduled", ActivityTimedOut = "ActivityTimedOut", @@ -55,6 +56,7 @@ export type WorkflowEvent = export type ScheduledEvent = | ActivityScheduled + | ActivityOverridden | ChildWorkflowScheduled | ConditionStarted | EventsPublished @@ -135,6 +137,16 @@ export interface ActivityCompleted extends HistoryEventBase { result: any; } +/** + * Event generated when the workflow calls complete, fail, + * or cancel on it's activity or the activity of another execution. + */ +export interface ActivityOverridden extends HistoryEventBase { + type: WorkflowEventType.ActivityOverridden; + executionId: string; + activitySeq: number; +} + export interface ActivityFailed extends HistoryEventBase { type: WorkflowEventType.ActivityFailed; error: string; @@ -201,6 +213,12 @@ export function isActivityCompleted( return event.type === WorkflowEventType.ActivityCompleted; } +export function isActivityOverridden( + event: WorkflowEvent +): event is ActivityOverridden { + return event.type === WorkflowEventType.ActivityOverridden; +} + export function isActivityFailed( event: WorkflowEvent ): event is ActivityFailed { @@ -377,6 +395,7 @@ export function isWorkflowTimedOut( export const isScheduledEvent = or( isActivityScheduled, + isActivityOverridden, isChildWorkflowScheduled, isConditionStarted, isEventsPublished, diff --git a/packages/@eventual/core/test/command-util.ts b/packages/@eventual/core/test/command-util.ts index a66672c13..37b23743c 100644 --- a/packages/@eventual/core/test/command-util.ts +++ b/packages/@eventual/core/test/command-util.ts @@ -2,6 +2,7 @@ import { ulid } from "ulidx"; import { CommandType, ExpectSignalCommand, + OverrideActivityCommand, PublishEventsCommand, ScheduleActivityCommand, ScheduleWorkflowCommand, @@ -31,8 +32,11 @@ import { WorkflowEventType, WorkflowTimedOut, ActivityHeartbeatTimedOut, + ActivityOverridden, } from "../src/workflow-events.js"; import { SignalTarget } from "../src/signals.js"; +import { Failed, Resolved } from "../src/result.js"; +import { ActivityTarget } from "../src/index.js"; export function createSleepUntilCommand( untilTime: string, @@ -69,6 +73,19 @@ export function createScheduledActivityCommand( }; } +export function createOverrideActivityCommand( + outcome: Resolved | Failed, + target: ActivityTarget, + seq: number +): OverrideActivityCommand { + return { + kind: CommandType.OverrideActivity, + seq, + outcome, + target, + }; +} + export function createScheduledWorkflowCommand( name: string, input: any, @@ -191,6 +208,20 @@ export function activityScheduled( }; } +export function activityOverridden( + executionId: string, + activitySeq: number, + seq: number +): ActivityOverridden { + return { + type: WorkflowEventType.ActivityOverridden, + executionId, + activitySeq, + seq, + timestamp: new Date(0).toISOString(), + }; +} + export function activityHeartbeatTimedOut( seq: number, /** Relative seconds from 0 */ diff --git a/packages/@eventual/core/test/commend-executor.test.ts b/packages/@eventual/core/test/commend-executor.test.ts index 6aeeb211e..78012ed94 100644 --- a/packages/@eventual/core/test/commend-executor.test.ts +++ b/packages/@eventual/core/test/commend-executor.test.ts @@ -14,6 +14,7 @@ import { WorkflowEventType, } from "../src/workflow-events.js"; import { + ActivityRuntimeClient, EventClient, EventEnvelope, formatChildExecutionName, @@ -45,12 +46,16 @@ const mockWorkflowRuntimeClient = { const mockEventClient = { publish: jest.fn() as EventClient["publish"], } satisfies Partial as EventClient; +const mockActivityRuntimeClient = { + closeActivity: jest.fn() as ActivityRuntimeClient["closeActivity"], +} satisfies Partial as ActivityRuntimeClient; const testExecutor = new CommandExecutor({ timerClient: mockTimerClient, workflowClient: mockWorkflowClient, workflowRuntimeClient: mockWorkflowRuntimeClient, eventClient: mockEventClient, + activityRuntimeClient: mockActivityRuntimeClient, }); const workflow = { diff --git a/packages/@eventual/core/test/interpret.test.ts b/packages/@eventual/core/test/interpret.test.ts index 1bee6000d..ae40ca295 100644 --- a/packages/@eventual/core/test/interpret.test.ts +++ b/packages/@eventual/core/test/interpret.test.ts @@ -1,8 +1,18 @@ -import { createActivityCall } from "../src/calls/activity-call.js"; +import { + createActivityCall, + createOverrideActivityCall, +} from "../src/calls/activity-call.js"; import { chain } from "../src/chain.js"; -import { DeterminismError, HeartbeatTimeout, Timeout } from "../src/error.js"; import { + DeterminismError, + EventualError, + HeartbeatTimeout, + Timeout, +} from "../src/error.js"; +import { + ActivityTargetType, Context, + createActivityToken, createAwaitAll, Eventual, interpret, @@ -19,10 +29,14 @@ import { WorkflowHandler, WorkflowResult, } from "../src/index.js"; -import { createSleepUntilCall } from "../src/calls/sleep-call.js"; +import { + createSleepForCall, + createSleepUntilCall, +} from "../src/calls/sleep-call.js"; import { activityCompleted, activityFailed, + activityOverridden as activityOverridden, activityHeartbeatTimedOut, activityScheduled, activityTimedOut, @@ -30,6 +44,7 @@ import { conditionStarted, conditionTimedOut, createExpectSignalCommand, + createOverrideActivityCommand as createOverrideActivityCommand, createPublishEventCommand, createScheduledActivityCommand, createScheduledWorkflowCommand, @@ -156,7 +171,13 @@ test("should catch error of failed Activity", () => { activityFailed("error", 0), ]) ).toMatchObject({ - commands: [createScheduledActivityCommand("handle-error", ["error"], 1)], + commands: [ + createScheduledActivityCommand( + "handle-error", + [new EventualError("error").toJSON()], + 1 + ), + ], }); }); @@ -239,7 +260,7 @@ test("should handle partial blocks with partial completes", () => { }); }); -describe("activity", () => +describe("activity", () => { describe("heartbeat", () => { const wf = workflow(function* () { return createActivityCall("getPumpedUp", [], undefined, 100); @@ -300,7 +321,125 @@ describe("activity", () => commands: [], }); }); - })); + }); + + describe("override activity", () => { + describe("complete own activity", () => { + const wf = workflow(function* () { + const act = createActivityCall("getPumpedUp", [], undefined, 100); + yield createSleepForCall(100); + yield createOverrideActivityCall( + { seq: 0, type: ActivityTargetType.OwnActivity }, + Result.resolved("hi") + ); + return act; + }); + + test("override first", () => { + expect( + interpret(wf.definition(undefined, context), [ + activityScheduled("getPumpedUp", 0), + scheduledSleep("", 1), + completedSleep(1), + activityOverridden("", 0, 2), + ]) + ).toMatchObject({ + result: Result.resolved("hi"), + commands: [], + }); + }); + + test("override own after real complete", () => { + expect( + interpret(wf.definition(undefined, context), [ + activityScheduled("getPumpedUp", 0), + scheduledSleep("", 1), + activityCompleted("bye", 0), + completedSleep(1), + activityOverridden("", 0, 2), + ]) + ).toMatchObject({ + result: Result.resolved("bye"), + commands: [], + }); + }); + }); + + test("override with fail own activity", () => { + const wf = workflow(function* () { + const act = createActivityCall("getPumpedUp", [], undefined, 100); + yield createOverrideActivityCall( + { seq: 0, type: ActivityTargetType.OwnActivity }, + Result.failed(new Timeout()) + ); + return act; + }); + expect( + interpret(wf.definition(undefined, context), [ + activityScheduled("getPumpedUp", 0), + activityOverridden("", 0, 1), + ]) + ).toMatchObject({ + result: Result.failed(new Timeout()), + commands: [], + }); + }); + + describe("override external activity", () => { + test("override", () => { + const wf = workflow(function* () { + const act = createActivityCall("getPumpedUp", [], undefined, 100); + yield createOverrideActivityCall( + { + type: ActivityTargetType.ActivityToken, + activityToken: createActivityToken("exec1", 100), + }, + Result.failed(new Timeout()) + ); + return act; + }); + expect( + interpret(wf.definition(undefined, context), [ + activityScheduled("getPumpedUp", 0), + activityOverridden("exec1", 100, 1), + ]) + ).toMatchObject({ + commands: [], + }); + }); + + test("command", () => { + const wf = workflow(function* () { + const act = createActivityCall("getPumpedUp", [], undefined, 100); + yield createOverrideActivityCall( + { + type: ActivityTargetType.ActivityToken, + activityToken: createActivityToken("exec1", 100), + }, + Result.failed(new Timeout()) + ); + return act; + }); + expect( + interpret(wf.definition(undefined, context), [ + activityScheduled("getPumpedUp", 0), + ]) + ).toMatchObject({ + commands: [ + createOverrideActivityCommand( + Result.failed(new Timeout()), + { + type: ActivityTargetType.ActivityToken, + activityToken: createActivityToken("exec1", 100), + }, + 1 + ), + ], + }); + }); + }); + }); +}); test("should throw when scheduled does not correspond to call", () => { expect(() => @@ -938,7 +1077,7 @@ describe("Race", () => { activityCompleted("B", 1), ]) ).toMatchObject({ - result: Result.failed("A"), + result: Result.failed(new EventualError("A").toJSON()), }); expect( @@ -948,7 +1087,7 @@ describe("Race", () => { activityFailed("B", 1), ]) ).toMatchObject({ - result: Result.failed("B"), + result: Result.failed(new EventualError("B").toJSON()), }); }); }); @@ -1018,8 +1157,8 @@ describe("AwaitAllSettled", () => { ]) ).toMatchObject[]>>({ result: Result.resolved([ - { status: "rejected", reason: "A" }, - { status: "rejected", reason: "B" }, + { status: "rejected", reason: new EventualError("A").toJSON() }, + { status: "rejected", reason: new EventualError("B").toJSON() }, ]), commands: [], }); @@ -1033,7 +1172,7 @@ describe("AwaitAllSettled", () => { ]) ).toMatchObject[]>>({ result: Result.resolved([ - { status: "rejected", reason: "A" }, + { status: "rejected", reason: new EventualError("A").toJSON() }, { status: "fulfilled", value: "B" }, ]), commands: [], @@ -1306,7 +1445,7 @@ test("workflow calling other workflow", () => { workflowFailed("error", 0), ]) ).toMatchObject({ - result: Result.failed("error"), + result: Result.failed(new EventualError("error").toJSON()), commands: [], }); });