mirror of
https://gitee.com/dromara/liteFlow.git
synced 2026-05-15 04:22:09 +08:00
enhancement #I4HECS 重试模块的代码重构
This commit is contained in:
@@ -78,32 +78,7 @@ public class Chain implements Executable {
|
||||
for (Condition condition : conditionList) {
|
||||
if (condition instanceof ThenCondition) {
|
||||
for (Executable executableItem : condition.getNodeList()) {
|
||||
if (executableItem.getExecuteType().equals(ExecuteTypeEnum.CHAIN)) {
|
||||
executableItem.execute(slotIndex);
|
||||
} else {
|
||||
int retryCount = ((Node)executableItem).getInstance().getRetryCount();
|
||||
List<Class<? extends Exception>> forExceptions = Arrays.asList(((Node)executableItem).getInstance().getRetryForExceptions());
|
||||
for (int i = 0; i <= retryCount; i++) {
|
||||
try {
|
||||
if (i > 0) {
|
||||
LOG.info("[{}]:component[{}] performs {} retry", slot.getRequestId(), executableItem.getExecuteName(), i + 1);
|
||||
}
|
||||
executableItem.execute(slotIndex);
|
||||
break;
|
||||
} catch (ChainEndException e) {
|
||||
//如果是ChainEndException,则无需重试
|
||||
throw e;
|
||||
} catch (Exception e) {
|
||||
//判断抛出的异常是不是指定异常的子类
|
||||
boolean flag = forExceptions.stream().anyMatch(clazz -> clazz.isAssignableFrom(e.getClass()));
|
||||
|
||||
//两种情况不重试,1)抛出异常不在指定异常范围内 2)已经重试次数大于等于配置次数
|
||||
if (!flag || i >= retryCount){
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
executableItem.execute(slotIndex);
|
||||
}
|
||||
} else if (condition instanceof WhenCondition) {
|
||||
executeAsyncCondition((WhenCondition) condition, slotIndex, slot.getRequestId());
|
||||
@@ -134,7 +109,7 @@ public class Chain implements Executable {
|
||||
|
||||
condition.getNodeList().forEach(executable -> {
|
||||
Future<Boolean> future = parallelExecutor.submit(
|
||||
Objects.requireNonNull(TtlCallable.get(new ParallelCallable(executable, slotIndex, requestId, latch, liteflowConfig.getRetryCount())))
|
||||
Objects.requireNonNull(TtlCallable.get(new ParallelCallable(executable, slotIndex, requestId, latch)))
|
||||
);
|
||||
futureMap.put(executable.getExecuteName(), future);
|
||||
});
|
||||
|
||||
@@ -8,7 +8,9 @@
|
||||
package com.yomahub.liteflow.entity.flow;
|
||||
|
||||
import java.text.MessageFormat;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import cn.hutool.core.util.ObjectUtil;
|
||||
@@ -114,8 +116,31 @@ public class Node implements Executable,Cloneable{
|
||||
instance.setTag(tag);
|
||||
instance.setCondNodeMap(condNodeMap);
|
||||
|
||||
//执行业务逻辑的主要入口
|
||||
instance.execute();
|
||||
//这里开始进行重试的逻辑和主逻辑的运行
|
||||
int retryCount = instance.getRetryCount();
|
||||
List<Class<? extends Exception>> forExceptions = Arrays.asList(instance.getRetryForExceptions());
|
||||
for (int i = 0; i <= retryCount; i++) {
|
||||
try {
|
||||
if (i > 0) {
|
||||
LOG.info("[{}]:component[{}] performs {} retry", slot.getRequestId(), id, i + 1);
|
||||
}
|
||||
//执行业务逻辑的主要入口
|
||||
instance.execute();
|
||||
break;
|
||||
} catch (ChainEndException e) {
|
||||
//如果是ChainEndException,则无需重试
|
||||
throw e;
|
||||
} catch (Exception e) {
|
||||
//判断抛出的异常是不是指定异常的子类
|
||||
boolean flag = forExceptions.stream().anyMatch(clazz -> clazz.isAssignableFrom(e.getClass()));
|
||||
|
||||
//两种情况不重试,1)抛出异常不在指定异常范围内 2)已经重试次数大于等于配置次数
|
||||
if (!flag || i >= retryCount){
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
//如果组件覆盖了isEnd方法,或者在在逻辑中主要调用了setEnd(true)的话,流程就会立马结束
|
||||
if (instance.isEnd()) {
|
||||
|
||||
@@ -22,37 +22,20 @@ public class ParallelCallable implements Callable<Boolean> {
|
||||
|
||||
private final CountDownLatch latch;
|
||||
|
||||
private final int retryCount;
|
||||
|
||||
public ParallelCallable(Executable executableItem, Integer slotIndex, String requestId, CountDownLatch latch, int retryCount) {
|
||||
public ParallelCallable(Executable executableItem, Integer slotIndex, String requestId, CountDownLatch latch) {
|
||||
this.executableItem = executableItem;
|
||||
this.slotIndex = slotIndex;
|
||||
this.requestId = requestId;
|
||||
this.latch = latch;
|
||||
this.retryCount = retryCount;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Boolean call() throws Exception {
|
||||
try {
|
||||
boolean flag = true;
|
||||
for (int i = 0; i <= retryCount; i++) {
|
||||
try{
|
||||
if (i > 0){
|
||||
LOG.info("[{}]:component[{}] performs {} retry", requestId, executableItem.getExecuteName(), i+1);
|
||||
}
|
||||
executableItem.execute(slotIndex);
|
||||
flag = true;
|
||||
break;
|
||||
}catch (Exception e){
|
||||
if (i >= retryCount){
|
||||
LOG.error("requestId [{}], item [{}] execute error", requestId, executableItem.getExecuteName());
|
||||
flag = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
return flag;
|
||||
executableItem.execute(slotIndex);
|
||||
return true;
|
||||
} catch (Exception e){
|
||||
return false;
|
||||
} finally {
|
||||
latch.countDown();
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user