feature #I7YYLE 支持并发环境下的组件降级

This commit is contained in:
Dale Lee
2023-10-04 16:28:18 +08:00
parent bb9b355f8b
commit 63b1e2744a
10 changed files with 417 additions and 377 deletions

View File

@@ -10,19 +10,19 @@ import com.yomahub.liteflow.flow.element.condition.IteratorCondition;
public class IteratorOperator extends BaseOperator<IteratorCondition> {
@Override
public IteratorCondition build(Object[] objects) throws Exception {
OperatorHelper.checkObjectSizeEq(objects, 1);
@Override
public IteratorCondition build(Object[] objects) throws Exception {
OperatorHelper.checkObjectSizeEq(objects, 1);
Node node = OperatorHelper.convert(objects[0], Node.class);
if (!ListUtil.toList(NodeTypeEnum.ITERATOR, NodeTypeEnum.FALLBACK).contains(node.getType())) {
throw new QLException("The parameter must be iterator-node item");
}
Node node = OperatorHelper.convert(objects[0], Node.class);
if (!ListUtil.toList(NodeTypeEnum.ITERATOR, NodeTypeEnum.FALLBACK).contains(node.getType())) {
throw new QLException("The parameter must be iterator-node item");
}
IteratorCondition iteratorCondition = new IteratorCondition();
iteratorCondition.setIteratorNode(node);
IteratorCondition iteratorCondition = new IteratorCondition();
iteratorCondition.setIteratorNode(node);
return iteratorCondition;
}
return iteratorCondition;
}
}

View File

@@ -16,19 +16,19 @@ import com.yomahub.liteflow.flow.element.condition.SwitchCondition;
*/
public class SwitchOperator extends BaseOperator<SwitchCondition> {
@Override
public SwitchCondition build(Object[] objects) throws Exception {
OperatorHelper.checkObjectSizeEqOne(objects);
@Override
public SwitchCondition build(Object[] objects) throws Exception {
OperatorHelper.checkObjectSizeEqOne(objects);
Node switchNode = OperatorHelper.convert(objects[0], Node.class);
if (!ListUtil.toList(NodeTypeEnum.SWITCH, NodeTypeEnum.SWITCH_SCRIPT, NodeTypeEnum.FALLBACK)
.contains(switchNode.getType())) {
throw new QLException("The caller must be Switch item");
}
Node switchNode = OperatorHelper.convert(objects[0], Node.class);
if (!ListUtil.toList(NodeTypeEnum.SWITCH, NodeTypeEnum.SWITCH_SCRIPT, NodeTypeEnum.FALLBACK).contains(switchNode.getType())) {
throw new QLException("The caller must be Switch item");
}
SwitchCondition switchCondition = new SwitchCondition();
switchCondition.setSwitchNode(switchNode);
SwitchCondition switchCondition = new SwitchCondition();
switchCondition.setSwitchNode(switchNode);
return switchCondition;
}
return switchCondition;
}
}

View File

