Skip to content

Commit

Permalink
feature: add workflow definition graph and service
Browse files Browse the repository at this point in the history
  • Loading branch information
wangqi committed Sep 6, 2024
1 parent 1576b6e commit 9f3e808
Show file tree
Hide file tree
Showing 25 changed files with 1,596 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
import cn.sliew.carp.module.queue.api.BaseQueueFactory;
import cn.sliew.carp.module.queue.api.ListenerManager;
import cn.sliew.carp.module.queue.api.Queue;
import org.springframework.stereotype.Component;

@Component
public class DefaultDelayQueueFactory extends BaseQueueFactory {

private final ListenerManager listenerManager;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
<dependency>
<groupId>${project.parent.groupId}</groupId>
<artifactId>carp-framework-dag</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->

<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>cn.sliew</groupId>
<artifactId>carp-module-workflow</artifactId>
<version>0.0.12-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<artifactId>carp-module-workflow-internal</artifactId>

<dependencies>
<dependency>
<groupId>${project.parent.groupId}</groupId>
<artifactId>carp-module-workflow-api</artifactId>
</dependency>
<dependency>
<groupId>${project.parent.groupId}</groupId>
<artifactId>carp-module-queue-api</artifactId>
</dependency>

<dependency>
<groupId>com.alibaba.cola</groupId>
<artifactId>cola-component-statemachine</artifactId>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cn.sliew.carp.module.workflow.internal.listener.taskinstance;

import cn.sliew.carp.framework.dag.service.DagInstanceComplexService;
import cn.sliew.carp.framework.dag.service.DagStepService;
import cn.sliew.scaleph.workflow.simple.statemachine.WorkflowInstanceStateMachine;
import cn.sliew.scaleph.workflow.simple.statemachine.WorkflowTaskInstanceStateMachine;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RScheduledExecutorService;
import org.redisson.api.RedissonClient;
import org.redisson.api.WorkerOptions;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;

import java.util.concurrent.CompletableFuture;

@Slf4j
public abstract class AbstractWorkflowTaskInstanceEventListener implements WorkflowTaskInstanceEventListener, InitializingBean, BeanFactoryAware {

private BeanFactory beanFactory;
protected RScheduledExecutorService executorService;

@Autowired
protected DagInstanceComplexService dagInstanceComplexService;
@Autowired
protected DagStepService dagStepService;
@Autowired
protected WorkflowInstanceStateMachine workflowInstanceStateMachine;
@Autowired
protected WorkflowTaskInstanceStateMachine stateMachine;
@Autowired
private RedissonClient redissonClient;

@Override
public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
this.beanFactory = beanFactory;
}

@Override
public void afterPropertiesSet() throws Exception {
executorService = redissonClient.getExecutorService(WorkflowTaskInstanceStateMachine.EXECUTOR);
executorService.registerWorkers(WorkerOptions.defaults().workers(20).beanFactory(beanFactory));
}

@Override
public void onEvent(WorkflowTaskInstanceEventDTO event) {
try {
handleEventAsync(event);
} catch (Throwable throwable) {
onFailure(event.getWorkflowTaskInstanceId(), throwable);
}
}

protected void onFailure(Long workflowTaskInstanceId, Throwable throwable) {
stateMachine.onFailure(dagStepService.get(workflowTaskInstanceId), throwable);
}

protected abstract CompletableFuture handleEventAsync(WorkflowTaskInstanceEventDTO event);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cn.sliew.carp.module.workflow.internal.listener.taskinstance;

import cn.sliew.carp.framework.dag.service.DagConfigStepService;
import cn.sliew.carp.framework.dag.service.DagInstanceComplexService;
import cn.sliew.carp.framework.dag.service.DagStepService;
import cn.sliew.carp.framework.dag.service.dto.DagConfigStepDTO;
import cn.sliew.carp.framework.dag.service.dto.DagInstanceDTO;
import cn.sliew.carp.framework.dag.service.dto.DagStepDTO;
import cn.sliew.milky.common.exception.Rethrower;
import cn.sliew.milky.common.filter.ActionListener;
import cn.sliew.milky.common.util.JacksonUtil;
import cn.sliew.scaleph.common.util.SpringApplicationContextUtil;
import cn.sliew.scaleph.queue.MessageListener;
import cn.sliew.scaleph.workflow.engine.Engine;
import cn.sliew.scaleph.workflow.engine.EngineBuilder;
import cn.sliew.scaleph.workflow.engine.action.Action;
import cn.sliew.scaleph.workflow.engine.action.ActionContext;
import cn.sliew.scaleph.workflow.engine.action.ActionContextBuilder;
import cn.sliew.scaleph.workflow.engine.action.ActionResult;
import cn.sliew.scaleph.workflow.engine.workflow.SequentialFlow;
import cn.sliew.scaleph.workflow.engine.workflow.WorkFlow;
import cn.sliew.scaleph.workflow.service.dto.WorkflowTaskDefinitionMeta;
import cn.sliew.scaleph.workflow.simple.statemachine.WorkflowTaskInstanceStateMachine;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.annotation.RInject;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.util.ClassUtils;

import java.io.Serializable;
import java.util.Collections;
import java.util.Date;
import java.util.Map;
import java.util.concurrent.CompletableFuture;

@MessageListener(topic = WorkflowTaskInstanceDeployEventListener.TOPIC, consumerGroup = WorkflowTaskInstanceStateMachine.CONSUMER_GROUP)
public class WorkflowTaskInstanceDeployEventListener extends AbstractWorkflowTaskInstanceEventListener {

public static final String TOPIC = "TOPIC_WORKFLOW_TASK_INSTANCE_COMMAND_DEPLOY";

public static Engine engine = EngineBuilder.newInstance().build();

@Override
protected CompletableFuture handleEventAsync(WorkflowTaskInstanceEventDTO event) {
CompletableFuture<?> future = executorService.submit(new DeployRunner(event)).toCompletableFuture();
future.whenCompleteAsync((unused, throwable) -> {
if (throwable != null) {
onFailure(event.getWorkflowTaskInstanceId(), throwable);
}
});
return future;
}

@Slf4j
public static class DeployRunner implements Runnable, Serializable {

private WorkflowTaskInstanceEventDTO event;

@RInject
private String taskId;
@Autowired
private DagInstanceComplexService dagInstanceComplexService;
@Autowired
private DagConfigStepService dagConfigStepService;
@Autowired
private DagStepService dagStepService;
@Autowired
protected WorkflowTaskInstanceStateMachine stateMachine;

public DeployRunner(WorkflowTaskInstanceEventDTO event) {
this.event = event;
}

@Override
public void run() {

DagStepDTO dagStepUpdateParam = new DagStepDTO();
dagStepUpdateParam.setId(event.getWorkflowTaskInstanceId());
dagStepUpdateParam.setStatus(event.getNextState().getValue());
dagStepUpdateParam.setStartTime(new Date());
dagStepService.update(dagStepUpdateParam);

DagStepDTO stepDTO = dagStepService.get(event.getWorkflowTaskInstanceId());
DagInstanceDTO dagInstanceDTO = dagInstanceComplexService.selectSimpleOne(stepDTO.getDagInstanceId());
DagConfigStepDTO configStepDTO = dagConfigStepService.get(stepDTO.getDagConfigStep().getId());
WorkflowTaskDefinitionMeta workflowTaskDefinitionMeta = JacksonUtil.toObject(configStepDTO.getStepMeta(), WorkflowTaskDefinitionMeta.class);
try {
Class<?> clazz = ClassUtils.forName(workflowTaskDefinitionMeta.getHandler(), ClassUtils.getDefaultClassLoader());
Action action = (Action) SpringApplicationContextUtil.getBean(clazz);
WorkFlow workFlow = SequentialFlow.newSequentialFlow()
.name(configStepDTO.getStepName())
.execute(action)
.build();
ActionContext actionContext = buildActionContext(dagInstanceDTO, stepDTO);
engine.run(workFlow, actionContext, new ActionListener<ActionResult>() {
@Override
public void onResponse(ActionResult result) {
try {
ActionContext context = result.getContext();
log.info("workflow task {} run success!, globalInputs: {}, inputs: {}, outputs: {}",
configStepDTO.getStepName(), JacksonUtil.toJsonString(context.getGlobalInputs()), JacksonUtil.toJsonString(context.getInputs()), JacksonUtil.toJsonString(context.getOutputs()));
// 记录输出
DagStepDTO dagStepSuccessParam = new DagStepDTO();
dagStepSuccessParam.setId(event.getWorkflowTaskInstanceId());
dagStepSuccessParam.setOutputs(JacksonUtil.toJsonNode(context.getOutputs()));
dagStepService.update(dagStepSuccessParam);
// 通知成功
stateMachine.onSuccess(dagStepService.get(event.getWorkflowTaskInstanceId()));
} catch (Exception e) {
onFailure(e);
}
}

@Override
public void onFailure(Throwable e) {
log.error("workflow task {} run failure!", configStepDTO.getStepName(), e);
// 通知失败
stateMachine.onFailure(dagStepService.get(event.getWorkflowTaskInstanceId()), e);
}
});
} catch (ClassNotFoundException e) {
Rethrower.throwAs(e);
}
}

private ActionContext buildActionContext(DagInstanceDTO dagInstanceDTO, DagStepDTO stepDTO) {
Map<String, Object> globalInputs = Collections.emptyMap();
if (dagInstanceDTO.getInputs() != null && dagInstanceDTO.getInputs().isObject()) {
globalInputs = JacksonUtil.toMap(dagInstanceDTO.getInputs());
}
Map<String, Object> inputs = Collections.emptyMap();
if (stepDTO.getInputs() != null && stepDTO.getInputs().isObject()) {
inputs = JacksonUtil.toMap(stepDTO.getInputs());
}
return ActionContextBuilder.newBuilder()
.withWorkflowDefinitionId(dagInstanceDTO.getDagConfig().getId())
.withWorkflowInstanceId(stepDTO.getDagInstanceId())
.withWorkflowTaskDefinitionId(stepDTO.getDagConfigStep().getId())
.withWorkflowTaskInstanceId(stepDTO.getId())
.withGlobalInputs(globalInputs)
.withInputs(inputs)
.validateAndBuild();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cn.sliew.carp.module.workflow.internal.listener.taskinstance;

import cn.sliew.scaleph.common.dict.workflow.WorkflowTaskInstanceEvent;
import cn.sliew.scaleph.common.dict.workflow.WorkflowTaskInstanceStage;
import lombok.Getter;

import java.io.Serializable;

@Getter
public class WorkflowTaskInstanceEventDTO implements Serializable {

private static final long serialVersionUID = 1L;

private final WorkflowTaskInstanceStage state;
private final WorkflowTaskInstanceStage nextState;
private final WorkflowTaskInstanceEvent event;
private final Long workflowTaskInstanceId;
private final Throwable throwable;

public WorkflowTaskInstanceEventDTO(WorkflowTaskInstanceStage state, WorkflowTaskInstanceStage nextState, WorkflowTaskInstanceEvent event, Long workflowTaskInstanceId) {
this(state, nextState, event, workflowTaskInstanceId, null);
}

public WorkflowTaskInstanceEventDTO(WorkflowTaskInstanceStage state, WorkflowTaskInstanceStage nextState, WorkflowTaskInstanceEvent event, Long workflowTaskInstanceId, Throwable throwable) {
this.state = state;
this.nextState = nextState;
this.event = event;
this.workflowTaskInstanceId = workflowTaskInstanceId;
this.throwable = throwable;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cn.sliew.carp.module.workflow.internal.listener.taskinstance;

import cn.sliew.scaleph.queue.Message;
import cn.sliew.scaleph.queue.MessageHandler;
import cn.sliew.scaleph.queue.util.FuryUtil;

public interface WorkflowTaskInstanceEventListener extends MessageHandler {

@Override
default void handler(Message message) throws Exception {
if (message.getBody() != null) {
Object deserialized = FuryUtil.deserializeByJava(message.getBody());
if (deserialized instanceof WorkflowTaskInstanceEventDTO) {
WorkflowTaskInstanceEventDTO eventDTO = (WorkflowTaskInstanceEventDTO)deserialized;
onEvent(eventDTO);
}
}
}

void onEvent(WorkflowTaskInstanceEventDTO eventDTO);
}
Loading

0 comments on commit 9f3e808

Please sign in to comment.