From 8877f6e98cfda89278381c7b5da2e4e13c8c8ffa Mon Sep 17 00:00:00 2001 From: "everywhere.z" Date: Tue, 16 Aug 2022 09:20:22 +0100 Subject: [PATCH] =?UTF-8?q?feature=20#I5M34O=20=E6=94=AF=E6=8C=81=E5=9C=A8?= =?UTF-8?q?=E6=B5=81=E7=A8=8B=E6=89=A7=E8=A1=8C=E5=89=8D=E5=B0=B1=E4=BC=A0?= =?UTF-8?q?=E5=85=A5=E4=B8=80=E4=B8=AA=E5=88=9D=E5=A7=8B=E5=8C=96=E5=A5=BD?= =?UTF-8?q?=E7=9A=84context=E5=AF=B9=E8=B1=A1=E7=9A=84=E7=89=B9=E6=80=A7?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../yomahub/liteflow/core/FlowExecutor.java | 42 +++++++++++++------ .../com/yomahub/liteflow/slot/DataBus.java | 26 ++++++++---- 2 files changed, 48 insertions(+), 20 deletions(-) diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/core/FlowExecutor.java b/liteflow-core/src/main/java/com/yomahub/liteflow/core/FlowExecutor.java index 54b9af140..92a019915 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/core/FlowExecutor.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/core/FlowExecutor.java @@ -9,10 +9,7 @@ package com.yomahub.liteflow.core; import cn.hutool.core.collection.ListUtil; -import cn.hutool.core.util.BooleanUtil; -import cn.hutool.core.util.ObjectUtil; -import cn.hutool.core.util.ReUtil; -import cn.hutool.core.util.StrUtil; +import cn.hutool.core.util.*; import com.yomahub.liteflow.exception.*; import com.yomahub.liteflow.flow.FlowBus; import com.yomahub.liteflow.flow.LiteflowResponse; @@ -159,14 +156,14 @@ public class FlowExecutor { //隐式流程的调用方法 public void invoke(String chainId, Object param, Integer slotIndex) throws Exception { - LiteflowResponse response = this.execute2Resp(chainId, param, null, slotIndex, true); + LiteflowResponse response = this.execute2Resp(chainId, param, null, null, slotIndex, true); if (!response.isSuccess()){ throw response.getCause(); } } public LiteflowResponse invoke2Resp(String chainId, Object param, Integer slotIndex) { - return this.execute2Resp(chainId, param, null, slotIndex, true); + return this.execute2Resp(chainId, param, null, null, slotIndex, true); } //单独调用某一个node @@ -187,13 +184,23 @@ public class FlowExecutor { //调用一个流程并返回LiteflowResponse,允许多上下文的传入 public LiteflowResponse execute2Resp(String chainId, Object param, Class... contextBeanClazzArray) { - return this.execute2Resp(chainId, param, contextBeanClazzArray, null, false); + return this.execute2Resp(chainId, param, contextBeanClazzArray, null, null, false); } //调用一个流程并返回Future,允许多上下文的传入 public Future execute2Future(String chainId, Object param, Class... contextBeanClazzArray) { return ExecutorHelper.loadInstance().buildMainExecutor(liteflowConfig.getMainExecutorClass()).submit(() - -> FlowExecutorHolder.loadInstance().execute2Resp(chainId, param, contextBeanClazzArray, null, false)); + -> FlowExecutorHolder.loadInstance().execute2Resp(chainId, param, contextBeanClazzArray,null, null, false)); + } + + + public LiteflowResponse execute2Resp(String chainId, Object param, Object... contextBeanArray) { + return this.execute2Resp(chainId, param, null, contextBeanArray, null, false); + } + + public Future execute2Future(String chainId, Object param, Object... contextBeanArray) { + return ExecutorHelper.loadInstance().buildMainExecutor(liteflowConfig.getMainExecutorClass()).submit(() + -> FlowExecutorHolder.loadInstance().execute2Resp(chainId, param, null, contextBeanArray, null, false)); } //调用一个流程,返回默认的上下文,适用于简单的调用 @@ -206,20 +213,31 @@ public class FlowExecutor { } } - private LiteflowResponse execute2Resp(String chainId, Object param, Class[] contextBeanClazzArray, + private LiteflowResponse execute2Resp(String chainId, + Object param, + Class[] contextBeanClazzArray, + Object[] contextBeanArray, Integer slotIndex, boolean isInnerChain) { - Slot slot = doExecute(chainId, param, contextBeanClazzArray, slotIndex, isInnerChain); + Slot slot = doExecute(chainId, param, contextBeanClazzArray, contextBeanArray, slotIndex, isInnerChain); return new LiteflowResponse(slot); } - private Slot doExecute(String chainId, Object param, Class[] contextBeanClazzArray, Integer slotIndex, + private Slot doExecute(String chainId, + Object param, + Class[] contextBeanClazzArray, + Object[] contextBeanArray, + Integer slotIndex, boolean isInnerChain) { if (FlowBus.needInit()) { init(); } if (!isInnerChain && ObjectUtil.isNull(slotIndex)) { - slotIndex = DataBus.offerSlot(ListUtil.toList(contextBeanClazzArray)); + if (ArrayUtil.isNotEmpty(contextBeanClazzArray)){ + slotIndex = DataBus.offerSlotByClass(ListUtil.toList(contextBeanClazzArray)); + }else{ + slotIndex = DataBus.offerSlotByBean(ListUtil.toList(contextBeanArray)); + } if (BooleanUtil.isTrue(liteflowConfig.getPrintExecutionLog())) { LOG.info("slot[{}] offered", slotIndex); } diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/slot/DataBus.java b/liteflow-core/src/main/java/com/yomahub/liteflow/slot/DataBus.java index e10905c98..12bd43705 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/slot/DataBus.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/slot/DataBus.java @@ -58,15 +58,25 @@ public class DataBus { } } - public static int offerSlot(List> contextClazzList) { + public static int offerSlotByClass(List> contextClazzList) { + //把classList通过反射初始化成对象列表 + //这里用newInstanceIfPossible这个方法,是为了兼容当没有无参构造方法所报的错 + List contextBeanList = contextClazzList.stream() + .map((Function, Object>) ReflectUtil::newInstanceIfPossible).collect(Collectors.toList()); + + Slot slot = new Slot(contextBeanList); + + return offerIndex(slot); + } + + public static int offerSlotByBean(List contextList){ + Slot slot = new Slot(contextList); + + return offerIndex(slot); + } + + private static int offerIndex(Slot slot){ try { - //把classList通过反射初始化成对象列表 - //这里用newInstanceIfPossible这个方法,是为了兼容当没有无参构造方法所报的错 - List contextBeanList = contextClazzList.stream() - .map((Function, Object>) ReflectUtil::newInstanceIfPossible).collect(Collectors.toList()); - - Slot slot = new Slot(contextBeanList); - //这里有没有并发问题? //没有,因为QUEUE的类型为ConcurrentLinkedQueue,并发情况下,每次取到的index不会相同 //当然前提是QUEUE里面的值不会重复,但是这个是由其他机制来保证的