@@ -6,7 +6,6 @@
* @email weenyc31@163.com
* @Date 2020/4/1
*/
package com.yomahub.liteflow.flow;
import cn.hutool.core.collection.ListUtil;
@@ -14,7 +13,9 @@ import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.StrUtil;
import com.yomahub.liteflow.annotation.FallbackCmp;
import com.yomahub.liteflow.annotation.util.AnnoUtil;
import com.yomahub.liteflow.core.*;
import com.yomahub.liteflow.core.ComponentInitializer;
import com.yomahub.liteflow.core.NodeComponent;
import com.yomahub.liteflow.core.ScriptComponent;
import com.yomahub.liteflow.enums.FlowParserTypeEnum;
import com.yomahub.liteflow.enums.NodeTypeEnum;
import com.yomahub.liteflow.exception.ComponentCannotRegisterException;
@@ -49,246 +50,254 @@ import java.util.stream.Collectors;
*/
public class FlowBus {
private static final LFLog LOG = LFLoggerManager.getLogger(FlowBus.class);
private static final LFLog LOG = LFLoggerManager.getLogger(FlowBus.class);
private static final Map<String, Chain> chainMap = new CopyOnWriteHashMap<>();
private static final Map<String, Chain> chainMap = new CopyOnWriteHashMap<>();
private static final Map<String, Node> nodeMap = new CopyOnWriteHashMap<>();
private static final Map<String, Node> nodeMap = new CopyOnWriteHashMap<>();
private static final Map<NodeTypeEnum, Node> fallbackNodeMap = new CopyOnWriteHashMap<>();
private static final Map<NodeTypeEnum, Node> fallbackNodeMap = new CopyOnWriteHashMap<>();
private FlowBus() {
}
private FlowBus() {
}
public static Chain getChain(String id) {
return chainMap.get(id);
}
public static Chain getChain(String id) {
return chainMap.get(id);
}
// 这一方法主要用于第一阶段chain的预装载
public static void addChain(String chainName) {
if (!chainMap.containsKey(chainName)) {
chainMap.put(chainName, new Chain(chainName));
}
}
// 这一方法主要用于第一阶段chain的预装载
public static void addChain(String chainName) {
if (!chainMap.containsKey(chainName)) {
chainMap.put(chainName, new Chain(chainName));
}
}
// 这个方法主要用于第二阶段的替换chain
public static void addChain(Chain chain) {
chainMap.put(chain.getChainId(), chain);
}
// 这个方法主要用于第二阶段的替换chain
public static void addChain(Chain chain) {
chainMap.put(chain.getChainId(), chain);
}
public static boolean containChain(String chainId) {
return chainMap.containsKey(chainId);
}
public static boolean containChain(String chainId) {
return chainMap.containsKey(chainId);
}
public static boolean needInit() {
return MapUtil.isEmpty(chainMap);
}
public static boolean needInit() {
return MapUtil.isEmpty(chainMap);
}
public static boolean containNode(String nodeId) {
return nodeMap.containsKey(nodeId);
}
public static boolean containNode(String nodeId) {
return nodeMap.containsKey(nodeId);
}
/**
* 添加已托管的节点Spring、Solon 管理的节点)
*/
public static void addManagedNode(String nodeId, NodeComponent nodeComponent) {
// 根据class来猜测类型
NodeTypeEnum type = NodeTypeEnum.guessType(nodeComponent.getClass());
/**
* 添加已托管的节点Spring、Solon 管理的节点)
* */
public static void addManagedNode(String nodeId, NodeComponent nodeComponent) {
// 根据class来猜测类型
NodeTypeEnum type = NodeTypeEnum.guessType(nodeComponent.getClass());
if (type == null) {
throw new NullNodeTypeException(StrUtil.format("node type is null for node[{}]", nodeId));
}
if (type == null) {
throw new NullNodeTypeException(StrUtil.format("node type is null for node[{}]", nodeId));
}
Node node = new Node(ComponentInitializer.loadInstance()
.initComponent(nodeComponent, type, nodeComponent.getName(), nodeId));
nodeMap.put(nodeId, node);
addFallbackNode(node);
}
Node node = new Node(ComponentInitializer.loadInstance()
.initComponent(nodeComponent, type, nodeComponent.getName(), nodeId));
nodeMap.put(nodeId, node);
addFallbackNode(node);
}
/**
* 添加 node
*
* @param nodeId 节点id
* @param name 节点名称
* @param type 节点类型
* @param cmpClazz 节点组件类
*/
public static void addNode(String nodeId, String name, NodeTypeEnum type, Class<?> cmpClazz) {
addNode(nodeId, name, type, cmpClazz, null, null);
}
/**
* 添加 node
* @param nodeId 节点id
* @param name 节点名称
* @param type 节点类型
* @param cmpClazz 节点组件类
*/
public static void addNode(String nodeId, String name, NodeTypeEnum type, Class<?> cmpClazz) {
addNode(nodeId, name, type, cmpClazz, null, null);
}
/**
* 添加 node
*
* @param nodeId 节点id
* @param name 节点名称
* @param nodeType 节点类型
* @param cmpClazzStr 节点组件类路径
*/
public static void addNode(String nodeId, String name, NodeTypeEnum nodeType, String cmpClazzStr) {
Class<?> cmpClazz;
try {
cmpClazz = Class.forName(cmpClazzStr);
} catch (Exception e) {
throw new ComponentCannotRegisterException(e.getMessage());
}
addNode(nodeId, name, nodeType, cmpClazz, null, null);
}
/**
* 添加 node
* @param nodeId 节点id
* @param name 节点名称
* @param nodeType 节点类型
* @param cmpClazzStr 节点组件类路径
*/
public static void addNode(String nodeId, String name, NodeTypeEnum nodeType, String cmpClazzStr) {
Class<?> cmpClazz;
try {
cmpClazz = Class.forName(cmpClazzStr);
}
catch (Exception e) {
throw new ComponentCannotRegisterException(e.getMessage());
}
addNode(nodeId, name, nodeType, cmpClazz, null, null);
}
/**
* 添加脚本 node
*
* @param nodeId 节点id
* @param name 节点名称
* @param nodeType 节点类型
* @param script 脚本
*/
public static void addScriptNode(String nodeId, String name, NodeTypeEnum nodeType, String script,
String language) {
addNode(nodeId, name, nodeType, ScriptComponent.ScriptComponentClassMap.get(nodeType), script, language);
}
/**
* 添加脚本 node
* @param nodeId 节点id
* @param name 节点名称
* @param nodeType 节点类型
* @param script 脚本
*/
public static void addScriptNode(String nodeId, String name, NodeTypeEnum nodeType, String script,
String language) {
addNode(nodeId, name, nodeType, ScriptComponent.ScriptComponentClassMap.get(nodeType), script, language);
}
private static void addNode(String nodeId, String name, NodeTypeEnum type, Class<?> cmpClazz, String script,
String language) {
try {
// 判断此类是否是声明式的组件,如果是声明式的组件,就用动态代理生成实例
// 如果不是声明式的,就用传统的方式进行判断
List<NodeComponent> cmpInstances = new ArrayList<>();
if (LiteFlowProxyUtil.isDeclareCmp(cmpClazz)) {
// 这里的逻辑要仔细看下
// 如果是spring体系把原始的类往spring上下文中进行注册那么会走到ComponentScanner中
// 由于ComponentScanner中已经对原始类进行了动态代理出来的对象已经变成了动态代理类所以这时候的bean已经是NodeComponent的子类了
// 所以spring体系下无需再对这个bean做二次代理
// 但是在非spring体系下这个bean依旧是原来那个bean所以需要对这个bean做一次代理
// 这里用ContextAware的spi机制来判断是否spring体系
ContextAware contextAware = ContextAwareHolder.loadContextAware();
Object bean = ContextAwareHolder.loadContextAware().registerBean(nodeId, cmpClazz);
if (LocalContextAware.class.isAssignableFrom(contextAware.getClass())) {
cmpInstances = LiteFlowProxyUtil.proxy2NodeComponent(bean, nodeId);
} else {
cmpInstances = ListUtil.toList((NodeComponent) bean);
}
} else {
// 以node方式配置本质上是为了适配无spring的环境如果有spring环境其实不用这么配置
// 这里的逻辑是判断是否能从spring上下文中取到如果没有spring则就是new instance了
// 如果是script类型的节点因为class只有一个所以也不能注册进spring上下文注册的时候需要new Instance
if (!type.isScript()) {
cmpInstances = ListUtil.toList(
(NodeComponent) ContextAwareHolder.loadContextAware().registerOrGet(nodeId, cmpClazz));
}
// 去除null元素
cmpInstances.remove(null);
// 如果为空
if (cmpInstances.isEmpty()) {
NodeComponent cmpInstance = (NodeComponent) cmpClazz.newInstance();
cmpInstances.add(cmpInstance);
}
}
// 进行初始化component
cmpInstances = cmpInstances.stream().map(cmpInstance -> ComponentInitializer.loadInstance()
.initComponent(cmpInstance, type, name,
cmpInstance.getNodeId() == null ? nodeId : cmpInstance.getNodeId()))
.collect(Collectors.toList());
private static void addNode(String nodeId, String name, NodeTypeEnum type, Class<?> cmpClazz, String script,
String language) {
try {
// 判断此类是否是声明式的组件,如果是声明式的组件,就用动态代理生成实例
// 如果不是声明式的,就用传统的方式进行判断
List<NodeComponent> cmpInstances = new ArrayList<>();
if (LiteFlowProxyUtil.isDeclareCmp(cmpClazz)) {
// 这里的逻辑要仔细看下
// 如果是spring体系把原始的类往spring上下文中进行注册那么会走到ComponentScanner中
// 由于ComponentScanner中已经对原始类进行了动态代理出来的对象已经变成了动态代理类所以这时候的bean已经是NodeComponent的子类了
// 所以spring体系下无需再对这个bean做二次代理
// 但是在非spring体系下这个bean依旧是原来那个bean所以需要对这个bean做一次代理
// 这里用ContextAware的spi机制来判断是否spring体系
ContextAware contextAware = ContextAwareHolder.loadContextAware();
Object bean = ContextAwareHolder.loadContextAware().registerBean(nodeId, cmpClazz);
if (LocalContextAware.class.isAssignableFrom(contextAware.getClass())) {
cmpInstances = LiteFlowProxyUtil.proxy2NodeComponent(bean, nodeId);
}
else {
cmpInstances = ListUtil.toList((NodeComponent) bean);
}
}
else {
// 以node方式配置本质上是为了适配无spring的环境如果有spring环境其实不用这么配置
// 这里的逻辑是判断是否能从spring上下文中取到如果没有spring则就是new instance了
// 如果是script类型的节点因为class只有一个所以也不能注册进spring上下文注册的时候需要new Instance
if (!type.isScript()) {
cmpInstances = ListUtil
.toList((NodeComponent) ContextAwareHolder.loadContextAware().registerOrGet(nodeId, cmpClazz));
}
// 去除null元素
cmpInstances.remove(null);
// 如果为空
if (cmpInstances.isEmpty()) {
NodeComponent cmpInstance = (NodeComponent) cmpClazz.newInstance();
cmpInstances.add(cmpInstance);
}
}
// 进行初始化component
cmpInstances = cmpInstances.stream()
.map(cmpInstance -> ComponentInitializer.loadInstance()
.initComponent(cmpInstance, type, name,
cmpInstance.getNodeId() == null ? nodeId : cmpInstance.getNodeId()))
.collect(Collectors.toList());
// 初始化Node把component放到Node里去
List<Node> nodes = cmpInstances.stream().map(Node::new).collect(Collectors.toList());
// 初始化Node把component放到Node里去
List<Node> nodes = cmpInstances.stream().map(Node::new).collect(Collectors.toList());
for (int i = 0; i < nodes.size(); i++) {
Node node = nodes.get(i);
NodeComponent cmpInstance = cmpInstances.get(i);
// 如果是脚本节点则还要加载script脚本
if (type.isScript()) {
if (StrUtil.isNotBlank(script)) {
node.setScript(script);
node.setLanguage(language);
((ScriptComponent) cmpInstance).loadScript(script, language);
} else {
String errorMsg = StrUtil.format("script for node[{}] is empty", nodeId);
throw new ScriptLoadException(errorMsg);
}
}
for (int i = 0; i < nodes.size(); i++) {
Node node = nodes.get(i);
NodeComponent cmpInstance = cmpInstances.get(i);
// 如果是脚本节点则还要加载script脚本
if (type.isScript()) {
if (StrUtil.isNotBlank(script)) {
node.setScript(script);
node.setLanguage(language);
((ScriptComponent) cmpInstance).loadScript(script, language);
}
else {
String errorMsg = StrUtil.format("script for node[{}] is empty", nodeId);
throw new ScriptLoadException(errorMsg);
}
}
String activeNodeId = StrUtil.isEmpty(cmpInstance.getNodeId()) ? nodeId : cmpInstance.getNodeId();
nodeMap.put(activeNodeId, node);
addFallbackNode(node);
}
String activeNodeId = StrUtil.isEmpty(cmpInstance.getNodeId()) ? nodeId : cmpInstance.getNodeId();
nodeMap.put(activeNodeId, node);
addFallbackNode(node);
}
} catch (Exception e) {
String error = StrUtil.format("component[{}] register error",
StrUtil.isEmpty(name) ? nodeId : StrUtil.format("{}({})", nodeId, name));
LOG.error(e.getMessage());
throw new ComponentCannotRegisterException(StrUtil.format("{} {}", error, e.getMessage()));
}
}
}
catch (Exception e) {
String error = StrUtil.format("component[{}] register error",
StrUtil.isEmpty(name) ? nodeId : StrUtil.format("{}({})", nodeId, name));
LOG.error(e.getMessage());
throw new ComponentCannotRegisterException(StrUtil.format("{} {}", error, e.getMessage()));
}
}
public static Node getNode(String nodeId) {
return nodeMap.get(nodeId);
}
public static Node getNode(String nodeId) {
return nodeMap.get(nodeId);
}
public static Map<String, Node> getNodeMap() {
return nodeMap;
}
public static Map<String, Node> getNodeMap() {
return nodeMap;
}
public static Map<String, Chain> getChainMap() {
return chainMap;
}
public static Map<String, Chain> getChainMap() {
return chainMap;
}
public static Node getFallBackNode(NodeTypeEnum nodeType) {
return fallbackNodeMap.get(nodeType);
}
public static Node getFallBackNode(NodeTypeEnum nodeType) {
return fallbackNodeMap.get(nodeType);
}
public static void cleanCache() {
chainMap.clear();
nodeMap.clear();
fallbackNodeMap.clear();
cleanScriptCache();
}
public static void cleanCache() {
chainMap.clear();
nodeMap.clear();
fallbackNodeMap.clear();
cleanScriptCache();
}
public static void cleanScriptCache() {
// 如果引入了脚本组件SPI则还需要清理脚本的缓存
try {
ScriptExecutorFactory.loadInstance().cleanScriptCache();
} catch (ScriptSpiException ignored) {
}
}
public static void cleanScriptCache() {
// 如果引入了脚本组件SPI则还需要清理脚本的缓存
try {
ScriptExecutorFactory.loadInstance().cleanScriptCache();
}
catch (ScriptSpiException ignored) {
}
}
public static void refreshFlowMetaData(FlowParserTypeEnum type, String content) throws Exception {
if (type.equals(FlowParserTypeEnum.TYPE_EL_XML)) {
new LocalXmlFlowELParser().parse(content);
} else if (type.equals(FlowParserTypeEnum.TYPE_EL_JSON)) {
new LocalJsonFlowELParser().parse(content);
} else if (type.equals(FlowParserTypeEnum.TYPE_EL_YML)) {
new LocalYmlFlowELParser().parse(content);
}
}
public static void refreshFlowMetaData(FlowParserTypeEnum type, String content) throws Exception {
if (type.equals(FlowParserTypeEnum.TYPE_EL_XML)) {
new LocalXmlFlowELParser().parse(content);
}
else if (type.equals(FlowParserTypeEnum.TYPE_EL_JSON)) {
new LocalJsonFlowELParser().parse(content);
}
else if (type.equals(FlowParserTypeEnum.TYPE_EL_YML)) {
new LocalYmlFlowELParser().parse(content);
}
}
public static boolean removeChain(String chainId) {
if (containChain(chainId)) {
chainMap.remove(chainId);
return true;
} else {
String errMsg = StrUtil.format("cannot find the chain[{}]", chainId);
LOG.error(errMsg);
return false;
}
}
public static boolean removeChain(String chainId) {
if (containChain(chainId)) {
chainMap.remove(chainId);
return true;
}
else {
String errMsg = StrUtil.format("cannot find the chain[{}]", chainId);
LOG.error(errMsg);
return false;
}
}
public static void removeChain(String... chainIds) {
Arrays.stream(chainIds).forEach(FlowBus::removeChain);
}
public static void removeChain(String... chainIds) {
Arrays.stream(chainIds).forEach(FlowBus::removeChain);
}
private static void addFallbackNode(Node node) {
NodeComponent nodeComponent = node.getInstance();
FallbackCmp fallbackCmp = AnnoUtil.getAnnotation(nodeComponent.getClass(), FallbackCmp.class);
if (fallbackCmp == null) {
return;
}
// 判断是否是降级组件,如果是则添加到 fallbackNodeMap
private static void addFallbackNode(Node node) {
NodeComponent nodeComponent = node.getInstance();
FallbackCmp fallbackCmp = AnnoUtil.getAnnotation(nodeComponent.getClass(), FallbackCmp.class);
if (fallbackCmp == null) {
return;
}
NodeTypeEnum nodeType = node.getType();
if (nodeType == null) {
nodeType = fallbackCmp.type();
}
fallbackNodeMap.put(nodeType, node);
}
NodeTypeEnum nodeType = node.getType();
if (nodeType == null) {
nodeType = fallbackCmp.type();
}
fallbackNodeMap.put(nodeType, node);
}
}

