Skip to content

Commit

Permalink
feature: update workflow engine
Browse files Browse the repository at this point in the history
  • Loading branch information
kalencaya committed Oct 1, 2024
1 parent 3fc9830 commit 724bdef
Show file tree
Hide file tree
Showing 31 changed files with 1,019 additions and 531 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cn.sliew.carp.module.workflow.internal.engine.dispatch;

import cn.sliew.carp.framework.common.dict.workflow.WorkflowInstanceEvent;
import cn.sliew.carp.module.queue.api.Message;
import cn.sliew.carp.module.queue.api.MessageHandler;
import cn.sliew.carp.module.queue.api.MessageListener;
import cn.sliew.carp.module.queue.api.util.Serder;
import cn.sliew.carp.module.workflow.api.engine.dispatch.WorkflowInstanceEventDispatcher;
import cn.sliew.carp.module.workflow.api.engine.dispatch.handler.WorkflowInstanceEventHandler;
import cn.sliew.carp.module.workflow.api.engine.dispatch.event.WorkflowInstanceStatusEvent;
import cn.sliew.carp.module.workflow.internal.engine.dispatch.event.WorkflowInstanceEventDTO;
import cn.sliew.carp.module.workflow.internal.statemachine.WorkflowTaskInstanceStateMachine;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.util.CollectionUtils;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;

