diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/builder/el/operator/MustOperator.java b/liteflow-core/src/main/java/com/yomahub/liteflow/builder/el/operator/MustOperator.java index cb018c529..e859cd8bc 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/builder/el/operator/MustOperator.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/builder/el/operator/MustOperator.java @@ -1,10 +1,15 @@ package com.yomahub.liteflow.builder.el.operator; +import cn.hutool.core.util.StrUtil; import com.yomahub.liteflow.builder.el.operator.base.BaseOperator; import com.yomahub.liteflow.builder.el.operator.base.OperatorHelper; import com.yomahub.liteflow.enums.ParallelStrategyEnum; import com.yomahub.liteflow.flow.element.condition.WhenCondition; +import java.util.Arrays; +import java.util.Set; +import java.util.stream.Collectors; + /** * EL 规则中的 must 的操作符 * @@ -19,8 +24,12 @@ public class MustOperator extends BaseOperator { WhenCondition whenCondition = OperatorHelper.convert(objects[0], WhenCondition.class); - String specifyId = OperatorHelper.convert(objects[1], String.class); - whenCondition.setSpecifyId(specifyId); + String specifyIds = OperatorHelper.convert(objects[1], String.class); + + // 解析指定完成的任务 ID 集合 + Set specifyIdSet = Arrays.stream(specifyIds.replace(StrUtil.SPACE, StrUtil.EMPTY).split(",")).collect(Collectors.toSet()); + + whenCondition.setSpecifyIdSet(specifyIdSet); whenCondition.setParallelStrategy(ParallelStrategyEnum.SPECIFY); return whenCondition; } diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/element/condition/WhenCondition.java b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/element/condition/WhenCondition.java index 7422a495e..15eb31cbd 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/element/condition/WhenCondition.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/element/condition/WhenCondition.java @@ -17,6 +17,7 @@ import com.yomahub.liteflow.log.LFLog; import com.yomahub.liteflow.log.LFLoggerManager; import com.yomahub.liteflow.thread.ExecutorHelper; +import java.util.Set; import java.util.concurrent.TimeUnit; /** @@ -38,8 +39,8 @@ public class WhenCondition extends Condition { // 当前 When 对应并行策略,默认为 ALL private ParallelStrategyEnum parallelStrategy; - // 只有 must 条件下,才会赋值 specifyId - private String specifyId; + // 只有 must 条件下,才会赋值 specifyIdSet + private Set specifyIdSet; // when单独的线程池名称 private String threadExecutorClass; @@ -96,12 +97,12 @@ public class WhenCondition extends Condition { this.parallelStrategy = parallelStrategy; } - public String getSpecifyId() { - return specifyId; + public Set getSpecifyIdSet() { + return specifyIdSet; } - public void setSpecifyId(String specifyId) { - this.specifyId = specifyId; + public void setSpecifyIdSet(Set specifyIdSet) { + this.specifyIdSet = specifyIdSet; } public String getThreadExecutorClass() { diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/SpecifyParallelExecutor.java b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/SpecifyParallelExecutor.java index 9474232d4..e397608ec 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/SpecifyParallelExecutor.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/SpecifyParallelExecutor.java @@ -1,14 +1,13 @@ package com.yomahub.liteflow.flow.parallel.strategy; -import cn.hutool.core.util.ObjUtil; +import cn.hutool.core.collection.CollUtil; import com.yomahub.liteflow.flow.element.condition.FinallyCondition; import com.yomahub.liteflow.flow.element.condition.PreCondition; import com.yomahub.liteflow.flow.element.condition.WhenCondition; import com.yomahub.liteflow.flow.parallel.WhenFutureObj; import com.yomahub.liteflow.thread.ExecutorHelper; -import java.util.ArrayList; -import java.util.List; +import java.util.*; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; @@ -31,8 +30,14 @@ public class SpecifyParallelExecutor extends ParallelStrategyExecutor { // 此方法其实只会初始化一次Executor,不会每次都会初始化。Executor 是唯一的 ExecutorService parallelExecutor = ExecutorHelper.loadInstance().buildWhenExecutor(whenCondition.getThreadExecutorClass()); - // 指定任务 - final CompletableFuture[] specifyTask = { null }; + // 指定完成的任务 + CompletableFuture specifyTask; + + // 已存在的任务 ID 集合 + Set exitingTaskIdSet = new HashSet<>(); + + // 指定任务列表,可以为 0 或者多个 + List> specifyTaskList = new ArrayList<>(); // 所有任务集合 List> allTaskList = new ArrayList<>(); @@ -53,22 +58,32 @@ public class SpecifyParallelExecutor extends ParallelStrategyExecutor { // 处理 task,封装成 CompletableFuture 对象 CompletableFuture completableFutureTask = wrappedFutureObj(executable, parallelExecutor, whenCondition, currChainName, slotIndex); // 存在 must 指定 ID 的 task,且该任务只会有一个或者没有 - if (whenCondition.getSpecifyId().equals(executable.getId())) { + if (whenCondition.getSpecifyIdSet().contains(executable.getId())) { // 设置指定任务 future 对象 - specifyTask[0] = completableFutureTask; + specifyTaskList.add(completableFutureTask); + // 记录已存在的任务 ID + exitingTaskIdSet.add(executable.getId()); } // 组合所有任务 allTaskList.add(completableFutureTask); }); - if (ObjUtil.isNull(specifyTask[0])) { - LOG.warn("The specified task[{}] was not found, waiting for all tasks to complete by default.", whenCondition.getSpecifyId()); + if (CollUtil.isEmpty(specifyTaskList)) { + LOG.warn("The specified task{} was not found, waiting for all tasks to complete by default.", whenCondition.getSpecifyIdSet()); // 不存在指定任务,则需要等待所有任务都执行完成 - specifyTask[0] = CompletableFuture.allOf(allTaskList.toArray(new CompletableFuture[] {})); + specifyTask = CompletableFuture.allOf(allTaskList.toArray(new CompletableFuture[] {})); + } else { + // 判断 specifyIdSet 中有哪些任务是不存在的,给出提示 + Collection absentTaskIdSet = CollUtil.subtract(whenCondition.getSpecifyIdSet(), exitingTaskIdSet); + if (CollUtil.isNotEmpty(absentTaskIdSet)) { + LOG.warn("The specified task{} was not found, you need to define and register it.", absentTaskIdSet); + } + // 将指定要完成的任务通过 allOf 合成一个 CompletableFuture,表示需要等待 must 方法里面所有任务完成 + specifyTask = CompletableFuture.allOf(specifyTaskList.toArray(new CompletableFuture[]{})); } // 结果处理 - this.handleTaskResult(whenCondition, slotIndex, allTaskList, specifyTask[0]); + this.handleTaskResult(whenCondition, slotIndex, allTaskList, specifyTask); } diff --git a/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/java/com/yomahub/liteflow/test/asyncNode/AsyncNodeELSpringbootTest.java b/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/java/com/yomahub/liteflow/test/asyncNode/AsyncNodeELSpringbootTest.java index 99f9e1d97..541a2fe38 100644 --- a/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/java/com/yomahub/liteflow/test/asyncNode/AsyncNodeELSpringbootTest.java +++ b/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/java/com/yomahub/liteflow/test/asyncNode/AsyncNodeELSpringbootTest.java @@ -175,4 +175,13 @@ public class AsyncNodeELSpringbootTest extends BaseTest { Assertions.assertTrue(context.getData("check").toString().startsWith("akbc")); } + // 测试 must 指定多个任务, ignoreError 以及 id 关键字 + @Test + public void testAsyncFlow13() throws Exception { + LiteflowResponse response = flowExecutor.execute2Resp("chain13", "it's a base request"); + DefaultContext context = response.getFirstContextBean(); + Assertions.assertTrue(response.isSuccess()); + Assertions.assertTrue(context.getData("check").toString().startsWith("akbgc")); + } + } diff --git a/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/resources/asyncNode/flow.el.xml b/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/resources/asyncNode/flow.el.xml index e48bf988e..79523e4e5 100644 --- a/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/resources/asyncNode/flow.el.xml +++ b/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/resources/asyncNode/flow.el.xml @@ -62,4 +62,8 @@ THEN(WHEN(d, g, l, a, THEN(k, b).id("z")).ignoreError(true).must("z"), c); + + THEN(WHEN(d, g, l, a, THEN(k, b).id("z")).ignoreError(true).must("z, g, task1, task2"), c); + + \ No newline at end of file