mirror of
https://gitee.com/dromara/liteFlow.git
synced 2026-05-14 20:22:07 +08:00
Merge remote-tracking branch 'origin/dev' into dev
This commit is contained in:
@@ -3,6 +3,7 @@ package com.yomahub.liteflow.builder.el;
|
||||
import cn.hutool.core.collection.CollUtil;
|
||||
import cn.hutool.core.util.ArrayUtil;
|
||||
import cn.hutool.core.util.CharUtil;
|
||||
import cn.hutool.core.util.ObjectUtil;
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import com.fasterxml.jackson.databind.JsonMappingException;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
@@ -145,17 +146,24 @@ public class LiteFlowChainELBuilder {
|
||||
// 这里无论多复杂的,外面必定有一个最外层的Condition,所以这里只有一个,内部可以嵌套很多层,这点和以前的不太一样
|
||||
Condition condition = (Condition) EXPRESS_RUNNER.execute(elStr, context, errorList, true, true);
|
||||
|
||||
if (Objects.isNull(condition)){
|
||||
throw new QLException(StrUtil.format("parse el fail,el:[{}]", elStr));
|
||||
}
|
||||
|
||||
// 把主要的condition加入
|
||||
this.conditionList.add(condition);
|
||||
return this;
|
||||
} catch (QLException e) {
|
||||
// EL 底层会包装异常,这里是曲线处理
|
||||
if (Objects.equals(e.getCause().getMessage(), DataNotFoundException.MSG)) {
|
||||
if (ObjectUtil.isNotNull(e.getCause()) && Objects.equals(e.getCause().getMessage(), DataNotFoundException.MSG)) {
|
||||
// 构建错误信息
|
||||
String msg = buildDataNotFoundExceptionMsg(elStr);
|
||||
throw new ELParseException(msg);
|
||||
}else if (ObjectUtil.isNotNull(e.getCause())){
|
||||
throw new ELParseException(e.getCause().getMessage());
|
||||
}else{
|
||||
throw new ELParseException(e.getMessage());
|
||||
}
|
||||
throw new ELParseException(e.getCause().getMessage());
|
||||
} catch (Exception e) {
|
||||
String errMsg = StrUtil.format("parse el fail in this chain[{}];\r\n", chain.getChainId());
|
||||
throw new ELParseException(errMsg + e.getMessage());
|
||||
@@ -172,7 +180,7 @@ public class LiteFlowChainELBuilder {
|
||||
LiteFlowChainELBuilder.createChain().setEL(elStr);
|
||||
return Boolean.TRUE;
|
||||
} catch (Exception e) {
|
||||
LOG.error(e.getMessage());
|
||||
LOG.error("validate error",e);
|
||||
}
|
||||
return Boolean.FALSE;
|
||||
}
|
||||
|
||||
@@ -7,6 +7,7 @@
|
||||
*/
|
||||
package com.yomahub.liteflow.core;
|
||||
|
||||
import cn.hutool.core.collection.CollUtil;
|
||||
import cn.hutool.core.date.StopWatch;
|
||||
import cn.hutool.core.util.ObjectUtil;
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
@@ -32,7 +33,9 @@ import com.yomahub.liteflow.monitor.CompStatistics;
|
||||
import com.yomahub.liteflow.monitor.MonitorBus;
|
||||
|
||||
import java.lang.reflect.Method;
|
||||
import java.util.Deque;
|
||||
import java.util.Map;
|
||||
import java.util.function.Predicate;
|
||||
|
||||
/**
|
||||
* 普通组件抽象类
|
||||
@@ -157,9 +160,15 @@ public abstract class NodeComponent {
|
||||
public void doRollback() throws Exception {
|
||||
Slot slot = this.getSlot();
|
||||
|
||||
boolean alreadyRollback = slot.getRollbackSteps().stream().anyMatch(cmpStep -> cmpStep.getRefNode().equals(getRefNode()));
|
||||
if (alreadyRollback){
|
||||
return;
|
||||
}
|
||||
|
||||
CmpStep cmpStep = new CmpStep(nodeId, name, CmpStepTypeEnum.SINGLE);
|
||||
cmpStep.setTag(this.getTag());
|
||||
cmpStep.setInstance(this);
|
||||
cmpStep.setRefNode(this.getRefNode());
|
||||
slot.addRollbackStep(cmpStep);
|
||||
|
||||
StopWatch stopWatch = new StopWatch();
|
||||
@@ -178,11 +187,6 @@ public abstract class NodeComponent {
|
||||
|
||||
// 往CmpStep中放入时间消耗信息
|
||||
cmpStep.setRollbackTimeSpent(timeSpent);
|
||||
// 性能统计
|
||||
if (ObjectUtil.isNotNull(monitorBus)) {
|
||||
CompStatistics statistics = new CompStatistics(this.getClass().getSimpleName(), timeSpent);
|
||||
monitorBus.addStatistics(statistics);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -40,6 +40,7 @@ import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
@@ -58,6 +59,8 @@ public class FlowBus {
|
||||
|
||||
private static final Map<NodeTypeEnum, Node> fallbackNodeMap = new CopyOnWriteHashMap<>();
|
||||
|
||||
private static AtomicBoolean initStat = new AtomicBoolean(false);
|
||||
|
||||
private FlowBus() {
|
||||
}
|
||||
|
||||
@@ -82,7 +85,7 @@ public class FlowBus {
|
||||
}
|
||||
|
||||
public static boolean needInit() {
|
||||
return MapUtil.isEmpty(chainMap);
|
||||
return initStat.compareAndSet(false, true);
|
||||
}
|
||||
|
||||
public static boolean containNode(String nodeId) {
|
||||
@@ -297,4 +300,8 @@ public class FlowBus {
|
||||
fallbackNodeMap.put(nodeType, node);
|
||||
}
|
||||
|
||||
public static void clearStat(){
|
||||
initStat.set(false);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -10,6 +10,7 @@ package com.yomahub.liteflow.flow.element;
|
||||
import cn.hutool.core.collection.CollUtil;
|
||||
import cn.hutool.core.collection.ListUtil;
|
||||
import cn.hutool.core.util.ObjectUtil;
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import com.yomahub.liteflow.enums.ConditionTypeEnum;
|
||||
import com.yomahub.liteflow.enums.ExecuteTypeEnum;
|
||||
import com.yomahub.liteflow.exception.ChainEndException;
|
||||
@@ -127,7 +128,11 @@ public abstract class Condition implements Executable{
|
||||
|
||||
@Override
|
||||
public String getId() {
|
||||
return id;
|
||||
if (StrUtil.isBlank(this.id)){
|
||||
return StrUtil.format("condition-{}",this.getConditionType().getName());
|
||||
}else{
|
||||
return id;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -2,7 +2,6 @@ package com.yomahub.liteflow.flow.parallel;
|
||||
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import com.yomahub.liteflow.exception.WhenTimeoutException;
|
||||
import com.yomahub.liteflow.property.LiteflowConfigGetter;
|
||||
|
||||
/**
|
||||
* 并行异步CompletableFuture里的值对象
|
||||
@@ -16,34 +15,34 @@ public class WhenFutureObj {
|
||||
|
||||
private boolean timeout;
|
||||
|
||||
private String executorName;
|
||||
private String executorId;
|
||||
|
||||
private Exception ex;
|
||||
|
||||
public static WhenFutureObj success(String executorName) {
|
||||
public static WhenFutureObj success(String executorId) {
|
||||
WhenFutureObj result = new WhenFutureObj();
|
||||
result.setSuccess(true);
|
||||
result.setTimeout(false);
|
||||
result.setExecutorName(executorName);
|
||||
result.setExecutorId(executorId);
|
||||
return result;
|
||||
}
|
||||
|
||||
public static WhenFutureObj fail(String executorName, Exception ex) {
|
||||
public static WhenFutureObj fail(String executorId, Exception ex) {
|
||||
WhenFutureObj result = new WhenFutureObj();
|
||||
result.setSuccess(false);
|
||||
result.setTimeout(false);
|
||||
result.setExecutorName(executorName);
|
||||
result.setExecutorId(executorId);
|
||||
result.setEx(ex);
|
||||
return result;
|
||||
}
|
||||
|
||||
public static WhenFutureObj timeOut(String executorName) {
|
||||
public static WhenFutureObj timeOut(String executorId) {
|
||||
WhenFutureObj result = new WhenFutureObj();
|
||||
result.setSuccess(false);
|
||||
result.setTimeout(true);
|
||||
result.setExecutorName(executorName);
|
||||
result.setExecutorId(executorId);
|
||||
result.setEx(new WhenTimeoutException(
|
||||
StrUtil.format("Timed out when executing the component[{}]",executorName)));
|
||||
StrUtil.format("Timed out when executing the component[{}]",executorId)));
|
||||
return result;
|
||||
}
|
||||
|
||||
@@ -55,12 +54,12 @@ public class WhenFutureObj {
|
||||
this.success = success;
|
||||
}
|
||||
|
||||
public String getExecutorName() {
|
||||
return executorName;
|
||||
public String getExecutorId() {
|
||||
return executorId;
|
||||
}
|
||||
|
||||
public void setExecutorName(String executorName) {
|
||||
this.executorName = executorName;
|
||||
public void setExecutorId(String executorId) {
|
||||
this.executorId = executorId;
|
||||
}
|
||||
|
||||
public Exception getEx() {
|
||||
|
||||
@@ -207,7 +207,7 @@ public abstract class ParallelStrategyExecutor {
|
||||
|
||||
// 输出超时信息
|
||||
timeOutWhenFutureObjList.forEach(whenFutureObj -> LOG.warn(
|
||||
"executing thread has reached max-wait-seconds, thread canceled.Execute-item: [{}]", whenFutureObj.getExecutorName()));
|
||||
"executing thread has reached max-wait-seconds, thread canceled.Execute-item: [{}]", whenFutureObj.getExecutorId()));
|
||||
|
||||
// 当配置了 ignoreError = false,出现 interrupted 或者 !f.get() 的情况,将抛出 WhenExecuteException
|
||||
if (!whenCondition.isIgnoreError()) {
|
||||
@@ -219,7 +219,7 @@ public abstract class ParallelStrategyExecutor {
|
||||
// 循环判断CompletableFuture的返回值,如果异步执行失败,则抛出相应的业务异常
|
||||
for (WhenFutureObj whenFutureObj : allCompletableWhenFutureObjList) {
|
||||
if (!whenFutureObj.isSuccess()) {
|
||||
LOG.info(StrUtil.format("when-executor[{}] execute failed. errorResume [false].", whenFutureObj.getExecutorName()));
|
||||
LOG.info(StrUtil.format("when-executor[{}] execute failed. errorResume [false].", whenFutureObj.getExecutorId()));
|
||||
throw whenFutureObj.getEx();
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user