From 9fbb615e92a471d876eea2df9ddbd5a0d6fbdc7b Mon Sep 17 00:00:00 2001 From: luoyi <972849752@qq.com> Date: Thu, 14 Sep 2023 22:35:26 +0800 Subject: [PATCH] =?UTF-8?q?enhancement=20#I7XAIB=20=E7=AE=80=E5=8C=96=20Sp?= =?UTF-8?q?ecifyParallelExecutor=20=E6=89=A7=E8=A1=8C=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../strategy/SpecifyParallelExecutor.java | 50 +++++++++---------- 1 file changed, 23 insertions(+), 27 deletions(-) diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/SpecifyParallelExecutor.java b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/SpecifyParallelExecutor.java index 4ade6aa52..9474232d4 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/SpecifyParallelExecutor.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/SpecifyParallelExecutor.java @@ -1,19 +1,16 @@ package com.yomahub.liteflow.flow.parallel.strategy; -import cn.hutool.core.collection.CollUtil; import cn.hutool.core.util.ObjUtil; -import com.yomahub.liteflow.flow.element.Executable; import com.yomahub.liteflow.flow.element.condition.FinallyCondition; import com.yomahub.liteflow.flow.element.condition.PreCondition; import com.yomahub.liteflow.flow.element.condition.WhenCondition; import com.yomahub.liteflow.flow.parallel.WhenFutureObj; import com.yomahub.liteflow.thread.ExecutorHelper; +import java.util.ArrayList; import java.util.List; -import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; -import java.util.stream.Collectors; /** * 完成指定任务执行器,使用 ID 进行比较 @@ -34,8 +31,14 @@ public class SpecifyParallelExecutor extends ParallelStrategyExecutor { // 此方法其实只会初始化一次Executor,不会每次都会初始化。Executor 是唯一的 ExecutorService parallelExecutor = ExecutorHelper.loadInstance().buildWhenExecutor(whenCondition.getThreadExecutorClass()); - // 过滤指定 ID 的任务,且该任务只会有一个或者没有 - Map> specifyExecutableMap = whenCondition.getExecutableList() + // 指定任务 + final CompletableFuture[] specifyTask = { null }; + + // 所有任务集合 + List> allTaskList = new ArrayList<>(); + + // 遍历 when 所有 node,进行筛选及处理 + whenCondition.getExecutableList() .stream() .filter(executable -> !(executable instanceof PreCondition) && !(executable instanceof FinallyCondition)) .filter(executable -> { @@ -46,33 +49,26 @@ public class SpecifyParallelExecutor extends ParallelStrategyExecutor { return false; } }) - .collect(Collectors.partitioningBy(executable -> whenCondition.getSpecifyId().equals(executable.getId()))); + .forEach(executable -> { + // 处理 task,封装成 CompletableFuture 对象 + CompletableFuture completableFutureTask = wrappedFutureObj(executable, parallelExecutor, whenCondition, currChainName, slotIndex); + // 存在 must 指定 ID 的 task,且该任务只会有一个或者没有 + if (whenCondition.getSpecifyId().equals(executable.getId())) { + // 设置指定任务 future 对象 + specifyTask[0] = completableFutureTask; + } + // 组合所有任务 + allTaskList.add(completableFutureTask); + }); - CompletableFuture specifyTask = null; - - // 处理非指定 task,封装成 CompletableFuture 对象,最终仍是会组合所有任务到集合中 - List> allTaskList = specifyExecutableMap.get(Boolean.FALSE) - .stream() - .map(executable -> wrappedFutureObj(executable, parallelExecutor, whenCondition, currChainName, slotIndex)) - .collect(Collectors.toList()); - - if (CollUtil.isNotEmpty(specifyExecutableMap.get(Boolean.TRUE))) { - // 存在 must 指定的 task - CompletableFuture specifyCompletableFuture = wrappedFutureObj(specifyExecutableMap.get(Boolean.TRUE).get(0), parallelExecutor, whenCondition, currChainName, slotIndex); - // 组合所有任务 - allTaskList.add(specifyCompletableFuture); - // 设置指定任务 future 对象 - specifyTask = specifyCompletableFuture; - } - - if (ObjUtil.isNull(specifyTask)) { + if (ObjUtil.isNull(specifyTask[0])) { LOG.warn("The specified task[{}] was not found, waiting for all tasks to complete by default.", whenCondition.getSpecifyId()); // 不存在指定任务,则需要等待所有任务都执行完成 - specifyTask = CompletableFuture.allOf(allTaskList.toArray(new CompletableFuture[] {})); + specifyTask[0] = CompletableFuture.allOf(allTaskList.toArray(new CompletableFuture[] {})); } // 结果处理 - this.handleTaskResult(whenCondition, slotIndex, allTaskList, specifyTask); + this.handleTaskResult(whenCondition, slotIndex, allTaskList, specifyTask[0]); }