diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/ParallelStrategyExecutor.java b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/ParallelStrategyExecutor.java index cf3ae9d43..cd390352a 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/ParallelStrategyExecutor.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/ParallelStrategyExecutor.java @@ -26,6 +26,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import java.util.stream.Stream; /** * 并发策略执行器抽象类 @@ -81,6 +82,51 @@ public abstract class ParallelStrategyExecutor { } } + /** + * 过滤 WHEN 待执行任务 + * @param executableList 所有任务列表 + * @param slotIndex + * @return + */ + protected Stream filterWhenTaskList(List executableList, Integer slotIndex) { + // 1.先进行过滤,前置和后置组件过滤掉,因为在 EL Chain 处理的时候已经提出来了 + // 2.过滤 isAccess 为 false 的情况,因为不过滤这个的话,如果加上了 any,那么 isAccess 为 false 那就是最快的了 + return executableList.stream() + .filter(executable -> !(executable instanceof PreCondition) && !(executable instanceof FinallyCondition)) + .filter(executable -> { + try { + return executable.isAccess(slotIndex); + } catch (Exception e) { + LOG.error("there was an error when executing the when component isAccess", e); + return false; + } + }); + } + + /** + * 获取 WHEN 所需线程池 + * @param whenCondition + * @return + */ + protected ExecutorService getWhenExecutorService(WhenCondition whenCondition) { + + LiteflowConfig liteflowConfig = LiteflowConfigGetter.get(); + + // 如果设置了线程池隔离,则每个 when 都会有对应的线程池,这是为了避免多层嵌套时如果线程池数量不够时出现单个线程池死锁。用线程池隔离的方式会更加好 + // 如果 when 没有超多层的嵌套,还是用默认的比较好。 + // 默认设置不隔离。也就是说,默认情况是一个线程池类一个实例,如果什么都不配置,那也就是在 when 的情况下,全局一个线程池。 + ExecutorService parallelExecutor; + + if (BooleanUtil.isTrue(liteflowConfig.getWhenThreadPoolIsolate())) { + parallelExecutor = ExecutorHelper.loadInstance().buildWhenExecutorWithHash(whenCondition.getThreadExecutorClass(), String.valueOf(whenCondition.hashCode())); + } else { + parallelExecutor = ExecutorHelper.loadInstance().buildWhenExecutor(whenCondition.getThreadExecutorClass()); + } + + return parallelExecutor; + + } + /** * 获取所有任务 CompletableFuture 集合 * @param whenCondition @@ -91,36 +137,15 @@ public abstract class ParallelStrategyExecutor { String currChainName = whenCondition.getCurrChainId(); - LiteflowConfig liteflowConfig = LiteflowConfigGetter.get(); - - // 如果设置了线程池隔离,则每个when都会有对应的线程池,这是为了避免多层嵌套时如果线程池数量不够时出现单个线程池死锁。用线程池隔离的方式会更加好 - // 如果when没有超多层的嵌套,还是用默认的比较好。 - // 默认设置不隔离。也就是说,默认情况是一个线程池类一个实例,如果什么都不配置,那也就是在when的情况下,全局一个线程池。 - ExecutorService parallelExecutor; - if (BooleanUtil.isTrue(liteflowConfig.getWhenThreadPoolIsolate())){ - parallelExecutor = ExecutorHelper.loadInstance().buildWhenExecutorWithHash(whenCondition.getThreadExecutorClass(), String.valueOf(whenCondition.hashCode())); - }else{ - parallelExecutor = ExecutorHelper.loadInstance().buildWhenExecutor(whenCondition.getThreadExecutorClass()); - } - // 设置 whenCondition 参数 setWhenConditionParams(whenCondition); - // 这里主要是做了封装 CompletableFuture 对象,用 lumbda 表达式做了很多事情,这句代码要仔细理清 - // 1.先进行过滤,前置和后置组件过滤掉,因为在 EL Chain 处理的时候已经提出来了 - // 2.过滤 isAccess 为 false 的情况,因为不过滤这个的话,如果加上了 any,那么 isAccess 为 false 那就是最快的了 - // 3.根据 condition.getNodeList() 的集合进行流处理,用 map 进行把 executable 对象转换成 List> - List> completableFutureList = whenCondition.getExecutableList() - .stream() - .filter(executable -> !(executable instanceof PreCondition) && !(executable instanceof FinallyCondition)) - .filter(executable -> { - try { - return executable.isAccess(slotIndex); - } catch (Exception e) { - LOG.error("there was an error when executing the when component isAccess", e); - return false; - } - }) + // 获取 WHEN 所需线程池 + ExecutorService parallelExecutor = getWhenExecutorService(whenCondition); + + // 这里主要是做了封装 CompletableFuture 对象,用 lambda 表达式做了很多事情,这句代码要仔细理清 + // 根据 condition.getNodeList() 的集合进行流处理,用 map 进行把 executable 对象转换成 List> + List> completableFutureList = filterWhenTaskList(whenCondition.getExecutableList(), slotIndex) .map(executable -> wrappedFutureObj(executable, parallelExecutor, whenCondition, currChainName, slotIndex)) .collect(Collectors.toList()); diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/SpecifyParallelExecutor.java b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/SpecifyParallelExecutor.java index e397608ec..09457c9e7 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/SpecifyParallelExecutor.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/SpecifyParallelExecutor.java @@ -1,11 +1,8 @@ package com.yomahub.liteflow.flow.parallel.strategy; import cn.hutool.core.collection.CollUtil; -import com.yomahub.liteflow.flow.element.condition.FinallyCondition; -import com.yomahub.liteflow.flow.element.condition.PreCondition; import com.yomahub.liteflow.flow.element.condition.WhenCondition; import com.yomahub.liteflow.flow.parallel.WhenFutureObj; -import com.yomahub.liteflow.thread.ExecutorHelper; import java.util.*; import java.util.concurrent.CompletableFuture; @@ -27,8 +24,8 @@ public class SpecifyParallelExecutor extends ParallelStrategyExecutor { // 设置 whenCondition 参数 this.setWhenConditionParams(whenCondition); - // 此方法其实只会初始化一次Executor,不会每次都会初始化。Executor 是唯一的 - ExecutorService parallelExecutor = ExecutorHelper.loadInstance().buildWhenExecutor(whenCondition.getThreadExecutorClass()); + // 获取 WHEN 所需线程池 + ExecutorService parallelExecutor = getWhenExecutorService(whenCondition); // 指定完成的任务 CompletableFuture specifyTask; @@ -43,17 +40,7 @@ public class SpecifyParallelExecutor extends ParallelStrategyExecutor { List> allTaskList = new ArrayList<>(); // 遍历 when 所有 node,进行筛选及处理 - whenCondition.getExecutableList() - .stream() - .filter(executable -> !(executable instanceof PreCondition) && !(executable instanceof FinallyCondition)) - .filter(executable -> { - try { - return executable.isAccess(slotIndex); - } catch (Exception e) { - LOG.error("there was an error when executing the when component isAccess", e); - return false; - } - }) + filterWhenTaskList(whenCondition.getExecutableList(), slotIndex) .forEach(executable -> { // 处理 task,封装成 CompletableFuture 对象 CompletableFuture completableFutureTask = wrappedFutureObj(executable, parallelExecutor, whenCondition, currChainName, slotIndex);