From 6eee9ce492d273cd0d8bcfcaef44d47b8bda78b8 Mon Sep 17 00:00:00 2001 From: luoyi <972849752@qq.com> Date: Thu, 14 Sep 2023 21:53:07 +0800 Subject: [PATCH] =?UTF-8?q?enhancement=20#I7XAIB=20=E8=B0=83=E6=95=B4?= =?UTF-8?q?=E5=8F=82=E6=95=B0=E5=91=BD=E5=90=8D=EF=BC=8C=E5=A2=9E=E5=8A=A0?= =?UTF-8?q?=E6=B3=A8=E9=87=8A=E8=AF=B4=E6=98=8E?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../liteflow/enums/ParallelStrategyEnum.java | 2 +- .../flow/element/condition/WhenCondition.java | 8 ++- .../strategy/AllOfParallelExecutor.java | 10 +-- .../strategy/AnyOfParallelExecutor.java | 10 +-- .../strategy/ParallelStrategyExecutor.java | 63 ++++++++++--------- .../strategy/SpecifyParallelExecutor.java | 25 ++++---- 6 files changed, 64 insertions(+), 54 deletions(-) diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/enums/ParallelStrategyEnum.java b/liteflow-core/src/main/java/com/yomahub/liteflow/enums/ParallelStrategyEnum.java index fc69b634e..09c9af437 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/enums/ParallelStrategyEnum.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/enums/ParallelStrategyEnum.java @@ -17,7 +17,7 @@ public enum ParallelStrategyEnum { ALL("allOf", "完成全部任务", AllOfParallelExecutor.class), - SPECIFY("must", "完成指定任务", SpecifyParallelExecutor.class); + SPECIFY("must", "完成指定 ID 任务", SpecifyParallelExecutor.class); private String strategyType; diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/element/condition/WhenCondition.java b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/element/condition/WhenCondition.java index f4d3df662..7422a495e 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/element/condition/WhenCondition.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/element/condition/WhenCondition.java @@ -15,6 +15,7 @@ import com.yomahub.liteflow.flow.parallel.strategy.ParallelStrategyExecutor; import com.yomahub.liteflow.flow.parallel.strategy.ParallelStrategyHelper; import com.yomahub.liteflow.log.LFLog; import com.yomahub.liteflow.log.LFLoggerManager; +import com.yomahub.liteflow.thread.ExecutorHelper; import java.util.concurrent.TimeUnit; @@ -59,13 +60,16 @@ public class WhenCondition extends Condition { return ConditionTypeEnum.TYPE_WHEN; } - // 使用线程池执行when并发流程 + // 使用线程池执行 when 并发流程 // 这块涉及到挺多的多线程逻辑,所以注释比较详细,看到这里的童鞋可以仔细阅读 private void executeAsyncCondition(Integer slotIndex) throws Exception { + // 获取并发执行策略 ParallelStrategyExecutor parallelStrategyExecutor = ParallelStrategyHelper.loadInstance().buildParallelExecutor(this.getParallelStrategy()); - // 执行逻辑 + + // 执行并发逻辑 parallelStrategyExecutor.execute(this, slotIndex); + } public boolean isIgnoreError() { diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/AllOfParallelExecutor.java b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/AllOfParallelExecutor.java index c3d6de2a4..aeb20bca8 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/AllOfParallelExecutor.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/AllOfParallelExecutor.java @@ -17,14 +17,14 @@ public class AllOfParallelExecutor extends ParallelStrategyExecutor { @Override public void execute(WhenCondition whenCondition, Integer slotIndex) throws Exception { - // 获取所有 CompletableFuture - List> completableFutureList = this.getCompletableFutureList(whenCondition, slotIndex); + // 获取所有 CompletableFuture 任务 + List> allTaskList = this.getAllTaskList(whenCondition, slotIndex); - // 把这些 CompletableFuture 通过 anyOf 合成一个 CompletableFuture - CompletableFuture resultCompletableFuture = CompletableFuture.allOf(completableFutureList.toArray(new CompletableFuture[] {})); + // 把这些 CompletableFuture 通过 allOf 合成一个 CompletableFuture,表明完成所有任务 + CompletableFuture specifyTask = CompletableFuture.allOf(allTaskList.toArray(new CompletableFuture[] {})); // 结果处理 - this.handleResult(whenCondition, slotIndex, completableFutureList, resultCompletableFuture); + this.handleTaskResult(whenCondition, slotIndex, allTaskList, specifyTask); } diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/AnyOfParallelExecutor.java b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/AnyOfParallelExecutor.java index 2115db84e..dce311fda 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/AnyOfParallelExecutor.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/AnyOfParallelExecutor.java @@ -17,14 +17,14 @@ public class AnyOfParallelExecutor extends ParallelStrategyExecutor { @Override public void execute(WhenCondition whenCondition, Integer slotIndex) throws Exception { - // 获取所有 CompletableFuture - List> completableFutureList = this.getCompletableFutureList(whenCondition, slotIndex); + // 获取所有 CompletableFuture 任务 + List> allTaskList = this.getAllTaskList(whenCondition, slotIndex); - // 把这些 CompletableFuture 通过 anyOf 合成一个 CompletableFuture - CompletableFuture resultCompletableFuture = CompletableFuture.anyOf(completableFutureList.toArray(new CompletableFuture[] {})); + // 把这些 CompletableFuture 通过 anyOf 合成一个 CompletableFuture,表明完成任一任务 + CompletableFuture specifyTask = CompletableFuture.anyOf(allTaskList.toArray(new CompletableFuture[] {})); // 结果处理 - this.handleResult(whenCondition, slotIndex, completableFutureList, resultCompletableFuture); + this.handleTaskResult(whenCondition, slotIndex, allTaskList, specifyTask); } diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/ParallelStrategyExecutor.java b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/ParallelStrategyExecutor.java index 56f8c9ba9..85524f28c 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/ParallelStrategyExecutor.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/ParallelStrategyExecutor.java @@ -2,6 +2,7 @@ package com.yomahub.liteflow.flow.parallel.strategy; import cn.hutool.core.util.ObjectUtil; import cn.hutool.core.util.StrUtil; +import com.yomahub.liteflow.enums.ParallelStrategyEnum; import com.yomahub.liteflow.exception.WhenExecuteException; import com.yomahub.liteflow.flow.element.Executable; import com.yomahub.liteflow.flow.element.condition.FinallyCondition; @@ -46,6 +47,8 @@ public abstract class ParallelStrategyExecutor { */ protected CompletableFuture wrappedFutureObj(Executable executable, ExecutorService parallelExecutor, WhenCondition whenCondition, String currChainName, Integer slotIndex) { + // 套入 CompletableFutureTimeout 方法进行超时判断,如果超时则用 WhenFutureObj.timeOut 返回超时的对象 + // 第 2 个参数是主要的本体 CompletableFuture,传入了 ParallelSupplier 和线程池对象 return CompletableFutureTimeout.completeOnTimeout( WhenFutureObj.timeOut(executable.getId()), CompletableFuture.supplyAsync(new ParallelSupplier(executable, currChainName, slotIndex), parallelExecutor), @@ -58,7 +61,7 @@ public abstract class ParallelStrategyExecutor { * @param whenCondition */ protected void setWhenConditionParams(WhenCondition whenCondition) { - // 获得liteflow的参数 + // 获得 liteflow 的参数 LiteflowConfig liteflowConfig = LiteflowConfigGetter.get(); if (ObjectUtil.isNull(whenCondition.getMaxWaitTime())) { if (ObjectUtil.isNotNull(liteflowConfig.getWhenMaxWaitSeconds())) { @@ -78,26 +81,25 @@ public abstract class ParallelStrategyExecutor { } /** - * 获取所有任务 + * 获取所有任务 CompletableFuture 集合 * @param whenCondition * @param slotIndex * @return */ - protected List> getCompletableFutureList(WhenCondition whenCondition, Integer slotIndex) { + protected List> getAllTaskList(WhenCondition whenCondition, Integer slotIndex) { + String currChainName = whenCondition.getCurrChainId(); - // 此方法其实只会初始化一次Executor,不会每次都会初始化。Executor是唯一的 + // 此方法其实只会初始化一次 Executor,不会每次都会初始化。Executor是唯一的 ExecutorService parallelExecutor = ExecutorHelper.loadInstance().buildWhenExecutor(whenCondition.getThreadExecutorClass()); - // 设置参数 + // 设置 whenCondition 参数 setWhenConditionParams(whenCondition); - // 这里主要是做了封装CompletableFuture对象,用lumbda表达式做了很多事情,这句代码要仔细理清 - // 1.先进行过滤,前置和后置组件过滤掉,因为在EL Chain处理的时候已经提出来了 - // 2.过滤isAccess为false的情况,因为不过滤这个的话,如果加上了any,那么isAccess为false那就是最快的了 - // 3.根据condition.getNodeList()的集合进行流处理,用map进行把executable对象转换成List> - // 4.在转的过程中,套入CompletableFutureTimeout方法进行超时判断,如果超时则用WhenFutureObj.timeOut返回超时的对象 - // 5.第2个参数是主要的本体CompletableFuture,传入了ParallelSupplier和线程池对象 + // 这里主要是做了封装 CompletableFuture 对象,用 lumbda 表达式做了很多事情,这句代码要仔细理清 + // 1.先进行过滤,前置和后置组件过滤掉,因为在 EL Chain 处理的时候已经提出来了 + // 2.过滤 isAccess 为 false 的情况,因为不过滤这个的话,如果加上了 any,那么 isAccess 为 false 那就是最快的了 + // 3.根据 condition.getNodeList() 的集合进行流处理,用 map 进行把 executable 对象转换成 List> List> completableFutureList = whenCondition.getExecutableList() .stream() .filter(executable -> !(executable instanceof PreCondition) && !(executable instanceof FinallyCondition)) @@ -116,33 +118,36 @@ public abstract class ParallelStrategyExecutor { } /** - * 结果处理 - * @param whenCondition - * @param slotIndex - * @param completableFutureList - * @param resultCompletableFuture + * 任务结果处理 + * @param whenCondition 并行组件对象 + * @param slotIndex 当前 slot 的 index + * @param allTaskList 并行组件中所有任务列表 + * @param specifyTask 指定预先完成的任务,详见 {@link ParallelStrategyEnum} * @throws Exception */ - protected void handleResult(WhenCondition whenCondition, Integer slotIndex, List> completableFutureList, CompletableFuture resultCompletableFuture) throws Exception { + protected void handleTaskResult(WhenCondition whenCondition, Integer slotIndex, List> allTaskList, + CompletableFuture specifyTask) throws Exception { + Slot slot = DataBus.getSlot(slotIndex); // 定义是否中断参数 - // 这里为什么要定义成数组呢,因为后面lambda要用到,根据final不能修改引用的原则,这里用了数组对象 + // 这里为什么要定义成数组呢,因为后面 lambda 要用到,根据 final 不能修改引用的原则,这里用了数组对象 final boolean[] interrupted = { false }; try { - // 进行执行,这句执行完后,就意味着所有的任务要么执行完毕,要么超时返回 - resultCompletableFuture.get(); + // 进行执行,这句执行完后有三种可能,所有任务执行完成、任一任务执行完成、指定的任务执行完成 + specifyTask.get(); } catch (InterruptedException | ExecutionException e) { LOG.error("there was an error when executing the CompletableFuture", e); interrupted[0] = true; } - // 拿到已经完成的CompletableFuture - // 如果any为false,那么所有任务都已经完成 - // 如果any为true,那么这里拿到的是第一个完成的任务 - // 这里过滤和转换一起用lumbda做了 - List allCompletableWhenFutureObjList = completableFutureList.stream().filter(f -> { + // 拿到已经完成的 CompletableFuture 对象 + // 如果 any 为 false,那么所有任务都已经完成 + // 如果 any 为 true,那么这里拿到的是第一个完成的任务 + // 如果为 must,那么这里获取到的就是指定的任务 + // 这里过滤和转换一起用 lambda 做了 + List allCompletableWhenFutureObjList = allTaskList.stream().filter(f -> { // 过滤出已经完成的,没完成的就直接终止 if (f.isDone()) { return true; @@ -159,8 +164,8 @@ public abstract class ParallelStrategyExecutor { } }).collect(Collectors.toList()); - // 判断超时,上面已经拿到了所有已经完成的CompletableFuture - // 那我们只要过滤出超时的CompletableFuture + // 判断超时,上面已经拿到了所有已经完成的 CompletableFuture + // 那我们只要过滤出超时的 CompletableFuture List timeOutWhenFutureObjList = allCompletableWhenFutureObjList.stream() .filter(WhenFutureObj::isTimeout) .collect(Collectors.toList()); @@ -169,7 +174,7 @@ public abstract class ParallelStrategyExecutor { timeOutWhenFutureObjList.forEach(whenFutureObj -> LOG.warn( "executing thread has reached max-wait-seconds, thread canceled.Execute-item: [{}]", whenFutureObj.getExecutorName())); - // 当配置了ignoreError = false,出现interrupted或者!f.get()的情况,将抛出WhenExecuteException + // 当配置了 ignoreError = false,出现 interrupted 或者 !f.get() 的情况,将抛出 WhenExecuteException if (!whenCondition.isIgnoreError()) { if (interrupted[0]) { throw new WhenExecuteException(StrUtil @@ -184,7 +189,7 @@ public abstract class ParallelStrategyExecutor { } } } else if (interrupted[0]) { - // 这里由于配置了ignoreError,所以只打印warn日志 + // 这里由于配置了 ignoreError,所以只打印 warn 日志 LOG.warn("executing when condition timeout , but ignore with errorResume."); } } diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/SpecifyParallelExecutor.java b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/SpecifyParallelExecutor.java index f763eb7d6..4ade6aa52 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/SpecifyParallelExecutor.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/SpecifyParallelExecutor.java @@ -28,9 +28,10 @@ public class SpecifyParallelExecutor extends ParallelStrategyExecutor { String currChainName = whenCondition.getCurrChainId(); + // 设置 whenCondition 参数 this.setWhenConditionParams(whenCondition); - // 此方法其实只会初始化一次Executor,不会每次都会初始化。Executor是唯一的 + // 此方法其实只会初始化一次Executor,不会每次都会初始化。Executor 是唯一的 ExecutorService parallelExecutor = ExecutorHelper.loadInstance().buildWhenExecutor(whenCondition.getThreadExecutorClass()); // 过滤指定 ID 的任务,且该任务只会有一个或者没有 @@ -47,31 +48,31 @@ public class SpecifyParallelExecutor extends ParallelStrategyExecutor { }) .collect(Collectors.partitioningBy(executable -> whenCondition.getSpecifyId().equals(executable.getId()))); - CompletableFuture resultCompletableFuture = null; + CompletableFuture specifyTask = null; - // 处理非指定 task - List> completableFutureList = specifyExecutableMap.get(Boolean.FALSE) + // 处理非指定 task,封装成 CompletableFuture 对象,最终仍是会组合所有任务到集合中 + List> allTaskList = specifyExecutableMap.get(Boolean.FALSE) .stream() .map(executable -> wrappedFutureObj(executable, parallelExecutor, whenCondition, currChainName, slotIndex)) .collect(Collectors.toList()); - if (specifyExecutableMap.containsKey(Boolean.TRUE) && CollUtil.isNotEmpty(specifyExecutableMap.get(Boolean.TRUE))) { + if (CollUtil.isNotEmpty(specifyExecutableMap.get(Boolean.TRUE))) { // 存在 must 指定的 task CompletableFuture specifyCompletableFuture = wrappedFutureObj(specifyExecutableMap.get(Boolean.TRUE).get(0), parallelExecutor, whenCondition, currChainName, slotIndex); // 组合所有任务 - completableFutureList.add(specifyCompletableFuture); - // 设置结果 future - resultCompletableFuture = specifyCompletableFuture; + allTaskList.add(specifyCompletableFuture); + // 设置指定任务 future 对象 + specifyTask = specifyCompletableFuture; } - if (ObjUtil.isNull(resultCompletableFuture)) { + if (ObjUtil.isNull(specifyTask)) { LOG.warn("The specified task[{}] was not found, waiting for all tasks to complete by default.", whenCondition.getSpecifyId()); - // 不存在指定任务,则所有任务都执行 - resultCompletableFuture = CompletableFuture.allOf(completableFutureList.toArray(new CompletableFuture[] {})); + // 不存在指定任务,则需要等待所有任务都执行完成 + specifyTask = CompletableFuture.allOf(allTaskList.toArray(new CompletableFuture[] {})); } // 结果处理 - this.handleResult(whenCondition, slotIndex, completableFutureList, resultCompletableFuture); + this.handleTaskResult(whenCondition, slotIndex, allTaskList, specifyTask); }