From 9f3e80832e154dea6e046687637688ffaec90fa1 Mon Sep 17 00:00:00 2001 From: wangqi Date: Fri, 6 Sep 2024 21:54:49 +0800 Subject: [PATCH] feature: add workflow definition graph and service --- .../api/memory/DefaultDelayQueueFactory.java | 2 + .../carp-module-workflow-api/pom.xml | 1 - .../carp-module-workflow-internal/pom.xml | 47 +++++ ...ractWorkflowTaskInstanceEventListener.java | 79 ++++++++ ...rkflowTaskInstanceDeployEventListener.java | 162 ++++++++++++++++ .../WorkflowTaskInstanceEventDTO.java | 49 +++++ .../WorkflowTaskInstanceEventListener.java | 39 ++++ ...kflowTaskInstanceFailureEventListener.java | 66 +++++++ ...kflowTaskInstanceSuccessEventListener.java | 63 +++++++ ...AbstractWorkflowInstanceEventListener.java | 73 ++++++++ .../WorkflowInstanceDeployEventListener.java | 108 +++++++++++ .../WorkflowInstanceEventDTO.java | 49 +++++ .../WorkflowInstanceEventListener.java | 39 ++++ .../WorkflowInstanceFailureEventListener.java | 65 +++++++ .../WorkflowInstanceResumeEventListener.java | 36 ++++ ...WorkflowInstanceShutdownEventListener.java | 36 ++++ .../WorkflowInstanceSuccessEventListener.java | 62 +++++++ .../WorkflowInstanceSuspendEventListener.java | 36 ++++ ...rkflowInstanceTaskChangeEventListener.java | 129 +++++++++++++ .../SimpleWorkflowInstanceManager.java | 63 +++++++ .../SimpleWorkflowTaskInstanceManager.java | 59 ++++++ .../WorkflowInstanceStateMachine.java | 173 ++++++++++++++++++ .../WorkflowTaskInstanceStateMachine.java | 155 ++++++++++++++++ carp-modules/carp-module-workflow/pom.xml | 1 + pom.xml | 5 + 25 files changed, 1596 insertions(+), 1 deletion(-) create mode 100644 carp-modules/carp-module-workflow/carp-module-workflow-internal/pom.xml create mode 100644 carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/listener/taskinstance/AbstractWorkflowTaskInstanceEventListener.java create mode 100644 carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/listener/taskinstance/WorkflowTaskInstanceDeployEventListener.java create mode 100644 carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/listener/taskinstance/WorkflowTaskInstanceEventDTO.java create mode 100644 carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/listener/taskinstance/WorkflowTaskInstanceEventListener.java create mode 100644 carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/listener/taskinstance/WorkflowTaskInstanceFailureEventListener.java create mode 100644 carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/listener/taskinstance/WorkflowTaskInstanceSuccessEventListener.java create mode 100644 carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/listener/workflowinstance/AbstractWorkflowInstanceEventListener.java create mode 100644 carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/listener/workflowinstance/WorkflowInstanceDeployEventListener.java create mode 100644 carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/listener/workflowinstance/WorkflowInstanceEventDTO.java create mode 100644 carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/listener/workflowinstance/WorkflowInstanceEventListener.java create mode 100644 carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/listener/workflowinstance/WorkflowInstanceFailureEventListener.java create mode 100644 carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/listener/workflowinstance/WorkflowInstanceResumeEventListener.java create mode 100644 carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/listener/workflowinstance/WorkflowInstanceShutdownEventListener.java create mode 100644 carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/listener/workflowinstance/WorkflowInstanceSuccessEventListener.java create mode 100644 carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/listener/workflowinstance/WorkflowInstanceSuspendEventListener.java create mode 100644 carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/listener/workflowinstance/WorkflowInstanceTaskChangeEventListener.java create mode 100644 carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/manager/SimpleWorkflowInstanceManager.java create mode 100644 carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/manager/SimpleWorkflowTaskInstanceManager.java create mode 100644 carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/statemachine/WorkflowInstanceStateMachine.java create mode 100644 carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/statemachine/WorkflowTaskInstanceStateMachine.java diff --git a/carp-modules/carp-module-queue/carp-module-queue-api/src/main/java/cn/sliew/carp/module/queue/api/memory/DefaultDelayQueueFactory.java b/carp-modules/carp-module-queue/carp-module-queue-api/src/main/java/cn/sliew/carp/module/queue/api/memory/DefaultDelayQueueFactory.java index bc0c104d..37653054 100644 --- a/carp-modules/carp-module-queue/carp-module-queue-api/src/main/java/cn/sliew/carp/module/queue/api/memory/DefaultDelayQueueFactory.java +++ b/carp-modules/carp-module-queue/carp-module-queue-api/src/main/java/cn/sliew/carp/module/queue/api/memory/DefaultDelayQueueFactory.java @@ -21,7 +21,9 @@ import cn.sliew.carp.module.queue.api.BaseQueueFactory; import cn.sliew.carp.module.queue.api.ListenerManager; import cn.sliew.carp.module.queue.api.Queue; +import org.springframework.stereotype.Component; +@Component public class DefaultDelayQueueFactory extends BaseQueueFactory { private final ListenerManager listenerManager; diff --git a/carp-modules/carp-module-workflow/carp-module-workflow-api/pom.xml b/carp-modules/carp-module-workflow/carp-module-workflow-api/pom.xml index cd8e6f2b..3289b7b3 100644 --- a/carp-modules/carp-module-workflow/carp-module-workflow-api/pom.xml +++ b/carp-modules/carp-module-workflow/carp-module-workflow-api/pom.xml @@ -32,7 +32,6 @@ ${project.parent.groupId} carp-framework-dag - provided diff --git a/carp-modules/carp-module-workflow/carp-module-workflow-internal/pom.xml b/carp-modules/carp-module-workflow/carp-module-workflow-internal/pom.xml new file mode 100644 index 00000000..6174e1a7 --- /dev/null +++ b/carp-modules/carp-module-workflow/carp-module-workflow-internal/pom.xml @@ -0,0 +1,47 @@ + + + + + 4.0.0 + + cn.sliew + carp-module-workflow + 0.0.12-SNAPSHOT + ../pom.xml + + carp-module-workflow-internal + + + + ${project.parent.groupId} + carp-module-workflow-api + + + ${project.parent.groupId} + carp-module-queue-api + + + + com.alibaba.cola + cola-component-statemachine + + + + \ No newline at end of file diff --git a/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/listener/taskinstance/AbstractWorkflowTaskInstanceEventListener.java b/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/listener/taskinstance/AbstractWorkflowTaskInstanceEventListener.java new file mode 100644 index 00000000..27e1addc --- /dev/null +++ b/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/listener/taskinstance/AbstractWorkflowTaskInstanceEventListener.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.sliew.carp.module.workflow.internal.listener.taskinstance; + +import cn.sliew.carp.framework.dag.service.DagInstanceComplexService; +import cn.sliew.carp.framework.dag.service.DagStepService; +import cn.sliew.scaleph.workflow.simple.statemachine.WorkflowInstanceStateMachine; +import cn.sliew.scaleph.workflow.simple.statemachine.WorkflowTaskInstanceStateMachine; +import lombok.extern.slf4j.Slf4j; +import org.redisson.api.RScheduledExecutorService; +import org.redisson.api.RedissonClient; +import org.redisson.api.WorkerOptions; +import org.springframework.beans.BeansException; +import org.springframework.beans.factory.BeanFactory; +import org.springframework.beans.factory.BeanFactoryAware; +import org.springframework.beans.factory.InitializingBean; +import org.springframework.beans.factory.annotation.Autowired; + +import java.util.concurrent.CompletableFuture; + +@Slf4j +public abstract class AbstractWorkflowTaskInstanceEventListener implements WorkflowTaskInstanceEventListener, InitializingBean, BeanFactoryAware { + + private BeanFactory beanFactory; + protected RScheduledExecutorService executorService; + + @Autowired + protected DagInstanceComplexService dagInstanceComplexService; + @Autowired + protected DagStepService dagStepService; + @Autowired + protected WorkflowInstanceStateMachine workflowInstanceStateMachine; + @Autowired + protected WorkflowTaskInstanceStateMachine stateMachine; + @Autowired + private RedissonClient redissonClient; + + @Override + public void setBeanFactory(BeanFactory beanFactory) throws BeansException { + this.beanFactory = beanFactory; + } + + @Override + public void afterPropertiesSet() throws Exception { + executorService = redissonClient.getExecutorService(WorkflowTaskInstanceStateMachine.EXECUTOR); + executorService.registerWorkers(WorkerOptions.defaults().workers(20).beanFactory(beanFactory)); + } + + @Override + public void onEvent(WorkflowTaskInstanceEventDTO event) { + try { + handleEventAsync(event); + } catch (Throwable throwable) { + onFailure(event.getWorkflowTaskInstanceId(), throwable); + } + } + + protected void onFailure(Long workflowTaskInstanceId, Throwable throwable) { + stateMachine.onFailure(dagStepService.get(workflowTaskInstanceId), throwable); + } + + protected abstract CompletableFuture handleEventAsync(WorkflowTaskInstanceEventDTO event); +} diff --git a/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/listener/taskinstance/WorkflowTaskInstanceDeployEventListener.java b/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/listener/taskinstance/WorkflowTaskInstanceDeployEventListener.java new file mode 100644 index 00000000..e45cb1db --- /dev/null +++ b/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/listener/taskinstance/WorkflowTaskInstanceDeployEventListener.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.sliew.carp.module.workflow.internal.listener.taskinstance; + +import cn.sliew.carp.framework.dag.service.DagConfigStepService; +import cn.sliew.carp.framework.dag.service.DagInstanceComplexService; +import cn.sliew.carp.framework.dag.service.DagStepService; +import cn.sliew.carp.framework.dag.service.dto.DagConfigStepDTO; +import cn.sliew.carp.framework.dag.service.dto.DagInstanceDTO; +import cn.sliew.carp.framework.dag.service.dto.DagStepDTO; +import cn.sliew.milky.common.exception.Rethrower; +import cn.sliew.milky.common.filter.ActionListener; +import cn.sliew.milky.common.util.JacksonUtil; +import cn.sliew.scaleph.common.util.SpringApplicationContextUtil; +import cn.sliew.scaleph.queue.MessageListener; +import cn.sliew.scaleph.workflow.engine.Engine; +import cn.sliew.scaleph.workflow.engine.EngineBuilder; +import cn.sliew.scaleph.workflow.engine.action.Action; +import cn.sliew.scaleph.workflow.engine.action.ActionContext; +import cn.sliew.scaleph.workflow.engine.action.ActionContextBuilder; +import cn.sliew.scaleph.workflow.engine.action.ActionResult; +import cn.sliew.scaleph.workflow.engine.workflow.SequentialFlow; +import cn.sliew.scaleph.workflow.engine.workflow.WorkFlow; +import cn.sliew.scaleph.workflow.service.dto.WorkflowTaskDefinitionMeta; +import cn.sliew.scaleph.workflow.simple.statemachine.WorkflowTaskInstanceStateMachine; +import lombok.extern.slf4j.Slf4j; +import org.redisson.api.annotation.RInject; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.util.ClassUtils; + +import java.io.Serializable; +import java.util.Collections; +import java.util.Date; +import java.util.Map; +import java.util.concurrent.CompletableFuture; + +@MessageListener(topic = WorkflowTaskInstanceDeployEventListener.TOPIC, consumerGroup = WorkflowTaskInstanceStateMachine.CONSUMER_GROUP) +public class WorkflowTaskInstanceDeployEventListener extends AbstractWorkflowTaskInstanceEventListener { + + public static final String TOPIC = "TOPIC_WORKFLOW_TASK_INSTANCE_COMMAND_DEPLOY"; + + public static Engine engine = EngineBuilder.newInstance().build(); + + @Override + protected CompletableFuture handleEventAsync(WorkflowTaskInstanceEventDTO event) { + CompletableFuture future = executorService.submit(new DeployRunner(event)).toCompletableFuture(); + future.whenCompleteAsync((unused, throwable) -> { + if (throwable != null) { + onFailure(event.getWorkflowTaskInstanceId(), throwable); + } + }); + return future; + } + + @Slf4j + public static class DeployRunner implements Runnable, Serializable { + + private WorkflowTaskInstanceEventDTO event; + + @RInject + private String taskId; + @Autowired + private DagInstanceComplexService dagInstanceComplexService; + @Autowired + private DagConfigStepService dagConfigStepService; + @Autowired + private DagStepService dagStepService; + @Autowired + protected WorkflowTaskInstanceStateMachine stateMachine; + + public DeployRunner(WorkflowTaskInstanceEventDTO event) { + this.event = event; + } + + @Override + public void run() { + + DagStepDTO dagStepUpdateParam = new DagStepDTO(); + dagStepUpdateParam.setId(event.getWorkflowTaskInstanceId()); + dagStepUpdateParam.setStatus(event.getNextState().getValue()); + dagStepUpdateParam.setStartTime(new Date()); + dagStepService.update(dagStepUpdateParam); + + DagStepDTO stepDTO = dagStepService.get(event.getWorkflowTaskInstanceId()); + DagInstanceDTO dagInstanceDTO = dagInstanceComplexService.selectSimpleOne(stepDTO.getDagInstanceId()); + DagConfigStepDTO configStepDTO = dagConfigStepService.get(stepDTO.getDagConfigStep().getId()); + WorkflowTaskDefinitionMeta workflowTaskDefinitionMeta = JacksonUtil.toObject(configStepDTO.getStepMeta(), WorkflowTaskDefinitionMeta.class); + try { + Class clazz = ClassUtils.forName(workflowTaskDefinitionMeta.getHandler(), ClassUtils.getDefaultClassLoader()); + Action action = (Action) SpringApplicationContextUtil.getBean(clazz); + WorkFlow workFlow = SequentialFlow.newSequentialFlow() + .name(configStepDTO.getStepName()) + .execute(action) + .build(); + ActionContext actionContext = buildActionContext(dagInstanceDTO, stepDTO); + engine.run(workFlow, actionContext, new ActionListener() { + @Override + public void onResponse(ActionResult result) { + try { + ActionContext context = result.getContext(); + log.info("workflow task {} run success!, globalInputs: {}, inputs: {}, outputs: {}", + configStepDTO.getStepName(), JacksonUtil.toJsonString(context.getGlobalInputs()), JacksonUtil.toJsonString(context.getInputs()), JacksonUtil.toJsonString(context.getOutputs())); + // 记录输出 + DagStepDTO dagStepSuccessParam = new DagStepDTO(); + dagStepSuccessParam.setId(event.getWorkflowTaskInstanceId()); + dagStepSuccessParam.setOutputs(JacksonUtil.toJsonNode(context.getOutputs())); + dagStepService.update(dagStepSuccessParam); + // 通知成功 + stateMachine.onSuccess(dagStepService.get(event.getWorkflowTaskInstanceId())); + } catch (Exception e) { + onFailure(e); + } + } + + @Override + public void onFailure(Throwable e) { + log.error("workflow task {} run failure!", configStepDTO.getStepName(), e); + // 通知失败 + stateMachine.onFailure(dagStepService.get(event.getWorkflowTaskInstanceId()), e); + } + }); + } catch (ClassNotFoundException e) { + Rethrower.throwAs(e); + } + } + + private ActionContext buildActionContext(DagInstanceDTO dagInstanceDTO, DagStepDTO stepDTO) { + Map globalInputs = Collections.emptyMap(); + if (dagInstanceDTO.getInputs() != null && dagInstanceDTO.getInputs().isObject()) { + globalInputs = JacksonUtil.toMap(dagInstanceDTO.getInputs()); + } + Map inputs = Collections.emptyMap(); + if (stepDTO.getInputs() != null && stepDTO.getInputs().isObject()) { + inputs = JacksonUtil.toMap(stepDTO.getInputs()); + } + return ActionContextBuilder.newBuilder() + .withWorkflowDefinitionId(dagInstanceDTO.getDagConfig().getId()) + .withWorkflowInstanceId(stepDTO.getDagInstanceId()) + .withWorkflowTaskDefinitionId(stepDTO.getDagConfigStep().getId()) + .withWorkflowTaskInstanceId(stepDTO.getId()) + .withGlobalInputs(globalInputs) + .withInputs(inputs) + .validateAndBuild(); + } + } +} diff --git a/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/listener/taskinstance/WorkflowTaskInstanceEventDTO.java b/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/listener/taskinstance/WorkflowTaskInstanceEventDTO.java new file mode 100644 index 00000000..a87d575f --- /dev/null +++ b/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/listener/taskinstance/WorkflowTaskInstanceEventDTO.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.sliew.carp.module.workflow.internal.listener.taskinstance; + +import cn.sliew.scaleph.common.dict.workflow.WorkflowTaskInstanceEvent; +import cn.sliew.scaleph.common.dict.workflow.WorkflowTaskInstanceStage; +import lombok.Getter; + +import java.io.Serializable; + +@Getter +public class WorkflowTaskInstanceEventDTO implements Serializable { + + private static final long serialVersionUID = 1L; + + private final WorkflowTaskInstanceStage state; + private final WorkflowTaskInstanceStage nextState; + private final WorkflowTaskInstanceEvent event; + private final Long workflowTaskInstanceId; + private final Throwable throwable; + + public WorkflowTaskInstanceEventDTO(WorkflowTaskInstanceStage state, WorkflowTaskInstanceStage nextState, WorkflowTaskInstanceEvent event, Long workflowTaskInstanceId) { + this(state, nextState, event, workflowTaskInstanceId, null); + } + + public WorkflowTaskInstanceEventDTO(WorkflowTaskInstanceStage state, WorkflowTaskInstanceStage nextState, WorkflowTaskInstanceEvent event, Long workflowTaskInstanceId, Throwable throwable) { + this.state = state; + this.nextState = nextState; + this.event = event; + this.workflowTaskInstanceId = workflowTaskInstanceId; + this.throwable = throwable; + } +} diff --git a/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/listener/taskinstance/WorkflowTaskInstanceEventListener.java b/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/listener/taskinstance/WorkflowTaskInstanceEventListener.java new file mode 100644 index 00000000..706e2d97 --- /dev/null +++ b/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/listener/taskinstance/WorkflowTaskInstanceEventListener.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.sliew.carp.module.workflow.internal.listener.taskinstance; + +import cn.sliew.scaleph.queue.Message; +import cn.sliew.scaleph.queue.MessageHandler; +import cn.sliew.scaleph.queue.util.FuryUtil; + +public interface WorkflowTaskInstanceEventListener extends MessageHandler { + + @Override + default void handler(Message message) throws Exception { + if (message.getBody() != null) { + Object deserialized = FuryUtil.deserializeByJava(message.getBody()); + if (deserialized instanceof WorkflowTaskInstanceEventDTO) { + WorkflowTaskInstanceEventDTO eventDTO = (WorkflowTaskInstanceEventDTO)deserialized; + onEvent(eventDTO); + } + } + } + + void onEvent(WorkflowTaskInstanceEventDTO eventDTO); +} diff --git a/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/listener/taskinstance/WorkflowTaskInstanceFailureEventListener.java b/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/listener/taskinstance/WorkflowTaskInstanceFailureEventListener.java new file mode 100644 index 00000000..32c7c052 --- /dev/null +++ b/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/listener/taskinstance/WorkflowTaskInstanceFailureEventListener.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.sliew.carp.module.workflow.internal.listener.taskinstance; + +import cn.sliew.carp.framework.dag.service.dto.DagInstanceDTO; +import cn.sliew.carp.framework.dag.service.dto.DagStepDTO; +import cn.sliew.scaleph.common.dict.workflow.WorkflowTaskInstanceStage; +import cn.sliew.scaleph.queue.MessageListener; +import cn.sliew.scaleph.workflow.simple.statemachine.WorkflowTaskInstanceStateMachine; + +import java.io.Serializable; +import java.util.Date; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; + +@MessageListener(topic = WorkflowTaskInstanceFailureEventListener.TOPIC, consumerGroup = WorkflowTaskInstanceStateMachine.CONSUMER_GROUP) +public class WorkflowTaskInstanceFailureEventListener extends AbstractWorkflowTaskInstanceEventListener { + + public static final String TOPIC = "TOPIC_WORKFLOW_TASK_INSTANCE_PROCESS_FAILURE"; + + @Override + protected CompletableFuture handleEventAsync(WorkflowTaskInstanceEventDTO event) { + return CompletableFuture.runAsync(new FailureRunner(event.getWorkflowTaskInstanceId(), event.getThrowable())).toCompletableFuture(); + } + + private class FailureRunner implements Runnable, Serializable { + + private Long workflowTaskInstanceId; + private Optional throwable; + + public FailureRunner(Long workflowTaskInstanceId, Throwable throwable) { + this.workflowTaskInstanceId = workflowTaskInstanceId; + this.throwable = Optional.ofNullable(throwable); + } + + @Override + public void run() { + DagStepDTO dagStepUpdateParam = new DagStepDTO(); + dagStepUpdateParam.setId(workflowTaskInstanceId); + dagStepUpdateParam.setStatus(WorkflowTaskInstanceStage.FAILURE.getValue()); + dagStepUpdateParam.setEndTime(new Date()); + dagStepService.update(dagStepUpdateParam); + + DagStepDTO stepDTO = dagStepService.get(workflowTaskInstanceId); + DagInstanceDTO instanceDTO = dagInstanceComplexService.selectSimpleOne(stepDTO.getDagInstanceId()); + workflowInstanceStateMachine.onTaskChange(instanceDTO); + } + } + +} diff --git a/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/listener/taskinstance/WorkflowTaskInstanceSuccessEventListener.java b/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/listener/taskinstance/WorkflowTaskInstanceSuccessEventListener.java new file mode 100644 index 00000000..e394dd01 --- /dev/null +++ b/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/listener/taskinstance/WorkflowTaskInstanceSuccessEventListener.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.sliew.carp.module.workflow.internal.listener.taskinstance; + +import cn.sliew.carp.framework.dag.service.dto.DagInstanceDTO; +import cn.sliew.carp.framework.dag.service.dto.DagStepDTO; +import cn.sliew.scaleph.common.dict.workflow.WorkflowTaskInstanceStage; +import cn.sliew.scaleph.queue.MessageListener; +import cn.sliew.scaleph.workflow.simple.statemachine.WorkflowTaskInstanceStateMachine; + +import java.io.Serializable; +import java.util.Date; +import java.util.concurrent.CompletableFuture; + +@MessageListener(topic = WorkflowTaskInstanceSuccessEventListener.TOPIC, consumerGroup = WorkflowTaskInstanceStateMachine.CONSUMER_GROUP) +public class WorkflowTaskInstanceSuccessEventListener extends AbstractWorkflowTaskInstanceEventListener { + + public static final String TOPIC = "TOPIC_WORKFLOW_TASK_INSTANCE_PROCESS_SUCCESS"; + + @Override + protected CompletableFuture handleEventAsync(WorkflowTaskInstanceEventDTO event) { + return CompletableFuture.runAsync(new SuccessRunner(event.getWorkflowTaskInstanceId())).toCompletableFuture(); + } + + private class SuccessRunner implements Runnable, Serializable { + + private Long workflowTaskInstanceId; + + public SuccessRunner(Long workflowTaskInstanceId) { + this.workflowTaskInstanceId = workflowTaskInstanceId; + } + + @Override + public void run() { + DagStepDTO dagStepUpdateParam = new DagStepDTO(); + dagStepUpdateParam.setId(workflowTaskInstanceId); + dagStepUpdateParam.setStatus(WorkflowTaskInstanceStage.SUCCESS.getValue()); + dagStepUpdateParam.setEndTime(new Date()); + dagStepService.update(dagStepUpdateParam); + + DagStepDTO stepDTO = dagStepService.get(workflowTaskInstanceId); + DagInstanceDTO instanceDTO = dagInstanceComplexService.selectSimpleOne(stepDTO.getDagInstanceId()); + workflowInstanceStateMachine.onTaskChange(instanceDTO); + } + } + +} diff --git a/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/listener/workflowinstance/AbstractWorkflowInstanceEventListener.java b/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/listener/workflowinstance/AbstractWorkflowInstanceEventListener.java new file mode 100644 index 00000000..5436ac5e --- /dev/null +++ b/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/listener/workflowinstance/AbstractWorkflowInstanceEventListener.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.sliew.carp.module.workflow.internal.listener.workflowinstance; + +import cn.sliew.carp.framework.dag.service.DagInstanceComplexService; +import cn.sliew.scaleph.workflow.simple.statemachine.WorkflowInstanceStateMachine; +import lombok.extern.slf4j.Slf4j; +import org.redisson.api.RScheduledExecutorService; +import org.redisson.api.RedissonClient; +import org.redisson.api.WorkerOptions; +import org.springframework.beans.BeansException; +import org.springframework.beans.factory.BeanFactory; +import org.springframework.beans.factory.BeanFactoryAware; +import org.springframework.beans.factory.InitializingBean; +import org.springframework.beans.factory.annotation.Autowired; + +import java.util.concurrent.CompletableFuture; + +@Slf4j +public abstract class AbstractWorkflowInstanceEventListener implements WorkflowInstanceEventListener, InitializingBean, BeanFactoryAware { + + private BeanFactory beanFactory; + protected RScheduledExecutorService executorService; + + @Autowired + protected DagInstanceComplexService dagInstanceComplexService; + @Autowired + protected WorkflowInstanceStateMachine stateMachine; + @Autowired + private RedissonClient redissonClient; + + @Override + public void setBeanFactory(BeanFactory beanFactory) throws BeansException { + this.beanFactory = beanFactory; + } + + @Override + public void afterPropertiesSet() throws Exception { + executorService = redissonClient.getExecutorService(WorkflowInstanceStateMachine.EXECUTOR); + executorService.registerWorkers(WorkerOptions.defaults().workers(20).beanFactory(beanFactory)); + } + + @Override + public void onEvent(WorkflowInstanceEventDTO event) { + try { + handleEventAsync(event); + } catch (Throwable throwable) { + onFailure(event.getWorkflowInstanceId(), throwable); + } + } + + protected void onFailure(Long workflowInstanceId, Throwable throwable) { + stateMachine.onFailure(dagInstanceComplexService.selectSimpleOne(workflowInstanceId), throwable); + } + + protected abstract CompletableFuture handleEventAsync(WorkflowInstanceEventDTO event); +} diff --git a/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/listener/workflowinstance/WorkflowInstanceDeployEventListener.java b/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/listener/workflowinstance/WorkflowInstanceDeployEventListener.java new file mode 100644 index 00000000..9df5110b --- /dev/null +++ b/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/listener/workflowinstance/WorkflowInstanceDeployEventListener.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.sliew.carp.module.workflow.internal.listener.workflowinstance; + +import cn.sliew.carp.framework.dag.service.DagConfigComplexService; +import cn.sliew.carp.framework.dag.service.DagInstanceComplexService; +import cn.sliew.carp.framework.dag.service.DagInstanceService; +import cn.sliew.carp.framework.dag.service.dto.DagConfigStepDTO; +import cn.sliew.carp.framework.dag.service.dto.DagInstanceDTO; +import cn.sliew.carp.framework.dag.service.dto.DagStepDTO; +import cn.sliew.scaleph.queue.MessageListener; +import cn.sliew.scaleph.workflow.manager.WorkflowTaskInstanceManager; +import cn.sliew.scaleph.workflow.simple.statemachine.WorkflowInstanceStateMachine; +import com.google.common.graph.Graph; +import lombok.extern.slf4j.Slf4j; +import org.redisson.api.annotation.RInject; +import org.springframework.beans.factory.annotation.Autowired; + +import java.io.Serializable; +import java.util.Set; +import java.util.concurrent.CompletableFuture; + +@Slf4j +@MessageListener(topic = WorkflowInstanceDeployEventListener.TOPIC, consumerGroup = WorkflowInstanceStateMachine.CONSUMER_GROUP) +public class WorkflowInstanceDeployEventListener extends AbstractWorkflowInstanceEventListener { + + public static final String TOPIC = "TOPIC_WORKFLOW_INSTANCE_COMMAND_DEPLOY"; + + @Override + protected CompletableFuture handleEventAsync(WorkflowInstanceEventDTO event) { + CompletableFuture future = executorService.submit(new DeployRunner(event)).toCompletableFuture(); + future.whenCompleteAsync((unused, throwable) -> { + if (throwable != null) { + onFailure(event.getWorkflowInstanceId(), throwable); + } + }); + return future; + } + + /** + * 必须实现 Serializable 接口,无法使用 lambda + */ + public static class DeployRunner implements Runnable, Serializable { + + private WorkflowInstanceEventDTO event; + + @RInject + private String taskId; + @Autowired + private DagConfigComplexService dagConfigComplexService; + @Autowired + private DagInstanceComplexService dagInstanceComplexService; + @Autowired + private DagInstanceService dagInstanceService; + @Autowired + private WorkflowInstanceStateMachine stateMachine; + @Autowired + private WorkflowTaskInstanceManager workflowTaskInstanceManager; + + public DeployRunner(WorkflowInstanceEventDTO event) { + this.event = event; + } + + @Override + public void run() { + DagInstanceDTO dagInstanceUpdateParam = new DagInstanceDTO(); + dagInstanceUpdateParam.setId(event.getWorkflowInstanceId()); + dagInstanceUpdateParam.setStatus(event.getNextState().getValue()); + dagInstanceService.update(dagInstanceUpdateParam); + + DagInstanceDTO dagInstanceDTO = dagInstanceComplexService.selectSimpleOne(event.getWorkflowInstanceId()); + + // 找到 root 节点,批量启动 root 节点 + Graph dag = dagConfigComplexService.getDag(dagInstanceDTO.getDagConfig().getId()); + + // 无节点,直接成功 + if (dag.nodes().size() == 0) { + stateMachine.onSuccess(dagInstanceDTO); + return; + } + Graph dagStepGraph = dagInstanceComplexService.getDag(event.getWorkflowInstanceId(), dag); + + Set nodes = dagStepGraph.nodes(); + for (DagStepDTO dagStep : nodes) { + // root 节点 + if (dagStepGraph.inDegree(dagStep) == 0) { + workflowTaskInstanceManager.deploy(dagStep.getId()); + } + } + } + } +} diff --git a/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/listener/workflowinstance/WorkflowInstanceEventDTO.java b/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/listener/workflowinstance/WorkflowInstanceEventDTO.java new file mode 100644 index 00000000..fb785deb --- /dev/null +++ b/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/listener/workflowinstance/WorkflowInstanceEventDTO.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.sliew.carp.module.workflow.internal.listener.workflowinstance; + +import cn.sliew.carp.framework.common.dict.workflow.WorkflowInstanceEvent; +import cn.sliew.carp.framework.common.dict.workflow.WorkflowInstanceState; +import lombok.Getter; + +import java.io.Serializable; + +@Getter +public class WorkflowInstanceEventDTO implements Serializable { + + private static final long serialVersionUID = 1L; + + private final WorkflowInstanceState state; + private final WorkflowInstanceState nextState; + private final WorkflowInstanceEvent event; + private final Long workflowInstanceId; + private final Throwable throwable; + + public WorkflowInstanceEventDTO(WorkflowInstanceState state, WorkflowInstanceState nextState, WorkflowInstanceEvent event, Long workflowInstanceId) { + this(state, nextState, event, workflowInstanceId, null); + } + + public WorkflowInstanceEventDTO(WorkflowInstanceState state, WorkflowInstanceState nextState, WorkflowInstanceEvent event, Long workflowInstanceId, Throwable throwable) { + this.state = state; + this.nextState = nextState; + this.event = event; + this.workflowInstanceId = workflowInstanceId; + this.throwable = throwable; + } +} diff --git a/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/listener/workflowinstance/WorkflowInstanceEventListener.java b/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/listener/workflowinstance/WorkflowInstanceEventListener.java new file mode 100644 index 00000000..2a100167 --- /dev/null +++ b/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/listener/workflowinstance/WorkflowInstanceEventListener.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.sliew.carp.module.workflow.internal.listener.workflowinstance; + +import cn.sliew.scaleph.queue.Message; +import cn.sliew.scaleph.queue.MessageHandler; +import cn.sliew.scaleph.queue.util.FuryUtil; + +public interface WorkflowInstanceEventListener extends MessageHandler { + + @Override + default void handler(Message message) throws Exception { + if (message.getBody() != null) { + Object deserialized = FuryUtil.deserializeByJava(message.getBody()); + if (deserialized instanceof WorkflowInstanceEventDTO) { + WorkflowInstanceEventDTO eventDTO = (WorkflowInstanceEventDTO) deserialized; + onEvent(eventDTO); + } + } + } + + void onEvent(WorkflowInstanceEventDTO eventDTO); +} diff --git a/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/listener/workflowinstance/WorkflowInstanceFailureEventListener.java b/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/listener/workflowinstance/WorkflowInstanceFailureEventListener.java new file mode 100644 index 00000000..11a99a69 --- /dev/null +++ b/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/listener/workflowinstance/WorkflowInstanceFailureEventListener.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.sliew.carp.module.workflow.internal.listener.workflowinstance; + +import cn.sliew.carp.framework.dag.service.DagInstanceService; +import cn.sliew.carp.framework.dag.service.dto.DagInstanceDTO; +import cn.sliew.scaleph.common.dict.workflow.WorkflowInstanceState; +import cn.sliew.scaleph.queue.MessageListener; +import cn.sliew.scaleph.workflow.simple.statemachine.WorkflowInstanceStateMachine; +import org.springframework.beans.factory.annotation.Autowired; + +import java.io.Serializable; +import java.util.Date; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; + +@MessageListener(topic = WorkflowInstanceFailureEventListener.TOPIC, consumerGroup = WorkflowInstanceStateMachine.CONSUMER_GROUP) +public class WorkflowInstanceFailureEventListener extends AbstractWorkflowInstanceEventListener { + + public static final String TOPIC = "TOPIC_WORKFLOW_INSTANCE_PROCESS_FAILURE"; + + @Autowired + private DagInstanceService dagInstanceService; + + @Override + protected CompletableFuture handleEventAsync(WorkflowInstanceEventDTO event) { + return CompletableFuture.runAsync(new FailureRunner(event.getWorkflowInstanceId(), event.getThrowable())); + } + + private class FailureRunner implements Runnable, Serializable { + + private Long workflowInstanceId; + private Optional throwable; + + public FailureRunner(Long workflowInstanceId, Throwable throwable) { + this.workflowInstanceId = workflowInstanceId; + this.throwable = Optional.ofNullable(throwable); + } + + @Override + public void run() { + DagInstanceDTO dagInstanceUpdateParam = new DagInstanceDTO(); + dagInstanceUpdateParam.setId(workflowInstanceId); + dagInstanceUpdateParam.setStatus(WorkflowInstanceState.FAILURE.getValue()); + dagInstanceUpdateParam.setEndTime(new Date()); + dagInstanceService.update(dagInstanceUpdateParam); + } + } +} diff --git a/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/listener/workflowinstance/WorkflowInstanceResumeEventListener.java b/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/listener/workflowinstance/WorkflowInstanceResumeEventListener.java new file mode 100644 index 00000000..6b4d2b02 --- /dev/null +++ b/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/listener/workflowinstance/WorkflowInstanceResumeEventListener.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.sliew.carp.module.workflow.internal.listener.workflowinstance; + +import cn.sliew.milky.common.util.JacksonUtil; +import cn.sliew.scaleph.queue.MessageListener; +import cn.sliew.scaleph.workflow.simple.statemachine.WorkflowInstanceStateMachine; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@MessageListener(topic = WorkflowInstanceResumeEventListener.TOPIC, consumerGroup = WorkflowInstanceStateMachine.CONSUMER_GROUP) +public class WorkflowInstanceResumeEventListener implements WorkflowInstanceEventListener { + + public static final String TOPIC = "TOPIC_WORKFLOW_INSTANCE_COMMAND_RESUME"; + + @Override + public void onEvent(WorkflowInstanceEventDTO event) { + log.info("on event, {}", JacksonUtil.toJsonString(event)); + } +} diff --git a/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/listener/workflowinstance/WorkflowInstanceShutdownEventListener.java b/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/listener/workflowinstance/WorkflowInstanceShutdownEventListener.java new file mode 100644 index 00000000..e678ab4b --- /dev/null +++ b/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/listener/workflowinstance/WorkflowInstanceShutdownEventListener.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.sliew.carp.module.workflow.internal.listener.workflowinstance; + +import cn.sliew.milky.common.util.JacksonUtil; +import cn.sliew.scaleph.queue.MessageListener; +import cn.sliew.scaleph.workflow.simple.statemachine.WorkflowInstanceStateMachine; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@MessageListener(topic = WorkflowInstanceShutdownEventListener.TOPIC, consumerGroup = WorkflowInstanceStateMachine.CONSUMER_GROUP) +public class WorkflowInstanceShutdownEventListener implements WorkflowInstanceEventListener { + + public static final String TOPIC = "TOPIC_WORKFLOW_INSTANCE_COMMAND_SHUTDOWN"; + + @Override + public void onEvent(WorkflowInstanceEventDTO event) { + log.info("on event, {}", JacksonUtil.toJsonString(event)); + } +} diff --git a/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/listener/workflowinstance/WorkflowInstanceSuccessEventListener.java b/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/listener/workflowinstance/WorkflowInstanceSuccessEventListener.java new file mode 100644 index 00000000..507bb50d --- /dev/null +++ b/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/listener/workflowinstance/WorkflowInstanceSuccessEventListener.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.sliew.carp.module.workflow.internal.listener.workflowinstance; + +import cn.sliew.carp.framework.dag.service.DagInstanceService; +import cn.sliew.carp.framework.dag.service.dto.DagInstanceDTO; +import cn.sliew.scaleph.common.dict.workflow.WorkflowInstanceState; +import cn.sliew.scaleph.queue.MessageListener; +import cn.sliew.scaleph.workflow.simple.statemachine.WorkflowInstanceStateMachine; +import org.springframework.beans.factory.annotation.Autowired; + +import java.io.Serializable; +import java.util.Date; +import java.util.concurrent.CompletableFuture; + +@MessageListener(topic = WorkflowInstanceSuccessEventListener.TOPIC, consumerGroup = WorkflowInstanceStateMachine.CONSUMER_GROUP) +public class WorkflowInstanceSuccessEventListener extends AbstractWorkflowInstanceEventListener { + + public static final String TOPIC = "TOPIC_WORKFLOW_INSTANCE_PROCESS_SUCCESS"; + + @Autowired + private DagInstanceService dagInstanceService; + + @Override + protected CompletableFuture handleEventAsync(WorkflowInstanceEventDTO event) { + return CompletableFuture.runAsync(new SuccessRunner(event.getWorkflowInstanceId())); + } + + private class SuccessRunner implements Runnable, Serializable { + + private Long workflowInstanceId; + + public SuccessRunner(Long workflowInstanceId) { + this.workflowInstanceId = workflowInstanceId; + } + + @Override + public void run() { + DagInstanceDTO dagInstanceUpdateParam = new DagInstanceDTO(); + dagInstanceUpdateParam.setId(workflowInstanceId); + dagInstanceUpdateParam.setStatus(WorkflowInstanceState.SUCCESS.getValue()); + dagInstanceUpdateParam.setEndTime(new Date()); + dagInstanceService.update(dagInstanceUpdateParam); + } + } +} diff --git a/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/listener/workflowinstance/WorkflowInstanceSuspendEventListener.java b/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/listener/workflowinstance/WorkflowInstanceSuspendEventListener.java new file mode 100644 index 00000000..453352e4 --- /dev/null +++ b/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/listener/workflowinstance/WorkflowInstanceSuspendEventListener.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.sliew.carp.module.workflow.internal.listener.workflowinstance; + +import cn.sliew.milky.common.util.JacksonUtil; +import cn.sliew.scaleph.queue.MessageListener; +import cn.sliew.scaleph.workflow.simple.statemachine.WorkflowInstanceStateMachine; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@MessageListener(topic = WorkflowInstanceSuspendEventListener.TOPIC, consumerGroup = WorkflowInstanceStateMachine.CONSUMER_GROUP) +public class WorkflowInstanceSuspendEventListener implements WorkflowInstanceEventListener { + + public static final String TOPIC = "TOPIC_WORKFLOW_INSTANCE_COMMAND_SUSPEND"; + + @Override + public void onEvent(WorkflowInstanceEventDTO event) { + log.info("on event, {}", JacksonUtil.toJsonString(event)); + } +} diff --git a/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/listener/workflowinstance/WorkflowInstanceTaskChangeEventListener.java b/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/listener/workflowinstance/WorkflowInstanceTaskChangeEventListener.java new file mode 100644 index 00000000..8974d128 --- /dev/null +++ b/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/listener/workflowinstance/WorkflowInstanceTaskChangeEventListener.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.sliew.carp.module.workflow.internal.listener.workflowinstance; + +import cn.sliew.carp.framework.dag.service.DagConfigComplexService; +import cn.sliew.carp.framework.dag.service.dto.DagConfigStepDTO; +import cn.sliew.carp.framework.dag.service.dto.DagInstanceDTO; +import cn.sliew.carp.framework.dag.service.dto.DagStepDTO; +import cn.sliew.scaleph.common.dict.workflow.WorkflowInstanceState; +import cn.sliew.scaleph.common.dict.workflow.WorkflowTaskInstanceStage; +import cn.sliew.scaleph.queue.MessageListener; +import cn.sliew.scaleph.workflow.simple.statemachine.WorkflowInstanceStateMachine; +import cn.sliew.scaleph.workflow.simple.statemachine.WorkflowTaskInstanceStateMachine; +import com.google.common.graph.Graph; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; + +import java.io.Serializable; +import java.util.Set; +import java.util.concurrent.CompletableFuture; + +@Slf4j +@MessageListener(topic = WorkflowInstanceTaskChangeEventListener.TOPIC, consumerGroup = WorkflowInstanceStateMachine.CONSUMER_GROUP) +public class WorkflowInstanceTaskChangeEventListener extends AbstractWorkflowInstanceEventListener { + + public static final String TOPIC = "TOPIC_WORKFLOW_INSTANCE_PROCESS_TASK_CHANGE"; + + @Autowired + private DagConfigComplexService dagConfigComplexService; + @Autowired + private WorkflowTaskInstanceStateMachine taskInstanceStateMachine; + + @Override + protected CompletableFuture handleEventAsync(WorkflowInstanceEventDTO event) { + CompletableFuture future = CompletableFuture.runAsync(new TaskChangeRunner(event.getWorkflowInstanceId())); + future.whenComplete(((unused, throwable) -> { + if (throwable != null) { + onFailure(event.getWorkflowInstanceId(), throwable); + } + })); + return future; + } + + private class TaskChangeRunner implements Runnable, Serializable { + + private Long workflowInstanceId; + + public TaskChangeRunner(Long workflowInstanceId) { + this.workflowInstanceId = workflowInstanceId; + } + + @Override + public void run() { + DagInstanceDTO dagInstanceDTO = dagInstanceComplexService.selectSimpleOne(workflowInstanceId); + if (WorkflowInstanceState.FAILURE.getValue().equals(dagInstanceDTO.getStatus())) { + return; + } + + Graph dag = dagConfigComplexService.getDag(dagInstanceDTO.getDagConfig().getId()); + Graph stepDag = dagInstanceComplexService.getDag(workflowInstanceId, dag); + + // 检测所有任务的状态,如果有一个失败,则失败 + boolean isAnyFailure = false; + String anyFailureMessage = null; + for (DagStepDTO dagStepDTO : stepDag.nodes()) { + if (WorkflowTaskInstanceStage.FAILURE.getValue().equals(dagStepDTO.getStatus())) { + isAnyFailure = true; +// anyFailureMessage = dagStepDTO.getMessage(); + break; + } + } + if (isAnyFailure) { + onFailure(workflowInstanceId, new Exception(anyFailureMessage)); + return; + } + + // 检测所有任务,如果都执行成功,则成功。在检测过程中,尝试启动所有后置节点 + int successTaskCount = 0; + for (DagStepDTO dagStepDTO : stepDag.nodes()) { + if (WorkflowTaskInstanceStage.SUCCESS.getValue().equals(dagStepDTO.getStatus())) { + successTaskCount++; + // 如果节点成功,尝试启动后继节点 + Set successors = stepDag.successors(dagStepDTO); + successors.stream().forEach(successor -> tryDeploySuccessor(stepDag, successor)); + } + } + + if (successTaskCount == dag.nodes().size()) { + stateMachine.onSuccess(dagInstanceComplexService.selectSimpleOne(workflowInstanceId)); + } + } + + private void tryDeploySuccessor(Graph stepDag, DagStepDTO dagStepDTO) { + // 已经执行过 + if (dagStepDTO.getStatus() != null && WorkflowTaskInstanceStage.PENDING.getValue().equals(dagStepDTO.getStatus()) == false) { + return; + } + // 判断前驱节点是否全部成功 + Set predecessors = stepDag.predecessors(dagStepDTO); + for (DagStepDTO predecessor : predecessors) { + // 前驱节点未执行 + if (WorkflowTaskInstanceStage.PENDING.getValue().equals(predecessor.getStatus())) { + return; + } + // 前驱节点未成功 + if (WorkflowTaskInstanceStage.SUCCESS.getValue().equals(predecessor.getStatus()) == false) { + return; + } + } + taskInstanceStateMachine.deploy(dagStepDTO); + } + } +} diff --git a/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/manager/SimpleWorkflowInstanceManager.java b/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/manager/SimpleWorkflowInstanceManager.java new file mode 100644 index 00000000..0feb1045 --- /dev/null +++ b/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/manager/SimpleWorkflowInstanceManager.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.sliew.carp.module.workflow.internal.manager; + +import cn.sliew.carp.module.workflow.api.manager.WorkflowInstanceManager; +import cn.sliew.carp.module.workflow.api.service.WorkflowDefinitionService; +import cn.sliew.carp.module.workflow.internal.statemachine.WorkflowInstanceStateMachine; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +@Component +public class SimpleWorkflowInstanceManager implements WorkflowInstanceManager { + + @Autowired + private WorkflowDefinitionService workflowDefinitionService; + @Autowired + private DagInstanceComplexService dagInstanceComplexService; + @Autowired + private WorkflowInstanceStateMachine stateMachine; + + @Override + public void deploy(Long workflowDefinitionId) { + WorkflowDefinitionDTO workflowDefinitionDTO = workflowDefinitionService.get(workflowDefinitionId); + workflowDefinitionDTO.getDag().getId(); + Long dagInstanceId = dagInstanceComplexService.initialize(workflowDefinitionDTO.getDag().getId()); + stateMachine.deploy(get(dagInstanceId)); + } + + @Override + public void shutdown(Long id) { + stateMachine.shutdown(get(id)); + } + + @Override + public void suspend(Long id) { + stateMachine.suspend(get(id)); + } + + @Override + public void resume(Long id) { + stateMachine.resume(get(id)); + } + + private DagInstanceDTO get(Long id) { + return dagInstanceComplexService.selectSimpleOne(id); + } +} diff --git a/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/manager/SimpleWorkflowTaskInstanceManager.java b/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/manager/SimpleWorkflowTaskInstanceManager.java new file mode 100644 index 00000000..08490fb2 --- /dev/null +++ b/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/manager/SimpleWorkflowTaskInstanceManager.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.sliew.carp.module.workflow.internal.manager; + +import cn.sliew.carp.framework.dag.service.DagStepService; +import cn.sliew.carp.framework.dag.service.dto.DagStepDTO; +import cn.sliew.scaleph.workflow.manager.WorkflowTaskInstanceManager; +import cn.sliew.scaleph.workflow.simple.statemachine.WorkflowTaskInstanceStateMachine; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +@Component +public class SimpleWorkflowTaskInstanceManager implements WorkflowTaskInstanceManager { + + @Autowired + private DagStepService dagStepService; + @Autowired + private WorkflowTaskInstanceStateMachine stateMachine; + + @Override + public void deploy(Long id) { + stateMachine.deploy(get(id)); + } + + @Override + public void shutdown(Long id) { + stateMachine.shutdown(get(id)); + } + + @Override + public void suspend(Long id) { + stateMachine.suspend(get(id)); + } + + @Override + public void resume(Long id) { + stateMachine.resume(get(id)); + } + + private DagStepDTO get(Long id) { + return dagStepService.get(id); + } +} diff --git a/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/statemachine/WorkflowInstanceStateMachine.java b/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/statemachine/WorkflowInstanceStateMachine.java new file mode 100644 index 00000000..a63e0da7 --- /dev/null +++ b/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/statemachine/WorkflowInstanceStateMachine.java @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.sliew.carp.module.workflow.internal.statemachine; + +import cn.sliew.carp.framework.common.dict.workflow.WorkflowInstanceEvent; +import cn.sliew.carp.framework.common.dict.workflow.WorkflowInstanceState; +import cn.sliew.carp.framework.dag.service.dto.DagInstanceDTO; +import cn.sliew.carp.module.queue.api.Message; +import cn.sliew.carp.module.queue.api.Queue; +import cn.sliew.carp.module.queue.api.QueueFactory; +import cn.sliew.carp.module.workflow.internal.listener.workflowinstance.*; +import cn.sliew.milky.common.util.JacksonUtil; +import com.alibaba.cola.statemachine.Action; +import com.alibaba.cola.statemachine.StateMachine; +import com.alibaba.cola.statemachine.builder.StateMachineBuilder; +import com.alibaba.cola.statemachine.builder.StateMachineBuilderFactory; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.tuple.Pair; +import org.springframework.beans.factory.InitializingBean; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +@Slf4j +@Component +public class WorkflowInstanceStateMachine implements InitializingBean { + + public static final String CONSUMER_GROUP = "WorkflowInstanceStateMachine"; + public static final String EXECUTOR = "WorkflowInstanceExecute"; + + @Autowired + private QueueFactory queueFactory; + + private StateMachine> stateMachine; + + @Override + public void afterPropertiesSet() throws Exception { + StateMachineBuilder> builder = StateMachineBuilderFactory.create(); + + builder.externalTransition() + .from(WorkflowInstanceState.PENDING) + .to(WorkflowInstanceState.RUNNING) + .on(WorkflowInstanceEvent.COMMAND_DEPLOY) + .perform(doPerform()); + builder.externalTransition() + .from(WorkflowInstanceState.PENDING) + .to(WorkflowInstanceState.SUCCESS) + .on(WorkflowInstanceEvent.PROCESS_SUCCESS) + .perform(doPerform()); + + builder.internalTransition() + .within(WorkflowInstanceState.RUNNING) + .on(WorkflowInstanceEvent.PROCESS_TASK_CHANGE) + .perform(doPerform()); + builder.externalTransition() + .from(WorkflowInstanceState.RUNNING) + .to(WorkflowInstanceState.SUCCESS) + .on(WorkflowInstanceEvent.PROCESS_SUCCESS) + .perform(doPerform()); + builder.externalTransition() + .from(WorkflowInstanceState.RUNNING) + .to(WorkflowInstanceState.FAILURE) + .on(WorkflowInstanceEvent.PROCESS_FAILURE) + .perform(doPerform()); + builder.externalTransition() + .from(WorkflowInstanceState.RUNNING) + .to(WorkflowInstanceState.SUSPEND) + .on(WorkflowInstanceEvent.COMMAND_SUSPEND) + .perform(doPerform()); + builder.externalTransition() + .from(WorkflowInstanceState.RUNNING) + .to(WorkflowInstanceState.TERMINATED) + .on(WorkflowInstanceEvent.COMMAND_SHUTDOWN) + .perform(doPerform()); + + builder.externalTransition() + .from(WorkflowInstanceState.SUSPEND) + .to(WorkflowInstanceState.RUNNING) + .on(WorkflowInstanceEvent.COMMAND_RESUME) + .perform(doPerform()); + builder.externalTransition() + .from(WorkflowInstanceState.SUSPEND) + .to(WorkflowInstanceState.TERMINATED) + .on(WorkflowInstanceEvent.COMMAND_SHUTDOWN) + .perform(doPerform()); + + this.stateMachine = builder.build(CONSUMER_GROUP); + } + + private Action> doPerform() { + return (fromState, toState, eventEnum, pair) -> { + Queue queue = queueFactory.get(getTopic(eventEnum)); + WorkflowInstanceEventDTO eventDTO = new WorkflowInstanceEventDTO(fromState, toState, eventEnum, pair.getLeft(), pair.getRight()); + + Message message = Message.builder() + .topic(queue.getName()) + .body(FuryUtil.serializeByJava(eventDTO)) + .build(); + queue.push(message); + }; + } + + private String getTopic(WorkflowInstanceEvent event) { + switch (event) { + case COMMAND_DEPLOY: + return WorkflowInstanceDeployEventListener.TOPIC; + case COMMAND_SHUTDOWN: + return WorkflowInstanceShutdownEventListener.TOPIC; + case COMMAND_SUSPEND: + return WorkflowInstanceSuspendEventListener.TOPIC; + case COMMAND_RESUME: + return WorkflowInstanceResumeEventListener.TOPIC; + case PROCESS_TASK_CHANGE: + return WorkflowInstanceTaskChangeEventListener.TOPIC; + case PROCESS_SUCCESS: + return WorkflowInstanceSuccessEventListener.TOPIC; + case PROCESS_FAILURE: + return WorkflowInstanceFailureEventListener.TOPIC; + default: + throw new IllegalStateException("unknown workflow instance event: " + JacksonUtil.toJsonString(event)); + } + } + + public void deploy(DagInstanceDTO dagInstanceDTO) { + WorkflowInstanceState workflowInstanceState = WorkflowInstanceState.PENDING; + stateMachine.fireEvent(workflowInstanceState, WorkflowInstanceEvent.COMMAND_DEPLOY, Pair.of(dagInstanceDTO.getId(), null)); + } + + public void shutdown(DagInstanceDTO dagInstanceDTO) { + WorkflowInstanceState workflowInstanceState = WorkflowInstanceState.of(dagInstanceDTO.getStatus()); + stateMachine.fireEvent(workflowInstanceState, WorkflowInstanceEvent.COMMAND_SHUTDOWN, Pair.of(dagInstanceDTO.getId(), null)); + } + + public void suspend(DagInstanceDTO dagInstanceDTO) { + WorkflowInstanceState workflowInstanceState = WorkflowInstanceState.of(dagInstanceDTO.getStatus()); + stateMachine.fireEvent(workflowInstanceState, WorkflowInstanceEvent.COMMAND_SUSPEND, Pair.of(dagInstanceDTO.getId(), null)); + } + + public void resume(DagInstanceDTO dagInstanceDTO) { + WorkflowInstanceState workflowInstanceState = WorkflowInstanceState.of(dagInstanceDTO.getStatus()); + stateMachine.fireEvent(workflowInstanceState, WorkflowInstanceEvent.COMMAND_RESUME, Pair.of(dagInstanceDTO.getId(), null)); + } + + public void onTaskChange(DagInstanceDTO dagInstanceDTO) { + WorkflowInstanceState workflowInstanceState = WorkflowInstanceState.of(dagInstanceDTO.getStatus()); + stateMachine.fireEvent(workflowInstanceState, WorkflowInstanceEvent.PROCESS_TASK_CHANGE, Pair.of(dagInstanceDTO.getId(), null)); + } + + public void onSuccess(DagInstanceDTO dagInstanceDTO) { + WorkflowInstanceState workflowInstanceState = WorkflowInstanceState.of(dagInstanceDTO.getStatus()); + stateMachine.fireEvent(workflowInstanceState, WorkflowInstanceEvent.PROCESS_SUCCESS, Pair.of(dagInstanceDTO.getId(), null)); + } + + public void onFailure(DagInstanceDTO dagInstanceDTO, Throwable throwable) { + WorkflowInstanceState workflowInstanceState = WorkflowInstanceState.of(dagInstanceDTO.getStatus()); + stateMachine.fireEvent(workflowInstanceState, WorkflowInstanceEvent.PROCESS_FAILURE, Pair.of(dagInstanceDTO.getId(), throwable)); + } +} diff --git a/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/statemachine/WorkflowTaskInstanceStateMachine.java b/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/statemachine/WorkflowTaskInstanceStateMachine.java new file mode 100644 index 00000000..8b86a4d6 --- /dev/null +++ b/carp-modules/carp-module-workflow/carp-module-workflow-internal/src/main/java/cn/sliew/carp/module/workflow/internal/statemachine/WorkflowTaskInstanceStateMachine.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.sliew.carp.module.workflow.internal.statemachine; + +import cn.sliew.carp.framework.dag.service.dto.DagStepDTO; +import cn.sliew.milky.common.util.JacksonUtil; +import cn.sliew.scaleph.common.dict.workflow.WorkflowTaskInstanceEvent; +import cn.sliew.scaleph.common.dict.workflow.WorkflowTaskInstanceStage; +import cn.sliew.scaleph.queue.Message; +import cn.sliew.scaleph.queue.Queue; +import cn.sliew.scaleph.queue.QueueFactory; +import cn.sliew.scaleph.queue.util.FuryUtil; +import cn.sliew.scaleph.workflow.simple.listener.taskinstance.WorkflowTaskInstanceDeployEventListener; +import cn.sliew.scaleph.workflow.simple.listener.taskinstance.WorkflowTaskInstanceEventDTO; +import cn.sliew.scaleph.workflow.simple.listener.taskinstance.WorkflowTaskInstanceFailureEventListener; +import cn.sliew.scaleph.workflow.simple.listener.taskinstance.WorkflowTaskInstanceSuccessEventListener; +import com.alibaba.cola.statemachine.Action; +import com.alibaba.cola.statemachine.StateMachine; +import com.alibaba.cola.statemachine.builder.StateMachineBuilder; +import com.alibaba.cola.statemachine.builder.StateMachineBuilderFactory; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.tuple.Pair; +import org.springframework.beans.factory.InitializingBean; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +@Slf4j +@Component +public class WorkflowTaskInstanceStateMachine implements InitializingBean { + + public static final String CONSUMER_GROUP = "WorkflowTaskInstanceStateMachine"; + public static final String EXECUTOR = "WorkflowTaskInstanceExecute"; + + @Autowired + private QueueFactory queueFactory; + + private StateMachine> stateMachine; + + @Override + public void afterPropertiesSet() throws Exception { + StateMachineBuilder> builder = StateMachineBuilderFactory.create(); + + builder.externalTransition() + .from(WorkflowTaskInstanceStage.PENDING) + .to(WorkflowTaskInstanceStage.RUNNING) + .on(WorkflowTaskInstanceEvent.COMMAND_DEPLOY) + .perform(doPerform()); + + builder.externalTransition() + .from(WorkflowTaskInstanceStage.RUNNING) + .to(WorkflowTaskInstanceStage.SUCCESS) + .on(WorkflowTaskInstanceEvent.PROCESS_SUCCESS) + .perform(doPerform()); + builder.externalTransition() + .from(WorkflowTaskInstanceStage.RUNNING) + .to(WorkflowTaskInstanceStage.FAILURE) + .on(WorkflowTaskInstanceEvent.PROCESS_FAILURE) + .perform(doPerform()); + builder.externalTransition() + .from(WorkflowTaskInstanceStage.RUNNING) + .to(WorkflowTaskInstanceStage.SUSPEND) + .on(WorkflowTaskInstanceEvent.COMMAND_SUSPEND) + .perform(doPerform()); + builder.externalTransition() + .from(WorkflowTaskInstanceStage.RUNNING) + .to(WorkflowTaskInstanceStage.TERMINATED) + .on(WorkflowTaskInstanceEvent.COMMAND_SHUTDOWN) + .perform(doPerform()); + + builder.externalTransition() + .from(WorkflowTaskInstanceStage.SUSPEND) + .to(WorkflowTaskInstanceStage.RUNNING) + .on(WorkflowTaskInstanceEvent.COMMAND_RESUME) + .perform(doPerform()); + builder.externalTransition() + .from(WorkflowTaskInstanceStage.SUSPEND) + .to(WorkflowTaskInstanceStage.TERMINATED) + .on(WorkflowTaskInstanceEvent.COMMAND_SHUTDOWN) + .perform(doPerform()); + + this.stateMachine = builder.build(CONSUMER_GROUP); + } + + private Action> doPerform() { + return (fromState, toState, eventEnum, pair) -> { + Queue queue = queueFactory.get(getTopic(eventEnum)); + WorkflowTaskInstanceEventDTO eventDTO = new WorkflowTaskInstanceEventDTO(fromState, toState, eventEnum, pair.getLeft(), pair.getRight()); + Message message = Message.builder() + .topic(queue.getName()) + .body(FuryUtil.serializeByJava(eventDTO)) + .build(); + queue.push(message); + }; + } + + private String getTopic(WorkflowTaskInstanceEvent event) { + switch (event) { + case COMMAND_DEPLOY: + return WorkflowTaskInstanceDeployEventListener.TOPIC; + case PROCESS_SUCCESS: + return WorkflowTaskInstanceSuccessEventListener.TOPIC; + case PROCESS_FAILURE: + return WorkflowTaskInstanceFailureEventListener.TOPIC; + default: + throw new IllegalStateException("unknown workflow task instance event: " + JacksonUtil.toJsonString(event)); + } + } + + public void deploy(DagStepDTO dagStepDTO) { + + WorkflowTaskInstanceStage stage = WorkflowTaskInstanceStage.PENDING; + stateMachine.fireEvent(stage, WorkflowTaskInstanceEvent.COMMAND_DEPLOY, Pair.of(dagStepDTO.getId(), null)); + } + + public void shutdown(DagStepDTO dagStepDTO) { + WorkflowTaskInstanceStage stage = WorkflowTaskInstanceStage.of(dagStepDTO.getStatus()); + stateMachine.fireEvent(stage, WorkflowTaskInstanceEvent.COMMAND_SHUTDOWN, Pair.of(dagStepDTO.getId(), null)); + } + + public void suspend(DagStepDTO dagStepDTO) { + WorkflowTaskInstanceStage stage = WorkflowTaskInstanceStage.of(dagStepDTO.getStatus()); + stateMachine.fireEvent(stage, WorkflowTaskInstanceEvent.COMMAND_SUSPEND, Pair.of(dagStepDTO.getId(), null)); + } + + public void resume(DagStepDTO dagStepDTO) { + WorkflowTaskInstanceStage stage = WorkflowTaskInstanceStage.of(dagStepDTO.getStatus()); + stateMachine.fireEvent(stage, WorkflowTaskInstanceEvent.COMMAND_RESUME, Pair.of(dagStepDTO.getId(), null)); + } + + public void onSuccess(DagStepDTO dagStepDTO) { + WorkflowTaskInstanceStage stage = WorkflowTaskInstanceStage.of(dagStepDTO.getStatus()); + stateMachine.fireEvent(stage, WorkflowTaskInstanceEvent.PROCESS_SUCCESS, Pair.of(dagStepDTO.getId(), null)); + } + + public void onFailure(DagStepDTO dagStepDTO, Throwable throwable) { + WorkflowTaskInstanceStage stage = WorkflowTaskInstanceStage.of(dagStepDTO.getStatus()); + stateMachine.fireEvent(stage, WorkflowTaskInstanceEvent.PROCESS_FAILURE, Pair.of(dagStepDTO.getId(), throwable)); + } +} diff --git a/carp-modules/carp-module-workflow/pom.xml b/carp-modules/carp-module-workflow/pom.xml index 02b58abf..d1096ba0 100644 --- a/carp-modules/carp-module-workflow/pom.xml +++ b/carp-modules/carp-module-workflow/pom.xml @@ -31,6 +31,7 @@ carp-module-workflow-api + carp-module-workflow-internal \ No newline at end of file diff --git a/pom.xml b/pom.xml index 203e60f3..64012905 100644 --- a/pom.xml +++ b/pom.xml @@ -254,6 +254,11 @@ carp-module-system ${project.version} + + ${project.groupId} + carp-module-workflow-api + ${project.version} + ${project.groupId}