From 4e616f9da6b10a38dcb51d9086d8d88509b69cd9 Mon Sep 17 00:00:00 2001 From: luoyi <972849752@qq.com> Date: Wed, 30 Aug 2023 21:16:10 +0800 Subject: [PATCH 1/4] =?UTF-8?q?enhancement=20#I7XAIB=20WHEN=20=E5=A2=9E?= =?UTF-8?q?=E5=8A=A0=20must=20=E8=AF=AD=E6=B3=95=EF=BC=8C=E8=B0=83?= =?UTF-8?q?=E6=95=B4=20WhenCondition=20=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../builder/el/LiteFlowChainELBuilder.java | 3 +- .../builder/el/operator/AnyOperator.java | 3 +- .../builder/el/operator/MustOperator.java | 28 +++ .../liteflow/common/ChainConstant.java | 2 + .../liteflow/enums/ParallelStrategyEnum.java | 57 +++++ .../ParallelExecutorCreateException.java | 29 +++ .../flow/element/condition/WhenCondition.java | 174 +++------------- .../strategy/AllOfParallelExecutor.java | 31 +++ .../strategy/AnyOfParallelExecutor.java | 31 +++ .../strategy/ParallelStrategyExecutor.java | 194 ++++++++++++++++++ .../strategy/ParallelStrategyHelper.java | 79 +++++++ .../strategy/SpecifyParallelExecutor.java | 78 +++++++ .../asyncNode/AsyncNodeELSpringbootTest.java | 36 ++++ .../liteflow/test/asyncNode/cmp/KCmp.java | 27 +++ .../liteflow/test/asyncNode/cmp/LCmp.java | 15 ++ .../src/test/resources/asyncNode/flow.el.xml | 16 ++ 16 files changed, 652 insertions(+), 151 deletions(-) create mode 100644 liteflow-core/src/main/java/com/yomahub/liteflow/builder/el/operator/MustOperator.java create mode 100644 liteflow-core/src/main/java/com/yomahub/liteflow/enums/ParallelStrategyEnum.java create mode 100644 liteflow-core/src/main/java/com/yomahub/liteflow/exception/ParallelExecutorCreateException.java create mode 100644 liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/AllOfParallelExecutor.java create mode 100644 liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/AnyOfParallelExecutor.java create mode 100644 liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/ParallelStrategyExecutor.java create mode 100644 liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/ParallelStrategyHelper.java create mode 100644 liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/SpecifyParallelExecutor.java create mode 100644 liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/java/com/yomahub/liteflow/test/asyncNode/cmp/KCmp.java create mode 100644 liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/java/com/yomahub/liteflow/test/asyncNode/cmp/LCmp.java diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/builder/el/LiteFlowChainELBuilder.java b/liteflow-core/src/main/java/com/yomahub/liteflow/builder/el/LiteFlowChainELBuilder.java index 97e3de3e5..a5e26cab9 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/builder/el/LiteFlowChainELBuilder.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/builder/el/LiteFlowChainELBuilder.java @@ -15,8 +15,8 @@ import com.yomahub.liteflow.exception.ELParseException; import com.yomahub.liteflow.exception.FlowSystemException; import com.yomahub.liteflow.flow.FlowBus; import com.yomahub.liteflow.flow.element.Chain; -import com.yomahub.liteflow.flow.element.Node; import com.yomahub.liteflow.flow.element.Condition; +import com.yomahub.liteflow.flow.element.Node; import com.yomahub.liteflow.log.LFLog; import com.yomahub.liteflow.log.LFLoggerManager; @@ -72,6 +72,7 @@ public class LiteFlowChainELBuilder { EXPRESS_RUNNER.addFunctionAndClassMethod(ChainConstant.DEFAULT, Object.class, new DefaultOperator()); EXPRESS_RUNNER.addFunctionAndClassMethod(ChainConstant.TAG, Object.class, new TagOperator()); EXPRESS_RUNNER.addFunctionAndClassMethod(ChainConstant.ANY, Object.class, new AnyOperator()); + EXPRESS_RUNNER.addFunctionAndClassMethod(ChainConstant.MUST, Object.class, new MustOperator()); EXPRESS_RUNNER.addFunctionAndClassMethod(ChainConstant.ID, Object.class, new IdOperator()); EXPRESS_RUNNER.addFunctionAndClassMethod(ChainConstant.IGNORE_ERROR, Object.class, new IgnoreErrorOperator()); EXPRESS_RUNNER.addFunctionAndClassMethod(ChainConstant.THREAD_POOL, Object.class, new ThreadPoolOperator()); diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/builder/el/operator/AnyOperator.java b/liteflow-core/src/main/java/com/yomahub/liteflow/builder/el/operator/AnyOperator.java index 144f3a275..4f326dd68 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/builder/el/operator/AnyOperator.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/builder/el/operator/AnyOperator.java @@ -2,6 +2,7 @@ package com.yomahub.liteflow.builder.el.operator; import com.yomahub.liteflow.builder.el.operator.base.BaseOperator; import com.yomahub.liteflow.builder.el.operator.base.OperatorHelper; +import com.yomahub.liteflow.enums.ParallelStrategyEnum; import com.yomahub.liteflow.flow.element.condition.WhenCondition; /** @@ -19,7 +20,7 @@ public class AnyOperator extends BaseOperator { WhenCondition whenCondition = OperatorHelper.convert(objects[0], WhenCondition.class); Boolean any = OperatorHelper.convert(objects[1], Boolean.class); - whenCondition.setAny(any); + whenCondition.setParallelStrategy(any ? ParallelStrategyEnum.ANY : ParallelStrategyEnum.ALL); return whenCondition; } diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/builder/el/operator/MustOperator.java b/liteflow-core/src/main/java/com/yomahub/liteflow/builder/el/operator/MustOperator.java new file mode 100644 index 000000000..cb018c529 --- /dev/null +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/builder/el/operator/MustOperator.java @@ -0,0 +1,28 @@ +package com.yomahub.liteflow.builder.el.operator; + +import com.yomahub.liteflow.builder.el.operator.base.BaseOperator; +import com.yomahub.liteflow.builder.el.operator.base.OperatorHelper; +import com.yomahub.liteflow.enums.ParallelStrategyEnum; +import com.yomahub.liteflow.flow.element.condition.WhenCondition; + +/** + * EL 规则中的 must 的操作符 + * + * @author luo yi + * @since 2.11.0 + */ +public class MustOperator extends BaseOperator { + + @Override + public WhenCondition build(Object[] objects) throws Exception { + OperatorHelper.checkObjectSizeEqTwo(objects); + + WhenCondition whenCondition = OperatorHelper.convert(objects[0], WhenCondition.class); + + String specifyId = OperatorHelper.convert(objects[1], String.class); + whenCondition.setSpecifyId(specifyId); + whenCondition.setParallelStrategy(ParallelStrategyEnum.SPECIFY); + return whenCondition; + } + +} diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/common/ChainConstant.java b/liteflow-core/src/main/java/com/yomahub/liteflow/common/ChainConstant.java index 7379f26ea..a097cdfe1 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/common/ChainConstant.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/common/ChainConstant.java @@ -30,6 +30,8 @@ public interface ChainConstant { String ANY = "any"; + String MUST = "must"; + String TYPE = "type"; String THEN = "THEN"; diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/enums/ParallelStrategyEnum.java b/liteflow-core/src/main/java/com/yomahub/liteflow/enums/ParallelStrategyEnum.java new file mode 100644 index 000000000..fc69b634e --- /dev/null +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/enums/ParallelStrategyEnum.java @@ -0,0 +1,57 @@ +package com.yomahub.liteflow.enums; + +import com.yomahub.liteflow.flow.parallel.strategy.AllOfParallelExecutor; +import com.yomahub.liteflow.flow.parallel.strategy.AnyOfParallelExecutor; +import com.yomahub.liteflow.flow.parallel.strategy.ParallelStrategyExecutor; +import com.yomahub.liteflow.flow.parallel.strategy.SpecifyParallelExecutor; + +/** + * 并行策略枚举类 + * + * @author luo yi + * @since 2.11.0 + */ +public enum ParallelStrategyEnum { + + ANY("anyOf", "完成任一任务", AnyOfParallelExecutor.class), + + ALL("allOf", "完成全部任务", AllOfParallelExecutor.class), + + SPECIFY("must", "完成指定任务", SpecifyParallelExecutor.class); + + private String strategyType; + + private String description; + + private Class clazz; + + ParallelStrategyEnum(String strategyType, String description, Class clazz) { + this.strategyType = strategyType; + this.description = description; + this.clazz = clazz; + } + + public String getStrategyType() { + return strategyType; + } + + public void setStrategyType(String strategyType) { + this.strategyType = strategyType; + } + + public String getDescription() { + return description; + } + + public void setDescription(String description) { + this.description = description; + } + + public Class getClazz() { + return clazz; + } + + public void setClazz(Class clazz) { + this.clazz = clazz; + } +} diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/exception/ParallelExecutorCreateException.java b/liteflow-core/src/main/java/com/yomahub/liteflow/exception/ParallelExecutorCreateException.java new file mode 100644 index 000000000..68ea8a39a --- /dev/null +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/exception/ParallelExecutorCreateException.java @@ -0,0 +1,29 @@ +package com.yomahub.liteflow.exception; + +/** + * 并行策略执行器创建异常 + * + * @author luo yi + * @since 2.11.0 + */ +public class ParallelExecutorCreateException extends RuntimeException { + + private static final long serialVersionUID = 1L; + + /** 异常信息 */ + private String message; + + public ParallelExecutorCreateException(String message) { + this.message = message; + } + + @Override + public String getMessage() { + return message; + } + + public void setMessage(String message) { + this.message = message; + } + +} diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/element/condition/WhenCondition.java b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/element/condition/WhenCondition.java index 830aee652..4249617da 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/element/condition/WhenCondition.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/element/condition/WhenCondition.java @@ -7,28 +7,16 @@ */ package com.yomahub.liteflow.flow.element.condition; -import cn.hutool.core.util.ObjectUtil; -import cn.hutool.core.util.StrUtil; import com.yomahub.liteflow.common.LocalDefaultFlowConstant; import com.yomahub.liteflow.enums.ConditionTypeEnum; -import com.yomahub.liteflow.exception.WhenExecuteException; +import com.yomahub.liteflow.enums.ParallelStrategyEnum; import com.yomahub.liteflow.flow.element.Condition; -import com.yomahub.liteflow.flow.parallel.CompletableFutureTimeout; -import com.yomahub.liteflow.flow.parallel.ParallelSupplier; -import com.yomahub.liteflow.flow.parallel.WhenFutureObj; +import com.yomahub.liteflow.flow.parallel.strategy.ParallelStrategyExecutor; +import com.yomahub.liteflow.flow.parallel.strategy.ParallelStrategyHelper; import com.yomahub.liteflow.log.LFLog; import com.yomahub.liteflow.log.LFLoggerManager; -import com.yomahub.liteflow.property.LiteflowConfig; -import com.yomahub.liteflow.property.LiteflowConfigGetter; -import com.yomahub.liteflow.slot.DataBus; -import com.yomahub.liteflow.slot.Slot; -import com.yomahub.liteflow.thread.ExecutorHelper; -import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; + import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; /** * 并行器 @@ -46,8 +34,11 @@ public class WhenCondition extends Condition { // 此属性已弃用 private String group = LocalDefaultFlowConstant.DEFAULT; - // 只在when类型下有效,为true的话说明在多个并行节点下,任意一个成功,整个when就成功 - private boolean any = false; + // 当前 When 对应并行策略,默认为 ALL + private ParallelStrategyEnum parallelStrategy; + + // 只有 must 条件下,才会赋值 specifyId + private String specifyId; // when单独的线程池名称 private String threadExecutorClass; @@ -71,133 +62,10 @@ public class WhenCondition extends Condition { // 使用线程池执行when并发流程 // 这块涉及到挺多的多线程逻辑,所以注释比较详细,看到这里的童鞋可以仔细阅读 private void executeAsyncCondition(Integer slotIndex) throws Exception { - Slot slot = DataBus.getSlot(slotIndex); - - String currChainName = this.getCurrChainId(); - - // 此方法其实只会初始化一次Executor,不会每次都会初始化。Executor是唯一的 - ExecutorService parallelExecutor = ExecutorHelper.loadInstance() - .buildWhenExecutor(this.getThreadExecutorClass()); - - // 获得liteflow的参数 - LiteflowConfig liteflowConfig = LiteflowConfigGetter.get(); - - // 定义是否中断参数 - // 这里为什么要定义成数组呢,因为后面lambda要用到,根据final不能修改引用的原则,这里用了数组对象 - final boolean[] interrupted = { false }; - - // 这里主要是做了封装CompletableFuture对象,用lumbda表达式做了很多事情,这句代码要仔细理清 - // 1.先进行过滤,前置和后置组件过滤掉,因为在EL Chain处理的时候已经提出来了 - // 2.过滤isAccess为false的情况,因为不过滤这个的话,如果加上了any,那么isAccess为false那就是最快的了 - // 3.根据condition.getNodeList()的集合进行流处理,用map进行把executable对象转换成List> - // 4.在转的过程中,套入CompletableFutureTimeout方法进行超时判断,如果超时则用WhenFutureObj.timeOut返回超时的对象 - // 5.第2个参数是主要的本体CompletableFuture,传入了ParallelSupplier和线程池对象 - if (ObjectUtil.isNull(this.getMaxWaitTime())) { - if (ObjectUtil.isNotNull(liteflowConfig.getWhenMaxWaitSeconds())) { - // 获取全局异步线程最长等待秒数 - this.setMaxWaitTime(liteflowConfig.getWhenMaxWaitSeconds()); - this.setMaxWaitTimeUnit(TimeUnit.SECONDS); - } else { - // 获取全局异步线程最⻓的等待时间 - this.setMaxWaitTime(liteflowConfig.getWhenMaxWaitTime()); - } - } - - if (ObjectUtil.isNull(this.getMaxWaitTimeUnit())) { - // 获取全局异步线程最⻓的等待时间单位 - this.setMaxWaitTimeUnit(liteflowConfig.getWhenMaxWaitTimeUnit()); - } - - List> completableFutureList = this.getExecutableList() - .stream() - .filter(executable -> !(executable instanceof PreCondition) && !(executable instanceof FinallyCondition)) - .filter(executable -> { - try { - return executable.isAccess(slotIndex); - } catch (Exception e) { - LOG.error("there was an error when executing the when component isAccess", e); - return false; - } - }) - .map(executable -> CompletableFutureTimeout.completeOnTimeout( - WhenFutureObj.timeOut(executable.getId()), - CompletableFuture.supplyAsync(new ParallelSupplier(executable, currChainName, slotIndex), - parallelExecutor), - this.getMaxWaitTime(), this.getMaxWaitTimeUnit())) - .collect(Collectors.toList()); - - CompletableFuture resultCompletableFuture; - - // 这里判断执行方式 - // 如果any为false,说明这些异步任务全部执行好或者超时,才返回 - // 如果any为true,说明这些异步任务只要任意一个执行完成,就返回 - if (this.isAny()) { - // 把这些CompletableFuture通过anyOf合成一个CompletableFuture - resultCompletableFuture = CompletableFuture - .anyOf(completableFutureList.toArray(new CompletableFuture[] {})); - } else { - // 把这些CompletableFuture通过allOf合成一个CompletableFuture - resultCompletableFuture = CompletableFuture - .allOf(completableFutureList.toArray(new CompletableFuture[] {})); - } - - try { - // 进行执行,这句执行完后,就意味着所有的任务要么执行完毕,要么超时返回 - resultCompletableFuture.get(); - } catch (InterruptedException | ExecutionException e) { - LOG.error("there was an error when executing the CompletableFuture", e); - interrupted[0] = true; - } - - // 拿到已经完成的CompletableFuture - // 如果any为false,那么所有任务都已经完成 - // 如果any为true,那么这里拿到的是第一个完成的任务 - // 这里过滤和转换一起用lumbda做了 - List allCompletableWhenFutureObjList = completableFutureList.stream().filter(f -> { - // 过滤出已经完成的,没完成的就直接终止 - if (f.isDone()) { - return true; - } else { - f.cancel(true); - return false; - } - }).map(f -> { - try { - return f.get(); - } catch (InterruptedException | ExecutionException e) { - interrupted[0] = true; - return null; - } - }).collect(Collectors.toList()); - - // 判断超时,上面已经拿到了所有已经完成的CompletableFuture - // 那我们只要过滤出超时的CompletableFuture - List timeOutWhenFutureObjList = allCompletableWhenFutureObjList.stream() - .filter(WhenFutureObj::isTimeout) - .collect(Collectors.toList()); - - // 输出超时信息 - timeOutWhenFutureObjList.forEach(whenFutureObj -> LOG.warn( - "executing thread has reached max-wait-seconds, thread canceled.Execute-item: [{}]", whenFutureObj.getExecutorName())); - - // 当配置了ignoreError = false,出现interrupted或者!f.get()的情况,将抛出WhenExecuteException - if (!this.isIgnoreError()) { - if (interrupted[0]) { - throw new WhenExecuteException(StrUtil - .format("requestId [{}] when execute interrupted. errorResume [false].", slot.getRequestId())); - } - - // 循环判断CompletableFuture的返回值,如果异步执行失败,则抛出相应的业务异常 - for (WhenFutureObj whenFutureObj : allCompletableWhenFutureObjList) { - if (!whenFutureObj.isSuccess()) { - LOG.info(StrUtil.format("when-executor[{}] execute failed. errorResume [false].", whenFutureObj.getExecutorName())); - throw whenFutureObj.getEx(); - } - } - } else if (interrupted[0]) { - // 这里由于配置了ignoreError,所以只打印warn日志 - LOG.warn("executing when condition timeout , but ignore with errorResume."); - } + // 获取并发执行策略 + ParallelStrategyExecutor parallelStrategyExecutor = ParallelStrategyHelper.loadInstance().buildParallelExecutor(this.getParallelStrategy()); + // 执行逻辑 + parallelStrategyExecutor.execute(this, slotIndex); } public boolean isIgnoreError() { @@ -216,12 +84,20 @@ public class WhenCondition extends Condition { this.group = group; } - public boolean isAny() { - return any; + public ParallelStrategyEnum getParallelStrategy() { + return parallelStrategy; } - public void setAny(boolean any) { - this.any = any; + public void setParallelStrategy(ParallelStrategyEnum parallelStrategy) { + this.parallelStrategy = parallelStrategy; + } + + public String getSpecifyId() { + return specifyId; + } + + public void setSpecifyId(String specifyId) { + this.specifyId = specifyId; } public String getThreadExecutorClass() { diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/AllOfParallelExecutor.java b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/AllOfParallelExecutor.java new file mode 100644 index 000000000..c3d6de2a4 --- /dev/null +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/AllOfParallelExecutor.java @@ -0,0 +1,31 @@ +package com.yomahub.liteflow.flow.parallel.strategy; + +import com.yomahub.liteflow.flow.element.condition.WhenCondition; +import com.yomahub.liteflow.flow.parallel.WhenFutureObj; + +import java.util.List; +import java.util.concurrent.CompletableFuture; + +/** + * 完成全部任务 + * + * @author luo yi + * @since 2.11.0 + */ +public class AllOfParallelExecutor extends ParallelStrategyExecutor { + + @Override + public void execute(WhenCondition whenCondition, Integer slotIndex) throws Exception { + + // 获取所有 CompletableFuture + List> completableFutureList = this.getCompletableFutureList(whenCondition, slotIndex); + + // 把这些 CompletableFuture 通过 anyOf 合成一个 CompletableFuture + CompletableFuture resultCompletableFuture = CompletableFuture.allOf(completableFutureList.toArray(new CompletableFuture[] {})); + + // 结果处理 + this.handleResult(whenCondition, slotIndex, completableFutureList, resultCompletableFuture); + + } + +} diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/AnyOfParallelExecutor.java b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/AnyOfParallelExecutor.java new file mode 100644 index 000000000..2115db84e --- /dev/null +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/AnyOfParallelExecutor.java @@ -0,0 +1,31 @@ +package com.yomahub.liteflow.flow.parallel.strategy; + +import com.yomahub.liteflow.flow.element.condition.WhenCondition; +import com.yomahub.liteflow.flow.parallel.WhenFutureObj; + +import java.util.List; +import java.util.concurrent.CompletableFuture; + +/** + * 完成任一任务 + * + * @author luo yi + * @since 2.11.0 + */ +public class AnyOfParallelExecutor extends ParallelStrategyExecutor { + + @Override + public void execute(WhenCondition whenCondition, Integer slotIndex) throws Exception { + + // 获取所有 CompletableFuture + List> completableFutureList = this.getCompletableFutureList(whenCondition, slotIndex); + + // 把这些 CompletableFuture 通过 anyOf 合成一个 CompletableFuture + CompletableFuture resultCompletableFuture = CompletableFuture.anyOf(completableFutureList.toArray(new CompletableFuture[] {})); + + // 结果处理 + this.handleResult(whenCondition, slotIndex, completableFutureList, resultCompletableFuture); + + } + +} diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/ParallelStrategyExecutor.java b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/ParallelStrategyExecutor.java new file mode 100644 index 000000000..56f8c9ba9 --- /dev/null +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/ParallelStrategyExecutor.java @@ -0,0 +1,194 @@ +package com.yomahub.liteflow.flow.parallel.strategy; + +import cn.hutool.core.util.ObjectUtil; +import cn.hutool.core.util.StrUtil; +import com.yomahub.liteflow.exception.WhenExecuteException; +import com.yomahub.liteflow.flow.element.Executable; +import com.yomahub.liteflow.flow.element.condition.FinallyCondition; +import com.yomahub.liteflow.flow.element.condition.PreCondition; +import com.yomahub.liteflow.flow.element.condition.WhenCondition; +import com.yomahub.liteflow.flow.parallel.CompletableFutureTimeout; +import com.yomahub.liteflow.flow.parallel.ParallelSupplier; +import com.yomahub.liteflow.flow.parallel.WhenFutureObj; +import com.yomahub.liteflow.log.LFLog; +import com.yomahub.liteflow.log.LFLoggerManager; +import com.yomahub.liteflow.property.LiteflowConfig; +import com.yomahub.liteflow.property.LiteflowConfigGetter; +import com.yomahub.liteflow.slot.DataBus; +import com.yomahub.liteflow.slot.Slot; +import com.yomahub.liteflow.thread.ExecutorHelper; + +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +/** + * 并发策略执行器抽象类 + * + * @author luo yi + * @since 2.11.0 + */ +public abstract class ParallelStrategyExecutor { + + protected final LFLog LOG = LFLoggerManager.getLogger(this.getClass()); + + /** + * 封装 CompletableFuture 对象 + * @param executable + * @param parallelExecutor + * @param whenCondition + * @param currChainName + * @param slotIndex + * @return + */ + protected CompletableFuture wrappedFutureObj(Executable executable, ExecutorService parallelExecutor, + WhenCondition whenCondition, String currChainName, Integer slotIndex) { + return CompletableFutureTimeout.completeOnTimeout( + WhenFutureObj.timeOut(executable.getId()), + CompletableFuture.supplyAsync(new ParallelSupplier(executable, currChainName, slotIndex), parallelExecutor), + whenCondition.getMaxWaitTime(), + whenCondition.getMaxWaitTimeUnit()); + } + + /** + * 设置 WhenCondition 参数 + * @param whenCondition + */ + protected void setWhenConditionParams(WhenCondition whenCondition) { + // 获得liteflow的参数 + LiteflowConfig liteflowConfig = LiteflowConfigGetter.get(); + if (ObjectUtil.isNull(whenCondition.getMaxWaitTime())) { + if (ObjectUtil.isNotNull(liteflowConfig.getWhenMaxWaitSeconds())) { + // 获取全局异步线程最长等待秒数 + whenCondition.setMaxWaitTime(liteflowConfig.getWhenMaxWaitSeconds()); + whenCondition.setMaxWaitTimeUnit(TimeUnit.SECONDS); + } else { + // 获取全局异步线程最⻓的等待时间 + whenCondition.setMaxWaitTime(liteflowConfig.getWhenMaxWaitTime()); + } + } + + if (ObjectUtil.isNull(whenCondition.getMaxWaitTimeUnit())) { + // 获取全局异步线程最⻓的等待时间单位 + whenCondition.setMaxWaitTimeUnit(liteflowConfig.getWhenMaxWaitTimeUnit()); + } + } + + /** + * 获取所有任务 + * @param whenCondition + * @param slotIndex + * @return + */ + protected List> getCompletableFutureList(WhenCondition whenCondition, Integer slotIndex) { + String currChainName = whenCondition.getCurrChainId(); + + // 此方法其实只会初始化一次Executor,不会每次都会初始化。Executor是唯一的 + ExecutorService parallelExecutor = ExecutorHelper.loadInstance().buildWhenExecutor(whenCondition.getThreadExecutorClass()); + + // 设置参数 + setWhenConditionParams(whenCondition); + + // 这里主要是做了封装CompletableFuture对象,用lumbda表达式做了很多事情,这句代码要仔细理清 + // 1.先进行过滤,前置和后置组件过滤掉,因为在EL Chain处理的时候已经提出来了 + // 2.过滤isAccess为false的情况,因为不过滤这个的话,如果加上了any,那么isAccess为false那就是最快的了 + // 3.根据condition.getNodeList()的集合进行流处理,用map进行把executable对象转换成List> + // 4.在转的过程中,套入CompletableFutureTimeout方法进行超时判断,如果超时则用WhenFutureObj.timeOut返回超时的对象 + // 5.第2个参数是主要的本体CompletableFuture,传入了ParallelSupplier和线程池对象 + List> completableFutureList = whenCondition.getExecutableList() + .stream() + .filter(executable -> !(executable instanceof PreCondition) && !(executable instanceof FinallyCondition)) + .filter(executable -> { + try { + return executable.isAccess(slotIndex); + } catch (Exception e) { + LOG.error("there was an error when executing the when component isAccess", e); + return false; + } + }) + .map(executable -> wrappedFutureObj(executable, parallelExecutor, whenCondition, currChainName, slotIndex)) + .collect(Collectors.toList()); + + return completableFutureList; + } + + /** + * 结果处理 + * @param whenCondition + * @param slotIndex + * @param completableFutureList + * @param resultCompletableFuture + * @throws Exception + */ + protected void handleResult(WhenCondition whenCondition, Integer slotIndex, List> completableFutureList, CompletableFuture resultCompletableFuture) throws Exception { + Slot slot = DataBus.getSlot(slotIndex); + + // 定义是否中断参数 + // 这里为什么要定义成数组呢,因为后面lambda要用到,根据final不能修改引用的原则,这里用了数组对象 + final boolean[] interrupted = { false }; + + try { + // 进行执行,这句执行完后,就意味着所有的任务要么执行完毕,要么超时返回 + resultCompletableFuture.get(); + } catch (InterruptedException | ExecutionException e) { + LOG.error("there was an error when executing the CompletableFuture", e); + interrupted[0] = true; + } + + // 拿到已经完成的CompletableFuture + // 如果any为false,那么所有任务都已经完成 + // 如果any为true,那么这里拿到的是第一个完成的任务 + // 这里过滤和转换一起用lumbda做了 + List allCompletableWhenFutureObjList = completableFutureList.stream().filter(f -> { + // 过滤出已经完成的,没完成的就直接终止 + if (f.isDone()) { + return true; + } else { + f.cancel(true); + return false; + } + }).map(f -> { + try { + return f.get(); + } catch (InterruptedException | ExecutionException e) { + interrupted[0] = true; + return null; + } + }).collect(Collectors.toList()); + + // 判断超时,上面已经拿到了所有已经完成的CompletableFuture + // 那我们只要过滤出超时的CompletableFuture + List timeOutWhenFutureObjList = allCompletableWhenFutureObjList.stream() + .filter(WhenFutureObj::isTimeout) + .collect(Collectors.toList()); + + // 输出超时信息 + timeOutWhenFutureObjList.forEach(whenFutureObj -> LOG.warn( + "executing thread has reached max-wait-seconds, thread canceled.Execute-item: [{}]", whenFutureObj.getExecutorName())); + + // 当配置了ignoreError = false,出现interrupted或者!f.get()的情况,将抛出WhenExecuteException + if (!whenCondition.isIgnoreError()) { + if (interrupted[0]) { + throw new WhenExecuteException(StrUtil + .format("requestId [{}] when execute interrupted. errorResume [false].", slot.getRequestId())); + } + + // 循环判断CompletableFuture的返回值,如果异步执行失败,则抛出相应的业务异常 + for (WhenFutureObj whenFutureObj : allCompletableWhenFutureObjList) { + if (!whenFutureObj.isSuccess()) { + LOG.info(StrUtil.format("when-executor[{}] execute failed. errorResume [false].", whenFutureObj.getExecutorName())); + throw whenFutureObj.getEx(); + } + } + } else if (interrupted[0]) { + // 这里由于配置了ignoreError,所以只打印warn日志 + LOG.warn("executing when condition timeout , but ignore with errorResume."); + } + } + + public abstract void execute(WhenCondition whenCondition, Integer slotIndex) throws Exception; + +} diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/ParallelStrategyHelper.java b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/ParallelStrategyHelper.java new file mode 100644 index 000000000..250fccfb5 --- /dev/null +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/ParallelStrategyHelper.java @@ -0,0 +1,79 @@ +package com.yomahub.liteflow.flow.parallel.strategy; + +import cn.hutool.core.map.MapUtil; +import cn.hutool.core.util.ObjUtil; +import com.yomahub.liteflow.enums.ParallelStrategyEnum; +import com.yomahub.liteflow.exception.ParallelExecutorCreateException; +import com.yomahub.liteflow.log.LFLog; +import com.yomahub.liteflow.log.LFLoggerManager; +import com.yomahub.liteflow.spi.holder.ContextAwareHolder; + +import java.util.Map; + +/** + * WHEN 并发策略辅助 + * + * @author luo yi + * @since 2.11.0 + */ +public class ParallelStrategyHelper { + + private final LFLog LOG = LFLoggerManager.getLogger(ParallelStrategyHelper.class); + + /** + * 此处使用Map缓存线程池信息 key - 线程池构建者的Class全类名 value - 线程池对象 + */ + private final Map strategyExecutorMap; + + private ParallelStrategyHelper() { + strategyExecutorMap = MapUtil.newConcurrentHashMap(); + } + + /** + * 使用静态内部类实现单例模式 + */ + private static class Holder { + + static final ParallelStrategyHelper INSTANCE = new ParallelStrategyHelper(); + + } + + public static ParallelStrategyHelper loadInstance() { + return ParallelStrategyHelper.Holder.INSTANCE; + } + + private ParallelStrategyExecutor getParallelStrategyExecutor(ParallelStrategyEnum parallelStrategyEnum) { + try { + ParallelStrategyExecutor strategyExecutor = strategyExecutorMap.get(parallelStrategyEnum); + if (ObjUtil.isNotNull(strategyExecutor)) return strategyExecutor; + + Class executorClass = (Class) Class.forName(parallelStrategyEnum.getClazz().getName()); + strategyExecutor = ContextAwareHolder.loadContextAware().registerBean(executorClass); + strategyExecutorMap.put(parallelStrategyEnum, strategyExecutor); + return strategyExecutor; + } catch (Exception e) { + LOG.error(e.getMessage()); + throw new ParallelExecutorCreateException(e.getMessage()); + } + } + + public ParallelStrategyExecutor buildParallelExecutor(ParallelStrategyEnum parallelStrategyEnum) { + if (ObjUtil.isNull(parallelStrategyEnum)) return buildParallelExecutor(); + return getParallelStrategyExecutor(parallelStrategyEnum); + } + + /** + * 默认需完成所有任务 + * @return + */ + public ParallelStrategyExecutor buildParallelExecutor() { + return buildParallelExecutor(ParallelStrategyEnum.ALL); + } + + public void clearStrategyExecutorMap() { + if (MapUtil.isNotEmpty(strategyExecutorMap)) { + strategyExecutorMap.clear(); + } + } + +} diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/SpecifyParallelExecutor.java b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/SpecifyParallelExecutor.java new file mode 100644 index 000000000..f763eb7d6 --- /dev/null +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/SpecifyParallelExecutor.java @@ -0,0 +1,78 @@ +package com.yomahub.liteflow.flow.parallel.strategy; + +import cn.hutool.core.collection.CollUtil; +import cn.hutool.core.util.ObjUtil; +import com.yomahub.liteflow.flow.element.Executable; +import com.yomahub.liteflow.flow.element.condition.FinallyCondition; +import com.yomahub.liteflow.flow.element.condition.PreCondition; +import com.yomahub.liteflow.flow.element.condition.WhenCondition; +import com.yomahub.liteflow.flow.parallel.WhenFutureObj; +import com.yomahub.liteflow.thread.ExecutorHelper; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.stream.Collectors; + +/** + * 完成指定任务执行器,使用 ID 进行比较 + * + * @author luo yi + * @since 2.11.0 + */ +public class SpecifyParallelExecutor extends ParallelStrategyExecutor { + + @Override + public void execute(WhenCondition whenCondition, Integer slotIndex) throws Exception { + + String currChainName = whenCondition.getCurrChainId(); + + this.setWhenConditionParams(whenCondition); + + // 此方法其实只会初始化一次Executor,不会每次都会初始化。Executor是唯一的 + ExecutorService parallelExecutor = ExecutorHelper.loadInstance().buildWhenExecutor(whenCondition.getThreadExecutorClass()); + + // 过滤指定 ID 的任务,且该任务只会有一个或者没有 + Map> specifyExecutableMap = whenCondition.getExecutableList() + .stream() + .filter(executable -> !(executable instanceof PreCondition) && !(executable instanceof FinallyCondition)) + .filter(executable -> { + try { + return executable.isAccess(slotIndex); + } catch (Exception e) { + LOG.error("there was an error when executing the when component isAccess", e); + return false; + } + }) + .collect(Collectors.partitioningBy(executable -> whenCondition.getSpecifyId().equals(executable.getId()))); + + CompletableFuture resultCompletableFuture = null; + + // 处理非指定 task + List> completableFutureList = specifyExecutableMap.get(Boolean.FALSE) + .stream() + .map(executable -> wrappedFutureObj(executable, parallelExecutor, whenCondition, currChainName, slotIndex)) + .collect(Collectors.toList()); + + if (specifyExecutableMap.containsKey(Boolean.TRUE) && CollUtil.isNotEmpty(specifyExecutableMap.get(Boolean.TRUE))) { + // 存在 must 指定的 task + CompletableFuture specifyCompletableFuture = wrappedFutureObj(specifyExecutableMap.get(Boolean.TRUE).get(0), parallelExecutor, whenCondition, currChainName, slotIndex); + // 组合所有任务 + completableFutureList.add(specifyCompletableFuture); + // 设置结果 future + resultCompletableFuture = specifyCompletableFuture; + } + + if (ObjUtil.isNull(resultCompletableFuture)) { + LOG.warn("The specified task[{}] was not found, waiting for all tasks to complete by default.", whenCondition.getSpecifyId()); + // 不存在指定任务,则所有任务都执行 + resultCompletableFuture = CompletableFuture.allOf(completableFutureList.toArray(new CompletableFuture[] {})); + } + + // 结果处理 + this.handleResult(whenCondition, slotIndex, completableFutureList, resultCompletableFuture); + + } + +} diff --git a/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/java/com/yomahub/liteflow/test/asyncNode/AsyncNodeELSpringbootTest.java b/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/java/com/yomahub/liteflow/test/asyncNode/AsyncNodeELSpringbootTest.java index 30703e603..99f9e1d97 100644 --- a/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/java/com/yomahub/liteflow/test/asyncNode/AsyncNodeELSpringbootTest.java +++ b/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/java/com/yomahub/liteflow/test/asyncNode/AsyncNodeELSpringbootTest.java @@ -139,4 +139,40 @@ public class AsyncNodeELSpringbootTest extends BaseTest { Assertions.assertTrue(context.getData("check").toString().startsWith("habc")); } + // 测试 must 关键字 + @Test + public void testAsyncFlow9() throws Exception { + LiteflowResponse response = flowExecutor.execute2Resp("chain9", "it's a base request"); + DefaultContext context = response.getFirstContextBean(); + Assertions.assertTrue(response.isSuccess()); + Assertions.assertTrue(context.getData("check").toString().startsWith("habc")); + } + + // 测试 must 与 ignoreError 关键字,不忽略异常 + @Test + public void testAsyncFlow10() throws Exception { + LiteflowResponse response = flowExecutor.execute2Resp("chain10", "it's a base request"); + DefaultContext context = response.getFirstContextBean(); + Assertions.assertTrue(context.getData("check").toString().startsWith("kg")); + Assertions.assertFalse(response.isSuccess()); + } + + // 测试 must 与 ignoreError 关键字,忽略异常 + @Test + public void testAsyncFlow11() throws Exception { + LiteflowResponse response = flowExecutor.execute2Resp("chain11", "it's a base request"); + DefaultContext context = response.getFirstContextBean(); + Assertions.assertTrue(response.isSuccess()); + Assertions.assertTrue(context.getData("check").toString().startsWith("kgdabc")); + } + + // 测试 must 、 ignoreError 、 id 关键字 + @Test + public void testAsyncFlow12() throws Exception { + LiteflowResponse response = flowExecutor.execute2Resp("chain12", "it's a base request"); + DefaultContext context = response.getFirstContextBean(); + Assertions.assertTrue(response.isSuccess()); + Assertions.assertTrue(context.getData("check").toString().startsWith("akbc")); + } + } diff --git a/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/java/com/yomahub/liteflow/test/asyncNode/cmp/KCmp.java b/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/java/com/yomahub/liteflow/test/asyncNode/cmp/KCmp.java new file mode 100644 index 000000000..f4b27d714 --- /dev/null +++ b/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/java/com/yomahub/liteflow/test/asyncNode/cmp/KCmp.java @@ -0,0 +1,27 @@ +package com.yomahub.liteflow.test.asyncNode.cmp; + +import com.yomahub.liteflow.core.NodeComponent; +import com.yomahub.liteflow.slot.DefaultContext; +import org.springframework.stereotype.Component; + +@Component("k") +public class KCmp extends NodeComponent { + + @Override + public void process() throws Exception { + Thread.sleep(200); + DefaultContext context = this.getFirstContextBean(); + synchronized (NodeComponent.class) { + if (context.hasData("check")) { + String str = context.getData("check"); + str += this.getNodeId(); + context.setData("check", str); + } + else { + context.setData("check", this.getNodeId()); + } + } + System.out.println("Kcomp executed!"); + } + +} diff --git a/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/java/com/yomahub/liteflow/test/asyncNode/cmp/LCmp.java b/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/java/com/yomahub/liteflow/test/asyncNode/cmp/LCmp.java new file mode 100644 index 000000000..87e08565c --- /dev/null +++ b/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/java/com/yomahub/liteflow/test/asyncNode/cmp/LCmp.java @@ -0,0 +1,15 @@ +package com.yomahub.liteflow.test.asyncNode.cmp; + +import com.yomahub.liteflow.core.NodeComponent; +import org.springframework.stereotype.Component; + +@Component("l") +public class LCmp extends NodeComponent { + + @Override + public void process() throws Exception { + System.out.println("Lcomp executed! Throw exception"); + int i = 1/0; + } + +} diff --git a/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/resources/asyncNode/flow.el.xml b/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/resources/asyncNode/flow.el.xml index 06373bb2f..e48bf988e 100644 --- a/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/resources/asyncNode/flow.el.xml +++ b/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/resources/asyncNode/flow.el.xml @@ -46,4 +46,20 @@ THEN(WHEN(d, g, h).any(true), THEN(a, b, c)); + + THEN(WHEN(d, g, h).must("h"), THEN(a, b, c)); + + + + THEN(WHEN(d, g, k, l).must("g").ignoreError(false), THEN(a, b, c)); + + + + THEN(WHEN(d, g, k, l).ignoreError(true).must("d"), THEN(a, b, c)); + + + + THEN(WHEN(d, g, l, a, THEN(k, b).id("z")).ignoreError(true).must("z"), c); + + \ No newline at end of file From 6eee9ce492d273cd0d8bcfcaef44d47b8bda78b8 Mon Sep 17 00:00:00 2001 From: luoyi <972849752@qq.com> Date: Thu, 14 Sep 2023 21:53:07 +0800 Subject: [PATCH 2/4] =?UTF-8?q?enhancement=20#I7XAIB=20=E8=B0=83=E6=95=B4?= =?UTF-8?q?=E5=8F=82=E6=95=B0=E5=91=BD=E5=90=8D=EF=BC=8C=E5=A2=9E=E5=8A=A0?= =?UTF-8?q?=E6=B3=A8=E9=87=8A=E8=AF=B4=E6=98=8E?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../liteflow/enums/ParallelStrategyEnum.java | 2 +- .../flow/element/condition/WhenCondition.java | 8 ++- .../strategy/AllOfParallelExecutor.java | 10 +-- .../strategy/AnyOfParallelExecutor.java | 10 +-- .../strategy/ParallelStrategyExecutor.java | 63 ++++++++++--------- .../strategy/SpecifyParallelExecutor.java | 25 ++++---- 6 files changed, 64 insertions(+), 54 deletions(-) diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/enums/ParallelStrategyEnum.java b/liteflow-core/src/main/java/com/yomahub/liteflow/enums/ParallelStrategyEnum.java index fc69b634e..09c9af437 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/enums/ParallelStrategyEnum.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/enums/ParallelStrategyEnum.java @@ -17,7 +17,7 @@ public enum ParallelStrategyEnum { ALL("allOf", "完成全部任务", AllOfParallelExecutor.class), - SPECIFY("must", "完成指定任务", SpecifyParallelExecutor.class); + SPECIFY("must", "完成指定 ID 任务", SpecifyParallelExecutor.class); private String strategyType; diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/element/condition/WhenCondition.java b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/element/condition/WhenCondition.java index f4d3df662..7422a495e 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/element/condition/WhenCondition.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/element/condition/WhenCondition.java @@ -15,6 +15,7 @@ import com.yomahub.liteflow.flow.parallel.strategy.ParallelStrategyExecutor; import com.yomahub.liteflow.flow.parallel.strategy.ParallelStrategyHelper; import com.yomahub.liteflow.log.LFLog; import com.yomahub.liteflow.log.LFLoggerManager; +import com.yomahub.liteflow.thread.ExecutorHelper; import java.util.concurrent.TimeUnit; @@ -59,13 +60,16 @@ public class WhenCondition extends Condition { return ConditionTypeEnum.TYPE_WHEN; } - // 使用线程池执行when并发流程 + // 使用线程池执行 when 并发流程 // 这块涉及到挺多的多线程逻辑,所以注释比较详细,看到这里的童鞋可以仔细阅读 private void executeAsyncCondition(Integer slotIndex) throws Exception { + // 获取并发执行策略 ParallelStrategyExecutor parallelStrategyExecutor = ParallelStrategyHelper.loadInstance().buildParallelExecutor(this.getParallelStrategy()); - // 执行逻辑 + + // 执行并发逻辑 parallelStrategyExecutor.execute(this, slotIndex); + } public boolean isIgnoreError() { diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/AllOfParallelExecutor.java b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/AllOfParallelExecutor.java index c3d6de2a4..aeb20bca8 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/AllOfParallelExecutor.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/AllOfParallelExecutor.java @@ -17,14 +17,14 @@ public class AllOfParallelExecutor extends ParallelStrategyExecutor { @Override public void execute(WhenCondition whenCondition, Integer slotIndex) throws Exception { - // 获取所有 CompletableFuture - List> completableFutureList = this.getCompletableFutureList(whenCondition, slotIndex); + // 获取所有 CompletableFuture 任务 + List> allTaskList = this.getAllTaskList(whenCondition, slotIndex); - // 把这些 CompletableFuture 通过 anyOf 合成一个 CompletableFuture - CompletableFuture resultCompletableFuture = CompletableFuture.allOf(completableFutureList.toArray(new CompletableFuture[] {})); + // 把这些 CompletableFuture 通过 allOf 合成一个 CompletableFuture,表明完成所有任务 + CompletableFuture specifyTask = CompletableFuture.allOf(allTaskList.toArray(new CompletableFuture[] {})); // 结果处理 - this.handleResult(whenCondition, slotIndex, completableFutureList, resultCompletableFuture); + this.handleTaskResult(whenCondition, slotIndex, allTaskList, specifyTask); } diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/AnyOfParallelExecutor.java b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/AnyOfParallelExecutor.java index 2115db84e..dce311fda 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/AnyOfParallelExecutor.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/AnyOfParallelExecutor.java @@ -17,14 +17,14 @@ public class AnyOfParallelExecutor extends ParallelStrategyExecutor { @Override public void execute(WhenCondition whenCondition, Integer slotIndex) throws Exception { - // 获取所有 CompletableFuture - List> completableFutureList = this.getCompletableFutureList(whenCondition, slotIndex); + // 获取所有 CompletableFuture 任务 + List> allTaskList = this.getAllTaskList(whenCondition, slotIndex); - // 把这些 CompletableFuture 通过 anyOf 合成一个 CompletableFuture - CompletableFuture resultCompletableFuture = CompletableFuture.anyOf(completableFutureList.toArray(new CompletableFuture[] {})); + // 把这些 CompletableFuture 通过 anyOf 合成一个 CompletableFuture,表明完成任一任务 + CompletableFuture specifyTask = CompletableFuture.anyOf(allTaskList.toArray(new CompletableFuture[] {})); // 结果处理 - this.handleResult(whenCondition, slotIndex, completableFutureList, resultCompletableFuture); + this.handleTaskResult(whenCondition, slotIndex, allTaskList, specifyTask); } diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/ParallelStrategyExecutor.java b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/ParallelStrategyExecutor.java index 56f8c9ba9..85524f28c 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/ParallelStrategyExecutor.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/ParallelStrategyExecutor.java @@ -2,6 +2,7 @@ package com.yomahub.liteflow.flow.parallel.strategy; import cn.hutool.core.util.ObjectUtil; import cn.hutool.core.util.StrUtil; +import com.yomahub.liteflow.enums.ParallelStrategyEnum; import com.yomahub.liteflow.exception.WhenExecuteException; import com.yomahub.liteflow.flow.element.Executable; import com.yomahub.liteflow.flow.element.condition.FinallyCondition; @@ -46,6 +47,8 @@ public abstract class ParallelStrategyExecutor { */ protected CompletableFuture wrappedFutureObj(Executable executable, ExecutorService parallelExecutor, WhenCondition whenCondition, String currChainName, Integer slotIndex) { + // 套入 CompletableFutureTimeout 方法进行超时判断,如果超时则用 WhenFutureObj.timeOut 返回超时的对象 + // 第 2 个参数是主要的本体 CompletableFuture,传入了 ParallelSupplier 和线程池对象 return CompletableFutureTimeout.completeOnTimeout( WhenFutureObj.timeOut(executable.getId()), CompletableFuture.supplyAsync(new ParallelSupplier(executable, currChainName, slotIndex), parallelExecutor), @@ -58,7 +61,7 @@ public abstract class ParallelStrategyExecutor { * @param whenCondition */ protected void setWhenConditionParams(WhenCondition whenCondition) { - // 获得liteflow的参数 + // 获得 liteflow 的参数 LiteflowConfig liteflowConfig = LiteflowConfigGetter.get(); if (ObjectUtil.isNull(whenCondition.getMaxWaitTime())) { if (ObjectUtil.isNotNull(liteflowConfig.getWhenMaxWaitSeconds())) { @@ -78,26 +81,25 @@ public abstract class ParallelStrategyExecutor { } /** - * 获取所有任务 + * 获取所有任务 CompletableFuture 集合 * @param whenCondition * @param slotIndex * @return */ - protected List> getCompletableFutureList(WhenCondition whenCondition, Integer slotIndex) { + protected List> getAllTaskList(WhenCondition whenCondition, Integer slotIndex) { + String currChainName = whenCondition.getCurrChainId(); - // 此方法其实只会初始化一次Executor,不会每次都会初始化。Executor是唯一的 + // 此方法其实只会初始化一次 Executor,不会每次都会初始化。Executor是唯一的 ExecutorService parallelExecutor = ExecutorHelper.loadInstance().buildWhenExecutor(whenCondition.getThreadExecutorClass()); - // 设置参数 + // 设置 whenCondition 参数 setWhenConditionParams(whenCondition); - // 这里主要是做了封装CompletableFuture对象,用lumbda表达式做了很多事情,这句代码要仔细理清 - // 1.先进行过滤,前置和后置组件过滤掉,因为在EL Chain处理的时候已经提出来了 - // 2.过滤isAccess为false的情况,因为不过滤这个的话,如果加上了any,那么isAccess为false那就是最快的了 - // 3.根据condition.getNodeList()的集合进行流处理,用map进行把executable对象转换成List> - // 4.在转的过程中,套入CompletableFutureTimeout方法进行超时判断,如果超时则用WhenFutureObj.timeOut返回超时的对象 - // 5.第2个参数是主要的本体CompletableFuture,传入了ParallelSupplier和线程池对象 + // 这里主要是做了封装 CompletableFuture 对象,用 lumbda 表达式做了很多事情,这句代码要仔细理清 + // 1.先进行过滤,前置和后置组件过滤掉,因为在 EL Chain 处理的时候已经提出来了 + // 2.过滤 isAccess 为 false 的情况,因为不过滤这个的话,如果加上了 any,那么 isAccess 为 false 那就是最快的了 + // 3.根据 condition.getNodeList() 的集合进行流处理,用 map 进行把 executable 对象转换成 List> List> completableFutureList = whenCondition.getExecutableList() .stream() .filter(executable -> !(executable instanceof PreCondition) && !(executable instanceof FinallyCondition)) @@ -116,33 +118,36 @@ public abstract class ParallelStrategyExecutor { } /** - * 结果处理 - * @param whenCondition - * @param slotIndex - * @param completableFutureList - * @param resultCompletableFuture + * 任务结果处理 + * @param whenCondition 并行组件对象 + * @param slotIndex 当前 slot 的 index + * @param allTaskList 并行组件中所有任务列表 + * @param specifyTask 指定预先完成的任务,详见 {@link ParallelStrategyEnum} * @throws Exception */ - protected void handleResult(WhenCondition whenCondition, Integer slotIndex, List> completableFutureList, CompletableFuture resultCompletableFuture) throws Exception { + protected void handleTaskResult(WhenCondition whenCondition, Integer slotIndex, List> allTaskList, + CompletableFuture specifyTask) throws Exception { + Slot slot = DataBus.getSlot(slotIndex); // 定义是否中断参数 - // 这里为什么要定义成数组呢,因为后面lambda要用到,根据final不能修改引用的原则,这里用了数组对象 + // 这里为什么要定义成数组呢,因为后面 lambda 要用到,根据 final 不能修改引用的原则,这里用了数组对象 final boolean[] interrupted = { false }; try { - // 进行执行,这句执行完后,就意味着所有的任务要么执行完毕,要么超时返回 - resultCompletableFuture.get(); + // 进行执行,这句执行完后有三种可能,所有任务执行完成、任一任务执行完成、指定的任务执行完成 + specifyTask.get(); } catch (InterruptedException | ExecutionException e) { LOG.error("there was an error when executing the CompletableFuture", e); interrupted[0] = true; } - // 拿到已经完成的CompletableFuture - // 如果any为false,那么所有任务都已经完成 - // 如果any为true,那么这里拿到的是第一个完成的任务 - // 这里过滤和转换一起用lumbda做了 - List allCompletableWhenFutureObjList = completableFutureList.stream().filter(f -> { + // 拿到已经完成的 CompletableFuture 对象 + // 如果 any 为 false,那么所有任务都已经完成 + // 如果 any 为 true,那么这里拿到的是第一个完成的任务 + // 如果为 must,那么这里获取到的就是指定的任务 + // 这里过滤和转换一起用 lambda 做了 + List allCompletableWhenFutureObjList = allTaskList.stream().filter(f -> { // 过滤出已经完成的,没完成的就直接终止 if (f.isDone()) { return true; @@ -159,8 +164,8 @@ public abstract class ParallelStrategyExecutor { } }).collect(Collectors.toList()); - // 判断超时,上面已经拿到了所有已经完成的CompletableFuture - // 那我们只要过滤出超时的CompletableFuture + // 判断超时,上面已经拿到了所有已经完成的 CompletableFuture + // 那我们只要过滤出超时的 CompletableFuture List timeOutWhenFutureObjList = allCompletableWhenFutureObjList.stream() .filter(WhenFutureObj::isTimeout) .collect(Collectors.toList()); @@ -169,7 +174,7 @@ public abstract class ParallelStrategyExecutor { timeOutWhenFutureObjList.forEach(whenFutureObj -> LOG.warn( "executing thread has reached max-wait-seconds, thread canceled.Execute-item: [{}]", whenFutureObj.getExecutorName())); - // 当配置了ignoreError = false,出现interrupted或者!f.get()的情况,将抛出WhenExecuteException + // 当配置了 ignoreError = false,出现 interrupted 或者 !f.get() 的情况,将抛出 WhenExecuteException if (!whenCondition.isIgnoreError()) { if (interrupted[0]) { throw new WhenExecuteException(StrUtil @@ -184,7 +189,7 @@ public abstract class ParallelStrategyExecutor { } } } else if (interrupted[0]) { - // 这里由于配置了ignoreError,所以只打印warn日志 + // 这里由于配置了 ignoreError,所以只打印 warn 日志 LOG.warn("executing when condition timeout , but ignore with errorResume."); } } diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/SpecifyParallelExecutor.java b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/SpecifyParallelExecutor.java index f763eb7d6..4ade6aa52 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/SpecifyParallelExecutor.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/SpecifyParallelExecutor.java @@ -28,9 +28,10 @@ public class SpecifyParallelExecutor extends ParallelStrategyExecutor { String currChainName = whenCondition.getCurrChainId(); + // 设置 whenCondition 参数 this.setWhenConditionParams(whenCondition); - // 此方法其实只会初始化一次Executor,不会每次都会初始化。Executor是唯一的 + // 此方法其实只会初始化一次Executor,不会每次都会初始化。Executor 是唯一的 ExecutorService parallelExecutor = ExecutorHelper.loadInstance().buildWhenExecutor(whenCondition.getThreadExecutorClass()); // 过滤指定 ID 的任务,且该任务只会有一个或者没有 @@ -47,31 +48,31 @@ public class SpecifyParallelExecutor extends ParallelStrategyExecutor { }) .collect(Collectors.partitioningBy(executable -> whenCondition.getSpecifyId().equals(executable.getId()))); - CompletableFuture resultCompletableFuture = null; + CompletableFuture specifyTask = null; - // 处理非指定 task - List> completableFutureList = specifyExecutableMap.get(Boolean.FALSE) + // 处理非指定 task,封装成 CompletableFuture 对象,最终仍是会组合所有任务到集合中 + List> allTaskList = specifyExecutableMap.get(Boolean.FALSE) .stream() .map(executable -> wrappedFutureObj(executable, parallelExecutor, whenCondition, currChainName, slotIndex)) .collect(Collectors.toList()); - if (specifyExecutableMap.containsKey(Boolean.TRUE) && CollUtil.isNotEmpty(specifyExecutableMap.get(Boolean.TRUE))) { + if (CollUtil.isNotEmpty(specifyExecutableMap.get(Boolean.TRUE))) { // 存在 must 指定的 task CompletableFuture specifyCompletableFuture = wrappedFutureObj(specifyExecutableMap.get(Boolean.TRUE).get(0), parallelExecutor, whenCondition, currChainName, slotIndex); // 组合所有任务 - completableFutureList.add(specifyCompletableFuture); - // 设置结果 future - resultCompletableFuture = specifyCompletableFuture; + allTaskList.add(specifyCompletableFuture); + // 设置指定任务 future 对象 + specifyTask = specifyCompletableFuture; } - if (ObjUtil.isNull(resultCompletableFuture)) { + if (ObjUtil.isNull(specifyTask)) { LOG.warn("The specified task[{}] was not found, waiting for all tasks to complete by default.", whenCondition.getSpecifyId()); - // 不存在指定任务,则所有任务都执行 - resultCompletableFuture = CompletableFuture.allOf(completableFutureList.toArray(new CompletableFuture[] {})); + // 不存在指定任务,则需要等待所有任务都执行完成 + specifyTask = CompletableFuture.allOf(allTaskList.toArray(new CompletableFuture[] {})); } // 结果处理 - this.handleResult(whenCondition, slotIndex, completableFutureList, resultCompletableFuture); + this.handleTaskResult(whenCondition, slotIndex, allTaskList, specifyTask); } From 9fbb615e92a471d876eea2df9ddbd5a0d6fbdc7b Mon Sep 17 00:00:00 2001 From: luoyi <972849752@qq.com> Date: Thu, 14 Sep 2023 22:35:26 +0800 Subject: [PATCH 3/4] =?UTF-8?q?enhancement=20#I7XAIB=20=E7=AE=80=E5=8C=96?= =?UTF-8?q?=20SpecifyParallelExecutor=20=E6=89=A7=E8=A1=8C=E9=80=BB?= =?UTF-8?q?=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../strategy/SpecifyParallelExecutor.java | 50 +++++++++---------- 1 file changed, 23 insertions(+), 27 deletions(-) diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/SpecifyParallelExecutor.java b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/SpecifyParallelExecutor.java index 4ade6aa52..9474232d4 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/SpecifyParallelExecutor.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/SpecifyParallelExecutor.java @@ -1,19 +1,16 @@ package com.yomahub.liteflow.flow.parallel.strategy; -import cn.hutool.core.collection.CollUtil; import cn.hutool.core.util.ObjUtil; -import com.yomahub.liteflow.flow.element.Executable; import com.yomahub.liteflow.flow.element.condition.FinallyCondition; import com.yomahub.liteflow.flow.element.condition.PreCondition; import com.yomahub.liteflow.flow.element.condition.WhenCondition; import com.yomahub.liteflow.flow.parallel.WhenFutureObj; import com.yomahub.liteflow.thread.ExecutorHelper; +import java.util.ArrayList; import java.util.List; -import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; -import java.util.stream.Collectors; /** * 完成指定任务执行器,使用 ID 进行比较 @@ -34,8 +31,14 @@ public class SpecifyParallelExecutor extends ParallelStrategyExecutor { // 此方法其实只会初始化一次Executor,不会每次都会初始化。Executor 是唯一的 ExecutorService parallelExecutor = ExecutorHelper.loadInstance().buildWhenExecutor(whenCondition.getThreadExecutorClass()); - // 过滤指定 ID 的任务,且该任务只会有一个或者没有 - Map> specifyExecutableMap = whenCondition.getExecutableList() + // 指定任务 + final CompletableFuture[] specifyTask = { null }; + + // 所有任务集合 + List> allTaskList = new ArrayList<>(); + + // 遍历 when 所有 node,进行筛选及处理 + whenCondition.getExecutableList() .stream() .filter(executable -> !(executable instanceof PreCondition) && !(executable instanceof FinallyCondition)) .filter(executable -> { @@ -46,33 +49,26 @@ public class SpecifyParallelExecutor extends ParallelStrategyExecutor { return false; } }) - .collect(Collectors.partitioningBy(executable -> whenCondition.getSpecifyId().equals(executable.getId()))); + .forEach(executable -> { + // 处理 task,封装成 CompletableFuture 对象 + CompletableFuture completableFutureTask = wrappedFutureObj(executable, parallelExecutor, whenCondition, currChainName, slotIndex); + // 存在 must 指定 ID 的 task,且该任务只会有一个或者没有 + if (whenCondition.getSpecifyId().equals(executable.getId())) { + // 设置指定任务 future 对象 + specifyTask[0] = completableFutureTask; + } + // 组合所有任务 + allTaskList.add(completableFutureTask); + }); - CompletableFuture specifyTask = null; - - // 处理非指定 task,封装成 CompletableFuture 对象,最终仍是会组合所有任务到集合中 - List> allTaskList = specifyExecutableMap.get(Boolean.FALSE) - .stream() - .map(executable -> wrappedFutureObj(executable, parallelExecutor, whenCondition, currChainName, slotIndex)) - .collect(Collectors.toList()); - - if (CollUtil.isNotEmpty(specifyExecutableMap.get(Boolean.TRUE))) { - // 存在 must 指定的 task - CompletableFuture specifyCompletableFuture = wrappedFutureObj(specifyExecutableMap.get(Boolean.TRUE).get(0), parallelExecutor, whenCondition, currChainName, slotIndex); - // 组合所有任务 - allTaskList.add(specifyCompletableFuture); - // 设置指定任务 future 对象 - specifyTask = specifyCompletableFuture; - } - - if (ObjUtil.isNull(specifyTask)) { + if (ObjUtil.isNull(specifyTask[0])) { LOG.warn("The specified task[{}] was not found, waiting for all tasks to complete by default.", whenCondition.getSpecifyId()); // 不存在指定任务,则需要等待所有任务都执行完成 - specifyTask = CompletableFuture.allOf(allTaskList.toArray(new CompletableFuture[] {})); + specifyTask[0] = CompletableFuture.allOf(allTaskList.toArray(new CompletableFuture[] {})); } // 结果处理 - this.handleTaskResult(whenCondition, slotIndex, allTaskList, specifyTask); + this.handleTaskResult(whenCondition, slotIndex, allTaskList, specifyTask[0]); } From d7c6383b09d5aecc8b22c33385b247a8c7d56138 Mon Sep 17 00:00:00 2001 From: luoyi <972849752@qq.com> Date: Mon, 18 Sep 2023 20:58:25 +0800 Subject: [PATCH 4/4] =?UTF-8?q?enhancement=20#I7XAIB=20=E8=B0=83=E6=95=B4?= =?UTF-8?q?=20When=20=E4=BB=BB=E5=8A=A1=E5=91=BD=E5=90=8D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../flow/parallel/strategy/AllOfParallelExecutor.java | 6 +++--- .../flow/parallel/strategy/AnyOfParallelExecutor.java | 6 +++--- .../flow/parallel/strategy/ParallelStrategyExecutor.java | 8 ++++---- 3 files changed, 10 insertions(+), 10 deletions(-) diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/AllOfParallelExecutor.java b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/AllOfParallelExecutor.java index aeb20bca8..313e02d4d 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/AllOfParallelExecutor.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/AllOfParallelExecutor.java @@ -18,13 +18,13 @@ public class AllOfParallelExecutor extends ParallelStrategyExecutor { public void execute(WhenCondition whenCondition, Integer slotIndex) throws Exception { // 获取所有 CompletableFuture 任务 - List> allTaskList = this.getAllTaskList(whenCondition, slotIndex); + List> whenAllTaskList = this.getWhenAllTaskList(whenCondition, slotIndex); // 把这些 CompletableFuture 通过 allOf 合成一个 CompletableFuture,表明完成所有任务 - CompletableFuture specifyTask = CompletableFuture.allOf(allTaskList.toArray(new CompletableFuture[] {})); + CompletableFuture specifyTask = CompletableFuture.allOf(whenAllTaskList.toArray(new CompletableFuture[] {})); // 结果处理 - this.handleTaskResult(whenCondition, slotIndex, allTaskList, specifyTask); + this.handleTaskResult(whenCondition, slotIndex, whenAllTaskList, specifyTask); } diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/AnyOfParallelExecutor.java b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/AnyOfParallelExecutor.java index dce311fda..02fcc4646 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/AnyOfParallelExecutor.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/AnyOfParallelExecutor.java @@ -18,13 +18,13 @@ public class AnyOfParallelExecutor extends ParallelStrategyExecutor { public void execute(WhenCondition whenCondition, Integer slotIndex) throws Exception { // 获取所有 CompletableFuture 任务 - List> allTaskList = this.getAllTaskList(whenCondition, slotIndex); + List> whenAllTaskList = this.getWhenAllTaskList(whenCondition, slotIndex); // 把这些 CompletableFuture 通过 anyOf 合成一个 CompletableFuture,表明完成任一任务 - CompletableFuture specifyTask = CompletableFuture.anyOf(allTaskList.toArray(new CompletableFuture[] {})); + CompletableFuture specifyTask = CompletableFuture.anyOf(whenAllTaskList.toArray(new CompletableFuture[] {})); // 结果处理 - this.handleTaskResult(whenCondition, slotIndex, allTaskList, specifyTask); + this.handleTaskResult(whenCondition, slotIndex, whenAllTaskList, specifyTask); } diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/ParallelStrategyExecutor.java b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/ParallelStrategyExecutor.java index 85524f28c..95adb04ee 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/ParallelStrategyExecutor.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/ParallelStrategyExecutor.java @@ -86,7 +86,7 @@ public abstract class ParallelStrategyExecutor { * @param slotIndex * @return */ - protected List> getAllTaskList(WhenCondition whenCondition, Integer slotIndex) { + protected List> getWhenAllTaskList(WhenCondition whenCondition, Integer slotIndex) { String currChainName = whenCondition.getCurrChainId(); @@ -121,11 +121,11 @@ public abstract class ParallelStrategyExecutor { * 任务结果处理 * @param whenCondition 并行组件对象 * @param slotIndex 当前 slot 的 index - * @param allTaskList 并行组件中所有任务列表 + * @param whenAllTaskList 并行组件中所有任务列表 * @param specifyTask 指定预先完成的任务,详见 {@link ParallelStrategyEnum} * @throws Exception */ - protected void handleTaskResult(WhenCondition whenCondition, Integer slotIndex, List> allTaskList, + protected void handleTaskResult(WhenCondition whenCondition, Integer slotIndex, List> whenAllTaskList, CompletableFuture specifyTask) throws Exception { Slot slot = DataBus.getSlot(slotIndex); @@ -147,7 +147,7 @@ public abstract class ParallelStrategyExecutor { // 如果 any 为 true,那么这里拿到的是第一个完成的任务 // 如果为 must,那么这里获取到的就是指定的任务 // 这里过滤和转换一起用 lambda 做了 - List allCompletableWhenFutureObjList = allTaskList.stream().filter(f -> { + List allCompletableWhenFutureObjList = whenAllTaskList.stream().filter(f -> { // 过滤出已经完成的,没完成的就直接终止 if (f.isDone()) { return true;