mirror of
https://gitee.com/dromara/liteFlow.git
synced 2026-05-14 20:22:07 +08:00
enhancement #I7XAIB 调整参数命名,增加注释说明
This commit is contained in:
@@ -17,7 +17,7 @@ public enum ParallelStrategyEnum {
|
||||
|
||||
ALL("allOf", "完成全部任务", AllOfParallelExecutor.class),
|
||||
|
||||
SPECIFY("must", "完成指定任务", SpecifyParallelExecutor.class);
|
||||
SPECIFY("must", "完成指定 ID 任务", SpecifyParallelExecutor.class);
|
||||
|
||||
private String strategyType;
|
||||
|
||||
|
||||
@@ -15,6 +15,7 @@ import com.yomahub.liteflow.flow.parallel.strategy.ParallelStrategyExecutor;
|
||||
import com.yomahub.liteflow.flow.parallel.strategy.ParallelStrategyHelper;
|
||||
import com.yomahub.liteflow.log.LFLog;
|
||||
import com.yomahub.liteflow.log.LFLoggerManager;
|
||||
import com.yomahub.liteflow.thread.ExecutorHelper;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
@@ -59,13 +60,16 @@ public class WhenCondition extends Condition {
|
||||
return ConditionTypeEnum.TYPE_WHEN;
|
||||
}
|
||||
|
||||
// 使用线程池执行when并发流程
|
||||
// 使用线程池执行 when 并发流程
|
||||
// 这块涉及到挺多的多线程逻辑,所以注释比较详细,看到这里的童鞋可以仔细阅读
|
||||
private void executeAsyncCondition(Integer slotIndex) throws Exception {
|
||||
|
||||
// 获取并发执行策略
|
||||
ParallelStrategyExecutor parallelStrategyExecutor = ParallelStrategyHelper.loadInstance().buildParallelExecutor(this.getParallelStrategy());
|
||||
// 执行逻辑
|
||||
|
||||
// 执行并发逻辑
|
||||
parallelStrategyExecutor.execute(this, slotIndex);
|
||||
|
||||
}
|
||||
|
||||
public boolean isIgnoreError() {
|
||||
|
||||
@@ -17,14 +17,14 @@ public class AllOfParallelExecutor extends ParallelStrategyExecutor {
|
||||
@Override
|
||||
public void execute(WhenCondition whenCondition, Integer slotIndex) throws Exception {
|
||||
|
||||
// 获取所有 CompletableFuture
|
||||
List<CompletableFuture<WhenFutureObj>> completableFutureList = this.getCompletableFutureList(whenCondition, slotIndex);
|
||||
// 获取所有 CompletableFuture 任务
|
||||
List<CompletableFuture<WhenFutureObj>> allTaskList = this.getAllTaskList(whenCondition, slotIndex);
|
||||
|
||||
// 把这些 CompletableFuture 通过 anyOf 合成一个 CompletableFuture
|
||||
CompletableFuture<?> resultCompletableFuture = CompletableFuture.allOf(completableFutureList.toArray(new CompletableFuture[] {}));
|
||||
// 把这些 CompletableFuture 通过 allOf 合成一个 CompletableFuture,表明完成所有任务
|
||||
CompletableFuture<?> specifyTask = CompletableFuture.allOf(allTaskList.toArray(new CompletableFuture[] {}));
|
||||
|
||||
// 结果处理
|
||||
this.handleResult(whenCondition, slotIndex, completableFutureList, resultCompletableFuture);
|
||||
this.handleTaskResult(whenCondition, slotIndex, allTaskList, specifyTask);
|
||||
|
||||
}
|
||||
|
||||
|
||||
@@ -17,14 +17,14 @@ public class AnyOfParallelExecutor extends ParallelStrategyExecutor {
|
||||
@Override
|
||||
public void execute(WhenCondition whenCondition, Integer slotIndex) throws Exception {
|
||||
|
||||
// 获取所有 CompletableFuture
|
||||
List<CompletableFuture<WhenFutureObj>> completableFutureList = this.getCompletableFutureList(whenCondition, slotIndex);
|
||||
// 获取所有 CompletableFuture 任务
|
||||
List<CompletableFuture<WhenFutureObj>> allTaskList = this.getAllTaskList(whenCondition, slotIndex);
|
||||
|
||||
// 把这些 CompletableFuture 通过 anyOf 合成一个 CompletableFuture
|
||||
CompletableFuture<?> resultCompletableFuture = CompletableFuture.anyOf(completableFutureList.toArray(new CompletableFuture[] {}));
|
||||
// 把这些 CompletableFuture 通过 anyOf 合成一个 CompletableFuture,表明完成任一任务
|
||||
CompletableFuture<?> specifyTask = CompletableFuture.anyOf(allTaskList.toArray(new CompletableFuture[] {}));
|
||||
|
||||
// 结果处理
|
||||
this.handleResult(whenCondition, slotIndex, completableFutureList, resultCompletableFuture);
|
||||
this.handleTaskResult(whenCondition, slotIndex, allTaskList, specifyTask);
|
||||
|
||||
}
|
||||
|
||||
|
||||
@@ -2,6 +2,7 @@ package com.yomahub.liteflow.flow.parallel.strategy;
|
||||
|
||||
import cn.hutool.core.util.ObjectUtil;
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import com.yomahub.liteflow.enums.ParallelStrategyEnum;
|
||||
import com.yomahub.liteflow.exception.WhenExecuteException;
|
||||
import com.yomahub.liteflow.flow.element.Executable;
|
||||
import com.yomahub.liteflow.flow.element.condition.FinallyCondition;
|
||||
@@ -46,6 +47,8 @@ public abstract class ParallelStrategyExecutor {
|
||||
*/
|
||||
protected CompletableFuture<WhenFutureObj> wrappedFutureObj(Executable executable, ExecutorService parallelExecutor,
|
||||
WhenCondition whenCondition, String currChainName, Integer slotIndex) {
|
||||
// 套入 CompletableFutureTimeout 方法进行超时判断,如果超时则用 WhenFutureObj.timeOut 返回超时的对象
|
||||
// 第 2 个参数是主要的本体 CompletableFuture,传入了 ParallelSupplier 和线程池对象
|
||||
return CompletableFutureTimeout.completeOnTimeout(
|
||||
WhenFutureObj.timeOut(executable.getId()),
|
||||
CompletableFuture.supplyAsync(new ParallelSupplier(executable, currChainName, slotIndex), parallelExecutor),
|
||||
@@ -58,7 +61,7 @@ public abstract class ParallelStrategyExecutor {
|
||||
* @param whenCondition
|
||||
*/
|
||||
protected void setWhenConditionParams(WhenCondition whenCondition) {
|
||||
// 获得liteflow的参数
|
||||
// 获得 liteflow 的参数
|
||||
LiteflowConfig liteflowConfig = LiteflowConfigGetter.get();
|
||||
if (ObjectUtil.isNull(whenCondition.getMaxWaitTime())) {
|
||||
if (ObjectUtil.isNotNull(liteflowConfig.getWhenMaxWaitSeconds())) {
|
||||
@@ -78,26 +81,25 @@ public abstract class ParallelStrategyExecutor {
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取所有任务
|
||||
* 获取所有任务 CompletableFuture 集合
|
||||
* @param whenCondition
|
||||
* @param slotIndex
|
||||
* @return
|
||||
*/
|
||||
protected List<CompletableFuture<WhenFutureObj>> getCompletableFutureList(WhenCondition whenCondition, Integer slotIndex) {
|
||||
protected List<CompletableFuture<WhenFutureObj>> getAllTaskList(WhenCondition whenCondition, Integer slotIndex) {
|
||||
|
||||
String currChainName = whenCondition.getCurrChainId();
|
||||
|
||||
// 此方法其实只会初始化一次Executor,不会每次都会初始化。Executor是唯一的
|
||||
// 此方法其实只会初始化一次 Executor,不会每次都会初始化。Executor是唯一的
|
||||
ExecutorService parallelExecutor = ExecutorHelper.loadInstance().buildWhenExecutor(whenCondition.getThreadExecutorClass());
|
||||
|
||||
// 设置参数
|
||||
// 设置 whenCondition 参数
|
||||
setWhenConditionParams(whenCondition);
|
||||
|
||||
// 这里主要是做了封装CompletableFuture对象,用lumbda表达式做了很多事情,这句代码要仔细理清
|
||||
// 1.先进行过滤,前置和后置组件过滤掉,因为在EL Chain处理的时候已经提出来了
|
||||
// 2.过滤isAccess为false的情况,因为不过滤这个的话,如果加上了any,那么isAccess为false那就是最快的了
|
||||
// 3.根据condition.getNodeList()的集合进行流处理,用map进行把executable对象转换成List<CompletableFuture<WhenFutureObj>>
|
||||
// 4.在转的过程中,套入CompletableFutureTimeout方法进行超时判断,如果超时则用WhenFutureObj.timeOut返回超时的对象
|
||||
// 5.第2个参数是主要的本体CompletableFuture,传入了ParallelSupplier和线程池对象
|
||||
// 这里主要是做了封装 CompletableFuture 对象,用 lumbda 表达式做了很多事情,这句代码要仔细理清
|
||||
// 1.先进行过滤,前置和后置组件过滤掉,因为在 EL Chain 处理的时候已经提出来了
|
||||
// 2.过滤 isAccess 为 false 的情况,因为不过滤这个的话,如果加上了 any,那么 isAccess 为 false 那就是最快的了
|
||||
// 3.根据 condition.getNodeList() 的集合进行流处理,用 map 进行把 executable 对象转换成 List<CompletableFuture<WhenFutureObj>>
|
||||
List<CompletableFuture<WhenFutureObj>> completableFutureList = whenCondition.getExecutableList()
|
||||
.stream()
|
||||
.filter(executable -> !(executable instanceof PreCondition) && !(executable instanceof FinallyCondition))
|
||||
@@ -116,33 +118,36 @@ public abstract class ParallelStrategyExecutor {
|
||||
}
|
||||
|
||||
/**
|
||||
* 结果处理
|
||||
* @param whenCondition
|
||||
* @param slotIndex
|
||||
* @param completableFutureList
|
||||
* @param resultCompletableFuture
|
||||
* 任务结果处理
|
||||
* @param whenCondition 并行组件对象
|
||||
* @param slotIndex 当前 slot 的 index
|
||||
* @param allTaskList 并行组件中所有任务列表
|
||||
* @param specifyTask 指定预先完成的任务,详见 {@link ParallelStrategyEnum}
|
||||
* @throws Exception
|
||||
*/
|
||||
protected void handleResult(WhenCondition whenCondition, Integer slotIndex, List<CompletableFuture<WhenFutureObj>> completableFutureList, CompletableFuture<?> resultCompletableFuture) throws Exception {
|
||||
protected void handleTaskResult(WhenCondition whenCondition, Integer slotIndex, List<CompletableFuture<WhenFutureObj>> allTaskList,
|
||||
CompletableFuture<?> specifyTask) throws Exception {
|
||||
|
||||
Slot slot = DataBus.getSlot(slotIndex);
|
||||
|
||||
// 定义是否中断参数
|
||||
// 这里为什么要定义成数组呢,因为后面lambda要用到,根据final不能修改引用的原则,这里用了数组对象
|
||||
// 这里为什么要定义成数组呢,因为后面 lambda 要用到,根据 final 不能修改引用的原则,这里用了数组对象
|
||||
final boolean[] interrupted = { false };
|
||||
|
||||
try {
|
||||
// 进行执行,这句执行完后,就意味着所有的任务要么执行完毕,要么超时返回
|
||||
resultCompletableFuture.get();
|
||||
// 进行执行,这句执行完后有三种可能,所有任务执行完成、任一任务执行完成、指定的任务执行完成
|
||||
specifyTask.get();
|
||||
} catch (InterruptedException | ExecutionException e) {
|
||||
LOG.error("there was an error when executing the CompletableFuture", e);
|
||||
interrupted[0] = true;
|
||||
}
|
||||
|
||||
// 拿到已经完成的CompletableFuture
|
||||
// 如果any为false,那么所有任务都已经完成
|
||||
// 如果any为true,那么这里拿到的是第一个完成的任务
|
||||
// 这里过滤和转换一起用lumbda做了
|
||||
List<WhenFutureObj> allCompletableWhenFutureObjList = completableFutureList.stream().filter(f -> {
|
||||
// 拿到已经完成的 CompletableFuture 对象
|
||||
// 如果 any 为 false,那么所有任务都已经完成
|
||||
// 如果 any 为 true,那么这里拿到的是第一个完成的任务
|
||||
// 如果为 must,那么这里获取到的就是指定的任务
|
||||
// 这里过滤和转换一起用 lambda 做了
|
||||
List<WhenFutureObj> allCompletableWhenFutureObjList = allTaskList.stream().filter(f -> {
|
||||
// 过滤出已经完成的,没完成的就直接终止
|
||||
if (f.isDone()) {
|
||||
return true;
|
||||
@@ -159,8 +164,8 @@ public abstract class ParallelStrategyExecutor {
|
||||
}
|
||||
}).collect(Collectors.toList());
|
||||
|
||||
// 判断超时,上面已经拿到了所有已经完成的CompletableFuture
|
||||
// 那我们只要过滤出超时的CompletableFuture
|
||||
// 判断超时,上面已经拿到了所有已经完成的 CompletableFuture
|
||||
// 那我们只要过滤出超时的 CompletableFuture
|
||||
List<WhenFutureObj> timeOutWhenFutureObjList = allCompletableWhenFutureObjList.stream()
|
||||
.filter(WhenFutureObj::isTimeout)
|
||||
.collect(Collectors.toList());
|
||||
@@ -169,7 +174,7 @@ public abstract class ParallelStrategyExecutor {
|
||||
timeOutWhenFutureObjList.forEach(whenFutureObj -> LOG.warn(
|
||||
"executing thread has reached max-wait-seconds, thread canceled.Execute-item: [{}]", whenFutureObj.getExecutorName()));
|
||||
|
||||
// 当配置了ignoreError = false,出现interrupted或者!f.get()的情况,将抛出WhenExecuteException
|
||||
// 当配置了 ignoreError = false,出现 interrupted 或者 !f.get() 的情况,将抛出 WhenExecuteException
|
||||
if (!whenCondition.isIgnoreError()) {
|
||||
if (interrupted[0]) {
|
||||
throw new WhenExecuteException(StrUtil
|
||||
@@ -184,7 +189,7 @@ public abstract class ParallelStrategyExecutor {
|
||||
}
|
||||
}
|
||||
} else if (interrupted[0]) {
|
||||
// 这里由于配置了ignoreError,所以只打印warn日志
|
||||
// 这里由于配置了 ignoreError,所以只打印 warn 日志
|
||||
LOG.warn("executing when condition timeout , but ignore with errorResume.");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -28,9 +28,10 @@ public class SpecifyParallelExecutor extends ParallelStrategyExecutor {
|
||||
|
||||
String currChainName = whenCondition.getCurrChainId();
|
||||
|
||||
// 设置 whenCondition 参数
|
||||
this.setWhenConditionParams(whenCondition);
|
||||
|
||||
// 此方法其实只会初始化一次Executor,不会每次都会初始化。Executor是唯一的
|
||||
// 此方法其实只会初始化一次Executor,不会每次都会初始化。Executor 是唯一的
|
||||
ExecutorService parallelExecutor = ExecutorHelper.loadInstance().buildWhenExecutor(whenCondition.getThreadExecutorClass());
|
||||
|
||||
// 过滤指定 ID 的任务,且该任务只会有一个或者没有
|
||||
@@ -47,31 +48,31 @@ public class SpecifyParallelExecutor extends ParallelStrategyExecutor {
|
||||
})
|
||||
.collect(Collectors.partitioningBy(executable -> whenCondition.getSpecifyId().equals(executable.getId())));
|
||||
|
||||
CompletableFuture<?> resultCompletableFuture = null;
|
||||
CompletableFuture<?> specifyTask = null;
|
||||
|
||||
// 处理非指定 task
|
||||
List<CompletableFuture<WhenFutureObj>> completableFutureList = specifyExecutableMap.get(Boolean.FALSE)
|
||||
// 处理非指定 task,封装成 CompletableFuture 对象,最终仍是会组合所有任务到集合中
|
||||
List<CompletableFuture<WhenFutureObj>> allTaskList = specifyExecutableMap.get(Boolean.FALSE)
|
||||
.stream()
|
||||
.map(executable -> wrappedFutureObj(executable, parallelExecutor, whenCondition, currChainName, slotIndex))
|
||||
.collect(Collectors.toList());
|
||||
|
||||
if (specifyExecutableMap.containsKey(Boolean.TRUE) && CollUtil.isNotEmpty(specifyExecutableMap.get(Boolean.TRUE))) {
|
||||
if (CollUtil.isNotEmpty(specifyExecutableMap.get(Boolean.TRUE))) {
|
||||
// 存在 must 指定的 task
|
||||
CompletableFuture<WhenFutureObj> specifyCompletableFuture = wrappedFutureObj(specifyExecutableMap.get(Boolean.TRUE).get(0), parallelExecutor, whenCondition, currChainName, slotIndex);
|
||||
// 组合所有任务
|
||||
completableFutureList.add(specifyCompletableFuture);
|
||||
// 设置结果 future
|
||||
resultCompletableFuture = specifyCompletableFuture;
|
||||
allTaskList.add(specifyCompletableFuture);
|
||||
// 设置指定任务 future 对象
|
||||
specifyTask = specifyCompletableFuture;
|
||||
}
|
||||
|
||||
if (ObjUtil.isNull(resultCompletableFuture)) {
|
||||
if (ObjUtil.isNull(specifyTask)) {
|
||||
LOG.warn("The specified task[{}] was not found, waiting for all tasks to complete by default.", whenCondition.getSpecifyId());
|
||||
// 不存在指定任务,则所有任务都执行
|
||||
resultCompletableFuture = CompletableFuture.allOf(completableFutureList.toArray(new CompletableFuture[] {}));
|
||||
// 不存在指定任务,则需要等待所有任务都执行完成
|
||||
specifyTask = CompletableFuture.allOf(allTaskList.toArray(new CompletableFuture[] {}));
|
||||
}
|
||||
|
||||
// 结果处理
|
||||
this.handleResult(whenCondition, slotIndex, completableFutureList, resultCompletableFuture);
|
||||
this.handleTaskResult(whenCondition, slotIndex, allTaskList, specifyTask);
|
||||
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user