enhancement #I7XAIB 简化 SpecifyParallelExecutor 执行逻辑

This commit is contained in:
luoyi
2023-09-14 22:35:26 +08:00
parent 6eee9ce492
commit 9fbb615e92

View File

@@ -1,19 +1,16 @@
package com.yomahub.liteflow.flow.parallel.strategy;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.ObjUtil;
import com.yomahub.liteflow.flow.element.Executable;
import com.yomahub.liteflow.flow.element.condition.FinallyCondition;
import com.yomahub.liteflow.flow.element.condition.PreCondition;
import com.yomahub.liteflow.flow.element.condition.WhenCondition;
import com.yomahub.liteflow.flow.parallel.WhenFutureObj;
import com.yomahub.liteflow.thread.ExecutorHelper;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
/**
* 完成指定任务执行器,使用 ID 进行比较
@@ -34,8 +31,14 @@ public class SpecifyParallelExecutor extends ParallelStrategyExecutor {
// 此方法其实只会初始化一次Executor不会每次都会初始化。Executor 是唯一的
ExecutorService parallelExecutor = ExecutorHelper.loadInstance().buildWhenExecutor(whenCondition.getThreadExecutorClass());
// 过滤指定 ID 的任务,且该任务只会有一个或者没有
Map<Boolean, List<Executable>> specifyExecutableMap = whenCondition.getExecutableList()
// 指定任务
final CompletableFuture<?>[] specifyTask = { null };
// 所有任务集合
List<CompletableFuture<WhenFutureObj>> allTaskList = new ArrayList<>();
// 遍历 when 所有 node进行筛选及处理
whenCondition.getExecutableList()
.stream()
.filter(executable -> !(executable instanceof PreCondition) && !(executable instanceof FinallyCondition))
.filter(executable -> {
@@ -46,33 +49,26 @@ public class SpecifyParallelExecutor extends ParallelStrategyExecutor {
return false;
}
})
.collect(Collectors.partitioningBy(executable -> whenCondition.getSpecifyId().equals(executable.getId())));
.forEach(executable -> {
// 处理 task封装成 CompletableFuture 对象
CompletableFuture<WhenFutureObj> completableFutureTask = wrappedFutureObj(executable, parallelExecutor, whenCondition, currChainName, slotIndex);
// 存在 must 指定 ID 的 task且该任务只会有一个或者没有
if (whenCondition.getSpecifyId().equals(executable.getId())) {
// 设置指定任务 future 对象
specifyTask[0] = completableFutureTask;
}
// 组合所有任务
allTaskList.add(completableFutureTask);
});
CompletableFuture<?> specifyTask = null;
// 处理非指定 task封装成 CompletableFuture 对象,最终仍是会组合所有任务到集合中
List<CompletableFuture<WhenFutureObj>> allTaskList = specifyExecutableMap.get(Boolean.FALSE)
.stream()
.map(executable -> wrappedFutureObj(executable, parallelExecutor, whenCondition, currChainName, slotIndex))
.collect(Collectors.toList());
if (CollUtil.isNotEmpty(specifyExecutableMap.get(Boolean.TRUE))) {
// 存在 must 指定的 task
CompletableFuture<WhenFutureObj> specifyCompletableFuture = wrappedFutureObj(specifyExecutableMap.get(Boolean.TRUE).get(0), parallelExecutor, whenCondition, currChainName, slotIndex);
// 组合所有任务
allTaskList.add(specifyCompletableFuture);
// 设置指定任务 future 对象
specifyTask = specifyCompletableFuture;
}
if (ObjUtil.isNull(specifyTask)) {
if (ObjUtil.isNull(specifyTask[0])) {
LOG.warn("The specified task[{}] was not found, waiting for all tasks to complete by default.", whenCondition.getSpecifyId());
// 不存在指定任务,则需要等待所有任务都执行完成
specifyTask = CompletableFuture.allOf(allTaskList.toArray(new CompletableFuture[] {}));
specifyTask[0] = CompletableFuture.allOf(allTaskList.toArray(new CompletableFuture[] {}));
}
// 结果处理
this.handleTaskResult(whenCondition, slotIndex, allTaskList, specifyTask);
this.handleTaskResult(whenCondition, slotIndex, allTaskList, specifyTask[0]);
}