@Slf4j
@MessageListener(topic = InternalWorkflowInstanceEventDispatcher.TOPIC, consumerGroup = WorkflowTaskInstanceStateMachine.CONSUMER_GROUP)
public class InternalWorkflowInstanceEventDispatcher implements WorkflowInstanceEventDispatcher, MessageHandler, InitializingBean, DisposableBean {

public static final String TOPIC = "TOPIC_CARP_INTERNAL_WORKFLOW_INSTANCE_EVENT";

@Autowired
private List<WorkflowInstanceEventHandler> handlers;

private Map<WorkflowInstanceEvent, WorkflowInstanceEventHandler> registry = new HashMap<>();
private ThreadPoolTaskExecutor taskExecutor;

@Override
public void afterPropertiesSet() throws Exception {
if (CollectionUtils.isEmpty(handlers) == false) {
handlers.stream().forEach(handler -> registry.put(handler.getType(), handler));
}
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setMaxPoolSize(5);
executor.setCorePoolSize(1);
executor.setThreadNamePrefix("workflow-instance-thread-pool-");
executor.initialize();
taskExecutor = executor;
}

@Override
public void destroy() throws Exception {
if (taskExecutor != null) {
taskExecutor.shutdown();
}
}

@Override
public void handler(Message message) throws Exception {
if (message.getBody() != null) {
Object deserialized = Serder.deserializeByJava(message.getBody());
if (deserialized instanceof WorkflowInstanceEventDTO) {
WorkflowInstanceEventDTO eventDTO = (WorkflowInstanceEventDTO) deserialized;
dispatch(eventDTO);
}
}
}

@Override
public void dispatch(WorkflowInstanceStatusEvent event) {
if (registry.containsKey(event.getEvent()) == false) {
throw new RuntimeException("unknown workflow instance event: "
+ event.getEvent().getLabel() + "[" + event.getEvent().getValue() + "]");
}
WorkflowInstanceEventHandler handler = registry.get(event.getEvent());
CompletableFuture.runAsync(() -> handler.handle(event), taskExecutor)
.whenComplete((unused, throwable) -> {
if (throwable != null) {
log.error("workflow instance event dispatch failed", throwable);
}
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cn.sliew.carp.module.workflow.internal.engine.dispatch;

import cn.sliew.carp.framework.common.dict.workflow.WorkflowTaskInstanceEvent;
import cn.sliew.carp.module.queue.api.Message;
import cn.sliew.carp.module.queue.api.MessageHandler;
import cn.sliew.carp.module.queue.api.MessageListener;
import cn.sliew.carp.module.queue.api.util.Serder;
import cn.sliew.carp.module.workflow.api.engine.dispatch.WorkflowTaskInstanceEventDispatcher;
import cn.sliew.carp.module.workflow.api.engine.dispatch.event.WorkflowTaskInstanceStatusEvent;
import cn.sliew.carp.module.workflow.api.engine.dispatch.handler.WorkflowTaskInstanceEventHandler;
import cn.sliew.carp.module.workflow.internal.engine.dispatch.event.WorkflowTaskInstanceEventDTO;
import cn.sliew.carp.module.workflow.internal.statemachine.WorkflowInstanceStateMachine;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.util.CollectionUtils;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;

@Slf4j
@MessageListener(topic = InternalWorkflowTaskInstanceEventDispatcher.TOPIC, consumerGroup = WorkflowInstanceStateMachine.CONSUMER_GROUP)
public class InternalWorkflowTaskInstanceEventDispatcher implements WorkflowTaskInstanceEventDispatcher, MessageHandler, InitializingBean, DisposableBean {

public static final String TOPIC = "TOPIC_CARP_INTERNAL_WORKFLOW_TASK_INSTANCE_EVENT";

@Autowired
private List<WorkflowTaskInstanceEventHandler> handlers;

private Map<WorkflowTaskInstanceEvent, WorkflowTaskInstanceEventHandler> registry = new HashMap<>();
private ThreadPoolTaskExecutor taskExecutor;

@Override
public void afterPropertiesSet() throws Exception {
if (CollectionUtils.isEmpty(handlers) == false) {
handlers.stream().forEach(handler -> registry.put(handler.getType(), handler));
}
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setMaxPoolSize(5);
executor.setCorePoolSize(1);
executor.setThreadNamePrefix("workflow-task-instance-thread-pool-");
executor.initialize();
taskExecutor = executor;
}

@Override
public void destroy() throws Exception {
if (taskExecutor != null) {
taskExecutor.shutdown();
}
}

@Override
public void handler(Message message) throws Exception {
if (message.getBody() != null) {
Object deserialized = Serder.deserializeByJava(message.getBody());
if (deserialized instanceof WorkflowTaskInstanceEventDTO) {
WorkflowTaskInstanceEventDTO eventDTO = (WorkflowTaskInstanceEventDTO) deserialized;
dispatch(eventDTO);
}
}
}

@Override
public void dispatch(WorkflowTaskInstanceStatusEvent event) {
if (registry.containsKey(event.getEvent()) == false) {
throw new RuntimeException("unknown workflow task instance event: "
+ event.getEvent().getLabel() + "[" + event.getEvent().getValue() + "]");
}
WorkflowTaskInstanceEventHandler handler = registry.get(event.getEvent());
CompletableFuture.runAsync(() -> handler.handle(event), taskExecutor)
.whenComplete((unused, throwable) -> {
if (throwable != null) {
log.error("workflow task instance event dispatch failed", throwable);
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,17 @@
* limitations under the License.
*/

package cn.sliew.carp.module.workflow.internal.listener.workflowinstance;
package cn.sliew.carp.module.workflow.internal.engine.dispatch.event;

import cn.sliew.carp.framework.common.dict.workflow.WorkflowInstanceEvent;
import cn.sliew.carp.framework.common.dict.workflow.WorkflowInstanceState;
import cn.sliew.carp.module.workflow.api.engine.dispatch.event.WorkflowInstanceStatusEvent;
import lombok.Getter;

import java.io.Serializable;

@Getter
public class WorkflowInstanceEventDTO implements Serializable {
public class WorkflowInstanceEventDTO implements WorkflowInstanceStatusEvent, Serializable {

private static final long serialVersionUID = 1L;

Expand All @@ -46,4 +47,9 @@ public WorkflowInstanceEventDTO(WorkflowInstanceState state, WorkflowInstanceSta
this.workflowInstanceId = workflowInstanceId;
this.throwable = throwable;
}

@Override
public WorkflowInstanceEvent getEvent() {
return event;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,17 @@
* limitations under the License.
*/

package cn.sliew.carp.module.workflow.internal.listener.taskinstance;
package cn.sliew.carp.module.workflow.internal.engine.dispatch.event;

import cn.sliew.carp.framework.common.dict.workflow.WorkflowTaskInstanceEvent;
import cn.sliew.carp.framework.common.dict.workflow.WorkflowTaskInstanceStage;
import cn.sliew.carp.module.workflow.api.engine.dispatch.event.WorkflowTaskInstanceStatusEvent;
import lombok.Getter;

import java.io.Serializable;

@Getter
public class WorkflowTaskInstanceEventDTO implements Serializable {
public class WorkflowTaskInstanceEventDTO implements WorkflowTaskInstanceStatusEvent, Serializable {

private static final long serialVersionUID = 1L;

Expand All @@ -46,4 +47,9 @@ public WorkflowTaskInstanceEventDTO(WorkflowTaskInstanceStage state, WorkflowTas
this.workflowTaskInstanceId = workflowTaskInstanceId;
this.throwable = throwable;
}

@Override
public WorkflowTaskInstanceEvent getEvent() {
return event;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@
* limitations under the License.
*/

package cn.sliew.carp.module.workflow.internal.listener.taskinstance;
package cn.sliew.carp.module.workflow.internal.engine.dispatch.handler.task;

import cn.sliew.carp.framework.dag.service.DagInstanceComplexService;
import cn.sliew.carp.framework.dag.service.DagStepService;
import cn.sliew.carp.module.workflow.api.service.WorkflowInstanceService;
import cn.sliew.carp.module.workflow.internal.engine.dispatch.event.WorkflowTaskInstanceEventDTO;
import cn.sliew.carp.module.workflow.internal.statemachine.WorkflowInstanceStateMachine;
import cn.sliew.carp.module.workflow.internal.statemachine.WorkflowTaskInstanceStateMachine;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -45,6 +47,8 @@ public abstract class AbstractWorkflowTaskInstanceEventListener implements Workf
@Autowired
protected DagStepService dagStepService;
@Autowired
protected WorkflowInstanceService workflowInstanceService;
@Autowired
protected WorkflowInstanceStateMachine workflowInstanceStateMachine;
@Autowired
protected WorkflowTaskInstanceStateMachine stateMachine;
Expand All @@ -63,7 +67,7 @@ public void afterPropertiesSet() throws Exception {
}

@Override
public void onEvent(WorkflowTaskInstanceEventDTO event) {
public void handleInternal(WorkflowTaskInstanceEventDTO event) {
try {
handleEventAsync(event);
} catch (Throwable throwable) {
Expand All @@ -72,7 +76,7 @@ public void onEvent(WorkflowTaskInstanceEventDTO event) {
}

protected void onFailure(Long workflowTaskInstanceId, Throwable throwable) {
stateMachine.onFailure(dagStepService.get(workflowTaskInstanceId), throwable);
// stateMachine.onFailure(dagStepService.get(workflowTaskInstanceId), throwable);
}

protected abstract CompletableFuture handleEventAsync(WorkflowTaskInstanceEventDTO event);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cn.sliew.carp.module.workflow.internal.engine.dispatch.handler.task;

import cn.sliew.carp.framework.common.dict.workflow.WorkflowTaskInstanceEvent;
import cn.sliew.carp.framework.dag.service.dto.DagStepDTO;
import cn.sliew.carp.module.workflow.internal.engine.dispatch.event.WorkflowTaskInstanceEventDTO;
import org.springframework.stereotype.Component;

import java.util.Date;
import java.util.concurrent.CompletableFuture;

@Component
public class WorkflowTaskInstanceDeployEventListener extends AbstractWorkflowTaskInstanceEventListener {

@Override
public WorkflowTaskInstanceEvent getType() {
return WorkflowTaskInstanceEvent.COMMAND_DEPLOY;
}

@Override
protected CompletableFuture handleEventAsync(WorkflowTaskInstanceEventDTO event) {
CompletableFuture<?> future = CompletableFuture.runAsync(() -> run(event)).toCompletableFuture();
future.whenCompleteAsync((unused, throwable) -> {
if (throwable != null) {
onFailure(event.getWorkflowTaskInstanceId(), throwable);
}
});
return future;
}

private void run(WorkflowTaskInstanceEventDTO event) {
DagStepDTO dagStepUpdateParam = new DagStepDTO();
dagStepUpdateParam.setId(event.getWorkflowTaskInstanceId());
dagStepUpdateParam.setStatus(event.getNextState().getValue());
dagStepUpdateParam.setStartTime(new Date());
dagStepService.update(dagStepUpdateParam);
stateMachine.onSuccess(workflowInstanceService.getTask(event.getWorkflowTaskInstanceId()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,21 @@
* limitations under the License.
*/

package cn.sliew.carp.module.workflow.internal.listener.taskinstance;
package cn.sliew.carp.module.workflow.internal.engine.dispatch.handler.task;

import cn.sliew.carp.module.queue.api.Message;
import cn.sliew.carp.module.queue.api.MessageHandler;
import cn.sliew.carp.module.queue.api.util.Serder;
import cn.sliew.carp.module.workflow.api.engine.dispatch.event.WorkflowTaskInstanceStatusEvent;
import cn.sliew.carp.module.workflow.api.engine.dispatch.handler.WorkflowTaskInstanceEventHandler;
import cn.sliew.carp.module.workflow.internal.engine.dispatch.event.WorkflowTaskInstanceEventDTO;

public interface WorkflowTaskInstanceEventListener extends MessageHandler {
public interface WorkflowTaskInstanceEventListener extends WorkflowTaskInstanceEventHandler {

@Override
default void handler(Message message) throws Exception {
if (message.getBody() != null) {
Object deserialized = Serder.deserializeByJava(message.getBody());
if (deserialized instanceof WorkflowTaskInstanceEventDTO) {
WorkflowTaskInstanceEventDTO eventDTO = (WorkflowTaskInstanceEventDTO)deserialized;
onEvent(eventDTO);
}
default void handle(WorkflowTaskInstanceStatusEvent event) {
if (event instanceof WorkflowTaskInstanceEventDTO == false) {
throw new RuntimeException();
}
handleInternal((WorkflowTaskInstanceEventDTO) event);
}

void onEvent(WorkflowTaskInstanceEventDTO eventDTO);
void handleInternal(WorkflowTaskInstanceEventDTO eventDTO);
}
Loading

0 comments on commit 724bdef

Please sign in to comment.