mirror of
https://gitee.com/dromara/liteFlow.git
synced 2026-05-14 20:22:07 +08:00
修复合并master后的一些问题
This commit is contained in:
@@ -33,6 +33,12 @@ public class Chain implements Executable {
|
||||
|
||||
private List<Condition> conditionList = new ArrayList<>();
|
||||
|
||||
//前置处理Condition,用来区别主体的Condition
|
||||
private List<Condition> preConditionList = new ArrayList<>();
|
||||
|
||||
//后置处理Condition,用来区别主体的Condition
|
||||
private List<Condition> finallyConditionList = new ArrayList<>();
|
||||
|
||||
public Chain(String chainName){
|
||||
this.chainName = chainName;
|
||||
}
|
||||
@@ -68,6 +74,9 @@ public class Chain implements Executable {
|
||||
}
|
||||
Slot<?> slot = DataBus.getSlot(slotIndex);
|
||||
try {
|
||||
//在子流程或者隐式流程里,slot需要取到的chainName是当前流程,所以这不再是set,而是push
|
||||
//其底层结构是一个stack
|
||||
slot.pushChainName(chainName);
|
||||
//执行前置
|
||||
this.executePre(slotIndex);
|
||||
//执行主体Condition
|
||||
@@ -92,30 +101,17 @@ public class Chain implements Executable {
|
||||
}
|
||||
|
||||
// 执行pre节点
|
||||
public void executePre(Integer slotIndex) throws Exception {
|
||||
doExecuteForPointConditionType(slotIndex, ConditionTypeEnum.TYPE_PRE);
|
||||
}
|
||||
|
||||
public void executeFinally(Integer slotIndex) throws Exception {
|
||||
doExecuteForPointConditionType(slotIndex, ConditionTypeEnum.TYPE_FINALLY);
|
||||
}
|
||||
|
||||
// 执行指定的conditionType的节点
|
||||
private void doExecuteForPointConditionType(Integer slotIndex, ConditionTypeEnum conditionTypeEnum) throws Exception {
|
||||
//先把指定condition类型的节点过滤出来
|
||||
List<Condition> conditions =filterCondition(conditionTypeEnum);
|
||||
for (Condition condition : conditions){
|
||||
for(Executable executableItem : condition.getNodeList()){
|
||||
executableItem.execute(slotIndex);
|
||||
}
|
||||
private void executePre(Integer slotIndex) throws Exception {
|
||||
for (Condition condition : this.preConditionList){
|
||||
condition.execute(slotIndex);
|
||||
}
|
||||
}
|
||||
|
||||
// 根据节点condition类型过去出节点列表
|
||||
private List<Condition> filterCondition(ConditionTypeEnum conditionTypeEnum) {
|
||||
assert conditionTypeEnum != null;
|
||||
return conditionList.stream().filter(condition ->
|
||||
condition.getConditionType().equals(conditionTypeEnum)).collect(Collectors.toList());
|
||||
//执行后置
|
||||
private void executeFinally(Integer slotIndex) throws Exception {
|
||||
for (Condition condition : this.finallyConditionList){
|
||||
condition.execute(slotIndex);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -127,107 +123,19 @@ public class Chain implements Executable {
|
||||
public String getExecuteName() {
|
||||
return chainName;
|
||||
}
|
||||
public List<Condition> getPreConditionList() {
|
||||
return preConditionList;
|
||||
}
|
||||
|
||||
//使用线程池执行when并发流程
|
||||
//这块涉及到挺多的多线程逻辑,所以注释比较详细,看到这里的童鞋可以仔细阅读
|
||||
private void executeAsyncCondition(WhenCondition condition, Integer slotIndex) throws Exception{
|
||||
Slot slot = DataBus.getSlot(slotIndex);
|
||||
public void setPreConditionList(List<Condition> preConditionList) {
|
||||
this.preConditionList = preConditionList;
|
||||
}
|
||||
|
||||
//此方法其实只会初始化一次Executor,不会每次都会初始化。Executor是唯一的
|
||||
ExecutorService parallelExecutor = ExecutorHelper.loadInstance().buildWhenExecutor(condition.getThreadExecutorClass());
|
||||
public List<Condition> getFinallyConditionList() {
|
||||
return finallyConditionList;
|
||||
}
|
||||
|
||||
//获得liteflow的参数
|
||||
LiteflowConfig liteflowConfig = LiteflowConfigGetter.get();
|
||||
|
||||
//定义是否中断参数
|
||||
//这里为什么要定义成数组呢,因为后面lambda要用到,根据final不能修改引用的原则,这里用了数组对象
|
||||
final boolean[] interrupted = {false};
|
||||
|
||||
//这里主要是做了封装CompletableFuture对象,用lumbda表达式做了很多事情,这句代码要仔细理清
|
||||
//1.根据condition.getNodeList()的集合进行流处理,用map进行把executable对象转换成List<CompletableFuture<WhenFutureObj>>
|
||||
//2.在转的过程中,套入CompletableFutureTimeout方法进行超时判断,如果超时则用WhenFutureObj.timeOut返回超时的对象
|
||||
//3.第2个参数是主要的本体CompletableFuture,传入了ParallelSupplier和线程池对象
|
||||
List<CompletableFuture<WhenFutureObj>> completableFutureList = condition.getNodeList().stream().filter(executable -> {
|
||||
try {
|
||||
return executable.isAccess(slotIndex);
|
||||
}catch (Exception e){
|
||||
LOG.error("there was an error when executing the when component isAccess",e);
|
||||
return false;
|
||||
}
|
||||
}).map(executable -> CompletableFutureTimeout.completeOnTimeout(
|
||||
WhenFutureObj.timeOut(executable.getExecuteName()),
|
||||
CompletableFuture.supplyAsync(new ParallelSupplier(executable, slotIndex), parallelExecutor),
|
||||
liteflowConfig.getWhenMaxWaitSeconds(),
|
||||
TimeUnit.SECONDS
|
||||
)).collect(Collectors.toList());
|
||||
|
||||
|
||||
CompletableFuture<?> resultCompletableFuture;
|
||||
|
||||
//这里判断执行方式
|
||||
//如果any为false,说明这些异步任务全部执行好或者超时,才返回
|
||||
//如果any为true,说明这些异步任务只要任意一个执行完成,就返回
|
||||
if(condition.isAny()){
|
||||
//把这些CompletableFuture通过anyOf合成一个CompletableFuture
|
||||
resultCompletableFuture = CompletableFuture.anyOf(completableFutureList.toArray(new CompletableFuture[]{}));
|
||||
}else{
|
||||
//把这些CompletableFuture通过allOf合成一个CompletableFuture
|
||||
resultCompletableFuture = CompletableFuture.allOf(completableFutureList.toArray(new CompletableFuture[]{}));
|
||||
}
|
||||
|
||||
try {
|
||||
//进行执行,这句执行完后,就意味着所有的任务要么执行完毕,要么超时返回
|
||||
resultCompletableFuture.get();
|
||||
} catch (InterruptedException | ExecutionException e) {
|
||||
LOG.error("there was an error when executing the CompletableFuture",e);
|
||||
interrupted[0] = true;
|
||||
}
|
||||
|
||||
//拿到已经完成的CompletableFuture
|
||||
//如果any为false,那么所有任务都已经完成
|
||||
//如果any为true,那么这里拿到的是第一个完成的任务
|
||||
//这里过滤和转换一起用lumbda做了
|
||||
List<WhenFutureObj> allCompletableWhenFutureObjList = completableFutureList.stream().filter(f -> {
|
||||
//过滤出已经完成的,没完成的就直接终止
|
||||
if (f.isDone()){
|
||||
return true;
|
||||
}else{
|
||||
f.cancel(true);
|
||||
return false;
|
||||
}
|
||||
}).map(f -> {
|
||||
try {
|
||||
return f.get();
|
||||
} catch (InterruptedException | ExecutionException e) {
|
||||
interrupted[0] = true;
|
||||
return null;
|
||||
}
|
||||
}).collect(Collectors.toList());
|
||||
|
||||
//判断超时,上面已经拿到了所有已经完成的CompletableFuture
|
||||
//那我们只要过滤出超时的CompletableFuture
|
||||
List<WhenFutureObj> timeOutWhenFutureObjList = allCompletableWhenFutureObjList.stream().filter(WhenFutureObj::isTimeout).collect(Collectors.toList());
|
||||
|
||||
//输出超时信息
|
||||
timeOutWhenFutureObjList.forEach(whenFutureObj ->
|
||||
LOG.warn("requestId [{}] executing thread has reached max-wait-seconds, thread canceled.Execute-item: [{}]", slot.getRequestId(), whenFutureObj.getExecutorName()));
|
||||
|
||||
//当配置了errorResume = false,出现interrupted或者!f.get()的情况,将抛出WhenExecuteException
|
||||
if (!condition.isErrorResume()) {
|
||||
if (interrupted[0]) {
|
||||
throw new WhenExecuteException(StrUtil.format("requestId [{}] when execute interrupted. errorResume [false].", slot.getRequestId()));
|
||||
}
|
||||
|
||||
//循环判断CompletableFuture的返回值,如果异步执行失败,则抛出相应的业务异常
|
||||
for(WhenFutureObj whenFutureObj : allCompletableWhenFutureObjList){
|
||||
if (!whenFutureObj.isSuccess()){
|
||||
LOG.info(StrUtil.format("requestId [{}] when-executor[{}] execute failed. errorResume [false].", whenFutureObj.getExecutorName(), slot.getRequestId()));
|
||||
throw whenFutureObj.getEx();
|
||||
}
|
||||
}
|
||||
} else if (interrupted[0]) {
|
||||
// 这里由于配置了errorResume,所以只打印warn日志
|
||||
LOG.warn("requestId [{}] executing when condition timeout , but ignore with errorResume.", slot.getRequestId());
|
||||
}
|
||||
public void setFinallyConditionList(List<Condition> finallyConditionList) {
|
||||
this.finallyConditionList = finallyConditionList;
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user