diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/element/condition/ForCondition.java b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/element/condition/ForCondition.java index f3aeabaae..50f0a56e3 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/element/condition/ForCondition.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/element/condition/ForCondition.java @@ -33,13 +33,15 @@ public class ForCondition extends LoopCondition { throw new NoForNodeException(errorInfo); } + // 提前设置 chainId,避免无法在 isAccess 方法中获取到 + forNode.setCurrChainId(this.getCurrChainId()); + // 先去判断isAccess方法,如果isAccess方法都返回false,整个FOR表达式不执行 - if (!this.getForNode().isAccess(slotIndex)) { + if (!forNode.isAccess(slotIndex)) { return; } // 执行forCount组件 - forNode.setCurrChainId(this.getCurrChainId()); forNode.execute(slotIndex); // 获得循环次数 diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/element/condition/IfCondition.java b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/element/condition/IfCondition.java index 5fa272a30..91f3a8e41 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/element/condition/IfCondition.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/element/condition/IfCondition.java @@ -3,7 +3,8 @@ package com.yomahub.liteflow.flow.element.condition; import cn.hutool.core.util.ObjectUtil; import cn.hutool.core.util.StrUtil; import com.yomahub.liteflow.enums.ConditionTypeEnum; -import com.yomahub.liteflow.exception.*; +import com.yomahub.liteflow.exception.IfTargetCannotBePreOrFinallyException; +import com.yomahub.liteflow.exception.NoIfTrueNodeException; import com.yomahub.liteflow.flow.element.Condition; import com.yomahub.liteflow.flow.element.Executable; import com.yomahub.liteflow.slot.DataBus; @@ -21,13 +22,15 @@ public class IfCondition extends Condition { public void executeCondition(Integer slotIndex) throws Exception { Executable ifItem = this.getIfItem(); + // 提前设置 chainId,避免无法在 isAccess 方法中获取到 + ifItem.setCurrChainId(this.getCurrChainId()); + // 先去判断isAccess方法,如果isAccess方法都返回false,整个IF表达式不执行 if (!ifItem.isAccess(slotIndex)) { return; } // 先执行IF节点 - ifItem.setCurrChainId(this.getCurrChainId()); ifItem.execute(slotIndex); // 拿到If执行过的结果 diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/element/condition/IteratorCondition.java b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/element/condition/IteratorCondition.java index f80ac43d7..2a2a640b0 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/element/condition/IteratorCondition.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/element/condition/IteratorCondition.java @@ -29,13 +29,15 @@ public class IteratorCondition extends LoopCondition { throw new NoIteratorNodeException(errorInfo); } + // 提前设置 chainId,避免无法在 isAccess 方法中获取到 + iteratorNode.setCurrChainId(this.getCurrChainId()); + // 先去判断isAccess方法,如果isAccess方法都返回false,整个ITERATOR表达式不执行 if (!iteratorNode.isAccess(slotIndex)) { return; } // 执行Iterator组件 - iteratorNode.setCurrChainId(this.getCurrChainId()); iteratorNode.execute(slotIndex); Iterator it = iteratorNode.getItemResultMetaValue(slotIndex); diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/element/condition/SwitchCondition.java b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/element/condition/SwitchCondition.java index 2a208d8ee..3c575f2f4 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/element/condition/SwitchCondition.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/element/condition/SwitchCondition.java @@ -1,13 +1,10 @@ package com.yomahub.liteflow.flow.element.condition; -import cn.hutool.core.collection.ListUtil; import cn.hutool.core.util.ObjectUtil; import cn.hutool.core.util.StrUtil; import com.yomahub.liteflow.enums.ConditionTypeEnum; -import com.yomahub.liteflow.enums.NodeTypeEnum; import com.yomahub.liteflow.exception.NoSwitchTargetNodeException; import com.yomahub.liteflow.exception.SwitchTargetCannotBePreOrFinallyException; -import com.yomahub.liteflow.exception.SwitchTypeErrorException; import com.yomahub.liteflow.flow.element.Condition; import com.yomahub.liteflow.flow.element.Executable; import com.yomahub.liteflow.flow.element.Node; @@ -35,13 +32,15 @@ public class SwitchCondition extends Condition { // 获取target List List targetList = this.getTargetList(); + // 提前设置 chainId,避免无法在 isAccess 方法中获取到 + switchNode.setCurrChainId(this.getCurrChainId()); + // 先去判断isAccess方法,如果isAccess方法都返回false,整个SWITCH表达式不执行 if (!switchNode.isAccess(slotIndex)) { return; } // 先执行switch节点 - switchNode.setCurrChainId(this.getCurrChainId()); switchNode.execute(slotIndex); // 拿到switch节点的结果 diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/element/condition/WhileCondition.java b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/element/condition/WhileCondition.java index df85bfe86..0e00e761f 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/element/condition/WhileCondition.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/element/condition/WhileCondition.java @@ -3,7 +3,6 @@ package com.yomahub.liteflow.flow.element.condition; import cn.hutool.core.util.ObjectUtil; import com.yomahub.liteflow.enums.ConditionTypeEnum; import com.yomahub.liteflow.flow.element.Executable; -import com.yomahub.liteflow.flow.element.Node; import com.yomahub.liteflow.flow.parallel.LoopFutureObj; import com.yomahub.liteflow.thread.ExecutorHelper; @@ -24,6 +23,9 @@ public class WhileCondition extends LoopCondition { public void executeCondition(Integer slotIndex) throws Exception { Executable whileItem = this.getWhileItem(); + // 提前设置 chainId,避免无法在 isAccess 方法中获取到 + whileItem.setCurrChainId(this.getCurrChainId()); + // 先去判断isAccess方法,如果isAccess方法都返回false,整个WHILE表达式不执行 if (!whileItem.isAccess(slotIndex)) { return; @@ -84,7 +86,6 @@ public class WhileCondition extends LoopCondition { private boolean getWhileResult(Integer slotIndex, int loopIndex) throws Exception { Executable whileItem = this.getWhileItem(); // 执行while组件 - whileItem.setCurrChainId(this.getCurrChainId()); setLoopIndex(whileItem, loopIndex); whileItem.execute(slotIndex); diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/AllOfParallelExecutor.java b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/AllOfParallelExecutor.java index 2cb606f1e..6b637dbe4 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/AllOfParallelExecutor.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/AllOfParallelExecutor.java @@ -33,7 +33,7 @@ public class AllOfParallelExecutor extends ParallelStrategyExecutor { // 在 allOf 这个场景中,不需要过滤 @Override - protected Stream filterAccess(Stream stream, Integer slotIndex) { + protected Stream filterAccess(Stream stream, Integer slotIndex, String currentChainId) { return stream; } diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/ParallelStrategyExecutor.java b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/ParallelStrategyExecutor.java index c0cabd242..e420876f2 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/ParallelStrategyExecutor.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/ParallelStrategyExecutor.java @@ -11,7 +11,6 @@ import com.yomahub.liteflow.flow.element.condition.FinallyCondition; import com.yomahub.liteflow.flow.element.condition.PreCondition; import com.yomahub.liteflow.flow.element.condition.WhenCondition; import com.yomahub.liteflow.flow.parallel.CompletableFutureExpand; -import com.yomahub.liteflow.flow.parallel.CompletableFutureTimeout; import com.yomahub.liteflow.flow.parallel.ParallelSupplier; import com.yomahub.liteflow.flow.parallel.WhenFutureObj; import com.yomahub.liteflow.log.LFLog; @@ -89,20 +88,23 @@ public abstract class ParallelStrategyExecutor { * 过滤 WHEN 待执行任务 * @param executableList 所有任务列表 * @param slotIndex + * @param currentChainId 当前执行的 chainId * @return */ - protected Stream filterWhenTaskList(List executableList, Integer slotIndex) { + protected Stream filterWhenTaskList(List executableList, Integer slotIndex, String currentChainId) { // 1.先进行过滤,前置和后置组件过滤掉,因为在 EL Chain 处理的时候已经提出来了 // 2.过滤 isAccess 为 false 的情况,因为不过滤这个的话,如果加上了 any,那么 isAccess 为 false 那就是最快的了 Stream stream = executableList.stream() .filter(executable -> !(executable instanceof PreCondition) && !(executable instanceof FinallyCondition)); - return filterAccess(stream, slotIndex); + return filterAccess(stream, slotIndex, currentChainId); } // 过滤 isAccess 的方法,默认实现,同时为避免同一个 node 的 isAccess 方法重复执行,给 node 设置 isAccess 方法执行结果 - protected Stream filterAccess(Stream stream, Integer slotIndex) { + protected Stream filterAccess(Stream stream, Integer slotIndex, String currentChainId) { return stream.filter(executable -> { try { + // 提前设置 chainId,避免无法在 isAccess 方法中获取到 + executable.setCurrChainId(currentChainId); boolean access = executable.isAccess(slotIndex); if (executable instanceof Node) { ((Node) executable).setAccessResult(access); @@ -150,14 +152,14 @@ public abstract class ParallelStrategyExecutor { String currChainName = whenCondition.getCurrChainId(); // 设置 whenCondition 参数 - setWhenConditionParams(whenCondition); + this.setWhenConditionParams(whenCondition); // 获取 WHEN 所需线程池 ExecutorService parallelExecutor = getWhenExecutorService(whenCondition); // 这里主要是做了封装 CompletableFuture 对象,用 lambda 表达式做了很多事情,这句代码要仔细理清 // 根据 condition.getNodeList() 的集合进行流处理,用 map 进行把 executable 对象转换成 List> - List> completableFutureList = filterWhenTaskList(whenCondition.getExecutableList(), slotIndex) + List> completableFutureList = filterWhenTaskList(whenCondition.getExecutableList(), slotIndex, currChainName) .map(executable -> wrappedFutureObj(executable, parallelExecutor, whenCondition, currChainName, slotIndex)) .collect(Collectors.toList()); diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/SpecifyParallelExecutor.java b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/SpecifyParallelExecutor.java index e9a78ef9a..7f2aaf92d 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/SpecifyParallelExecutor.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/SpecifyParallelExecutor.java @@ -41,7 +41,7 @@ public class SpecifyParallelExecutor extends ParallelStrategyExecutor { List> allTaskList = new ArrayList<>(); // 遍历 when 所有 node,进行筛选及处理 - filterWhenTaskList(whenCondition.getExecutableList(), slotIndex) + filterWhenTaskList(whenCondition.getExecutableList(), slotIndex, currChainName) .forEach(executable -> { // 处理 task,封装成 CompletableFuture 对象 CompletableFuture completableFutureTask = wrappedFutureObj(executable, parallelExecutor, whenCondition, currChainName, slotIndex);