feature #I5M34O 支持在流程执行前就传入一个初始化好的context对象的特性

This commit is contained in:
everywhere.z
2022-08-16 09:20:22 +01:00
parent 587943ca1a
commit 8877f6e98c
2 changed files with 48 additions and 20 deletions

View File

@@ -9,10 +9,7 @@
package com.yomahub.liteflow.core;
import cn.hutool.core.collection.ListUtil;
import cn.hutool.core.util.BooleanUtil;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.ReUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.core.util.*;
import com.yomahub.liteflow.exception.*;
import com.yomahub.liteflow.flow.FlowBus;
import com.yomahub.liteflow.flow.LiteflowResponse;
@@ -159,14 +156,14 @@ public class FlowExecutor {
//隐式流程的调用方法
public void invoke(String chainId, Object param, Integer slotIndex) throws Exception {
LiteflowResponse response = this.execute2Resp(chainId, param, null, slotIndex, true);
LiteflowResponse response = this.execute2Resp(chainId, param, null, null, slotIndex, true);
if (!response.isSuccess()){
throw response.getCause();
}
}
public LiteflowResponse invoke2Resp(String chainId, Object param, Integer slotIndex) {
return this.execute2Resp(chainId, param, null, slotIndex, true);
return this.execute2Resp(chainId, param, null, null, slotIndex, true);
}
//单独调用某一个node
@@ -187,13 +184,23 @@ public class FlowExecutor {
//调用一个流程并返回LiteflowResponse允许多上下文的传入
public LiteflowResponse execute2Resp(String chainId, Object param, Class<?>... contextBeanClazzArray) {
return this.execute2Resp(chainId, param, contextBeanClazzArray, null, false);
return this.execute2Resp(chainId, param, contextBeanClazzArray, null, null, false);
}
//调用一个流程并返回Future<LiteflowResponse>,允许多上下文的传入
public Future<LiteflowResponse> execute2Future(String chainId, Object param, Class<?>... contextBeanClazzArray) {
return ExecutorHelper.loadInstance().buildMainExecutor(liteflowConfig.getMainExecutorClass()).submit(()
-> FlowExecutorHolder.loadInstance().execute2Resp(chainId, param, contextBeanClazzArray, null, false));
-> FlowExecutorHolder.loadInstance().execute2Resp(chainId, param, contextBeanClazzArray,null, null, false));
}
public LiteflowResponse execute2Resp(String chainId, Object param, Object... contextBeanArray) {
return this.execute2Resp(chainId, param, null, contextBeanArray, null, false);
}
public Future<LiteflowResponse> execute2Future(String chainId, Object param, Object... contextBeanArray) {
return ExecutorHelper.loadInstance().buildMainExecutor(liteflowConfig.getMainExecutorClass()).submit(()
-> FlowExecutorHolder.loadInstance().execute2Resp(chainId, param, null, contextBeanArray, null, false));
}
//调用一个流程,返回默认的上下文,适用于简单的调用
@@ -206,20 +213,31 @@ public class FlowExecutor {
}
}
private LiteflowResponse execute2Resp(String chainId, Object param, Class<?>[] contextBeanClazzArray,
private LiteflowResponse execute2Resp(String chainId,
Object param,
Class<?>[] contextBeanClazzArray,
Object[] contextBeanArray,
Integer slotIndex, boolean isInnerChain) {
Slot slot = doExecute(chainId, param, contextBeanClazzArray, slotIndex, isInnerChain);
Slot slot = doExecute(chainId, param, contextBeanClazzArray, contextBeanArray, slotIndex, isInnerChain);
return new LiteflowResponse(slot);
}
private Slot doExecute(String chainId, Object param, Class<?>[] contextBeanClazzArray, Integer slotIndex,
private Slot doExecute(String chainId,
Object param,
Class<?>[] contextBeanClazzArray,
Object[] contextBeanArray,
Integer slotIndex,
boolean isInnerChain) {
if (FlowBus.needInit()) {
init();
}
if (!isInnerChain && ObjectUtil.isNull(slotIndex)) {
slotIndex = DataBus.offerSlot(ListUtil.toList(contextBeanClazzArray));
if (ArrayUtil.isNotEmpty(contextBeanClazzArray)){
slotIndex = DataBus.offerSlotByClass(ListUtil.toList(contextBeanClazzArray));
}else{
slotIndex = DataBus.offerSlotByBean(ListUtil.toList(contextBeanArray));
}
if (BooleanUtil.isTrue(liteflowConfig.getPrintExecutionLog())) {
LOG.info("slot[{}] offered", slotIndex);
}

View File

@@ -58,15 +58,25 @@ public class DataBus {
}
}
public static int offerSlot(List<Class<?>> contextClazzList) {
public static int offerSlotByClass(List<Class<?>> contextClazzList) {
//把classList通过反射初始化成对象列表
//这里用newInstanceIfPossible这个方法是为了兼容当没有无参构造方法所报的错
List<Object> contextBeanList = contextClazzList.stream()
.map((Function<Class<?>, Object>) ReflectUtil::newInstanceIfPossible).collect(Collectors.toList());
Slot slot = new Slot(contextBeanList);
return offerIndex(slot);
}
public static int offerSlotByBean(List<Object> contextList){
Slot slot = new Slot(contextList);
return offerIndex(slot);
}
private static int offerIndex(Slot slot){
try {
//把classList通过反射初始化成对象列表
//这里用newInstanceIfPossible这个方法是为了兼容当没有无参构造方法所报的错
List<Object> contextBeanList = contextClazzList.stream()
.map((Function<Class<?>, Object>) ReflectUtil::newInstanceIfPossible).collect(Collectors.toList());
Slot slot = new Slot(contextBeanList);
//这里有没有并发问题?
//没有因为QUEUE的类型为ConcurrentLinkedQueue并发情况下每次取到的index不会相同
//当然前提是QUEUE里面的值不会重复但是这个是由其他机制来保证的