!232 WHEN 的 must 语法可指定多个任务

Merge pull request !232 from luoyi/issues/I84XNE
This commit is contained in:
铂赛东
2023-09-28 09:15:35 +00:00
committed by Gitee
5 changed files with 57 additions and 19 deletions

View File

@@ -1,10 +1,15 @@
package com.yomahub.liteflow.builder.el.operator;
import cn.hutool.core.util.StrUtil;
import com.yomahub.liteflow.builder.el.operator.base.BaseOperator;
import com.yomahub.liteflow.builder.el.operator.base.OperatorHelper;
import com.yomahub.liteflow.enums.ParallelStrategyEnum;
import com.yomahub.liteflow.flow.element.condition.WhenCondition;
import java.util.Arrays;
import java.util.Set;
import java.util.stream.Collectors;
/**
* EL 规则中的 must 的操作符
*
@@ -19,8 +24,12 @@ public class MustOperator extends BaseOperator<WhenCondition> {
WhenCondition whenCondition = OperatorHelper.convert(objects[0], WhenCondition.class);
String specifyId = OperatorHelper.convert(objects[1], String.class);
whenCondition.setSpecifyId(specifyId);
String specifyIds = OperatorHelper.convert(objects[1], String.class);
// 解析指定完成的任务 ID 集合
Set<String> specifyIdSet = Arrays.stream(specifyIds.replace(StrUtil.SPACE, StrUtil.EMPTY).split(",")).collect(Collectors.toSet());
whenCondition.setSpecifyIdSet(specifyIdSet);
whenCondition.setParallelStrategy(ParallelStrategyEnum.SPECIFY);
return whenCondition;
}

View File

@@ -17,6 +17,7 @@ import com.yomahub.liteflow.log.LFLog;
import com.yomahub.liteflow.log.LFLoggerManager;
import com.yomahub.liteflow.thread.ExecutorHelper;
import java.util.Set;
import java.util.concurrent.TimeUnit;
/**
@@ -38,8 +39,8 @@ public class WhenCondition extends Condition {
// 当前 When 对应并行策略,默认为 ALL
private ParallelStrategyEnum parallelStrategy;
// 只有 must 条件下,才会赋值 specifyId
private String specifyId;
// 只有 must 条件下,才会赋值 specifyIdSet
private Set<String> specifyIdSet;
// when单独的线程池名称
private String threadExecutorClass;
@@ -96,12 +97,12 @@ public class WhenCondition extends Condition {
this.parallelStrategy = parallelStrategy;
}
public String getSpecifyId() {
return specifyId;
public Set<String> getSpecifyIdSet() {
return specifyIdSet;
}
public void setSpecifyId(String specifyId) {
this.specifyId = specifyId;
public void setSpecifyIdSet(Set<String> specifyIdSet) {
this.specifyIdSet = specifyIdSet;
}
public String getThreadExecutorClass() {

View File

@@ -1,14 +1,13 @@
package com.yomahub.liteflow.flow.parallel.strategy;
import cn.hutool.core.util.ObjUtil;
import cn.hutool.core.collection.CollUtil;
import com.yomahub.liteflow.flow.element.condition.FinallyCondition;
import com.yomahub.liteflow.flow.element.condition.PreCondition;
import com.yomahub.liteflow.flow.element.condition.WhenCondition;
import com.yomahub.liteflow.flow.parallel.WhenFutureObj;
import com.yomahub.liteflow.thread.ExecutorHelper;
import java.util.ArrayList;
import java.util.List;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
@@ -31,8 +30,14 @@ public class SpecifyParallelExecutor extends ParallelStrategyExecutor {
// 此方法其实只会初始化一次Executor不会每次都会初始化。Executor 是唯一的
ExecutorService parallelExecutor = ExecutorHelper.loadInstance().buildWhenExecutor(whenCondition.getThreadExecutorClass());
// 指定任务
final CompletableFuture<?>[] specifyTask = { null };
// 指定完成的任务
CompletableFuture<?> specifyTask;
// 已存在的任务 ID 集合
Set<String> exitingTaskIdSet = new HashSet<>();
// 指定任务列表,可以为 0 或者多个
List<CompletableFuture<?>> specifyTaskList = new ArrayList<>();
// 所有任务集合
List<CompletableFuture<WhenFutureObj>> allTaskList = new ArrayList<>();
@@ -53,22 +58,32 @@ public class SpecifyParallelExecutor extends ParallelStrategyExecutor {
// 处理 task封装成 CompletableFuture 对象
CompletableFuture<WhenFutureObj> completableFutureTask = wrappedFutureObj(executable, parallelExecutor, whenCondition, currChainName, slotIndex);
// 存在 must 指定 ID 的 task且该任务只会有一个或者没有
if (whenCondition.getSpecifyId().equals(executable.getId())) {
if (whenCondition.getSpecifyIdSet().contains(executable.getId())) {
// 设置指定任务 future 对象
specifyTask[0] = completableFutureTask;
specifyTaskList.add(completableFutureTask);
// 记录已存在的任务 ID
exitingTaskIdSet.add(executable.getId());
}
// 组合所有任务
allTaskList.add(completableFutureTask);
});
if (ObjUtil.isNull(specifyTask[0])) {
LOG.warn("The specified task[{}] was not found, waiting for all tasks to complete by default.", whenCondition.getSpecifyId());
if (CollUtil.isEmpty(specifyTaskList)) {
LOG.warn("The specified task{} was not found, waiting for all tasks to complete by default.", whenCondition.getSpecifyIdSet());
// 不存在指定任务,则需要等待所有任务都执行完成
specifyTask[0] = CompletableFuture.allOf(allTaskList.toArray(new CompletableFuture[] {}));
specifyTask = CompletableFuture.allOf(allTaskList.toArray(new CompletableFuture[] {}));
} else {
// 判断 specifyIdSet 中有哪些任务是不存在的,给出提示
Collection<String> absentTaskIdSet = CollUtil.subtract(whenCondition.getSpecifyIdSet(), exitingTaskIdSet);
if (CollUtil.isNotEmpty(absentTaskIdSet)) {
LOG.warn("The specified task{} was not found, you need to define and register it.", absentTaskIdSet);
}
// 将指定要完成的任务通过 allOf 合成一个 CompletableFuture表示需要等待 must 方法里面所有任务完成
specifyTask = CompletableFuture.allOf(specifyTaskList.toArray(new CompletableFuture[]{}));
}
// 结果处理
this.handleTaskResult(whenCondition, slotIndex, allTaskList, specifyTask[0]);
this.handleTaskResult(whenCondition, slotIndex, allTaskList, specifyTask);
}

View File

@@ -175,4 +175,13 @@ public class AsyncNodeELSpringbootTest extends BaseTest {
Assertions.assertTrue(context.getData("check").toString().startsWith("akbc"));
}
// 测试 must 指定多个任务, ignoreError 以及 id 关键字
@Test
public void testAsyncFlow13() throws Exception {
LiteflowResponse response = flowExecutor.execute2Resp("chain13", "it's a base request");
DefaultContext context = response.getFirstContextBean();
Assertions.assertTrue(response.isSuccess());
Assertions.assertTrue(context.getData("check").toString().startsWith("akbgc"));
}
}

View File

@@ -62,4 +62,8 @@
THEN(WHEN(d, g, l, a, THEN(k, b).id("z")).ignoreError(true).must("z"), c);
</chain>
<chain name="chain13">
THEN(WHEN(d, g, l, a, THEN(k, b).id("z")).ignoreError(true).must("z, g, task1, task2"), c);
</chain>
</flow>