!237 [ISSUE #I7YYLF] 增加组件降级特性

Merge pull request !237 from DaleLee/issue/#I7YYLF
This commit is contained in:
铂赛东
2023-10-08 07:38:14 +00:00
committed by Gitee
119 changed files with 3344 additions and 430 deletions

View File

@@ -0,0 +1,23 @@
package com.yomahub.liteflow.annotation;
import com.yomahub.liteflow.enums.NodeTypeEnum;
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Inherited;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* 降级组件
*
* @author DaleLee
* @since 2.11.1
*/
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
public @interface FallbackCmp {
}

View File

@@ -27,7 +27,7 @@ public class ForOperator extends BaseOperator<ForCondition> {
Node node;
if (objects[0] instanceof Node) {
node = OperatorHelper.convert(objects[0], Node.class);
if (!ListUtil.toList(NodeTypeEnum.FOR, NodeTypeEnum.FOR_SCRIPT).contains(node.getType())) {
if (!ListUtil.toList(NodeTypeEnum.FOR, NodeTypeEnum.FOR_SCRIPT, NodeTypeEnum.FALLBACK).contains(node.getType())) {
throw new QLException("The parameter must be for-node item");
}
}

View File

@@ -15,7 +15,7 @@ public class IteratorOperator extends BaseOperator<IteratorCondition> {
OperatorHelper.checkObjectSizeEq(objects, 1);
Node node = OperatorHelper.convert(objects[0], Node.class);
if (!ListUtil.toList(NodeTypeEnum.ITERATOR).contains(node.getType())) {
if (!ListUtil.toList(NodeTypeEnum.ITERATOR, NodeTypeEnum.FALLBACK).contains(node.getType())) {
throw new QLException("The parameter must be iterator-node item");
}

View File

@@ -1,11 +1,10 @@
package com.yomahub.liteflow.builder.el.operator;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import com.ql.util.express.exception.QLException;
import com.yomahub.liteflow.builder.el.operator.base.BaseOperator;
import com.yomahub.liteflow.builder.el.operator.base.OperatorHelper;
import com.yomahub.liteflow.exception.ELParseException;
import com.yomahub.liteflow.flow.FlowBus;
import com.yomahub.liteflow.flow.element.FallbackNodeProxy;
import com.yomahub.liteflow.flow.element.Node;
import com.yomahub.liteflow.property.LiteflowConfig;
import com.yomahub.liteflow.property.LiteflowConfigGetter;
@@ -14,45 +13,29 @@ import com.yomahub.liteflow.property.LiteflowConfigGetter;
* EL规则中的node的操作符
*
* @author Bryan.Zhang
* @author DaleLee
* @since 2.8.3
*/
public class NodeOperator extends BaseOperator<Node> {
@Override
public Node build(Object[] objects) throws Exception {
OperatorHelper.checkObjectSizeEqOne(objects);
@Override
public Node build(Object[] objects) throws Exception {
OperatorHelper.checkObjectSizeEqOne(objects);
String nodeId = OperatorHelper.convert(objects[0], String.class);
String nodeId = OperatorHelper.convert(objects[0], String.class);
if (FlowBus.containNode(nodeId)) {
return FlowBus.getNode(nodeId);
}
else {
LiteflowConfig liteflowConfig = LiteflowConfigGetter.get();
if (StrUtil.isNotBlank(liteflowConfig.getSubstituteCmpClass())) {
Node substituteNode = FlowBus.getNodeMap()
.values()
.stream()
.filter(node -> node.getInstance()
.getClass()
.getName()
.equals(liteflowConfig.getSubstituteCmpClass()))
.findFirst()
.orElse(null);
if (ObjectUtil.isNotNull(substituteNode)) {
return substituteNode;
}
else {
String error = StrUtil.format("This node[{}] cannot be found", nodeId);
throw new QLException(error);
}
}
else {
String error = StrUtil.format("This node[{}] cannot be found, or you can configure an substitute node",
nodeId);
throw new QLException(error);
}
}
}
if (FlowBus.containNode(nodeId)) {
// 找到对应节点
return FlowBus.getNode(nodeId);
} else {
// 检查是否开启了组件降级功能
LiteflowConfig liteflowConfig = LiteflowConfigGetter.get();
Boolean enable = liteflowConfig.getFallbackCmpEnable();
if (!enable) {
throw new ELParseException("The fallback component is disabled");
}
// 生成代理节点
return new FallbackNodeProxy(nodeId);
}
}
}

View File

@@ -21,7 +21,7 @@ public class SwitchOperator extends BaseOperator<SwitchCondition> {
OperatorHelper.checkObjectSizeEqOne(objects);
Node switchNode = OperatorHelper.convert(objects[0], Node.class);
if (!ListUtil.toList(NodeTypeEnum.SWITCH, NodeTypeEnum.SWITCH_SCRIPT).contains(switchNode.getType())) {
if (!ListUtil.toList(NodeTypeEnum.SWITCH, NodeTypeEnum.SWITCH_SCRIPT, NodeTypeEnum.FALLBACK).contains(switchNode.getType())) {
throw new QLException("The caller must be Switch item");
}

View File

@@ -145,7 +145,8 @@ public class OperatorHelper {
if (!(object instanceof Node && ListUtil.toList(
NodeTypeEnum.IF, NodeTypeEnum.IF_SCRIPT,
NodeTypeEnum.WHILE, NodeTypeEnum.WHILE_SCRIPT,
NodeTypeEnum.BREAK, NodeTypeEnum.BREAK_SCRIPT).contains(((Node) object).getType())
NodeTypeEnum.BREAK, NodeTypeEnum.BREAK_SCRIPT, NodeTypeEnum.FALLBACK)
.contains(((Node) object).getType())
|| object instanceof AndOrCondition || object instanceof NotCondition)) {
throw new QLException("The first parameter must be boolean type Node or boolean type condition");
}

View File

@@ -34,6 +34,8 @@ public enum NodeTypeEnum {
BREAK("break", "循环跳出", false, NodeBreakComponent.class),
ITERATOR("iterator", "循环迭代", false, NodeIteratorComponent.class),
FALLBACK("fallback", "降级", false, null),
SCRIPT("script", "脚本", true, ScriptCommonComponent.class),

View File

@@ -0,0 +1,31 @@
package com.yomahub.liteflow.exception;
/**
* 没有找到降级组件异常
*
* @author DaleLee
* @since 2.11.1
*/
public class FallbackCmpNotFoundException extends RuntimeException {
private static final long serialVersionUID = 1L;
/**
* 异常信息
*/
private String message;
public FallbackCmpNotFoundException(String message) {
this.message = message;
}
@Override
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
}

View File

@@ -11,7 +11,11 @@ package com.yomahub.liteflow.flow;
import cn.hutool.core.collection.ListUtil;
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.StrUtil;
import com.yomahub.liteflow.core.*;
import com.yomahub.liteflow.annotation.FallbackCmp;
import com.yomahub.liteflow.annotation.util.AnnoUtil;
import com.yomahub.liteflow.core.ComponentInitializer;
import com.yomahub.liteflow.core.NodeComponent;
import com.yomahub.liteflow.core.ScriptComponent;
import com.yomahub.liteflow.enums.FlowParserTypeEnum;
import com.yomahub.liteflow.enums.NodeTypeEnum;
import com.yomahub.liteflow.exception.ComponentCannotRegisterException;
@@ -31,6 +35,7 @@ import com.yomahub.liteflow.spi.holder.ContextAwareHolder;
import com.yomahub.liteflow.spi.local.LocalContextAware;
import com.yomahub.liteflow.util.CopyOnWriteHashMap;
import com.yomahub.liteflow.util.LiteFlowProxyUtil;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@@ -41,6 +46,7 @@ import java.util.stream.Collectors;
* 流程元数据类
*
* @author Bryan.Zhang
* @author DaleLee
*/
public class FlowBus {
@@ -50,6 +56,8 @@ public class FlowBus {
private static final Map<String, Node> nodeMap = new CopyOnWriteHashMap<>();
private static final Map<NodeTypeEnum, Node> fallbackNodeMap = new CopyOnWriteHashMap<>();
private FlowBus() {
}
@@ -92,8 +100,10 @@ public class FlowBus {
throw new NullNodeTypeException(StrUtil.format("node type is null for node[{}]", nodeId));
}
nodeMap.put(nodeId,
new Node(ComponentInitializer.loadInstance().initComponent(nodeComponent, type, nodeComponent.getName(), nodeId)));
Node node = new Node(ComponentInitializer.loadInstance()
.initComponent(nodeComponent, type, nodeComponent.getName(), nodeId));
nodeMap.put(nodeId, node);
addFallbackNode(node);
}
/**
@@ -203,6 +213,7 @@ public class FlowBus {
String activeNodeId = StrUtil.isEmpty(cmpInstance.getNodeId()) ? nodeId : cmpInstance.getNodeId();
nodeMap.put(activeNodeId, node);
addFallbackNode(node);
}
}
@@ -226,9 +237,14 @@ public class FlowBus {
return chainMap;
}
public static Node getFallBackNode(NodeTypeEnum nodeType) {
return fallbackNodeMap.get(nodeType);
}
public static void cleanCache() {
chainMap.clear();
nodeMap.clear();
fallbackNodeMap.clear();
cleanScriptCache();
}
@@ -269,4 +285,16 @@ public class FlowBus {
Arrays.stream(chainIds).forEach(FlowBus::removeChain);
}
// 判断是否是降级组件,如果是则添加到 fallbackNodeMap
private static void addFallbackNode(Node node) {
NodeComponent nodeComponent = node.getInstance();
FallbackCmp fallbackCmp = AnnoUtil.getAnnotation(nodeComponent.getClass(), FallbackCmp.class);
if (fallbackCmp == null) {
return;
}
NodeTypeEnum nodeType = node.getType();
fallbackNodeMap.put(nodeType, node);
}
}

View File

@@ -10,10 +10,9 @@ package com.yomahub.liteflow.flow.element;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.collection.ListUtil;
import cn.hutool.core.util.ObjectUtil;
import com.yomahub.liteflow.enums.ConditionTypeEnum;
import com.yomahub.liteflow.enums.ExecuteTypeEnum;
import com.yomahub.liteflow.exception.ChainEndException;
import com.yomahub.liteflow.flow.element.Executable;
import com.yomahub.liteflow.enums.ConditionTypeEnum;
import com.yomahub.liteflow.flow.element.condition.ConditionKey;
import com.yomahub.liteflow.slot.DataBus;
import com.yomahub.liteflow.slot.Slot;
@@ -27,6 +26,7 @@ import java.util.Map;
* Condition的抽象类
*
* @author Bryan.Zhang
* @author DaleLee
*/
public abstract class Condition implements Executable{
@@ -46,7 +46,10 @@ public abstract class Condition implements Executable{
@Override
public void execute(Integer slotIndex) throws Exception {
Slot slot = DataBus.getSlot(slotIndex);
try {
// 当前 Condition 入栈
slot.pushCondition(this);
executeCondition(slotIndex);
}
catch (ChainEndException e) {
@@ -55,7 +58,6 @@ public abstract class Condition implements Executable{
throw e;
}
catch (Exception e) {
Slot slot = DataBus.getSlot(slotIndex);
String chainId = this.getCurrChainId();
// 这里事先取到exception set到slot里为了方便finally取到exception
if (slot.isSubChain(chainId)) {
@@ -65,6 +67,9 @@ public abstract class Condition implements Executable{
slot.setException(e);
}
throw e;
} finally {
// 当前 Condition 出栈
slot.popCondition();
}
}

View File

@@ -0,0 +1,188 @@
package com.yomahub.liteflow.flow.element;
import cn.hutool.core.text.StrFormatter;
import cn.hutool.core.util.ObjectUtil;
import com.yomahub.liteflow.enums.ConditionTypeEnum;
import com.yomahub.liteflow.enums.NodeTypeEnum;
import com.yomahub.liteflow.exception.FallbackCmpNotFoundException;
import com.yomahub.liteflow.exception.FlowSystemException;
import com.yomahub.liteflow.flow.FlowBus;
import com.yomahub.liteflow.flow.element.condition.ConditionKey;
import com.yomahub.liteflow.flow.element.condition.ForCondition;
import com.yomahub.liteflow.flow.element.condition.IfCondition;
import com.yomahub.liteflow.flow.element.condition.IteratorCondition;
import com.yomahub.liteflow.flow.element.condition.LoopCondition;
import com.yomahub.liteflow.flow.element.condition.SwitchCondition;
import com.yomahub.liteflow.flow.element.condition.WhileCondition;
import com.yomahub.liteflow.slot.DataBus;
import com.yomahub.liteflow.slot.Slot;
/**
* 降级组件代理
*
* @author DaleLee
* @since 2.11.1
*/
public class FallbackNodeProxy extends Node {
// 原节点 id
private String expectedNodeId;
// 降级节点
private Node fallbackNode;
public FallbackNodeProxy() {
this.setType(NodeTypeEnum.FALLBACK);
}
public FallbackNodeProxy(String expectedNodeId) {
this();
this.expectedNodeId = expectedNodeId;
}
@Override
public void execute(Integer slotIndex) throws Exception {
loadFallBackNode(slotIndex);
this.fallbackNode.setCurrChainId(this.getCurrChainId());
this.fallbackNode.execute(slotIndex);
}
private void loadFallBackNode(Integer slotIndex) throws Exception {
if (ObjectUtil.isNotNull(this.fallbackNode)) {
// 已经加载过了
return;
}
Slot slot = DataBus.getSlot(slotIndex);
Condition curCondition = slot.getCurrentCondition();
if (ObjectUtil.isNull(curCondition)) {
throw new FlowSystemException("The current executing condition could not be found.");
}
Node node = findFallbackNode(curCondition);
if (ObjectUtil.isNull(node)) {
throw new FallbackCmpNotFoundException(
StrFormatter.format("No fallback component found for [{}] in chain[{}].", this.expectedNodeId,
this.getCurrChainId()));
}
// 使用 node 的副本
this.fallbackNode = node.copy();
}
private Node findFallbackNode(Condition condition) {
ConditionTypeEnum conditionType = condition.getConditionType();
switch (conditionType) {
case TYPE_THEN:
case TYPE_WHEN:
case TYPE_PRE:
case TYPE_FINALLY:
case TYPE_CATCH:
return FlowBus.getFallBackNode(NodeTypeEnum.COMMON);
case TYPE_IF:
return findNodeInIf((IfCondition) condition);
case TYPE_SWITCH:
return findNodeInSwitch((SwitchCondition) condition);
case TYPE_FOR:
return findNodeInFor((ForCondition) condition);
case TYPE_WHILE:
return findNodeInWhile((WhileCondition) condition);
case TYPE_ITERATOR:
return findNodeInIterator((IteratorCondition) condition);
case TYPE_NOT_OPT:
case TYPE_AND_OR_OPT:
return FlowBus.getFallBackNode(NodeTypeEnum.IF);
default:
return null;
}
}
private Node findNodeInIf(IfCondition ifCondition) {
Executable ifItem = ifCondition.getIfItem();
if (ifItem == this) {
// 需要条件组件
return FlowBus.getFallBackNode(NodeTypeEnum.IF);
}
// 需要普通组件
return FlowBus.getFallBackNode(NodeTypeEnum.COMMON);
}
private Node findNodeInSwitch(SwitchCondition switchCondition) {
Node switchNode = switchCondition.getSwitchNode();
if (switchNode == this) {
return FlowBus.getFallBackNode(NodeTypeEnum.SWITCH);
}
return FlowBus.getFallBackNode(NodeTypeEnum.COMMON);
}
private Node findNodeInFor(ForCondition forCondition) {
Node forNode = forCondition.getForNode();
if (forNode == this) {
return FlowBus.getFallBackNode(NodeTypeEnum.FOR);
}
return findNodeInLoop(forCondition);
}
private Node findNodeInWhile(WhileCondition whileCondition) {
Executable whileItem = whileCondition.getWhileItem();
if (whileItem == this) {
return FlowBus.getFallBackNode(NodeTypeEnum.WHILE);
}
return findNodeInLoop(whileCondition);
}
private Node findNodeInIterator(IteratorCondition iteratorCondition) {
Node iteratorNode = iteratorCondition.getIteratorNode();
if (iteratorNode == this) {
return FlowBus.getFallBackNode(NodeTypeEnum.ITERATOR);
}
return findNodeInLoop(iteratorCondition);
}
private Node findNodeInLoop(LoopCondition loopCondition) {
Executable breakItem = loopCondition.getExecutableOne(ConditionKey.BREAK_KEY);
if (breakItem == this) {
return FlowBus.getFallBackNode(NodeTypeEnum.BREAK);
}
return FlowBus.getFallBackNode(NodeTypeEnum.COMMON);
}
@Override
public <T> T getItemResultMetaValue(Integer slotIndex) {
return this.fallbackNode.getItemResultMetaValue(slotIndex);
}
@Override
public boolean isAccess(Integer slotIndex) throws Exception {
// 可能会先访问这个方法,所以在这里就要加载降级节点
loadFallBackNode(slotIndex);
return this.fallbackNode.isAccess(slotIndex);
}
@Override
public String getId() {
return this.fallbackNode == null ? null : this.fallbackNode.getId();
}
@Override
public Node copy() {
// 代理节点不复制
return this;
}
@Override
public NodeTypeEnum getType() {
return NodeTypeEnum.FALLBACK;
}
public String getExpectedNodeId() {
return expectedNodeId;
}
public void setExpectedNodeId(String expectedNodeId) {
this.expectedNodeId = expectedNodeId;
}
}

View File

@@ -97,9 +97,6 @@ public class LiteflowConfig {
// 是否打印执行中的日志
private Boolean printExecutionLog;
// 替补组件class路径
private String substituteCmpClass;
// 规则文件/脚本文件变更监听
private Boolean enableMonitorFile = Boolean.FALSE;
@@ -111,6 +108,9 @@ public class LiteflowConfig {
//使用默认并行循环线程池时,最大队列数
private Integer parallelQueueLimit;
// 是否启用组件降级
private Boolean fallbackCmpEnable;
public Boolean getEnableMonitorFile() {
return enableMonitorFile;
@@ -373,14 +373,6 @@ public class LiteflowConfig {
this.printExecutionLog = printExecutionLog;
}
public String getSubstituteCmpClass() {
return substituteCmpClass;
}
public void setSubstituteCmpClass(String substituteCmpClass) {
this.substituteCmpClass = substituteCmpClass;
}
public String getRuleSourceExtData() {
return ruleSourceExtData;
}
@@ -454,4 +446,16 @@ public class LiteflowConfig {
public void setParallelLoopExecutorClass(String parallelLoopExecutorClass) {
this.parallelLoopExecutorClass = parallelLoopExecutorClass;
}
public Boolean getFallbackCmpEnable() {
if (ObjectUtil.isNull(this.fallbackCmpEnable)) {
return false;
} else {
return fallbackCmpEnable;
}
}
public void setFallbackCmpEnable(Boolean fallbackCmpEnable) {
this.fallbackCmpEnable = fallbackCmpEnable;
}
}

View File

@@ -13,26 +13,29 @@ import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import com.yomahub.liteflow.exception.NoSuchContextBeanException;
import com.yomahub.liteflow.exception.NullParamException;
import com.yomahub.liteflow.flow.element.Condition;
import com.yomahub.liteflow.flow.entity.CmpStep;
import com.yomahub.liteflow.flow.id.IdGeneratorHolder;
import com.yomahub.liteflow.log.LFLog;
import com.yomahub.liteflow.log.LFLoggerManager;
import com.yomahub.liteflow.property.LiteflowConfigGetter;
import java.util.Deque;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.function.Consumer;
/**
* Slot的抽象类实现
*
* @author Bryan.Zhang
* @author LeoLee
* @author DaleLee
*/
@SuppressWarnings("unchecked")
public class Slot {
@@ -88,6 +91,8 @@ public class Slot {
protected ConcurrentHashMap<String, Object> metaDataMap = new ConcurrentHashMap<>();
private List<Object> contextBeanList;
private static final ThreadLocal<Deque<Condition>> conditionStack = ThreadLocal.withInitial(LinkedList::new);
public Slot() {
}
@@ -287,6 +292,18 @@ public class Slot {
public Iterator<?> getIteratorResult(String key) {
return getThreadMetaData(ITERATOR_PREFIX + key);
}
public Condition getCurrentCondition() {
return conditionStack.get().peek();
}
public void pushCondition(Condition condition) {
conditionStack.get().push(condition);
}
public void popCondition() {
conditionStack.get().pop();
}
/**
* @deprecated 请使用 {@link #setChainId(String)}