diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/element/condition/ForCondition.java b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/element/condition/ForCondition.java index 496987c83..70142c85b 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/element/condition/ForCondition.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/element/condition/ForCondition.java @@ -6,10 +6,17 @@ import com.yomahub.liteflow.enums.ConditionTypeEnum; import com.yomahub.liteflow.exception.NoForNodeException; import com.yomahub.liteflow.flow.element.Executable; import com.yomahub.liteflow.flow.element.Node; +import com.yomahub.liteflow.flow.parallel.LoopFutureObj; import com.yomahub.liteflow.slot.DataBus; import com.yomahub.liteflow.slot.Slot; +import com.yomahub.liteflow.thread.ExecutorHelper; import com.yomahub.liteflow.util.LiteFlowProxyUtil; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; + /** * 循环次数Condition * @@ -18,67 +25,100 @@ import com.yomahub.liteflow.util.LiteFlowProxyUtil; */ public class ForCondition extends LoopCondition { - @Override - public void executeCondition(Integer slotIndex) throws Exception { - Slot slot = DataBus.getSlot(slotIndex); - Node forNode = this.getForNode(); - if (ObjectUtil.isNull(forNode)) { - String errorInfo = StrUtil.format("[{}]:no for-node found", slot.getRequestId()); - throw new NoForNodeException(errorInfo); - } + @Override + public void executeCondition(Integer slotIndex) throws Exception { + Slot slot = DataBus.getSlot(slotIndex); + Node forNode = this.getForNode(); + if (ObjectUtil.isNull(forNode)) { + String errorInfo = StrUtil.format("[{}]:no for-node found", slot.getRequestId()); + throw new NoForNodeException(errorInfo); + } - // 先去判断isAccess方法,如果isAccess方法都返回false,整个FOR表达式不执行 - if (!this.getForNode().isAccess(slotIndex)) { - return; - } + // 先去判断isAccess方法,如果isAccess方法都返回false,整个FOR表达式不执行 + if (!this.getForNode().isAccess(slotIndex)) { + return; + } - // 执行forCount组件 - forNode.setCurrChainId(this.getCurrChainId()); - forNode.execute(slotIndex); + // 执行forCount组件 + forNode.setCurrChainId(this.getCurrChainId()); + forNode.execute(slotIndex); - // 获得循环次数 - int forCount = forNode.getItemResultMetaValue(slotIndex); + // 获得循环次数 + int forCount = forNode.getItemResultMetaValue(slotIndex); - // 获得要循环的可执行对象 - Executable executableItem = this.getDoExecutor(); + // 获得要循环的可执行对象 + Executable executableItem = this.getDoExecutor(); - // 获取Break节点 - Executable breakItem = this.getBreakItem(); + // 获取Break节点 + Executable breakItem = this.getBreakItem(); - try{ - // 循环执行 - for (int i = 0; i < forCount; i++) { - executableItem.setCurrChainId(this.getCurrChainId()); - // 设置循环index - setLoopIndex(executableItem, i); - executableItem.execute(slotIndex); - // 如果break组件不为空,则去执行 - if (ObjectUtil.isNotNull(breakItem)) { - breakItem.setCurrChainId(this.getCurrChainId()); - setLoopIndex(breakItem, i); - breakItem.execute(slotIndex); - boolean isBreak = breakItem.getItemResultMetaValue(slotIndex); - if (isBreak) { - break; - } - } - } - }finally { - removeLoopIndex(executableItem); - } - } + try { + if (!isParallel()) { + //串行循环执行 + for (int i = 0; i < forCount; i++) { + executableItem.setCurrChainId(this.getCurrChainId()); + // 设置循环index + setLoopIndex(executableItem, i); + executableItem.execute(slotIndex); + // 如果break组件不为空,则去执行 + if (ObjectUtil.isNotNull(breakItem)) { + breakItem.setCurrChainId(this.getCurrChainId()); + setLoopIndex(breakItem, i); + breakItem.execute(slotIndex); + boolean isBreak = breakItem.getItemResultMetaValue(slotIndex); + if (isBreak) { + break; + } + } + } + }else{ + //并行循环执行 + //存储所有的并行执行子项的CompletableFuture + List> futureList = new ArrayList<>(); + //获取并行循环的线程池 + ExecutorService parallelExecutor = ExecutorHelper.loadInstance().buildLoopParallelExecutor(); + for (int i = 0; i < forCount; i++){ + //提交异步任务 + CompletableFuture future = + CompletableFuture.supplyAsync(new LoopParallelSupplier(executableItem, this.getCurrChainId(), slotIndex, i), parallelExecutor); + futureList.add(future); + if (ObjectUtil.isNotNull(breakItem)) { + breakItem.setCurrChainId(this.getCurrChainId()); + setLoopIndex(breakItem, i); + breakItem.execute(slotIndex); + boolean isBreak = breakItem.getItemResultMetaValue(slotIndex); + if (isBreak) { + break; + } + } + } + //等待所有的异步执行完毕 + CompletableFuture resultCompletableFuture = CompletableFuture.allOf(futureList.toArray(new CompletableFuture[]{})); + resultCompletableFuture.join(); + //获取所有的执行结果,如果有失败的,那么需要抛出异常 + for (CompletableFuture future : futureList) { + LoopFutureObj loopFutureObj = future.get(); + if (!loopFutureObj.isSuccess()) { + throw loopFutureObj.getEx(); + } + } + } + } finally { + removeLoopIndex(executableItem); + } + } - @Override - public ConditionTypeEnum getConditionType() { - return ConditionTypeEnum.TYPE_FOR; - } + @Override + public ConditionTypeEnum getConditionType() { + return ConditionTypeEnum.TYPE_FOR; + } - public Node getForNode() { - return (Node) this.getExecutableOne(ConditionKey.FOR_KEY); - } + public Node getForNode() { + return (Node) this.getExecutableOne(ConditionKey.FOR_KEY); + } - public void setForNode(Node forNode) { - this.addExecutable(ConditionKey.FOR_KEY, forNode); - } + public void setForNode(Node forNode) { + this.addExecutable(ConditionKey.FOR_KEY, forNode); + } } diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/element/condition/IteratorCondition.java b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/element/condition/IteratorCondition.java index 3781d7e26..8f182392e 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/element/condition/IteratorCondition.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/element/condition/IteratorCondition.java @@ -6,83 +6,116 @@ import com.yomahub.liteflow.enums.ConditionTypeEnum; import com.yomahub.liteflow.exception.NoIteratorNodeException; import com.yomahub.liteflow.flow.element.Executable; import com.yomahub.liteflow.flow.element.Node; +import com.yomahub.liteflow.flow.parallel.LoopFutureObj; import com.yomahub.liteflow.slot.DataBus; import com.yomahub.liteflow.slot.Slot; +import com.yomahub.liteflow.thread.ExecutorHelper; import com.yomahub.liteflow.util.LiteFlowProxyUtil; +import java.util.ArrayList; import java.util.Iterator; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; public class IteratorCondition extends LoopCondition { - @Override - public void executeCondition(Integer slotIndex) throws Exception { - Slot slot = DataBus.getSlot(slotIndex); - Node iteratorNode = this.getIteratorNode(); + @Override + public void executeCondition(Integer slotIndex) throws Exception { + Slot slot = DataBus.getSlot(slotIndex); + Node iteratorNode = this.getIteratorNode(); - if (ObjectUtil.isNull(iteratorNode)) { - String errorInfo = StrUtil.format("[{}]:no iterator-node found", slot.getRequestId()); - throw new NoIteratorNodeException(errorInfo); - } + if (ObjectUtil.isNull(iteratorNode)) { + String errorInfo = StrUtil.format("[{}]:no iterator-node found", slot.getRequestId()); + throw new NoIteratorNodeException(errorInfo); + } - // 先去判断isAccess方法,如果isAccess方法都返回false,整个ITERATOR表达式不执行 - if (!iteratorNode.isAccess(slotIndex)) { - return; - } + // 先去判断isAccess方法,如果isAccess方法都返回false,整个ITERATOR表达式不执行 + if (!iteratorNode.isAccess(slotIndex)) { + return; + } - // 执行Iterator组件 - iteratorNode.setCurrChainId(this.getCurrChainId()); - iteratorNode.execute(slotIndex); + // 执行Iterator组件 + iteratorNode.setCurrChainId(this.getCurrChainId()); + iteratorNode.execute(slotIndex); - Iterator it = iteratorNode.getItemResultMetaValue(slotIndex); + Iterator it = iteratorNode.getItemResultMetaValue(slotIndex); - // 获得要循环的可执行对象 - Executable executableItem = this.getDoExecutor(); + // 获得要循环的可执行对象 + Executable executableItem = this.getDoExecutor(); - // 获取Break节点 - Executable breakItem = this.getBreakItem(); + // 获取Break节点 + Executable breakItem = this.getBreakItem(); - try{ - int index = 0; - while (it.hasNext()) { - Object itObj = it.next(); + try { + int index = 0; + if (!this.isParallel()) { + //原本的串行循环执行 + while (it.hasNext()) { + Object itObj = it.next(); - executableItem.setCurrChainId(this.getCurrChainId()); - // 设置循环index - setLoopIndex(executableItem, index); - // 设置循环迭代器对象 - setCurrLoopObject(executableItem, itObj); - // 执行可执行对象 - executableItem.execute(slotIndex); - // 如果break组件不为空,则去执行 - if (ObjectUtil.isNotNull(breakItem)) { - breakItem.setCurrChainId(this.getCurrChainId()); - setLoopIndex(breakItem, index); - setCurrLoopObject(breakItem, itObj); - breakItem.execute(slotIndex); - boolean isBreak = breakItem.getItemResultMetaValue(slotIndex); - if (isBreak) { - break; - } - } - index++; - } - }finally{ - removeLoopIndex(executableItem); - removeCurrLoopObject(executableItem); - } - } + executableItem.setCurrChainId(this.getCurrChainId()); + // 设置循环index + setLoopIndex(executableItem, index); + // 设置循环迭代器对象 + setCurrLoopObject(executableItem, itObj); + // 执行可执行对象 + executableItem.execute(slotIndex); + // 如果break组件不为空,则去执行 + if (ObjectUtil.isNotNull(breakItem)) { + breakItem.setCurrChainId(this.getCurrChainId()); + setLoopIndex(breakItem, index); + setCurrLoopObject(breakItem, itObj); + breakItem.execute(slotIndex); + boolean isBreak = breakItem.getItemResultMetaValue(slotIndex); + if (isBreak) { + break; + } + } + index++; + } + } else { + //并行循环执行 + //存储所有的并行执行子项的CompletableFuture + List> futureList = new ArrayList<>(); + //获取并行循环的线程池 + ExecutorService parallelExecutor = ExecutorHelper.loadInstance().buildLoopParallelExecutor(); + while (it.hasNext()) { + Object itObj = it.next(); + //提交异步任务 + CompletableFuture future = + CompletableFuture.supplyAsync(new LoopParallelSupplier(executableItem, this.getCurrChainId(), slotIndex, index, itObj), parallelExecutor); + futureList.add(future); + //break判断 + if (ObjectUtil.isNotNull(breakItem)) { + breakItem.setCurrChainId(this.getCurrChainId()); + setLoopIndex(breakItem, index); + breakItem.execute(slotIndex); + boolean isBreak = breakItem.getItemResultMetaValue(slotIndex); + if (isBreak) { + break; + } + } + index++; + } + } + } finally { + removeLoopIndex(executableItem); + removeCurrLoopObject(executableItem); + } + } - @Override - public ConditionTypeEnum getConditionType() { - return ConditionTypeEnum.TYPE_ITERATOR; - } + @Override + public ConditionTypeEnum getConditionType() { + return ConditionTypeEnum.TYPE_ITERATOR; + } - public Node getIteratorNode() { - return (Node) this.getExecutableOne(ConditionKey.ITERATOR_KEY); - } + public Node getIteratorNode() { + return (Node) this.getExecutableOne(ConditionKey.ITERATOR_KEY); + } - public void setIteratorNode(Node iteratorNode) { - this.addExecutable(ConditionKey.ITERATOR_KEY, iteratorNode); - } + public void setIteratorNode(Node iteratorNode) { + this.addExecutable(ConditionKey.ITERATOR_KEY, iteratorNode); + } } diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/element/condition/LoopCondition.java b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/element/condition/LoopCondition.java index 2861e59c6..e04375b52 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/element/condition/LoopCondition.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/element/condition/LoopCondition.java @@ -4,6 +4,9 @@ import com.yomahub.liteflow.flow.element.Chain; import com.yomahub.liteflow.flow.element.Condition; import com.yomahub.liteflow.flow.element.Executable; import com.yomahub.liteflow.flow.element.Node; +import com.yomahub.liteflow.flow.parallel.LoopFutureObj; + +import java.util.function.Supplier; /** * 循环Condition的抽象类 主要继承对象有ForCondition和WhileCondition @@ -83,4 +86,47 @@ public abstract class LoopCondition extends Condition { this.parallel = parallel; } + // 循环并行执行的Supplier封装 + public class LoopParallelSupplier implements Supplier { + private final Executable executableItem; + private final String currChainId; + private final Integer slotIndex; + private final Integer loopIndex; + private final Object itObj; + + public LoopParallelSupplier(Executable executableItem, String currChainId, Integer slotIndex, Integer loopIndex) { + this.executableItem = executableItem; + this.currChainId = currChainId; + this.slotIndex = slotIndex; + this.loopIndex = loopIndex; + this.itObj = null; + } + + public LoopParallelSupplier(Executable executableItem, String currChainId, Integer slotIndex, Integer loopIndex, Object itObj) { + this.executableItem = executableItem; + this.currChainId = currChainId; + this.slotIndex = slotIndex; + this.loopIndex = loopIndex; + this.itObj = itObj; + } + + + @Override + public LoopFutureObj get() { + try { + executableItem.setCurrChainId(this.currChainId); + // 设置循环index + setLoopIndex(executableItem, loopIndex); + //IteratorCondition的情况下,需要设置当前循环对象 + if(itObj != null){ + setCurrLoopObject(executableItem, itObj); + } + executableItem.execute(slotIndex); + return LoopFutureObj.success(executableItem.getId()); + } catch (Exception e) { + return LoopFutureObj.fail(executableItem.getId(), e); + } + } + } + } diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/element/condition/WhileCondition.java b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/element/condition/WhileCondition.java index 441066f8f..b6aeffd3a 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/element/condition/WhileCondition.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/element/condition/WhileCondition.java @@ -4,6 +4,13 @@ import cn.hutool.core.util.ObjectUtil; import com.yomahub.liteflow.enums.ConditionTypeEnum; import com.yomahub.liteflow.flow.element.Executable; import com.yomahub.liteflow.flow.element.Node; +import com.yomahub.liteflow.flow.parallel.LoopFutureObj; +import com.yomahub.liteflow.thread.ExecutorHelper; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; /** * 循环条件Condition @@ -30,21 +37,55 @@ public class WhileCondition extends LoopCondition { // 循环执行 int index = 0; - while (getWhileResult(slotIndex)) { - executableItem.setCurrChainId(this.getCurrChainId()); - setLoopIndex(executableItem, index); - executableItem.execute(slotIndex); - // 如果break组件不为空,则去执行 - if (ObjectUtil.isNotNull(breakItem)) { - breakItem.setCurrChainId(this.getCurrChainId()); - setLoopIndex(breakItem, index); - breakItem.execute(slotIndex); - boolean isBreak = breakItem.getItemResultMetaValue(slotIndex); - if (isBreak) { - break; + if(!this.isParallel()){ + //串行循环 + while (getWhileResult(slotIndex)) { + executableItem.setCurrChainId(this.getCurrChainId()); + setLoopIndex(executableItem, index); + executableItem.execute(slotIndex); + // 如果break组件不为空,则去执行 + if (ObjectUtil.isNotNull(breakItem)) { + breakItem.setCurrChainId(this.getCurrChainId()); + setLoopIndex(breakItem, index); + breakItem.execute(slotIndex); + boolean isBreak = breakItem.getItemResultMetaValue(slotIndex); + if (isBreak) { + break; + } + } + index++; + } + }else{ + //并行循环逻辑 + List> futureList = new ArrayList<>(); + //获取并行循环的线程池 + ExecutorService parallelExecutor = ExecutorHelper.loadInstance().buildLoopParallelExecutor(); + while (getWhileResult(slotIndex)){ + CompletableFuture future = + CompletableFuture.supplyAsync(new LoopParallelSupplier(executableItem, this.getCurrChainId(), slotIndex, index), parallelExecutor); + futureList.add(future); + //break判断 + if (ObjectUtil.isNotNull(breakItem)) { + breakItem.setCurrChainId(this.getCurrChainId()); + setLoopIndex(breakItem, index); + breakItem.execute(slotIndex); + boolean isBreak = breakItem.getItemResultMetaValue(slotIndex); + if (isBreak) { + break; + } + } + index++; + } + //等待所有的异步执行完毕 + CompletableFuture resultCompletableFuture = CompletableFuture.allOf(futureList.toArray(new CompletableFuture[]{})); + resultCompletableFuture.join(); + //获取所有的执行结果,如果有失败的,那么需要抛出异常 + for (CompletableFuture future : futureList) { + LoopFutureObj loopFutureObj = future.get(); + if (!loopFutureObj.isSuccess()) { + throw loopFutureObj.getEx(); } } - index++; } } diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/LoopFutureObj.java b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/LoopFutureObj.java new file mode 100644 index 000000000..3eca8c157 --- /dev/null +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/LoopFutureObj.java @@ -0,0 +1,54 @@ +package com.yomahub.liteflow.flow.parallel; + +/** + * 并行循环各子项的执行结果对象 + * + * @author zhhhhy + * @since 2.10.5 + */ + +public class LoopFutureObj { + private String executorName; + private boolean success; + private Exception ex; + + + public static LoopFutureObj success(String executorName) { + LoopFutureObj result = new LoopFutureObj(); + result.setSuccess(true); + result.setExecutorName(executorName); + return result; + } + + public static LoopFutureObj fail(String executorName, Exception ex) { + LoopFutureObj result = new LoopFutureObj(); + result.setSuccess(false); + result.setExecutorName(executorName); + result.setEx(ex); + return result; + } + + public Exception getEx() { + return ex; + } + + public String getExecutorName() { + return executorName; + } + + public boolean isSuccess() { + return success; + } + + public void setEx(Exception ex) { + this.ex = ex; + } + + public void setExecutorName(String executorName) { + this.executorName = executorName; + } + + public void setSuccess(boolean success) { + this.success = success; + } +}