diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/core/FlowExecutor.java b/liteflow-core/src/main/java/com/yomahub/liteflow/core/FlowExecutor.java index 54b9af140..92a019915 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/core/FlowExecutor.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/core/FlowExecutor.java @@ -9,10 +9,7 @@ package com.yomahub.liteflow.core; import cn.hutool.core.collection.ListUtil; -import cn.hutool.core.util.BooleanUtil; -import cn.hutool.core.util.ObjectUtil; -import cn.hutool.core.util.ReUtil; -import cn.hutool.core.util.StrUtil; +import cn.hutool.core.util.*; import com.yomahub.liteflow.exception.*; import com.yomahub.liteflow.flow.FlowBus; import com.yomahub.liteflow.flow.LiteflowResponse; @@ -159,14 +156,14 @@ public class FlowExecutor { //隐式流程的调用方法 public void invoke(String chainId, Object param, Integer slotIndex) throws Exception { - LiteflowResponse response = this.execute2Resp(chainId, param, null, slotIndex, true); + LiteflowResponse response = this.execute2Resp(chainId, param, null, null, slotIndex, true); if (!response.isSuccess()){ throw response.getCause(); } } public LiteflowResponse invoke2Resp(String chainId, Object param, Integer slotIndex) { - return this.execute2Resp(chainId, param, null, slotIndex, true); + return this.execute2Resp(chainId, param, null, null, slotIndex, true); } //单独调用某一个node @@ -187,13 +184,23 @@ public class FlowExecutor { //调用一个流程并返回LiteflowResponse,允许多上下文的传入 public LiteflowResponse execute2Resp(String chainId, Object param, Class... contextBeanClazzArray) { - return this.execute2Resp(chainId, param, contextBeanClazzArray, null, false); + return this.execute2Resp(chainId, param, contextBeanClazzArray, null, null, false); } //调用一个流程并返回Future,允许多上下文的传入 public Future execute2Future(String chainId, Object param, Class... contextBeanClazzArray) { return ExecutorHelper.loadInstance().buildMainExecutor(liteflowConfig.getMainExecutorClass()).submit(() - -> FlowExecutorHolder.loadInstance().execute2Resp(chainId, param, contextBeanClazzArray, null, false)); + -> FlowExecutorHolder.loadInstance().execute2Resp(chainId, param, contextBeanClazzArray,null, null, false)); + } + + + public LiteflowResponse execute2Resp(String chainId, Object param, Object... contextBeanArray) { + return this.execute2Resp(chainId, param, null, contextBeanArray, null, false); + } + + public Future execute2Future(String chainId, Object param, Object... contextBeanArray) { + return ExecutorHelper.loadInstance().buildMainExecutor(liteflowConfig.getMainExecutorClass()).submit(() + -> FlowExecutorHolder.loadInstance().execute2Resp(chainId, param, null, contextBeanArray, null, false)); } //调用一个流程,返回默认的上下文,适用于简单的调用 @@ -206,20 +213,31 @@ public class FlowExecutor { } } - private LiteflowResponse execute2Resp(String chainId, Object param, Class[] contextBeanClazzArray, + private LiteflowResponse execute2Resp(String chainId, + Object param, + Class[] contextBeanClazzArray, + Object[] contextBeanArray, Integer slotIndex, boolean isInnerChain) { - Slot slot = doExecute(chainId, param, contextBeanClazzArray, slotIndex, isInnerChain); + Slot slot = doExecute(chainId, param, contextBeanClazzArray, contextBeanArray, slotIndex, isInnerChain); return new LiteflowResponse(slot); } - private Slot doExecute(String chainId, Object param, Class[] contextBeanClazzArray, Integer slotIndex, + private Slot doExecute(String chainId, + Object param, + Class[] contextBeanClazzArray, + Object[] contextBeanArray, + Integer slotIndex, boolean isInnerChain) { if (FlowBus.needInit()) { init(); } if (!isInnerChain && ObjectUtil.isNull(slotIndex)) { - slotIndex = DataBus.offerSlot(ListUtil.toList(contextBeanClazzArray)); + if (ArrayUtil.isNotEmpty(contextBeanClazzArray)){ + slotIndex = DataBus.offerSlotByClass(ListUtil.toList(contextBeanClazzArray)); + }else{ + slotIndex = DataBus.offerSlotByBean(ListUtil.toList(contextBeanArray)); + } if (BooleanUtil.isTrue(liteflowConfig.getPrintExecutionLog())) { LOG.info("slot[{}] offered", slotIndex); } diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/slot/DataBus.java b/liteflow-core/src/main/java/com/yomahub/liteflow/slot/DataBus.java index e10905c98..12bd43705 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/slot/DataBus.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/slot/DataBus.java @@ -58,15 +58,25 @@ public class DataBus { } } - public static int offerSlot(List> contextClazzList) { + public static int offerSlotByClass(List> contextClazzList) { + //把classList通过反射初始化成对象列表 + //这里用newInstanceIfPossible这个方法,是为了兼容当没有无参构造方法所报的错 + List contextBeanList = contextClazzList.stream() + .map((Function, Object>) ReflectUtil::newInstanceIfPossible).collect(Collectors.toList()); + + Slot slot = new Slot(contextBeanList); + + return offerIndex(slot); + } + + public static int offerSlotByBean(List contextList){ + Slot slot = new Slot(contextList); + + return offerIndex(slot); + } + + private static int offerIndex(Slot slot){ try { - //把classList通过反射初始化成对象列表 - //这里用newInstanceIfPossible这个方法,是为了兼容当没有无参构造方法所报的错 - List contextBeanList = contextClazzList.stream() - .map((Function, Object>) ReflectUtil::newInstanceIfPossible).collect(Collectors.toList()); - - Slot slot = new Slot(contextBeanList); - //这里有没有并发问题? //没有,因为QUEUE的类型为ConcurrentLinkedQueue,并发情况下,每次取到的index不会相同 //当然前提是QUEUE里面的值不会重复,但是这个是由其他机制来保证的