From a036ae00ee7ab434044b2b596ff2703ed050e30a Mon Sep 17 00:00:00 2001 From: "everywhere.z" Date: Fri, 8 Mar 2024 12:58:34 +0800 Subject: [PATCH] =?UTF-8?q?enhancement=20#I8YDGE=20=E5=9C=A8=E8=BF=AD?= =?UTF-8?q?=E4=BB=A3=E5=BE=AA=E7=8E=AF=E7=BB=84=E4=BB=B6=E4=B8=AD=EF=BC=8C?= =?UTF-8?q?=E6=97=A0=E6=B3=95=E8=8E=B7=E5=8F=96=E5=AD=90=E6=B5=81=E7=A8=8B?= =?UTF-8?q?=E4=BC=A0=E9=80=92=E7=9A=84=E8=AF=B7=E6=B1=82=E5=8F=82=E6=95=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../parallel/strategy/ParallelStrategyExecutor.java | 12 ++++++------ .../parallel/strategy/SpecifyParallelExecutor.java | 6 +++--- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/ParallelStrategyExecutor.java b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/ParallelStrategyExecutor.java index e420876f2..4ca07a9d4 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/ParallelStrategyExecutor.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/ParallelStrategyExecutor.java @@ -45,16 +45,16 @@ public abstract class ParallelStrategyExecutor { * @param executable * @param parallelExecutor * @param whenCondition - * @param currChainName + * @param currChainId * @param slotIndex * @return */ protected CompletableFuture wrappedFutureObj(Executable executable, ExecutorService parallelExecutor, - WhenCondition whenCondition, String currChainName, Integer slotIndex) { + WhenCondition whenCondition, String currChainId, Integer slotIndex) { // 套入 CompletableFutureTimeout 方法进行超时判断,如果超时则用 WhenFutureObj.timeOut 返回超时的对象 // 第 2 个参数是主要的本体 CompletableFuture,传入了 ParallelSupplier 和线程池对象 return CompletableFutureExpand.completeOnTimeout( - CompletableFuture.supplyAsync(new ParallelSupplier(executable, currChainName, slotIndex), parallelExecutor), + CompletableFuture.supplyAsync(new ParallelSupplier(executable, currChainId, slotIndex), parallelExecutor), whenCondition.getMaxWaitTime(), whenCondition.getMaxWaitTimeUnit(), WhenFutureObj.timeOut(executable.getId())); @@ -149,7 +149,7 @@ public abstract class ParallelStrategyExecutor { */ protected List> getWhenAllTaskList(WhenCondition whenCondition, Integer slotIndex) { - String currChainName = whenCondition.getCurrChainId(); + String currChainId = whenCondition.getCurrChainId(); // 设置 whenCondition 参数 this.setWhenConditionParams(whenCondition); @@ -159,8 +159,8 @@ public abstract class ParallelStrategyExecutor { // 这里主要是做了封装 CompletableFuture 对象,用 lambda 表达式做了很多事情,这句代码要仔细理清 // 根据 condition.getNodeList() 的集合进行流处理,用 map 进行把 executable 对象转换成 List> - List> completableFutureList = filterWhenTaskList(whenCondition.getExecutableList(), slotIndex, currChainName) - .map(executable -> wrappedFutureObj(executable, parallelExecutor, whenCondition, currChainName, slotIndex)) + List> completableFutureList = filterWhenTaskList(whenCondition.getExecutableList(), slotIndex, currChainId) + .map(executable -> wrappedFutureObj(executable, parallelExecutor, whenCondition, currChainId, slotIndex)) .collect(Collectors.toList()); return completableFutureList; diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/SpecifyParallelExecutor.java b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/SpecifyParallelExecutor.java index 7f2aaf92d..bfa89cdc5 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/SpecifyParallelExecutor.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/SpecifyParallelExecutor.java @@ -20,7 +20,7 @@ public class SpecifyParallelExecutor extends ParallelStrategyExecutor { @Override public void execute(WhenCondition whenCondition, Integer slotIndex) throws Exception { - String currChainName = whenCondition.getCurrChainId(); + String currChainId = whenCondition.getCurrChainId(); // 设置 whenCondition 参数 this.setWhenConditionParams(whenCondition); @@ -41,10 +41,10 @@ public class SpecifyParallelExecutor extends ParallelStrategyExecutor { List> allTaskList = new ArrayList<>(); // 遍历 when 所有 node,进行筛选及处理 - filterWhenTaskList(whenCondition.getExecutableList(), slotIndex, currChainName) + filterWhenTaskList(whenCondition.getExecutableList(), slotIndex, currChainId) .forEach(executable -> { // 处理 task,封装成 CompletableFuture 对象 - CompletableFuture completableFutureTask = wrappedFutureObj(executable, parallelExecutor, whenCondition, currChainName, slotIndex); + CompletableFuture completableFutureTask = wrappedFutureObj(executable, parallelExecutor, whenCondition, currChainId, slotIndex); // 存在 must 指定 ID 的 task,且该任务只会有一个或者没有 if (whenCondition.getSpecifyIdSet().contains(executable.getId())) { // 设置指定任务 future 对象