From 724bdefb330622d9560a82305a215c4c48f5ed94 Mon Sep 17 00:00:00 2001 From: wangqi <1942460489@qq.com> Date: Wed, 2 Oct 2024 00:27:18 +0800 Subject: [PATCH] feature: update workflow engine --- ...ternalWorkflowInstanceEventDispatcher.java | 100 ++++++++++++++ ...alWorkflowTaskInstanceEventDispatcher.java | 100 ++++++++++++++ .../event}/WorkflowInstanceEventDTO.java | 10 +- .../event}/WorkflowTaskInstanceEventDTO.java | 10 +- ...ractWorkflowTaskInstanceEventListener.java | 10 +- ...rkflowTaskInstanceDeployEventListener.java | 56 ++++++++ .../WorkflowTaskInstanceEventListener.java | 23 ++-- ...kflowTaskInstanceFailureEventListener.java | 21 +-- ...kflowTaskInstanceSuccessEventListener.java | 21 +-- ...AbstractWorkflowInstanceEventListener.java | 10 +- .../WorkflowInstanceDeployEventListener.java | 74 ++++++++++ .../WorkflowInstanceEventListener.java | 23 ++-- .../WorkflowInstanceFailureEventListener.java | 16 ++- .../WorkflowInstanceResumeEventListener.java | 16 ++- ...WorkflowInstanceShutdownEventListener.java | 16 ++- .../WorkflowInstanceSuccessEventListener.java | 16 ++- .../WorkflowInstanceSuspendEventListener.java | 18 ++- ...rkflowInstanceTaskChangeEventListener.java | 98 +++++++++++++ ...nternalWorkflowInstanceEventPublisher.java | 53 +++++++ ...nalWorkflowTaskInstanceEventPublisher.java | 53 +++++++ .../executor/WorkflowInstanceExecutor.java | 40 ++++++ .../WorkflowInstanceExecutorManager.java | 82 +++++++++++ .../WorkflowInstanceExecuteExecutor.java | 66 +++++++++ ...rkflowTaskInstanceDeployEventListener.java | 91 ------------ .../WorkflowInstanceDeployEventListener.java | 108 --------------- ...rkflowInstanceTaskChangeEventListener.java | 129 ------------------ .../SimpleWorkflowInstanceManager.java | 15 +- .../SimpleWorkflowTaskInstanceManager.java | 10 +- .../impl/WorkflowInstanceServiceImpl.java | 124 +++++++++++++++++ .../WorkflowInstanceStateMachine.java | 75 +++------- .../WorkflowTaskInstanceStateMachine.java | 66 +++------ 31 files changed, 1019 insertions(+), 531 deletions(-) create mode 100644 carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/engine/dispatch/InternalWorkflowInstanceEventDispatcher.java create mode 100644 carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/engine/dispatch/InternalWorkflowTaskInstanceEventDispatcher.java rename carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/{listener/workflowinstance => engine/dispatch/event}/WorkflowInstanceEventDTO.java (84%) rename carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/{listener/taskinstance => engine/dispatch/event}/WorkflowTaskInstanceEventDTO.java (84%) rename carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/{listener/taskinstance => engine/dispatch/handler/task}/AbstractWorkflowTaskInstanceEventListener.java (86%) create mode 100644 carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/engine/dispatch/handler/task/WorkflowTaskInstanceDeployEventListener.java rename carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/{listener/taskinstance => engine/dispatch/handler/task}/WorkflowTaskInstanceEventListener.java (52%) rename carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/{listener/taskinstance => engine/dispatch/handler/task}/WorkflowTaskInstanceFailureEventListener.java (74%) rename carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/{listener/taskinstance => engine/dispatch/handler/task}/WorkflowTaskInstanceSuccessEventListener.java (72%) rename carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/{listener/workflowinstance => engine/dispatch/handler/workflow}/AbstractWorkflowInstanceEventListener.java (84%) create mode 100644 carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/engine/dispatch/handler/workflow/WorkflowInstanceDeployEventListener.java rename carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/{listener/workflowinstance => engine/dispatch/handler/workflow}/WorkflowInstanceEventListener.java (53%) rename carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/{listener/workflowinstance => engine/dispatch/handler/workflow}/WorkflowInstanceFailureEventListener.java (84%) rename carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/{listener/workflowinstance => engine/dispatch/handler/workflow}/WorkflowInstanceResumeEventListener.java (68%) rename carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/{listener/workflowinstance => engine/dispatch/handler/workflow}/WorkflowInstanceShutdownEventListener.java (68%) rename carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/{listener/workflowinstance => engine/dispatch/handler/workflow}/WorkflowInstanceSuccessEventListener.java (83%) rename carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/{listener/workflowinstance => engine/dispatch/handler/workflow}/WorkflowInstanceSuspendEventListener.java (68%) create mode 100644 carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/engine/dispatch/handler/workflow/WorkflowInstanceTaskChangeEventListener.java create mode 100644 carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/engine/dispatch/publisher/InternalWorkflowInstanceEventPublisher.java create mode 100644 carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/engine/dispatch/publisher/InternalWorkflowTaskInstanceEventPublisher.java create mode 100644 carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/executor/WorkflowInstanceExecutor.java create mode 100644 carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/executor/WorkflowInstanceExecutorManager.java create mode 100644 carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/executor/workflow/WorkflowInstanceExecuteExecutor.java delete mode 100644 carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/listener/taskinstance/WorkflowTaskInstanceDeployEventListener.java delete mode 100644 carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/listener/workflowinstance/WorkflowInstanceDeployEventListener.java delete mode 100644 carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/listener/workflowinstance/WorkflowInstanceTaskChangeEventListener.java create mode 100644 carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/service/impl/WorkflowInstanceServiceImpl.java diff --git a/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/engine/dispatch/InternalWorkflowInstanceEventDispatcher.java b/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/engine/dispatch/InternalWorkflowInstanceEventDispatcher.java new file mode 100644 index 00000000..b7a1b82d --- /dev/null +++ b/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/engine/dispatch/InternalWorkflowInstanceEventDispatcher.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.sliew.carp.module.workflow.internal.engine.dispatch; + +import cn.sliew.carp.framework.common.dict.workflow.WorkflowInstanceEvent; +import cn.sliew.carp.module.queue.api.Message; +import cn.sliew.carp.module.queue.api.MessageHandler; +import cn.sliew.carp.module.queue.api.MessageListener; +import cn.sliew.carp.module.queue.api.util.Serder; +import cn.sliew.carp.module.workflow.api.engine.dispatch.WorkflowInstanceEventDispatcher; +import cn.sliew.carp.module.workflow.api.engine.dispatch.handler.WorkflowInstanceEventHandler; +import cn.sliew.carp.module.workflow.api.engine.dispatch.event.WorkflowInstanceStatusEvent; +import cn.sliew.carp.module.workflow.internal.engine.dispatch.event.WorkflowInstanceEventDTO; +import cn.sliew.carp.module.workflow.internal.statemachine.WorkflowTaskInstanceStateMachine; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.DisposableBean; +import org.springframework.beans.factory.InitializingBean; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; +import org.springframework.util.CollectionUtils; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; + +@Slf4j +@MessageListener(topic = InternalWorkflowInstanceEventDispatcher.TOPIC, consumerGroup = WorkflowTaskInstanceStateMachine.CONSUMER_GROUP) +public class InternalWorkflowInstanceEventDispatcher implements WorkflowInstanceEventDispatcher, MessageHandler, InitializingBean, DisposableBean { + + public static final String TOPIC = "TOPIC_CARP_INTERNAL_WORKFLOW_INSTANCE_EVENT"; + + @Autowired + private List handlers; + + private Map registry = new HashMap<>(); + private ThreadPoolTaskExecutor taskExecutor; + + @Override + public void afterPropertiesSet() throws Exception { + if (CollectionUtils.isEmpty(handlers) == false) { + handlers.stream().forEach(handler -> registry.put(handler.getType(), handler)); + } + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); + executor.setMaxPoolSize(5); + executor.setCorePoolSize(1); + executor.setThreadNamePrefix("workflow-instance-thread-pool-"); + executor.initialize(); + taskExecutor = executor; + } + + @Override + public void destroy() throws Exception { + if (taskExecutor != null) { + taskExecutor.shutdown(); + } + } + + @Override + public void handler(Message message) throws Exception { + if (message.getBody() != null) { + Object deserialized = Serder.deserializeByJava(message.getBody()); + if (deserialized instanceof WorkflowInstanceEventDTO) { + WorkflowInstanceEventDTO eventDTO = (WorkflowInstanceEventDTO) deserialized; + dispatch(eventDTO); + } + } + } + + @Override + public void dispatch(WorkflowInstanceStatusEvent event) { + if (registry.containsKey(event.getEvent()) == false) { + throw new RuntimeException("unknown workflow instance event: " + + event.getEvent().getLabel() + "[" + event.getEvent().getValue() + "]"); + } + WorkflowInstanceEventHandler handler = registry.get(event.getEvent()); + CompletableFuture.runAsync(() -> handler.handle(event), taskExecutor) + .whenComplete((unused, throwable) -> { + if (throwable != null) { + log.error("workflow instance event dispatch failed", throwable); + } + }); + } +} diff --git a/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/engine/dispatch/InternalWorkflowTaskInstanceEventDispatcher.java b/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/engine/dispatch/InternalWorkflowTaskInstanceEventDispatcher.java new file mode 100644 index 00000000..7973fb16 --- /dev/null +++ b/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/engine/dispatch/InternalWorkflowTaskInstanceEventDispatcher.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.sliew.carp.module.workflow.internal.engine.dispatch; + +import cn.sliew.carp.framework.common.dict.workflow.WorkflowTaskInstanceEvent; +import cn.sliew.carp.module.queue.api.Message; +import cn.sliew.carp.module.queue.api.MessageHandler; +import cn.sliew.carp.module.queue.api.MessageListener; +import cn.sliew.carp.module.queue.api.util.Serder; +import cn.sliew.carp.module.workflow.api.engine.dispatch.WorkflowTaskInstanceEventDispatcher; +import cn.sliew.carp.module.workflow.api.engine.dispatch.event.WorkflowTaskInstanceStatusEvent; +import cn.sliew.carp.module.workflow.api.engine.dispatch.handler.WorkflowTaskInstanceEventHandler; +import cn.sliew.carp.module.workflow.internal.engine.dispatch.event.WorkflowTaskInstanceEventDTO; +import cn.sliew.carp.module.workflow.internal.statemachine.WorkflowInstanceStateMachine; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.DisposableBean; +import org.springframework.beans.factory.InitializingBean; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; +import org.springframework.util.CollectionUtils; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; + +@Slf4j +@MessageListener(topic = InternalWorkflowTaskInstanceEventDispatcher.TOPIC, consumerGroup = WorkflowInstanceStateMachine.CONSUMER_GROUP) +public class InternalWorkflowTaskInstanceEventDispatcher implements WorkflowTaskInstanceEventDispatcher, MessageHandler, InitializingBean, DisposableBean { + + public static final String TOPIC = "TOPIC_CARP_INTERNAL_WORKFLOW_TASK_INSTANCE_EVENT"; + + @Autowired + private List handlers; + + private Map registry = new HashMap<>(); + private ThreadPoolTaskExecutor taskExecutor; + + @Override + public void afterPropertiesSet() throws Exception { + if (CollectionUtils.isEmpty(handlers) == false) { + handlers.stream().forEach(handler -> registry.put(handler.getType(), handler)); + } + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); + executor.setMaxPoolSize(5); + executor.setCorePoolSize(1); + executor.setThreadNamePrefix("workflow-task-instance-thread-pool-"); + executor.initialize(); + taskExecutor = executor; + } + + @Override + public void destroy() throws Exception { + if (taskExecutor != null) { + taskExecutor.shutdown(); + } + } + + @Override + public void handler(Message message) throws Exception { + if (message.getBody() != null) { + Object deserialized = Serder.deserializeByJava(message.getBody()); + if (deserialized instanceof WorkflowTaskInstanceEventDTO) { + WorkflowTaskInstanceEventDTO eventDTO = (WorkflowTaskInstanceEventDTO) deserialized; + dispatch(eventDTO); + } + } + } + + @Override + public void dispatch(WorkflowTaskInstanceStatusEvent event) { + if (registry.containsKey(event.getEvent()) == false) { + throw new RuntimeException("unknown workflow task instance event: " + + event.getEvent().getLabel() + "[" + event.getEvent().getValue() + "]"); + } + WorkflowTaskInstanceEventHandler handler = registry.get(event.getEvent()); + CompletableFuture.runAsync(() -> handler.handle(event), taskExecutor) + .whenComplete((unused, throwable) -> { + if (throwable != null) { + log.error("workflow task instance event dispatch failed", throwable); + } + }); + } +} diff --git a/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/listener/workflowinstance/WorkflowInstanceEventDTO.java b/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/engine/dispatch/event/WorkflowInstanceEventDTO.java similarity index 84% rename from carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/listener/workflowinstance/WorkflowInstanceEventDTO.java rename to carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/engine/dispatch/event/WorkflowInstanceEventDTO.java index fb785deb..a0e607a2 100644 --- a/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/listener/workflowinstance/WorkflowInstanceEventDTO.java +++ b/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/engine/dispatch/event/WorkflowInstanceEventDTO.java @@ -16,16 +16,17 @@ * limitations under the License. */ -package cn.sliew.carp.module.workflow.internal.listener.workflowinstance; +package cn.sliew.carp.module.workflow.internal.engine.dispatch.event; import cn.sliew.carp.framework.common.dict.workflow.WorkflowInstanceEvent; import cn.sliew.carp.framework.common.dict.workflow.WorkflowInstanceState; +import cn.sliew.carp.module.workflow.api.engine.dispatch.event.WorkflowInstanceStatusEvent; import lombok.Getter; import java.io.Serializable; @Getter -public class WorkflowInstanceEventDTO implements Serializable { +public class WorkflowInstanceEventDTO implements WorkflowInstanceStatusEvent, Serializable { private static final long serialVersionUID = 1L; @@ -46,4 +47,9 @@ public WorkflowInstanceEventDTO(WorkflowInstanceState state, WorkflowInstanceSta this.workflowInstanceId = workflowInstanceId; this.throwable = throwable; } + + @Override + public WorkflowInstanceEvent getEvent() { + return event; + } } diff --git a/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/listener/taskinstance/WorkflowTaskInstanceEventDTO.java b/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/engine/dispatch/event/WorkflowTaskInstanceEventDTO.java similarity index 84% rename from carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/listener/taskinstance/WorkflowTaskInstanceEventDTO.java rename to carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/engine/dispatch/event/WorkflowTaskInstanceEventDTO.java index 0d276505..4762ba5a 100644 --- a/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/listener/taskinstance/WorkflowTaskInstanceEventDTO.java +++ b/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/engine/dispatch/event/WorkflowTaskInstanceEventDTO.java @@ -16,16 +16,17 @@ * limitations under the License. */ -package cn.sliew.carp.module.workflow.internal.listener.taskinstance; +package cn.sliew.carp.module.workflow.internal.engine.dispatch.event; import cn.sliew.carp.framework.common.dict.workflow.WorkflowTaskInstanceEvent; import cn.sliew.carp.framework.common.dict.workflow.WorkflowTaskInstanceStage; +import cn.sliew.carp.module.workflow.api.engine.dispatch.event.WorkflowTaskInstanceStatusEvent; import lombok.Getter; import java.io.Serializable; @Getter -public class WorkflowTaskInstanceEventDTO implements Serializable { +public class WorkflowTaskInstanceEventDTO implements WorkflowTaskInstanceStatusEvent, Serializable { private static final long serialVersionUID = 1L; @@ -46,4 +47,9 @@ public WorkflowTaskInstanceEventDTO(WorkflowTaskInstanceStage state, WorkflowTas this.workflowTaskInstanceId = workflowTaskInstanceId; this.throwable = throwable; } + + @Override + public WorkflowTaskInstanceEvent getEvent() { + return event; + } } diff --git a/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/listener/taskinstance/AbstractWorkflowTaskInstanceEventListener.java b/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/engine/dispatch/handler/task/AbstractWorkflowTaskInstanceEventListener.java similarity index 86% rename from carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/listener/taskinstance/AbstractWorkflowTaskInstanceEventListener.java rename to carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/engine/dispatch/handler/task/AbstractWorkflowTaskInstanceEventListener.java index cc834938..4add5688 100644 --- a/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/listener/taskinstance/AbstractWorkflowTaskInstanceEventListener.java +++ b/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/engine/dispatch/handler/task/AbstractWorkflowTaskInstanceEventListener.java @@ -16,10 +16,12 @@ * limitations under the License. */ -package cn.sliew.carp.module.workflow.internal.listener.taskinstance; +package cn.sliew.carp.module.workflow.internal.engine.dispatch.handler.task; import cn.sliew.carp.framework.dag.service.DagInstanceComplexService; import cn.sliew.carp.framework.dag.service.DagStepService; +import cn.sliew.carp.module.workflow.api.service.WorkflowInstanceService; +import cn.sliew.carp.module.workflow.internal.engine.dispatch.event.WorkflowTaskInstanceEventDTO; import cn.sliew.carp.module.workflow.internal.statemachine.WorkflowInstanceStateMachine; import cn.sliew.carp.module.workflow.internal.statemachine.WorkflowTaskInstanceStateMachine; import lombok.extern.slf4j.Slf4j; @@ -45,6 +47,8 @@ public abstract class AbstractWorkflowTaskInstanceEventListener implements Workf @Autowired protected DagStepService dagStepService; @Autowired + protected WorkflowInstanceService workflowInstanceService; + @Autowired protected WorkflowInstanceStateMachine workflowInstanceStateMachine; @Autowired protected WorkflowTaskInstanceStateMachine stateMachine; @@ -63,7 +67,7 @@ public void afterPropertiesSet() throws Exception { } @Override - public void onEvent(WorkflowTaskInstanceEventDTO event) { + public void handleInternal(WorkflowTaskInstanceEventDTO event) { try { handleEventAsync(event); } catch (Throwable throwable) { @@ -72,7 +76,7 @@ public void onEvent(WorkflowTaskInstanceEventDTO event) { } protected void onFailure(Long workflowTaskInstanceId, Throwable throwable) { - stateMachine.onFailure(dagStepService.get(workflowTaskInstanceId), throwable); +// stateMachine.onFailure(dagStepService.get(workflowTaskInstanceId), throwable); } protected abstract CompletableFuture handleEventAsync(WorkflowTaskInstanceEventDTO event); diff --git a/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/engine/dispatch/handler/task/WorkflowTaskInstanceDeployEventListener.java b/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/engine/dispatch/handler/task/WorkflowTaskInstanceDeployEventListener.java new file mode 100644 index 00000000..e9f0c770 --- /dev/null +++ b/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/engine/dispatch/handler/task/WorkflowTaskInstanceDeployEventListener.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.sliew.carp.module.workflow.internal.engine.dispatch.handler.task; + +import cn.sliew.carp.framework.common.dict.workflow.WorkflowTaskInstanceEvent; +import cn.sliew.carp.framework.dag.service.dto.DagStepDTO; +import cn.sliew.carp.module.workflow.internal.engine.dispatch.event.WorkflowTaskInstanceEventDTO; +import org.springframework.stereotype.Component; + +import java.util.Date; +import java.util.concurrent.CompletableFuture; + +@Component +public class WorkflowTaskInstanceDeployEventListener extends AbstractWorkflowTaskInstanceEventListener { + + @Override + public WorkflowTaskInstanceEvent getType() { + return WorkflowTaskInstanceEvent.COMMAND_DEPLOY; + } + + @Override + protected CompletableFuture handleEventAsync(WorkflowTaskInstanceEventDTO event) { + CompletableFuture future = CompletableFuture.runAsync(() -> run(event)).toCompletableFuture(); + future.whenCompleteAsync((unused, throwable) -> { + if (throwable != null) { + onFailure(event.getWorkflowTaskInstanceId(), throwable); + } + }); + return future; + } + + private void run(WorkflowTaskInstanceEventDTO event) { + DagStepDTO dagStepUpdateParam = new DagStepDTO(); + dagStepUpdateParam.setId(event.getWorkflowTaskInstanceId()); + dagStepUpdateParam.setStatus(event.getNextState().getValue()); + dagStepUpdateParam.setStartTime(new Date()); + dagStepService.update(dagStepUpdateParam); + stateMachine.onSuccess(workflowInstanceService.getTask(event.getWorkflowTaskInstanceId())); + } +} diff --git a/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/listener/taskinstance/WorkflowTaskInstanceEventListener.java b/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/engine/dispatch/handler/task/WorkflowTaskInstanceEventListener.java similarity index 52% rename from carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/listener/taskinstance/WorkflowTaskInstanceEventListener.java rename to carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/engine/dispatch/handler/task/WorkflowTaskInstanceEventListener.java index 9fda5e3d..98393736 100644 --- a/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/listener/taskinstance/WorkflowTaskInstanceEventListener.java +++ b/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/engine/dispatch/handler/task/WorkflowTaskInstanceEventListener.java @@ -16,24 +16,21 @@ * limitations under the License. */ -package cn.sliew.carp.module.workflow.internal.listener.taskinstance; +package cn.sliew.carp.module.workflow.internal.engine.dispatch.handler.task; -import cn.sliew.carp.module.queue.api.Message; -import cn.sliew.carp.module.queue.api.MessageHandler; -import cn.sliew.carp.module.queue.api.util.Serder; +import cn.sliew.carp.module.workflow.api.engine.dispatch.event.WorkflowTaskInstanceStatusEvent; +import cn.sliew.carp.module.workflow.api.engine.dispatch.handler.WorkflowTaskInstanceEventHandler; +import cn.sliew.carp.module.workflow.internal.engine.dispatch.event.WorkflowTaskInstanceEventDTO; -public interface WorkflowTaskInstanceEventListener extends MessageHandler { +public interface WorkflowTaskInstanceEventListener extends WorkflowTaskInstanceEventHandler { @Override - default void handler(Message message) throws Exception { - if (message.getBody() != null) { - Object deserialized = Serder.deserializeByJava(message.getBody()); - if (deserialized instanceof WorkflowTaskInstanceEventDTO) { - WorkflowTaskInstanceEventDTO eventDTO = (WorkflowTaskInstanceEventDTO)deserialized; - onEvent(eventDTO); - } + default void handle(WorkflowTaskInstanceStatusEvent event) { + if (event instanceof WorkflowTaskInstanceEventDTO == false) { + throw new RuntimeException(); } + handleInternal((WorkflowTaskInstanceEventDTO) event); } - void onEvent(WorkflowTaskInstanceEventDTO eventDTO); + void handleInternal(WorkflowTaskInstanceEventDTO eventDTO); } diff --git a/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/listener/taskinstance/WorkflowTaskInstanceFailureEventListener.java b/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/engine/dispatch/handler/task/WorkflowTaskInstanceFailureEventListener.java similarity index 74% rename from carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/listener/taskinstance/WorkflowTaskInstanceFailureEventListener.java rename to carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/engine/dispatch/handler/task/WorkflowTaskInstanceFailureEventListener.java index 5237466c..5f31820d 100644 --- a/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/listener/taskinstance/WorkflowTaskInstanceFailureEventListener.java +++ b/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/engine/dispatch/handler/task/WorkflowTaskInstanceFailureEventListener.java @@ -16,23 +16,27 @@ * limitations under the License. */ -package cn.sliew.carp.module.workflow.internal.listener.taskinstance; +package cn.sliew.carp.module.workflow.internal.engine.dispatch.handler.task; +import cn.sliew.carp.framework.common.dict.workflow.WorkflowTaskInstanceEvent; import cn.sliew.carp.framework.common.dict.workflow.WorkflowTaskInstanceStage; -import cn.sliew.carp.framework.dag.service.dto.DagInstanceDTO; import cn.sliew.carp.framework.dag.service.dto.DagStepDTO; -import cn.sliew.carp.module.queue.api.MessageListener; -import cn.sliew.carp.module.workflow.internal.statemachine.WorkflowTaskInstanceStateMachine; +import cn.sliew.carp.module.workflow.api.engine.domain.instance.WorkflowTaskInstance; +import cn.sliew.carp.module.workflow.internal.engine.dispatch.event.WorkflowTaskInstanceEventDTO; +import org.springframework.stereotype.Component; import java.io.Serializable; import java.util.Date; import java.util.Optional; import java.util.concurrent.CompletableFuture; -@MessageListener(topic = WorkflowTaskInstanceFailureEventListener.TOPIC, consumerGroup = WorkflowTaskInstanceStateMachine.CONSUMER_GROUP) +@Component public class WorkflowTaskInstanceFailureEventListener extends AbstractWorkflowTaskInstanceEventListener { - public static final String TOPIC = "TOPIC_WORKFLOW_TASK_INSTANCE_PROCESS_FAILURE"; + @Override + public WorkflowTaskInstanceEvent getType() { + return WorkflowTaskInstanceEvent.PROCESS_FAILURE; + } @Override protected CompletableFuture handleEventAsync(WorkflowTaskInstanceEventDTO event) { @@ -57,9 +61,8 @@ public void run() { dagStepUpdateParam.setEndTime(new Date()); dagStepService.update(dagStepUpdateParam); - DagStepDTO stepDTO = dagStepService.get(workflowTaskInstanceId); - DagInstanceDTO instanceDTO = dagInstanceComplexService.selectSimpleOne(stepDTO.getDagInstanceId()); - workflowInstanceStateMachine.onTaskChange(instanceDTO); + WorkflowTaskInstance taskInstance = workflowInstanceService.getTask(workflowTaskInstanceId); + workflowInstanceStateMachine.onTaskChange(workflowInstanceService.get(taskInstance.getWorkflowInstanceId())); } } diff --git a/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/listener/taskinstance/WorkflowTaskInstanceSuccessEventListener.java b/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/engine/dispatch/handler/task/WorkflowTaskInstanceSuccessEventListener.java similarity index 72% rename from carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/listener/taskinstance/WorkflowTaskInstanceSuccessEventListener.java rename to carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/engine/dispatch/handler/task/WorkflowTaskInstanceSuccessEventListener.java index 093028ec..2de8dc60 100644 --- a/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/listener/taskinstance/WorkflowTaskInstanceSuccessEventListener.java +++ b/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/engine/dispatch/handler/task/WorkflowTaskInstanceSuccessEventListener.java @@ -16,22 +16,26 @@ * limitations under the License. */ -package cn.sliew.carp.module.workflow.internal.listener.taskinstance; +package cn.sliew.carp.module.workflow.internal.engine.dispatch.handler.task; +import cn.sliew.carp.framework.common.dict.workflow.WorkflowTaskInstanceEvent; import cn.sliew.carp.framework.common.dict.workflow.WorkflowTaskInstanceStage; -import cn.sliew.carp.framework.dag.service.dto.DagInstanceDTO; import cn.sliew.carp.framework.dag.service.dto.DagStepDTO; -import cn.sliew.carp.module.queue.api.MessageListener; -import cn.sliew.carp.module.workflow.internal.statemachine.WorkflowTaskInstanceStateMachine; +import cn.sliew.carp.module.workflow.api.engine.domain.instance.WorkflowTaskInstance; +import cn.sliew.carp.module.workflow.internal.engine.dispatch.event.WorkflowTaskInstanceEventDTO; +import org.springframework.stereotype.Component; import java.io.Serializable; import java.util.Date; import java.util.concurrent.CompletableFuture; -@MessageListener(topic = WorkflowTaskInstanceSuccessEventListener.TOPIC, consumerGroup = WorkflowTaskInstanceStateMachine.CONSUMER_GROUP) +@Component public class WorkflowTaskInstanceSuccessEventListener extends AbstractWorkflowTaskInstanceEventListener { - public static final String TOPIC = "TOPIC_WORKFLOW_TASK_INSTANCE_PROCESS_SUCCESS"; + @Override + public WorkflowTaskInstanceEvent getType() { + return WorkflowTaskInstanceEvent.PROCESS_SUCCESS; + } @Override protected CompletableFuture handleEventAsync(WorkflowTaskInstanceEventDTO event) { @@ -54,9 +58,8 @@ public void run() { dagStepUpdateParam.setEndTime(new Date()); dagStepService.update(dagStepUpdateParam); - DagStepDTO stepDTO = dagStepService.get(workflowTaskInstanceId); - DagInstanceDTO instanceDTO = dagInstanceComplexService.selectSimpleOne(stepDTO.getDagInstanceId()); - workflowInstanceStateMachine.onTaskChange(instanceDTO); + WorkflowTaskInstance taskInstance = workflowInstanceService.getTask(workflowTaskInstanceId); + workflowInstanceStateMachine.onTaskChange(workflowInstanceService.get(taskInstance.getWorkflowInstanceId())); } } diff --git a/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/listener/workflowinstance/AbstractWorkflowInstanceEventListener.java b/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/engine/dispatch/handler/workflow/AbstractWorkflowInstanceEventListener.java similarity index 84% rename from carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/listener/workflowinstance/AbstractWorkflowInstanceEventListener.java rename to carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/engine/dispatch/handler/workflow/AbstractWorkflowInstanceEventListener.java index 1e092ef0..56e2e687 100644 --- a/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/listener/workflowinstance/AbstractWorkflowInstanceEventListener.java +++ b/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/engine/dispatch/handler/workflow/AbstractWorkflowInstanceEventListener.java @@ -16,9 +16,11 @@ * limitations under the License. */ -package cn.sliew.carp.module.workflow.internal.listener.workflowinstance; +package cn.sliew.carp.module.workflow.internal.engine.dispatch.handler.workflow; import cn.sliew.carp.framework.dag.service.DagInstanceComplexService; +import cn.sliew.carp.module.workflow.api.service.WorkflowInstanceService; +import cn.sliew.carp.module.workflow.internal.engine.dispatch.event.WorkflowInstanceEventDTO; import cn.sliew.carp.module.workflow.internal.statemachine.WorkflowInstanceStateMachine; import lombok.extern.slf4j.Slf4j; import org.redisson.api.RScheduledExecutorService; @@ -41,6 +43,8 @@ public abstract class AbstractWorkflowInstanceEventListener implements WorkflowI @Autowired protected DagInstanceComplexService dagInstanceComplexService; @Autowired + protected WorkflowInstanceService workflowInstanceService; + @Autowired protected WorkflowInstanceStateMachine stateMachine; @Autowired private RedissonClient redissonClient; @@ -57,7 +61,7 @@ public void afterPropertiesSet() throws Exception { } @Override - public void onEvent(WorkflowInstanceEventDTO event) { + public void handleInternal(WorkflowInstanceEventDTO event) { try { handleEventAsync(event); } catch (Throwable throwable) { @@ -66,7 +70,7 @@ public void onEvent(WorkflowInstanceEventDTO event) { } protected void onFailure(Long workflowInstanceId, Throwable throwable) { - stateMachine.onFailure(dagInstanceComplexService.selectSimpleOne(workflowInstanceId), throwable); + stateMachine.onFailure(workflowInstanceService.get(workflowInstanceId), throwable); } protected abstract CompletableFuture handleEventAsync(WorkflowInstanceEventDTO event); diff --git a/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/engine/dispatch/handler/workflow/WorkflowInstanceDeployEventListener.java b/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/engine/dispatch/handler/workflow/WorkflowInstanceDeployEventListener.java new file mode 100644 index 00000000..e219d376 --- /dev/null +++ b/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/engine/dispatch/handler/workflow/WorkflowInstanceDeployEventListener.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.sliew.carp.module.workflow.internal.engine.dispatch.handler.workflow; + +import cn.sliew.carp.framework.common.dict.workflow.WorkflowExecuteType; +import cn.sliew.carp.framework.common.dict.workflow.WorkflowInstanceEvent; +import cn.sliew.carp.framework.dag.algorithm.DAG; +import cn.sliew.carp.framework.dag.service.DagInstanceService; +import cn.sliew.carp.module.workflow.api.engine.domain.instance.WorkflowInstance; +import cn.sliew.carp.module.workflow.api.engine.domain.instance.WorkflowTaskInstance; +import cn.sliew.carp.module.workflow.api.service.convert.WorkflowExecutionGraphConvert; +import cn.sliew.carp.module.workflow.internal.engine.dispatch.event.WorkflowInstanceEventDTO; +import cn.sliew.carp.module.workflow.internal.executor.WorkflowInstanceExecutorManager; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import java.util.concurrent.CompletableFuture; + +@Slf4j +@Component +public class WorkflowInstanceDeployEventListener extends AbstractWorkflowInstanceEventListener { + + @Autowired + private DagInstanceService dagInstanceService; + @Autowired + private WorkflowInstanceExecutorManager workflowInstanceExecutorManager; + + @Override + public WorkflowInstanceEvent getType() { + return WorkflowInstanceEvent.COMMAND_DEPLOY; + } + + @Override + protected CompletableFuture handleEventAsync(WorkflowInstanceEventDTO event) { + CompletableFuture future = CompletableFuture.runAsync(() -> run(event)); + future.whenCompleteAsync((unused, throwable) -> { + if (throwable != null) { + log.error(throwable.getMessage(), throwable); + onFailure(event.getWorkflowInstanceId(), throwable); + } + }); + return future; + } + + private void run(WorkflowInstanceEventDTO event) { + dagInstanceService.updateStatus(event.getWorkflowInstanceId(), event.getState().getValue(), event.getNextState().getValue()); + + WorkflowInstance workflowInstance = workflowInstanceService.getGraph(event.getWorkflowInstanceId()); + DAG dag = WorkflowExecutionGraphConvert.INSTANCE.toDto(workflowInstance.getGraph()); + // 无节点,直接成功 + if (dag.nodes().size() == 0) { + stateMachine.onSuccess(workflowInstance); + return; + } + workflowInstanceExecutorManager.execute(WorkflowExecuteType.EXECUTE, workflowInstance, dag); + } +} diff --git a/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/listener/workflowinstance/WorkflowInstanceEventListener.java b/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/engine/dispatch/handler/workflow/WorkflowInstanceEventListener.java similarity index 53% rename from carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/listener/workflowinstance/WorkflowInstanceEventListener.java rename to carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/engine/dispatch/handler/workflow/WorkflowInstanceEventListener.java index 2d2218f3..540ff6ac 100644 --- a/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/listener/workflowinstance/WorkflowInstanceEventListener.java +++ b/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/engine/dispatch/handler/workflow/WorkflowInstanceEventListener.java @@ -16,24 +16,21 @@ * limitations under the License. */ -package cn.sliew.carp.module.workflow.internal.listener.workflowinstance; +package cn.sliew.carp.module.workflow.internal.engine.dispatch.handler.workflow; -import cn.sliew.carp.module.queue.api.Message; -import cn.sliew.carp.module.queue.api.MessageHandler; -import cn.sliew.carp.module.queue.api.util.Serder; +import cn.sliew.carp.module.workflow.api.engine.dispatch.event.WorkflowInstanceStatusEvent; +import cn.sliew.carp.module.workflow.api.engine.dispatch.handler.WorkflowInstanceEventHandler; +import cn.sliew.carp.module.workflow.internal.engine.dispatch.event.WorkflowInstanceEventDTO; -public interface WorkflowInstanceEventListener extends MessageHandler { +public interface WorkflowInstanceEventListener extends WorkflowInstanceEventHandler { @Override - default void handler(Message message) throws Exception { - if (message.getBody() != null) { - Object deserialized = Serder.deserializeByJava(message.getBody()); - if (deserialized instanceof WorkflowInstanceEventDTO) { - WorkflowInstanceEventDTO eventDTO = (WorkflowInstanceEventDTO) deserialized; - onEvent(eventDTO); - } + default void handle(WorkflowInstanceStatusEvent event) { + if (event instanceof WorkflowInstanceEventDTO == false) { + throw new RuntimeException(); } + handleInternal((WorkflowInstanceEventDTO) event); } - void onEvent(WorkflowInstanceEventDTO eventDTO); + void handleInternal(WorkflowInstanceEventDTO eventDTO); } diff --git a/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/listener/workflowinstance/WorkflowInstanceFailureEventListener.java b/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/engine/dispatch/handler/workflow/WorkflowInstanceFailureEventListener.java similarity index 84% rename from carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/listener/workflowinstance/WorkflowInstanceFailureEventListener.java rename to carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/engine/dispatch/handler/workflow/WorkflowInstanceFailureEventListener.java index 06e6cf8e..e148aec3 100644 --- a/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/listener/workflowinstance/WorkflowInstanceFailureEventListener.java +++ b/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/engine/dispatch/handler/workflow/WorkflowInstanceFailureEventListener.java @@ -16,28 +16,32 @@ * limitations under the License. */ -package cn.sliew.carp.module.workflow.internal.listener.workflowinstance; +package cn.sliew.carp.module.workflow.internal.engine.dispatch.handler.workflow; +import cn.sliew.carp.framework.common.dict.workflow.WorkflowInstanceEvent; import cn.sliew.carp.framework.common.dict.workflow.WorkflowInstanceState; import cn.sliew.carp.framework.dag.service.DagInstanceService; import cn.sliew.carp.framework.dag.service.dto.DagInstanceDTO; -import cn.sliew.carp.module.queue.api.MessageListener; -import cn.sliew.carp.module.workflow.internal.statemachine.WorkflowInstanceStateMachine; +import cn.sliew.carp.module.workflow.internal.engine.dispatch.event.WorkflowInstanceEventDTO; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; import java.io.Serializable; import java.util.Date; import java.util.Optional; import java.util.concurrent.CompletableFuture; -@MessageListener(topic = WorkflowInstanceFailureEventListener.TOPIC, consumerGroup = WorkflowInstanceStateMachine.CONSUMER_GROUP) +@Component public class WorkflowInstanceFailureEventListener extends AbstractWorkflowInstanceEventListener { - public static final String TOPIC = "TOPIC_WORKFLOW_INSTANCE_PROCESS_FAILURE"; - @Autowired private DagInstanceService dagInstanceService; + @Override + public WorkflowInstanceEvent getType() { + return WorkflowInstanceEvent.PROCESS_FAILURE; + } + @Override protected CompletableFuture handleEventAsync(WorkflowInstanceEventDTO event) { return CompletableFuture.runAsync(new FailureRunner(event.getWorkflowInstanceId(), event.getThrowable())); diff --git a/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/listener/workflowinstance/WorkflowInstanceResumeEventListener.java b/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/engine/dispatch/handler/workflow/WorkflowInstanceResumeEventListener.java similarity index 68% rename from carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/listener/workflowinstance/WorkflowInstanceResumeEventListener.java rename to carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/engine/dispatch/handler/workflow/WorkflowInstanceResumeEventListener.java index a58116ec..b428edb6 100644 --- a/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/listener/workflowinstance/WorkflowInstanceResumeEventListener.java +++ b/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/engine/dispatch/handler/workflow/WorkflowInstanceResumeEventListener.java @@ -16,21 +16,25 @@ * limitations under the License. */ -package cn.sliew.carp.module.workflow.internal.listener.workflowinstance; +package cn.sliew.carp.module.workflow.internal.engine.dispatch.handler.workflow; -import cn.sliew.carp.module.queue.api.MessageListener; -import cn.sliew.carp.module.workflow.internal.statemachine.WorkflowInstanceStateMachine; +import cn.sliew.carp.framework.common.dict.workflow.WorkflowInstanceEvent; +import cn.sliew.carp.module.workflow.internal.engine.dispatch.event.WorkflowInstanceEventDTO; import cn.sliew.milky.common.util.JacksonUtil; import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; @Slf4j -@MessageListener(topic = WorkflowInstanceResumeEventListener.TOPIC, consumerGroup = WorkflowInstanceStateMachine.CONSUMER_GROUP) +@Component public class WorkflowInstanceResumeEventListener implements WorkflowInstanceEventListener { - public static final String TOPIC = "TOPIC_WORKFLOW_INSTANCE_COMMAND_RESUME"; + @Override + public WorkflowInstanceEvent getType() { + return WorkflowInstanceEvent.COMMAND_RESUME; + } @Override - public void onEvent(WorkflowInstanceEventDTO event) { + public void handleInternal(WorkflowInstanceEventDTO event) { log.info("on event, {}", JacksonUtil.toJsonString(event)); } } diff --git a/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/listener/workflowinstance/WorkflowInstanceShutdownEventListener.java b/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/engine/dispatch/handler/workflow/WorkflowInstanceShutdownEventListener.java similarity index 68% rename from carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/listener/workflowinstance/WorkflowInstanceShutdownEventListener.java rename to carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/engine/dispatch/handler/workflow/WorkflowInstanceShutdownEventListener.java index adb47118..7db0563c 100644 --- a/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/listener/workflowinstance/WorkflowInstanceShutdownEventListener.java +++ b/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/engine/dispatch/handler/workflow/WorkflowInstanceShutdownEventListener.java @@ -16,21 +16,25 @@ * limitations under the License. */ -package cn.sliew.carp.module.workflow.internal.listener.workflowinstance; +package cn.sliew.carp.module.workflow.internal.engine.dispatch.handler.workflow; -import cn.sliew.carp.module.queue.api.MessageListener; -import cn.sliew.carp.module.workflow.internal.statemachine.WorkflowInstanceStateMachine; +import cn.sliew.carp.framework.common.dict.workflow.WorkflowInstanceEvent; +import cn.sliew.carp.module.workflow.internal.engine.dispatch.event.WorkflowInstanceEventDTO; import cn.sliew.milky.common.util.JacksonUtil; import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; @Slf4j -@MessageListener(topic = WorkflowInstanceShutdownEventListener.TOPIC, consumerGroup = WorkflowInstanceStateMachine.CONSUMER_GROUP) +@Component public class WorkflowInstanceShutdownEventListener implements WorkflowInstanceEventListener { - public static final String TOPIC = "TOPIC_WORKFLOW_INSTANCE_COMMAND_SHUTDOWN"; + @Override + public WorkflowInstanceEvent getType() { + return WorkflowInstanceEvent.COMMAND_SHUTDOWN; + } @Override - public void onEvent(WorkflowInstanceEventDTO event) { + public void handleInternal(WorkflowInstanceEventDTO event) { log.info("on event, {}", JacksonUtil.toJsonString(event)); } } diff --git a/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/listener/workflowinstance/WorkflowInstanceSuccessEventListener.java b/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/engine/dispatch/handler/workflow/WorkflowInstanceSuccessEventListener.java similarity index 83% rename from carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/listener/workflowinstance/WorkflowInstanceSuccessEventListener.java rename to carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/engine/dispatch/handler/workflow/WorkflowInstanceSuccessEventListener.java index 1ac9f3d7..479cbfeb 100644 --- a/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/listener/workflowinstance/WorkflowInstanceSuccessEventListener.java +++ b/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/engine/dispatch/handler/workflow/WorkflowInstanceSuccessEventListener.java @@ -16,27 +16,31 @@ * limitations under the License. */ -package cn.sliew.carp.module.workflow.internal.listener.workflowinstance; +package cn.sliew.carp.module.workflow.internal.engine.dispatch.handler.workflow; +import cn.sliew.carp.framework.common.dict.workflow.WorkflowInstanceEvent; import cn.sliew.carp.framework.common.dict.workflow.WorkflowInstanceState; import cn.sliew.carp.framework.dag.service.DagInstanceService; import cn.sliew.carp.framework.dag.service.dto.DagInstanceDTO; -import cn.sliew.carp.module.queue.api.MessageListener; -import cn.sliew.carp.module.workflow.internal.statemachine.WorkflowInstanceStateMachine; +import cn.sliew.carp.module.workflow.internal.engine.dispatch.event.WorkflowInstanceEventDTO; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; import java.io.Serializable; import java.util.Date; import java.util.concurrent.CompletableFuture; -@MessageListener(topic = WorkflowInstanceSuccessEventListener.TOPIC, consumerGroup = WorkflowInstanceStateMachine.CONSUMER_GROUP) +@Component public class WorkflowInstanceSuccessEventListener extends AbstractWorkflowInstanceEventListener { - public static final String TOPIC = "TOPIC_WORKFLOW_INSTANCE_PROCESS_SUCCESS"; - @Autowired private DagInstanceService dagInstanceService; + @Override + public WorkflowInstanceEvent getType() { + return WorkflowInstanceEvent.PROCESS_SUCCESS; + } + @Override protected CompletableFuture handleEventAsync(WorkflowInstanceEventDTO event) { return CompletableFuture.runAsync(new SuccessRunner(event.getWorkflowInstanceId())); diff --git a/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/listener/workflowinstance/WorkflowInstanceSuspendEventListener.java b/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/engine/dispatch/handler/workflow/WorkflowInstanceSuspendEventListener.java similarity index 68% rename from carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/listener/workflowinstance/WorkflowInstanceSuspendEventListener.java rename to carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/engine/dispatch/handler/workflow/WorkflowInstanceSuspendEventListener.java index ea6eac12..01d51072 100644 --- a/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/listener/workflowinstance/WorkflowInstanceSuspendEventListener.java +++ b/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/engine/dispatch/handler/workflow/WorkflowInstanceSuspendEventListener.java @@ -16,21 +16,25 @@ * limitations under the License. */ -package cn.sliew.carp.module.workflow.internal.listener.workflowinstance; +package cn.sliew.carp.module.workflow.internal.engine.dispatch.handler.workflow; -import cn.sliew.carp.module.queue.api.MessageListener; -import cn.sliew.carp.module.workflow.internal.statemachine.WorkflowInstanceStateMachine; +import cn.sliew.carp.framework.common.dict.workflow.WorkflowInstanceEvent; +import cn.sliew.carp.module.workflow.internal.engine.dispatch.event.WorkflowInstanceEventDTO; import cn.sliew.milky.common.util.JacksonUtil; import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; @Slf4j -@MessageListener(topic = WorkflowInstanceSuspendEventListener.TOPIC, consumerGroup = WorkflowInstanceStateMachine.CONSUMER_GROUP) +@Component public class WorkflowInstanceSuspendEventListener implements WorkflowInstanceEventListener { - public static final String TOPIC = "TOPIC_WORKFLOW_INSTANCE_COMMAND_SUSPEND"; - @Override - public void onEvent(WorkflowInstanceEventDTO event) { + public WorkflowInstanceEvent getType() { + return WorkflowInstanceEvent.COMMAND_SUSPEND; + } + + @Override + public void handleInternal(WorkflowInstanceEventDTO event) { log.info("on event, {}", JacksonUtil.toJsonString(event)); } } diff --git a/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/engine/dispatch/handler/workflow/WorkflowInstanceTaskChangeEventListener.java b/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/engine/dispatch/handler/workflow/WorkflowInstanceTaskChangeEventListener.java new file mode 100644 index 00000000..f2c99c88 --- /dev/null +++ b/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/engine/dispatch/handler/workflow/WorkflowInstanceTaskChangeEventListener.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.sliew.carp.module.workflow.internal.engine.dispatch.handler.workflow; + +import cn.sliew.carp.framework.common.dict.workflow.WorkflowExecuteType; +import cn.sliew.carp.framework.common.dict.workflow.WorkflowInstanceEvent; +import cn.sliew.carp.framework.common.dict.workflow.WorkflowInstanceState; +import cn.sliew.carp.framework.dag.algorithm.DAG; +import cn.sliew.carp.module.workflow.api.engine.domain.instance.WorkflowExecutionGraph; +import cn.sliew.carp.module.workflow.api.engine.domain.instance.WorkflowInstance; +import cn.sliew.carp.module.workflow.api.engine.domain.instance.WorkflowTaskInstance; +import cn.sliew.carp.module.workflow.api.service.convert.WorkflowExecutionGraphConvert; +import cn.sliew.carp.module.workflow.internal.engine.dispatch.event.WorkflowInstanceEventDTO; +import cn.sliew.carp.module.workflow.internal.executor.WorkflowInstanceExecutorManager; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import java.util.concurrent.CompletableFuture; + +@Slf4j +@Component +public class WorkflowInstanceTaskChangeEventListener extends AbstractWorkflowInstanceEventListener { + + @Autowired + private WorkflowInstanceExecutorManager workflowInstanceExecutorManager; + + @Override + public WorkflowInstanceEvent getType() { + return WorkflowInstanceEvent.PROCESS_TASK_CHANGE; + } + + @Override + protected CompletableFuture handleEventAsync(WorkflowInstanceEventDTO event) { + CompletableFuture future = CompletableFuture.runAsync(() -> run(event.getWorkflowInstanceId())); + future.whenComplete(((unused, throwable) -> { + if (throwable != null) { + onFailure(event.getWorkflowInstanceId(), throwable); + } + })); + return future; + } + + private void run(Long workflowInstanceId) { + WorkflowInstance workflowInstance = workflowInstanceService.getGraph(workflowInstanceId); + if (WorkflowInstanceState.FAILURE == workflowInstance.getStatus()) { + return; + } + + DAG dag = WorkflowExecutionGraphConvert.INSTANCE.toDto(workflowInstance.getGraph()); + // 检测所有任务的状态,如果有一个失败,则失败。如果都执行成功,则成功 + int successTaskCount = 0; + boolean isAnyFailure = false; + String anyFailureMessage = null; + for (WorkflowTaskInstance taskInstance : dag.nodes()) { + if (taskInstance.getStatus().isEnd()) { + if (taskInstance.getStatus().isFailure()) { + isAnyFailure = true; +// anyFailureMessage = dagStepDTO.getMessage(); + break; + } + if (taskInstance.getStatus().isSuccess()) { + successTaskCount++; + } + } + } + + if (successTaskCount == dag.nodes().size()) { + stateMachine.onSuccess(workflowInstanceService.get(workflowInstanceId)); + return; + } + + if (isAnyFailure) { + onFailure(workflowInstanceId, new Exception(anyFailureMessage)); + return; + } + + // 继续执行剩余节点 + workflowInstanceExecutorManager.execute(WorkflowExecuteType.EXECUTE, workflowInstance, dag); + } + +} diff --git a/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/engine/dispatch/publisher/InternalWorkflowInstanceEventPublisher.java b/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/engine/dispatch/publisher/InternalWorkflowInstanceEventPublisher.java new file mode 100644 index 00000000..7f827958 --- /dev/null +++ b/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/engine/dispatch/publisher/InternalWorkflowInstanceEventPublisher.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.sliew.carp.module.workflow.internal.engine.dispatch.publisher; + +import cn.sliew.carp.module.queue.api.Message; +import cn.sliew.carp.module.queue.api.Queue; +import cn.sliew.carp.module.queue.api.QueueFactory; +import cn.sliew.carp.module.queue.api.util.Serder; +import cn.sliew.carp.module.workflow.api.engine.dispatch.event.WorkflowInstanceStatusEvent; +import cn.sliew.carp.module.workflow.api.engine.dispatch.publisher.WorkflowInstanceEventPublisher; +import cn.sliew.carp.module.workflow.internal.engine.dispatch.InternalWorkflowInstanceEventDispatcher; +import cn.sliew.carp.module.workflow.internal.engine.dispatch.event.WorkflowInstanceEventDTO; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +@Component +public class InternalWorkflowInstanceEventPublisher implements WorkflowInstanceEventPublisher { + + @Autowired + private QueueFactory queueFactory; + + @Override + public void publish(WorkflowInstanceStatusEvent event) { + if (event instanceof WorkflowInstanceEventDTO == false) { + throw new RuntimeException(); + } + + WorkflowInstanceEventDTO eventDTO = (WorkflowInstanceEventDTO) event; + + Queue queue = queueFactory.get(InternalWorkflowInstanceEventDispatcher.TOPIC); + Message message = Message.builder() + .topic(queue.getName()) + .body(Serder.serializeByJava(eventDTO)) + .build(); + queue.push(message); + } +} diff --git a/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/engine/dispatch/publisher/InternalWorkflowTaskInstanceEventPublisher.java b/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/engine/dispatch/publisher/InternalWorkflowTaskInstanceEventPublisher.java new file mode 100644 index 00000000..ff5d113a --- /dev/null +++ b/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/engine/dispatch/publisher/InternalWorkflowTaskInstanceEventPublisher.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.sliew.carp.module.workflow.internal.engine.dispatch.publisher; + +import cn.sliew.carp.module.queue.api.Message; +import cn.sliew.carp.module.queue.api.Queue; +import cn.sliew.carp.module.queue.api.QueueFactory; +import cn.sliew.carp.module.queue.api.util.Serder; +import cn.sliew.carp.module.workflow.api.engine.dispatch.event.WorkflowTaskInstanceStatusEvent; +import cn.sliew.carp.module.workflow.api.engine.dispatch.publisher.WorkflowTaskInstanceEventPublisher; +import cn.sliew.carp.module.workflow.internal.engine.dispatch.InternalWorkflowTaskInstanceEventDispatcher; +import cn.sliew.carp.module.workflow.internal.engine.dispatch.event.WorkflowTaskInstanceEventDTO; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +@Component +public class InternalWorkflowTaskInstanceEventPublisher implements WorkflowTaskInstanceEventPublisher { + + @Autowired + private QueueFactory queueFactory; + + @Override + public void publish(WorkflowTaskInstanceStatusEvent event) { + if (event instanceof WorkflowTaskInstanceEventDTO == false) { + throw new RuntimeException(); + } + + WorkflowTaskInstanceEventDTO eventDTO = (WorkflowTaskInstanceEventDTO) event; + + Queue queue = queueFactory.get(InternalWorkflowTaskInstanceEventDispatcher.TOPIC); + Message message = Message.builder() + .topic(queue.getName()) + .body(Serder.serializeByJava(eventDTO)) + .build(); + queue.push(message); + } +} diff --git a/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/executor/WorkflowInstanceExecutor.java b/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/executor/WorkflowInstanceExecutor.java new file mode 100644 index 00000000..d9de1583 --- /dev/null +++ b/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/executor/WorkflowInstanceExecutor.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.sliew.carp.module.workflow.internal.executor; + +import cn.sliew.carp.framework.common.dict.workflow.WorkflowExecuteType; +import cn.sliew.carp.framework.dag.algorithm.DAG; +import cn.sliew.carp.framework.dag.algorithm.DefaultDagEdge; +import cn.sliew.carp.module.workflow.api.engine.domain.instance.WorkflowInstance; +import cn.sliew.carp.module.workflow.api.engine.domain.instance.WorkflowTaskInstance; + +import java.util.Set; + +public interface WorkflowInstanceExecutor { + + WorkflowExecuteType getExecuteType(); + + void execute(WorkflowInstance instance, DAG dag); + + boolean checkEdge(WorkflowInstance instance, DAG dag, DefaultDagEdge edge); + + boolean checkTask(WorkflowInstance instance, DAG dag, WorkflowTaskInstance task); + + void executeTasks(Set task); +} diff --git a/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/executor/WorkflowInstanceExecutorManager.java b/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/executor/WorkflowInstanceExecutorManager.java new file mode 100644 index 00000000..d357a6bb --- /dev/null +++ b/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/executor/WorkflowInstanceExecutorManager.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.sliew.carp.module.workflow.internal.executor; + +import cn.sliew.carp.framework.common.dict.workflow.WorkflowExecuteType; +import cn.sliew.carp.framework.dag.algorithm.DAG; +import cn.sliew.carp.module.workflow.api.engine.domain.instance.WorkflowInstance; +import cn.sliew.carp.module.workflow.api.engine.domain.instance.WorkflowTaskInstance; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.DisposableBean; +import org.springframework.beans.factory.InitializingBean; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; +import org.springframework.stereotype.Component; +import org.springframework.util.CollectionUtils; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; + +@Slf4j +@Component +public class WorkflowInstanceExecutorManager implements InitializingBean, DisposableBean { + + @Autowired + private List executors; + + private Map registry = new HashMap<>(); + private ThreadPoolTaskExecutor taskExecutor; + + @Override + public void afterPropertiesSet() throws Exception { + if (CollectionUtils.isEmpty(executors) == false) { + executors.stream().forEach(handler -> registry.put(handler.getExecuteType(), handler)); + } + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); + executor.setMaxPoolSize(5); + executor.setCorePoolSize(1); + executor.setThreadNamePrefix("workflow-instance-execute-thread-pool-"); + executor.initialize(); + taskExecutor = executor; + } + + @Override + public void destroy() throws Exception { + if (taskExecutor != null) { + taskExecutor.shutdown(); + } + } + + public CompletableFuture execute(WorkflowExecuteType executeType, WorkflowInstance instance, DAG dag) { + if (registry.containsKey(executeType) == false) { + throw new RuntimeException("unknown workflow instance execute type: " + + executeType.getLabel() + "[" + executeType.getValue() + "]"); + } + + WorkflowInstanceExecutor handler = registry.get(executeType); + return CompletableFuture.runAsync(() -> handler.execute(instance, dag), taskExecutor) + .whenComplete((unused, throwable) -> { + if (throwable != null) { + log.error("workflow instance execute failed", throwable); + } + }); + } +} diff --git a/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/executor/workflow/WorkflowInstanceExecuteExecutor.java b/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/executor/workflow/WorkflowInstanceExecuteExecutor.java new file mode 100644 index 00000000..fef3373c --- /dev/null +++ b/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/executor/workflow/WorkflowInstanceExecuteExecutor.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.sliew.carp.module.workflow.internal.executor.workflow; + +import cn.sliew.carp.framework.common.dict.workflow.WorkflowExecuteType; +import cn.sliew.carp.framework.common.dict.workflow.WorkflowTaskInstanceStage; +import cn.sliew.carp.framework.dag.algorithm.DAG; +import cn.sliew.carp.framework.dag.algorithm.DagUtil; +import cn.sliew.carp.framework.dag.algorithm.DefaultDagEdge; +import cn.sliew.carp.module.workflow.api.engine.domain.instance.WorkflowInstance; +import cn.sliew.carp.module.workflow.api.engine.domain.instance.WorkflowTaskInstance; +import cn.sliew.carp.module.workflow.api.manager.WorkflowTaskInstanceManager; +import cn.sliew.carp.module.workflow.internal.executor.WorkflowInstanceExecutor; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; +import org.springframework.util.CollectionUtils; + +import java.util.Set; + +@Component +public class WorkflowInstanceExecuteExecutor implements WorkflowInstanceExecutor { + + @Autowired + private WorkflowTaskInstanceManager workflowTaskInstanceManager; + + @Override + public WorkflowExecuteType getExecuteType() { + return WorkflowExecuteType.EXECUTE; + } + + @Override + public void execute(WorkflowInstance instance, DAG dag) { + DagUtil.execute(dag, (dag1, node) -> checkTask(instance, dag1, node), (dag1, edge) -> checkEdge(instance, dag, edge), this::executeTasks); + } + + @Override + public boolean checkEdge(WorkflowInstance instance, DAG dag, DefaultDagEdge edge) { + return edge.getSource().getStatus().isSuccess(); + } + + @Override + public boolean checkTask(WorkflowInstance instance, DAG dag, WorkflowTaskInstance task) { + return task.getStatus() == WorkflowTaskInstanceStage.PENDING; + } + + @Override + public void executeTasks(Set task) { + task.forEach(taskInstance -> workflowTaskInstanceManager.deploy(taskInstance.getId())); + } +} diff --git a/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/listener/taskinstance/WorkflowTaskInstanceDeployEventListener.java b/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/listener/taskinstance/WorkflowTaskInstanceDeployEventListener.java deleted file mode 100644 index 0634aa57..00000000 --- a/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/listener/taskinstance/WorkflowTaskInstanceDeployEventListener.java +++ /dev/null @@ -1,91 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package cn.sliew.carp.module.workflow.internal.listener.taskinstance; - -import cn.sliew.carp.framework.dag.service.DagConfigStepService; -import cn.sliew.carp.framework.dag.service.DagInstanceComplexService; -import cn.sliew.carp.framework.dag.service.DagStepService; -import cn.sliew.carp.framework.dag.service.dto.DagConfigStepDTO; -import cn.sliew.carp.framework.dag.service.dto.DagInstanceDTO; -import cn.sliew.carp.framework.dag.service.dto.DagStepDTO; -import cn.sliew.carp.module.queue.api.MessageListener; -import cn.sliew.carp.module.workflow.api.graph.WorkflowTaskDefinitionMeta; -import cn.sliew.carp.module.workflow.internal.statemachine.WorkflowTaskInstanceStateMachine; -import cn.sliew.milky.common.util.JacksonUtil; -import lombok.extern.slf4j.Slf4j; -import org.redisson.api.annotation.RInject; -import org.springframework.beans.factory.annotation.Autowired; - -import java.io.Serializable; -import java.util.Date; -import java.util.concurrent.CompletableFuture; - -@MessageListener(topic = WorkflowTaskInstanceDeployEventListener.TOPIC, consumerGroup = WorkflowTaskInstanceStateMachine.CONSUMER_GROUP) -public class WorkflowTaskInstanceDeployEventListener extends AbstractWorkflowTaskInstanceEventListener { - - public static final String TOPIC = "TOPIC_WORKFLOW_TASK_INSTANCE_COMMAND_DEPLOY"; - - @Override - protected CompletableFuture handleEventAsync(WorkflowTaskInstanceEventDTO event) { - CompletableFuture future = executorService.submit(new DeployRunner(event)).toCompletableFuture(); - future.whenCompleteAsync((unused, throwable) -> { - if (throwable != null) { - onFailure(event.getWorkflowTaskInstanceId(), throwable); - } - }); - return future; - } - - @Slf4j - public static class DeployRunner implements Runnable, Serializable { - - private WorkflowTaskInstanceEventDTO event; - - @RInject - private String taskId; - @Autowired - private DagInstanceComplexService dagInstanceComplexService; - @Autowired - private DagConfigStepService dagConfigStepService; - @Autowired - private DagStepService dagStepService; - @Autowired - protected WorkflowTaskInstanceStateMachine stateMachine; - - public DeployRunner(WorkflowTaskInstanceEventDTO event) { - this.event = event; - } - - @Override - public void run() { - DagStepDTO dagStepUpdateParam = new DagStepDTO(); - dagStepUpdateParam.setId(event.getWorkflowTaskInstanceId()); - dagStepUpdateParam.setStatus(event.getNextState().getValue()); - dagStepUpdateParam.setStartTime(new Date()); - dagStepService.update(dagStepUpdateParam); - - DagStepDTO stepDTO = dagStepService.get(event.getWorkflowTaskInstanceId()); - DagInstanceDTO dagInstanceDTO = dagInstanceComplexService.selectSimpleOne(stepDTO.getDagInstanceId()); - DagConfigStepDTO configStepDTO = dagConfigStepService.get(stepDTO.getDagConfigStep().getId()); - WorkflowTaskDefinitionMeta workflowTaskDefinitionMeta = JacksonUtil.toObject(configStepDTO.getStepMeta(), WorkflowTaskDefinitionMeta.class); - - } - - } -} diff --git a/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/listener/workflowinstance/WorkflowInstanceDeployEventListener.java b/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/listener/workflowinstance/WorkflowInstanceDeployEventListener.java deleted file mode 100644 index 5f9ffa8f..00000000 --- a/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/listener/workflowinstance/WorkflowInstanceDeployEventListener.java +++ /dev/null @@ -1,108 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package cn.sliew.carp.module.workflow.internal.listener.workflowinstance; - -import cn.sliew.carp.framework.dag.service.DagConfigComplexService; -import cn.sliew.carp.framework.dag.service.DagInstanceComplexService; -import cn.sliew.carp.framework.dag.service.DagInstanceService; -import cn.sliew.carp.framework.dag.service.dto.DagConfigStepDTO; -import cn.sliew.carp.framework.dag.service.dto.DagInstanceDTO; -import cn.sliew.carp.framework.dag.service.dto.DagStepDTO; -import cn.sliew.carp.module.queue.api.MessageListener; -import cn.sliew.carp.module.workflow.api.manager.WorkflowTaskInstanceManager; -import cn.sliew.carp.module.workflow.internal.statemachine.WorkflowInstanceStateMachine; -import com.google.common.graph.Graph; -import lombok.extern.slf4j.Slf4j; -import org.redisson.api.annotation.RInject; -import org.springframework.beans.factory.annotation.Autowired; - -import java.io.Serializable; -import java.util.Set; -import java.util.concurrent.CompletableFuture; - -@Slf4j -@MessageListener(topic = WorkflowInstanceDeployEventListener.TOPIC, consumerGroup = WorkflowInstanceStateMachine.CONSUMER_GROUP) -public class WorkflowInstanceDeployEventListener extends AbstractWorkflowInstanceEventListener { - - public static final String TOPIC = "TOPIC_WORKFLOW_INSTANCE_COMMAND_DEPLOY"; - - @Override - protected CompletableFuture handleEventAsync(WorkflowInstanceEventDTO event) { - CompletableFuture future = executorService.submit(new DeployRunner(event)).toCompletableFuture(); - future.whenCompleteAsync((unused, throwable) -> { - if (throwable != null) { - onFailure(event.getWorkflowInstanceId(), throwable); - } - }); - return future; - } - - /** - * 必须实现 Serializable 接口,无法使用 lambda - */ - public static class DeployRunner implements Runnable, Serializable { - - private WorkflowInstanceEventDTO event; - - @RInject - private String taskId; - @Autowired - private DagConfigComplexService dagConfigComplexService; - @Autowired - private DagInstanceComplexService dagInstanceComplexService; - @Autowired - private DagInstanceService dagInstanceService; - @Autowired - private WorkflowInstanceStateMachine stateMachine; - @Autowired - private WorkflowTaskInstanceManager workflowTaskInstanceManager; - - public DeployRunner(WorkflowInstanceEventDTO event) { - this.event = event; - } - - @Override - public void run() { - DagInstanceDTO dagInstanceUpdateParam = new DagInstanceDTO(); - dagInstanceUpdateParam.setId(event.getWorkflowInstanceId()); - dagInstanceUpdateParam.setStatus(event.getNextState().getValue()); - dagInstanceService.update(dagInstanceUpdateParam); - - DagInstanceDTO dagInstanceDTO = dagInstanceComplexService.selectSimpleOne(event.getWorkflowInstanceId()); - - // 找到 root 节点,批量启动 root 节点 - Graph dag = dagConfigComplexService.getDag(dagInstanceDTO.getDagConfig().getId()); - - // 无节点,直接成功 - if (dag.nodes().size() == 0) { - stateMachine.onSuccess(dagInstanceDTO); - return; - } - Graph dagStepGraph = dagInstanceComplexService.getDag(event.getWorkflowInstanceId(), dag); - - Set nodes = dagStepGraph.nodes(); - for (DagStepDTO dagStep : nodes) { - // root 节点 - if (dagStepGraph.inDegree(dagStep) == 0) { - workflowTaskInstanceManager.deploy(dagStep.getId()); - } - } - } - } -} diff --git a/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/listener/workflowinstance/WorkflowInstanceTaskChangeEventListener.java b/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/listener/workflowinstance/WorkflowInstanceTaskChangeEventListener.java deleted file mode 100644 index e81fd0de..00000000 --- a/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/listener/workflowinstance/WorkflowInstanceTaskChangeEventListener.java +++ /dev/null @@ -1,129 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package cn.sliew.carp.module.workflow.internal.listener.workflowinstance; - -import cn.sliew.carp.framework.common.dict.workflow.WorkflowInstanceState; -import cn.sliew.carp.framework.common.dict.workflow.WorkflowTaskInstanceStage; -import cn.sliew.carp.framework.dag.service.DagConfigComplexService; -import cn.sliew.carp.framework.dag.service.dto.DagConfigStepDTO; -import cn.sliew.carp.framework.dag.service.dto.DagInstanceDTO; -import cn.sliew.carp.framework.dag.service.dto.DagStepDTO; -import cn.sliew.carp.module.queue.api.MessageListener; -import cn.sliew.carp.module.workflow.internal.statemachine.WorkflowInstanceStateMachine; -import cn.sliew.carp.module.workflow.internal.statemachine.WorkflowTaskInstanceStateMachine; -import com.google.common.graph.Graph; -import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Autowired; - -import java.io.Serializable; -import java.util.Set; -import java.util.concurrent.CompletableFuture; - -@Slf4j -@MessageListener(topic = WorkflowInstanceTaskChangeEventListener.TOPIC, consumerGroup = WorkflowInstanceStateMachine.CONSUMER_GROUP) -public class WorkflowInstanceTaskChangeEventListener extends AbstractWorkflowInstanceEventListener { - - public static final String TOPIC = "TOPIC_WORKFLOW_INSTANCE_PROCESS_TASK_CHANGE"; - - @Autowired - private DagConfigComplexService dagConfigComplexService; - @Autowired - private WorkflowTaskInstanceStateMachine taskInstanceStateMachine; - - @Override - protected CompletableFuture handleEventAsync(WorkflowInstanceEventDTO event) { - CompletableFuture future = CompletableFuture.runAsync(new TaskChangeRunner(event.getWorkflowInstanceId())); - future.whenComplete(((unused, throwable) -> { - if (throwable != null) { - onFailure(event.getWorkflowInstanceId(), throwable); - } - })); - return future; - } - - private class TaskChangeRunner implements Runnable, Serializable { - - private Long workflowInstanceId; - - public TaskChangeRunner(Long workflowInstanceId) { - this.workflowInstanceId = workflowInstanceId; - } - - @Override - public void run() { - DagInstanceDTO dagInstanceDTO = dagInstanceComplexService.selectSimpleOne(workflowInstanceId); - if (WorkflowInstanceState.FAILURE.getValue().equals(dagInstanceDTO.getStatus())) { - return; - } - - Graph dag = dagConfigComplexService.getDag(dagInstanceDTO.getDagConfig().getId()); - Graph stepDag = dagInstanceComplexService.getDag(workflowInstanceId, dag); - - // 检测所有任务的状态,如果有一个失败,则失败 - boolean isAnyFailure = false; - String anyFailureMessage = null; - for (DagStepDTO dagStepDTO : stepDag.nodes()) { - if (WorkflowTaskInstanceStage.FAILURE.getValue().equals(dagStepDTO.getStatus())) { - isAnyFailure = true; -// anyFailureMessage = dagStepDTO.getMessage(); - break; - } - } - if (isAnyFailure) { - onFailure(workflowInstanceId, new Exception(anyFailureMessage)); - return; - } - - // 检测所有任务,如果都执行成功,则成功。在检测过程中,尝试启动所有后置节点 - int successTaskCount = 0; - for (DagStepDTO dagStepDTO : stepDag.nodes()) { - if (WorkflowTaskInstanceStage.SUCCESS.getValue().equals(dagStepDTO.getStatus())) { - successTaskCount++; - // 如果节点成功,尝试启动后继节点 - Set successors = stepDag.successors(dagStepDTO); - successors.stream().forEach(successor -> tryDeploySuccessor(stepDag, successor)); - } - } - - if (successTaskCount == dag.nodes().size()) { - stateMachine.onSuccess(dagInstanceComplexService.selectSimpleOne(workflowInstanceId)); - } - } - - private void tryDeploySuccessor(Graph stepDag, DagStepDTO dagStepDTO) { - // 已经执行过 - if (dagStepDTO.getStatus() != null && WorkflowTaskInstanceStage.PENDING.getValue().equals(dagStepDTO.getStatus()) == false) { - return; - } - // 判断前驱节点是否全部成功 - Set predecessors = stepDag.predecessors(dagStepDTO); - for (DagStepDTO predecessor : predecessors) { - // 前驱节点未执行 - if (WorkflowTaskInstanceStage.PENDING.getValue().equals(predecessor.getStatus())) { - return; - } - // 前驱节点未成功 - if (WorkflowTaskInstanceStage.SUCCESS.getValue().equals(predecessor.getStatus()) == false) { - return; - } - } - taskInstanceStateMachine.deploy(dagStepDTO); - } - } -} diff --git a/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/manager/SimpleWorkflowInstanceManager.java b/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/manager/SimpleWorkflowInstanceManager.java index 043e92e9..42e37bf3 100644 --- a/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/manager/SimpleWorkflowInstanceManager.java +++ b/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/manager/SimpleWorkflowInstanceManager.java @@ -18,9 +18,9 @@ package cn.sliew.carp.module.workflow.internal.manager; -import cn.sliew.carp.framework.dag.service.DagInstanceComplexService; -import cn.sliew.carp.framework.dag.service.dto.DagInstanceDTO; +import cn.sliew.carp.module.workflow.api.engine.domain.instance.WorkflowInstance; import cn.sliew.carp.module.workflow.api.manager.WorkflowInstanceManager; +import cn.sliew.carp.module.workflow.api.service.WorkflowInstanceService; import cn.sliew.carp.module.workflow.internal.statemachine.WorkflowInstanceStateMachine; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @@ -29,14 +29,13 @@ public class SimpleWorkflowInstanceManager implements WorkflowInstanceManager { @Autowired - private DagInstanceComplexService dagInstanceComplexService; + private WorkflowInstanceService workflowInstanceService; @Autowired private WorkflowInstanceStateMachine stateMachine; @Override - public void deploy(Long workflowDefinitionId) { - Long dagInstanceId = dagInstanceComplexService.initialize(workflowDefinitionId); - stateMachine.deploy(get(dagInstanceId)); + public void deploy(Long id) { + stateMachine.deploy(get(id)); } @Override @@ -54,7 +53,7 @@ public void resume(Long id) { stateMachine.resume(get(id)); } - private DagInstanceDTO get(Long id) { - return dagInstanceComplexService.selectSimpleOne(id); + private WorkflowInstance get(Long id) { + return workflowInstanceService.get(id); } } diff --git a/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/manager/SimpleWorkflowTaskInstanceManager.java b/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/manager/SimpleWorkflowTaskInstanceManager.java index aaca1251..cbb3e195 100644 --- a/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/manager/SimpleWorkflowTaskInstanceManager.java +++ b/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/manager/SimpleWorkflowTaskInstanceManager.java @@ -18,9 +18,9 @@ package cn.sliew.carp.module.workflow.internal.manager; -import cn.sliew.carp.framework.dag.service.DagStepService; -import cn.sliew.carp.framework.dag.service.dto.DagStepDTO; +import cn.sliew.carp.module.workflow.api.engine.domain.instance.WorkflowTaskInstance; import cn.sliew.carp.module.workflow.api.manager.WorkflowTaskInstanceManager; +import cn.sliew.carp.module.workflow.api.service.WorkflowInstanceService; import cn.sliew.carp.module.workflow.internal.statemachine.WorkflowTaskInstanceStateMachine; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @@ -29,7 +29,7 @@ public class SimpleWorkflowTaskInstanceManager implements WorkflowTaskInstanceManager { @Autowired - private DagStepService dagStepService; + private WorkflowInstanceService workflowInstanceService; @Autowired private WorkflowTaskInstanceStateMachine stateMachine; @@ -53,7 +53,7 @@ public void resume(Long id) { stateMachine.resume(get(id)); } - private DagStepDTO get(Long id) { - return dagStepService.get(id); + private WorkflowTaskInstance get(Long id) { + return workflowInstanceService.getTask(id); } } diff --git a/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/service/impl/WorkflowInstanceServiceImpl.java b/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/service/impl/WorkflowInstanceServiceImpl.java new file mode 100644 index 00000000..a278e05b --- /dev/null +++ b/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/service/impl/WorkflowInstanceServiceImpl.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.sliew.carp.module.workflow.internal.service.impl; + +import cn.sliew.carp.framework.common.dict.workflow.WorkflowInstanceState; +import cn.sliew.carp.framework.common.dict.workflow.WorkflowStepType; +import cn.sliew.carp.framework.common.dict.workflow.WorkflowTaskInstanceStage; +import cn.sliew.carp.framework.dag.service.DagConfigLinkService; +import cn.sliew.carp.framework.dag.service.DagInstanceComplexService; +import cn.sliew.carp.framework.dag.service.DagInstanceService; +import cn.sliew.carp.framework.dag.service.DagStepService; +import cn.sliew.carp.framework.dag.service.dto.DagConfigLinkDTO; +import cn.sliew.carp.framework.dag.service.dto.DagInstanceComplexDTO; +import cn.sliew.carp.framework.dag.service.dto.DagInstanceDTO; +import cn.sliew.carp.framework.dag.service.dto.DagStepDTO; +import cn.sliew.carp.module.workflow.api.engine.domain.instance.WorkflowExecutionGraph; +import cn.sliew.carp.module.workflow.api.engine.domain.instance.WorkflowInstance; +import cn.sliew.carp.module.workflow.api.engine.domain.instance.WorkflowTaskInstance; +import cn.sliew.carp.module.workflow.api.manager.WorkflowInstanceManager; +import cn.sliew.carp.module.workflow.api.service.WorkflowInstanceService; +import cn.sliew.carp.module.workflow.api.service.convert.WorkflowDefinitionGraphEdgeConvert; +import cn.sliew.carp.module.workflow.api.service.convert.WorkflowInstanceConvert; +import cn.sliew.carp.module.workflow.api.service.convert.WorkflowTaskInstanceConvert; +import cn.sliew.carp.module.workflow.api.service.param.WorkflowRunParam; +import cn.sliew.carp.module.workflow.api.service.param.WorkflowStopParam; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import java.util.List; +import java.util.stream.Collectors; + +@Slf4j +@Service +public class WorkflowInstanceServiceImpl implements WorkflowInstanceService { + + @Autowired + private DagInstanceComplexService dagInstanceComplexService; + @Autowired + private DagInstanceService dagInstanceService; + @Autowired + private DagConfigLinkService dagConfigLinkService; + @Autowired + private DagStepService dagStepService; + @Autowired + private WorkflowInstanceManager workflowInstanceManager; + + @Override + public WorkflowInstance get(Long workflowInstanceId) { + DagInstanceDTO dagInstanceDTO = dagInstanceComplexService.selectSimpleOne(workflowInstanceId); + return WorkflowInstanceConvert.INSTANCE.toDto(dagInstanceDTO); + } + + @Override + public WorkflowInstance getGraph(Long workflowInstanceId) { + DagInstanceComplexDTO complexDTO = dagInstanceComplexService.selectOne(workflowInstanceId); + WorkflowInstance dto = WorkflowInstanceConvert.INSTANCE.toDto(complexDTO); + WorkflowExecutionGraph graph = new WorkflowExecutionGraph(); + dto.setGraph(graph); + + List allNodes = WorkflowTaskInstanceConvert.INSTANCE.toDto(complexDTO.getSteps()); + WorkflowTaskInstance preNode = allNodes.stream().filter(node -> node.getNode().getMeta().getStepType() == WorkflowStepType.PRE).findFirst().orElse(null); + WorkflowTaskInstance postNode = allNodes.stream().filter(node -> node.getNode().getMeta().getStepType() == WorkflowStepType.POST).findFirst().orElse(null); + List normalNodes = allNodes.stream().filter(node -> node.getNode().getMeta().getStepType() == WorkflowStepType.NORMAL).collect(Collectors.toList()); + graph.setPreTask(preNode); + graph.setPostTask(postNode); + graph.setTasks(normalNodes); + + List linkDTOS = dagConfigLinkService.listLinks(complexDTO.getDagConfig().getId()); + graph.setEdges(WorkflowDefinitionGraphEdgeConvert.INSTANCE.toDto(linkDTOS)); + + return dto; + } + + @Override + public WorkflowTaskInstance getTask(Long workflowTaskInstanceId) { + DagStepDTO dagStepDTO = dagStepService.get(workflowTaskInstanceId); + return WorkflowTaskInstanceConvert.INSTANCE.toDto(dagStepDTO); + } + + @Override + public Long simpleInitialize(Long workflowDefinitionId) { + Long workflowInstanceId = dagInstanceComplexService.initialize(workflowDefinitionId); + dagInstanceService.updateStatus(workflowInstanceId, null, WorkflowInstanceState.PENDING.getValue()); + List dagStepDTOS = dagStepService.listSteps(workflowInstanceId); + for (DagStepDTO dagStepDTO : dagStepDTOS) { + dagStepService.updateStatus(dagStepDTO.getId(), null, WorkflowTaskInstanceStage.PENDING.getValue()); + } + return workflowInstanceId; + } + + @Override + public Long run(WorkflowRunParam param) { + Long workflowInstanceId = simpleInitialize(param.getId()); + // 更新参数 + DagInstanceDTO instanceDTO = new DagInstanceDTO(); + instanceDTO.setId(workflowInstanceId); + instanceDTO.setInputs(param.getGlobalVariable()); + dagInstanceService.update(instanceDTO); + workflowInstanceManager.deploy(workflowInstanceId); + return workflowInstanceId; + } + + @Override + public void stop(WorkflowStopParam param) { + + } +} diff --git a/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/statemachine/WorkflowInstanceStateMachine.java b/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/statemachine/WorkflowInstanceStateMachine.java index 50d771ec..4667bd8f 100644 --- a/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/statemachine/WorkflowInstanceStateMachine.java +++ b/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/statemachine/WorkflowInstanceStateMachine.java @@ -20,13 +20,9 @@ import cn.sliew.carp.framework.common.dict.workflow.WorkflowInstanceEvent; import cn.sliew.carp.framework.common.dict.workflow.WorkflowInstanceState; -import cn.sliew.carp.framework.dag.service.dto.DagInstanceDTO; -import cn.sliew.carp.module.queue.api.Message; -import cn.sliew.carp.module.queue.api.Queue; -import cn.sliew.carp.module.queue.api.QueueFactory; -import cn.sliew.carp.module.queue.api.util.Serder; -import cn.sliew.carp.module.workflow.internal.listener.workflowinstance.*; -import cn.sliew.milky.common.util.JacksonUtil; +import cn.sliew.carp.module.workflow.api.engine.dispatch.publisher.WorkflowInstanceEventPublisher; +import cn.sliew.carp.module.workflow.api.engine.domain.instance.WorkflowInstance; +import cn.sliew.carp.module.workflow.internal.engine.dispatch.event.WorkflowInstanceEventDTO; import com.alibaba.cola.statemachine.Action; import com.alibaba.cola.statemachine.StateMachine; import com.alibaba.cola.statemachine.builder.StateMachineBuilder; @@ -45,7 +41,7 @@ public class WorkflowInstanceStateMachine implements InitializingBean { public static final String EXECUTOR = "WorkflowInstanceExecute"; @Autowired - private QueueFactory queueFactory; + private WorkflowInstanceEventPublisher publisher; private StateMachine> stateMachine; @@ -105,70 +101,37 @@ public void afterPropertiesSet() throws Exception { private Action> doPerform() { return (fromState, toState, eventEnum, pair) -> { - Queue queue = queueFactory.get(getTopic(eventEnum)); WorkflowInstanceEventDTO eventDTO = new WorkflowInstanceEventDTO(fromState, toState, eventEnum, pair.getLeft(), pair.getRight()); - - Message message = Message.builder() - .topic(queue.getName()) - .body(Serder.serializeByJava(eventDTO)) - .build(); - queue.push(message); + publisher.publish(eventDTO); }; } - private String getTopic(WorkflowInstanceEvent event) { - switch (event) { - case COMMAND_DEPLOY: - return WorkflowInstanceDeployEventListener.TOPIC; - case COMMAND_SHUTDOWN: - return WorkflowInstanceShutdownEventListener.TOPIC; - case COMMAND_SUSPEND: - return WorkflowInstanceSuspendEventListener.TOPIC; - case COMMAND_RESUME: - return WorkflowInstanceResumeEventListener.TOPIC; - case PROCESS_TASK_CHANGE: - return WorkflowInstanceTaskChangeEventListener.TOPIC; - case PROCESS_SUCCESS: - return WorkflowInstanceSuccessEventListener.TOPIC; - case PROCESS_FAILURE: - return WorkflowInstanceFailureEventListener.TOPIC; - default: - throw new IllegalStateException("unknown workflow instance event: " + JacksonUtil.toJsonString(event)); - } - } - public void deploy(DagInstanceDTO dagInstanceDTO) { - WorkflowInstanceState workflowInstanceState = WorkflowInstanceState.PENDING; - stateMachine.fireEvent(workflowInstanceState, WorkflowInstanceEvent.COMMAND_DEPLOY, Pair.of(dagInstanceDTO.getId(), null)); + public void deploy(WorkflowInstance instance) { + stateMachine.fireEvent(instance.getStatus(), WorkflowInstanceEvent.COMMAND_DEPLOY, Pair.of(instance.getId(), null)); } - public void shutdown(DagInstanceDTO dagInstanceDTO) { - WorkflowInstanceState workflowInstanceState = WorkflowInstanceState.of(dagInstanceDTO.getStatus()); - stateMachine.fireEvent(workflowInstanceState, WorkflowInstanceEvent.COMMAND_SHUTDOWN, Pair.of(dagInstanceDTO.getId(), null)); + public void shutdown(WorkflowInstance instance) { + stateMachine.fireEvent(instance.getStatus(), WorkflowInstanceEvent.COMMAND_SHUTDOWN, Pair.of(instance.getId(), null)); } - public void suspend(DagInstanceDTO dagInstanceDTO) { - WorkflowInstanceState workflowInstanceState = WorkflowInstanceState.of(dagInstanceDTO.getStatus()); - stateMachine.fireEvent(workflowInstanceState, WorkflowInstanceEvent.COMMAND_SUSPEND, Pair.of(dagInstanceDTO.getId(), null)); + public void suspend(WorkflowInstance instance) { + stateMachine.fireEvent(instance.getStatus(), WorkflowInstanceEvent.COMMAND_SUSPEND, Pair.of(instance.getId(), null)); } - public void resume(DagInstanceDTO dagInstanceDTO) { - WorkflowInstanceState workflowInstanceState = WorkflowInstanceState.of(dagInstanceDTO.getStatus()); - stateMachine.fireEvent(workflowInstanceState, WorkflowInstanceEvent.COMMAND_RESUME, Pair.of(dagInstanceDTO.getId(), null)); + public void resume(WorkflowInstance instance) { + stateMachine.fireEvent(instance.getStatus(), WorkflowInstanceEvent.COMMAND_RESUME, Pair.of(instance.getId(), null)); } - public void onTaskChange(DagInstanceDTO dagInstanceDTO) { - WorkflowInstanceState workflowInstanceState = WorkflowInstanceState.of(dagInstanceDTO.getStatus()); - stateMachine.fireEvent(workflowInstanceState, WorkflowInstanceEvent.PROCESS_TASK_CHANGE, Pair.of(dagInstanceDTO.getId(), null)); + public void onTaskChange(WorkflowInstance instance) { + stateMachine.fireEvent(instance.getStatus(), WorkflowInstanceEvent.PROCESS_TASK_CHANGE, Pair.of(instance.getId(), null)); } - public void onSuccess(DagInstanceDTO dagInstanceDTO) { - WorkflowInstanceState workflowInstanceState = WorkflowInstanceState.of(dagInstanceDTO.getStatus()); - stateMachine.fireEvent(workflowInstanceState, WorkflowInstanceEvent.PROCESS_SUCCESS, Pair.of(dagInstanceDTO.getId(), null)); + public void onSuccess(WorkflowInstance instance) { + stateMachine.fireEvent(instance.getStatus(), WorkflowInstanceEvent.PROCESS_SUCCESS, Pair.of(instance.getId(), null)); } - public void onFailure(DagInstanceDTO dagInstanceDTO, Throwable throwable) { - WorkflowInstanceState workflowInstanceState = WorkflowInstanceState.of(dagInstanceDTO.getStatus()); - stateMachine.fireEvent(workflowInstanceState, WorkflowInstanceEvent.PROCESS_FAILURE, Pair.of(dagInstanceDTO.getId(), throwable)); + public void onFailure(WorkflowInstance instance, Throwable throwable) { + stateMachine.fireEvent(instance.getStatus(), WorkflowInstanceEvent.PROCESS_FAILURE, Pair.of(instance.getId(), throwable)); } } diff --git a/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/statemachine/WorkflowTaskInstanceStateMachine.java b/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/statemachine/WorkflowTaskInstanceStateMachine.java index ffd2f21b..cf0df663 100644 --- a/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/statemachine/WorkflowTaskInstanceStateMachine.java +++ b/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/statemachine/WorkflowTaskInstanceStateMachine.java @@ -20,16 +20,9 @@ import cn.sliew.carp.framework.common.dict.workflow.WorkflowTaskInstanceEvent; import cn.sliew.carp.framework.common.dict.workflow.WorkflowTaskInstanceStage; -import cn.sliew.carp.framework.dag.service.dto.DagStepDTO; -import cn.sliew.carp.module.queue.api.Message; -import cn.sliew.carp.module.queue.api.Queue; -import cn.sliew.carp.module.queue.api.QueueFactory; -import cn.sliew.carp.module.queue.api.util.Serder; -import cn.sliew.carp.module.workflow.internal.listener.taskinstance.WorkflowTaskInstanceDeployEventListener; -import cn.sliew.carp.module.workflow.internal.listener.taskinstance.WorkflowTaskInstanceEventDTO; -import cn.sliew.carp.module.workflow.internal.listener.taskinstance.WorkflowTaskInstanceFailureEventListener; -import cn.sliew.carp.module.workflow.internal.listener.taskinstance.WorkflowTaskInstanceSuccessEventListener; -import cn.sliew.milky.common.util.JacksonUtil; +import cn.sliew.carp.module.workflow.api.engine.domain.instance.WorkflowTaskInstance; +import cn.sliew.carp.module.workflow.internal.engine.dispatch.event.WorkflowTaskInstanceEventDTO; +import cn.sliew.carp.module.workflow.internal.engine.dispatch.publisher.InternalWorkflowTaskInstanceEventPublisher; import com.alibaba.cola.statemachine.Action; import com.alibaba.cola.statemachine.StateMachine; import com.alibaba.cola.statemachine.builder.StateMachineBuilder; @@ -48,7 +41,7 @@ public class WorkflowTaskInstanceStateMachine implements InitializingBean { public static final String EXECUTOR = "WorkflowTaskInstanceExecute"; @Autowired - private QueueFactory queueFactory; + private InternalWorkflowTaskInstanceEventPublisher statusPublisher; private StateMachine> stateMachine; @@ -99,57 +92,32 @@ public void afterPropertiesSet() throws Exception { private Action> doPerform() { return (fromState, toState, eventEnum, pair) -> { - Queue queue = queueFactory.get(getTopic(eventEnum)); WorkflowTaskInstanceEventDTO eventDTO = new WorkflowTaskInstanceEventDTO(fromState, toState, eventEnum, pair.getLeft(), pair.getRight()); - Message message = Message.builder() - .topic(queue.getName()) - .body(Serder.serializeByJava(eventDTO)) - .build(); - queue.push(message); + statusPublisher.publish(eventDTO); }; } - private String getTopic(WorkflowTaskInstanceEvent event) { - switch (event) { - case COMMAND_DEPLOY: - return WorkflowTaskInstanceDeployEventListener.TOPIC; - case PROCESS_SUCCESS: - return WorkflowTaskInstanceSuccessEventListener.TOPIC; - case PROCESS_FAILURE: - return WorkflowTaskInstanceFailureEventListener.TOPIC; - default: - throw new IllegalStateException("unknown workflow task instance event: " + JacksonUtil.toJsonString(event)); - } + public void deploy(WorkflowTaskInstance taskInstance) { + stateMachine.fireEvent(taskInstance.getStatus(), WorkflowTaskInstanceEvent.COMMAND_DEPLOY, Pair.of(taskInstance.getId(), null)); } - public void deploy(DagStepDTO dagStepDTO) { - - WorkflowTaskInstanceStage stage = WorkflowTaskInstanceStage.PENDING; - stateMachine.fireEvent(stage, WorkflowTaskInstanceEvent.COMMAND_DEPLOY, Pair.of(dagStepDTO.getId(), null)); - } - - public void shutdown(DagStepDTO dagStepDTO) { - WorkflowTaskInstanceStage stage = WorkflowTaskInstanceStage.of(dagStepDTO.getStatus()); - stateMachine.fireEvent(stage, WorkflowTaskInstanceEvent.COMMAND_SHUTDOWN, Pair.of(dagStepDTO.getId(), null)); + public void shutdown(WorkflowTaskInstance taskInstance) { + stateMachine.fireEvent(taskInstance.getStatus(), WorkflowTaskInstanceEvent.COMMAND_SHUTDOWN, Pair.of(taskInstance.getId(), null)); } - public void suspend(DagStepDTO dagStepDTO) { - WorkflowTaskInstanceStage stage = WorkflowTaskInstanceStage.of(dagStepDTO.getStatus()); - stateMachine.fireEvent(stage, WorkflowTaskInstanceEvent.COMMAND_SUSPEND, Pair.of(dagStepDTO.getId(), null)); + public void suspend(WorkflowTaskInstance taskInstance) { + stateMachine.fireEvent(taskInstance.getStatus(), WorkflowTaskInstanceEvent.COMMAND_SUSPEND, Pair.of(taskInstance.getId(), null)); } - public void resume(DagStepDTO dagStepDTO) { - WorkflowTaskInstanceStage stage = WorkflowTaskInstanceStage.of(dagStepDTO.getStatus()); - stateMachine.fireEvent(stage, WorkflowTaskInstanceEvent.COMMAND_RESUME, Pair.of(dagStepDTO.getId(), null)); + public void resume(WorkflowTaskInstance taskInstance) { + stateMachine.fireEvent(taskInstance.getStatus(), WorkflowTaskInstanceEvent.COMMAND_RESUME, Pair.of(taskInstance.getId(), null)); } - public void onSuccess(DagStepDTO dagStepDTO) { - WorkflowTaskInstanceStage stage = WorkflowTaskInstanceStage.of(dagStepDTO.getStatus()); - stateMachine.fireEvent(stage, WorkflowTaskInstanceEvent.PROCESS_SUCCESS, Pair.of(dagStepDTO.getId(), null)); + public void onSuccess(WorkflowTaskInstance taskInstance) { + stateMachine.fireEvent(taskInstance.getStatus(), WorkflowTaskInstanceEvent.PROCESS_SUCCESS, Pair.of(taskInstance.getId(), null)); } - public void onFailure(DagStepDTO dagStepDTO, Throwable throwable) { - WorkflowTaskInstanceStage stage = WorkflowTaskInstanceStage.of(dagStepDTO.getStatus()); - stateMachine.fireEvent(stage, WorkflowTaskInstanceEvent.PROCESS_FAILURE, Pair.of(dagStepDTO.getId(), throwable)); + public void onFailure(WorkflowTaskInstance taskInstance, Throwable throwable) { + stateMachine.fireEvent(taskInstance.getStatus(), WorkflowTaskInstanceEvent.PROCESS_FAILURE, Pair.of(taskInstance.getId(), throwable)); } }