View File

@@ -1,20 +1,18 @@
/**
* <p>Title: liteflow</p>
* <p>Description: 轻量级的组件式流程框架</p>
*
* @author Bryan.Zhang
* @email weenyc31@163.com
* @Date 2020/4/1
*/
package com.yomahub.liteflow.flow.element;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.collection.ListUtil;
import cn.hutool.core.util.ObjectUtil;
import com.yomahub.liteflow.enums.ConditionTypeEnum;
import com.yomahub.liteflow.enums.ExecuteTypeEnum;
import com.yomahub.liteflow.exception.ChainEndException;
import com.yomahub.liteflow.enums.ConditionTypeEnum;
import com.yomahub.liteflow.flow.element.condition.ConditionKey;
import com.yomahub.liteflow.slot.DataBus;
import com.yomahub.liteflow.slot.Slot;
@@ -30,136 +28,141 @@ import java.util.Map;
* @author Bryan.Zhang
* @author DaleLee
*/
public abstract class Condition implements Executable {
public abstract class Condition implements Executable{
private String id;
private String id;
private String tag;
private String tag;
/**
* 可执行元素的集合
*/
private final Map<String, List<Executable>> executableGroup = new HashMap<>();
/**
* 可执行元素的集合
*/
private final Map<String, List<Executable>> executableGroup = new HashMap<>();
/**
* 当前所在的ChainName 如果对于子流程来说那这个就是子流程所在的Chain
*/
private String currChainId;
/**
* 当前所在的ChainName 如果对于子流程来说那这个就是子流程所在的Chain
*/
private String currChainId;
@Override
public void execute(Integer slotIndex) throws Exception {
// 当前 Condition 入栈
Slot slot = DataBus.getSlot(slotIndex);
try {
slot.pushCondition(this);
executeCondition(slotIndex);
} catch (ChainEndException e) {
// 这里单独catch ChainEndException是因为ChainEndException是用户自己setIsEnd抛出的异常
// 是属于正常逻辑所以会在FlowExecutor中判断。这里不作为异常处理
throw e;
} catch (Exception e) {
String chainId = this.getCurrChainId();
// 这里事先取到exception set到slot里为了方便finally取到exception
if (slot.isSubChain(chainId)) {
slot.setSubException(chainId, e);
} else {
slot.setException(e);
}
throw e;
} finally {
// 当前 Condition 出栈
slot.popCondition();
}
}
@Override
public void execute(Integer slotIndex) throws Exception {
Slot slot = DataBus.getSlot(slotIndex);
try {
// 当前 Condition 入栈
slot.pushCondition(this);
executeCondition(slotIndex);
}
catch (ChainEndException e) {
// 这里单独catch ChainEndException是因为ChainEndException是用户自己setIsEnd抛出的异常
// 是属于正常逻辑所以会在FlowExecutor中判断。这里不作为异常处理
throw e;
}
catch (Exception e) {
String chainId = this.getCurrChainId();
// 这里事先取到exception set到slot里为了方便finally取到exception
if (slot.isSubChain(chainId)) {
slot.setSubException(chainId, e);
}
else {
slot.setException(e);
}
throw e;
} finally {
// 当前 Condition 出栈
slot.popCondition();
}
}
public abstract void executeCondition(Integer slotIndex) throws Exception;
public abstract void executeCondition(Integer slotIndex) throws Exception;
@Override
public ExecuteTypeEnum getExecuteType() {
return ExecuteTypeEnum.CONDITION;
}
@Override
public ExecuteTypeEnum getExecuteType() {
return ExecuteTypeEnum.CONDITION;
}
public List<Executable> getExecutableList() {
return getExecutableList(ConditionKey.DEFAULT_KEY);
}
public List<Executable> getExecutableList() {
return getExecutableList(ConditionKey.DEFAULT_KEY);
}
public List<Executable> getExecutableList(String groupKey) {
List<Executable> executableList = this.executableGroup.get(groupKey);
if (CollUtil.isEmpty(executableList)) {
executableList = new ArrayList<>();
}
return executableList;
}
public List<Executable> getExecutableList(String groupKey) {
List<Executable> executableList = this.executableGroup.get(groupKey);
if (CollUtil.isEmpty(executableList)) {
executableList = new ArrayList<>();
}
return executableList;
}
public Executable getExecutableOne(String groupKey) {
List<Executable> list = getExecutableList(groupKey);
if (CollUtil.isEmpty(list)) {
return null;
} else {
return list.get(0);
}
}
public Executable getExecutableOne(String groupKey) {
List<Executable> list = getExecutableList(groupKey);
if (CollUtil.isEmpty(list)) {
return null;
}
else {
return list.get(0);
}
}
public void setExecutableList(List<Executable> executableList) {
this.executableGroup.put(ConditionKey.DEFAULT_KEY, executableList);
}
public void setExecutableList(List<Executable> executableList) {
this.executableGroup.put(ConditionKey.DEFAULT_KEY, executableList);
}
public void addExecutable(Executable executable) {
addExecutable(ConditionKey.DEFAULT_KEY, executable);
}
public void addExecutable(Executable executable) {
addExecutable(ConditionKey.DEFAULT_KEY, executable);
}
public void addExecutable(String groupKey, Executable executable) {
if (ObjectUtil.isNull(executable)) {
return;
}
List<Executable> executableList = this.executableGroup.get(groupKey);
if (CollUtil.isEmpty(executableList)) {
this.executableGroup.put(groupKey, ListUtil.toList(executable));
} else {
this.executableGroup.get(groupKey).add(executable);
}
}
public void addExecutable(String groupKey, Executable executable) {
if (ObjectUtil.isNull(executable)) {
return;
}
List<Executable> executableList = this.executableGroup.get(groupKey);
if (CollUtil.isEmpty(executableList)) {
this.executableGroup.put(groupKey, ListUtil.toList(executable));
}
else {
this.executableGroup.get(groupKey).add(executable);
}
}
public abstract ConditionTypeEnum getConditionType();
public abstract ConditionTypeEnum getConditionType();
@Override
public String getId() {
return id;
}
@Override
public String getId() {
return id;
}
@Override
public void setId(String id) {
this.id = id;
}
@Override
public void setId(String id) {
this.id = id;
}
@Override
public String getTag() {
return tag;
}
@Override
public String getTag() {
return tag;
}
@Override
public void setTag(String tag) {
this.tag = tag;
}
@Override
public void setTag(String tag) {
this.tag = tag;
}
/**
* 请使用 {@link #setCurrChainId(String)}
*/
@Deprecated
public String getCurrChainName() {
return currChainId;
}
/**
* 请使用 {@link #setCurrChainId(String)}
*/
@Deprecated
public String getCurrChainName() {
return currChainId;
}
public String getCurrChainId() {
return currChainId;
}
public String getCurrChainId() {
return currChainId;
}
@Override
public void setCurrChainId(String currChainId) {
this.currChainId = currChainId;
}
@Override
public void setCurrChainId(String currChainId) {
this.currChainId = currChainId;
}
public Map<String, List<Executable>> getExecutableGroup() {
return executableGroup;
}
public Map<String, List<Executable>> getExecutableGroup() {
return executableGroup;
}
}

View File

@@ -19,8 +19,10 @@ import com.yomahub.liteflow.flow.id.IdGeneratorHolder;
import com.yomahub.liteflow.log.LFLog;
import com.yomahub.liteflow.log.LFLoggerManager;
import com.yomahub.liteflow.property.LiteflowConfigGetter;
import java.util.Deque;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.Set;
@@ -90,7 +92,7 @@ public class Slot {
private List<Object> contextBeanList;
private final Deque<Condition> conditionStack = new ConcurrentLinkedDeque<>();
private static final ThreadLocal<Deque<Condition>> conditionStack = ThreadLocal.withInitial(LinkedList::new);
public Slot() {
}
@@ -292,15 +294,15 @@ public class Slot {
}
public Condition getCurrentCondition() {
return this.conditionStack.peek();
return conditionStack.get().peek();
}
public void pushCondition(Condition condition) {
this.conditionStack.push(condition);
conditionStack.get().push(condition);
}
public void popCondition() {
this.conditionStack.pop();
conditionStack.get().pop();
}
/**

View File

@@ -10,6 +10,8 @@ import org.springframework.context.annotation.ComponentScan;
import org.springframework.test.context.TestPropertySource;
import javax.annotation.Resource;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
/**
* SpringBoot 降级组件测试
@@ -186,4 +188,36 @@ public class FallbackSpringbootTest {
Assertions.assertTrue(response.isSuccess());
Assertions.assertEquals("for1==>b==>c==>b==>c==>b==>c", response.getExecuteStepStrWithoutTime());
}
@Test
public void testConcurrent1() {
LiteflowResponse response = flowExecutor.execute2Resp("concurrent1", "arg");
Assertions.assertTrue(response.isSuccess());
String stepStr = response.getExecuteStepStrWithoutTime();
Assertions.assertTrue("c==>ifn2".equals(stepStr) || "ifn2==>c".equals(stepStr));
}
@Test
public void testConcurrent2() {
LiteflowResponse response = flowExecutor.execute2Resp("concurrent2", "arg");
Assertions.assertTrue(response.isSuccess());
String stepStr = response.getExecuteStepStrWithoutTime();
Assertions.assertTrue("c==>ifn2".equals(stepStr) || "ifn2==>c".equals(stepStr));
}
@Test
public void testConcurrent3() throws ExecutionException, InterruptedException {
// 执行多条 chain
Future<LiteflowResponse> future1 = flowExecutor.execute2Future("concurrent1", "arg", new Object());
Future<LiteflowResponse> future2 = flowExecutor.execute2Future("concurrent2", "arg", new Object());
Thread.sleep(1000);
LiteflowResponse response1 = future1.get();
LiteflowResponse response2 = future2.get();
Assertions.assertTrue(response1.isSuccess());
String stepStr1 = response1.getExecuteStepStrWithoutTime();
Assertions.assertTrue("c==>ifn2".equals(stepStr1) || "ifn2==>c".equals(stepStr1));
Assertions.assertTrue(response2.isSuccess());
String stepStr2 = response2.getExecuteStepStrWithoutTime();
Assertions.assertTrue("c==>ifn2".equals(stepStr2) || "ifn2==>c".equals(stepStr2));
}
}

View File

@@ -1,11 +1,3 @@
/**
* <p>Title: liteflow</p>
* <p>Description: 轻量级的组件式流程框架</p>
*
* @author Bryan.Zhang
* @email weenyc31@163.com
* @Date 2020/4/1
*/
package com.yomahub.liteflow.test.fallback.cmp;
import com.yomahub.liteflow.annotation.LiteflowComponent;

View File

@@ -1,11 +1,3 @@
/**
* <p>Title: liteflow</p>
* <p>Description: 轻量级的组件式流程框架</p>
*
* @author Bryan.Zhang
* @email weenyc31@163.com
* @Date 2020/4/1
*/
package com.yomahub.liteflow.test.fallback.cmp;
import com.yomahub.liteflow.annotation.LiteflowComponent;

View File

@@ -1,11 +1,3 @@
/**
* <p>Title: liteflow</p>
* <p>Description: 轻量级的组件式流程框架</p>
*
* @author Bryan.Zhang
* @email weenyc31@163.com
* @Date 2020/4/1
*/
package com.yomahub.liteflow.test.fallback.cmp;
import com.yomahub.liteflow.annotation.FallbackCmp;

View File

@@ -8,6 +8,7 @@
<chain name="then2">
THEN(PRE(node("x1")), node("x2"), FINALLY(node("x3")));
</chain>
<!-- WHEN 普通组件降级 -->
<chain name="when1">
WHEN(b, node("x"));
@@ -33,12 +34,12 @@
FOR(3).DO(node("x"));
</chain>
<!-- WHILE 条件组件降级 -->
<!-- WHILE 条件循环组件降级 -->
<chain name="while1">
WHILE(node("x")).DO(a)
</chain>
<!-- WHILE 条件普通降级 -->
<!-- WHILE 普通组件降级 -->
<chain name="while2">
WHILE(wn1).DO(node("x"))
</chain>
@@ -117,4 +118,19 @@
THEN(b, node("x2"))
);
</chain>
<!-- 并发降级测试 -->
<chain name="concurrent1">
WHEN(
THEN(node("x1")),
IF(node("x2"), b)
).maxWaitSeconds(10000);
</chain>
<chain name="concurrent2">
WHEN(
node("x1"),
IF(node("x2"), b)
).maxWaitSeconds(10000);
</chain>
</flow>