diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/builder/el/LiteFlowChainELBuilder.java b/liteflow-core/src/main/java/com/yomahub/liteflow/builder/el/LiteFlowChainELBuilder.java index b777752db..7ce55ea09 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/builder/el/LiteFlowChainELBuilder.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/builder/el/LiteFlowChainELBuilder.java @@ -5,6 +5,7 @@ import cn.hutool.core.util.ArrayUtil; import cn.hutool.core.util.CharUtil; import cn.hutool.core.util.ObjectUtil; import cn.hutool.core.util.StrUtil; +import cn.hutool.crypto.digest.MD5; import com.fasterxml.jackson.databind.JsonMappingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.ql.util.express.DefaultContext; @@ -29,6 +30,7 @@ import com.yomahub.liteflow.log.LFLog; import com.yomahub.liteflow.log.LFLoggerManager; import com.yomahub.liteflow.property.LiteflowConfig; import com.yomahub.liteflow.property.LiteflowConfigGetter; +import com.yomahub.liteflow.util.ElRegexUtil; import java.util.ArrayList; import java.util.Arrays; @@ -190,6 +192,10 @@ public class LiteFlowChainELBuilder { } this.chain.setEl(elStr); + + String elMd5 = MD5.create().digestHex(ElRegexUtil.normalize(elStr)); + this.chain.setElMd5(elMd5); + LiteflowConfig liteflowConfig = LiteflowConfigGetter.get(); // 如果设置了不检查Node是否存在,那么这里是不解析的 if (liteflowConfig.getParseMode().equals(ParseModeEnum.PARSE_ONE_ON_FIRST_EXEC)){ diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/core/FlowExecutor.java b/liteflow-core/src/main/java/com/yomahub/liteflow/core/FlowExecutor.java index d393d75a4..628d7f70b 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/core/FlowExecutor.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/core/FlowExecutor.java @@ -13,7 +13,10 @@ import cn.hutool.core.collection.ListUtil; import cn.hutool.core.lang.Tuple; import cn.hutool.core.map.MapUtil; import cn.hutool.core.util.*; +import cn.hutool.crypto.digest.MD5; +import com.yomahub.liteflow.builder.el.LiteFlowChainELBuilder; import com.yomahub.liteflow.common.ChainConstant; +import com.yomahub.liteflow.common.entity.ValidationResp; import com.yomahub.liteflow.enums.ChainExecuteModeEnum; import com.yomahub.liteflow.enums.ParseModeEnum; import com.yomahub.liteflow.exception.*; @@ -39,6 +42,7 @@ import com.yomahub.liteflow.slot.Slot; import com.yomahub.liteflow.spi.holder.ContextCmpInitHolder; import com.yomahub.liteflow.spi.holder.PathContentParserHolder; import com.yomahub.liteflow.thread.ExecutorHelper; +import com.yomahub.liteflow.util.ElRegexUtil; import java.util.*; import java.util.concurrent.CompletableFuture; @@ -50,6 +54,7 @@ import java.util.stream.Collectors; * 流程规则主要执行器类 * * @author Bryan.Zhang + * @author luo yi */ public class FlowExecutor { @@ -256,6 +261,92 @@ public class FlowExecutor { return this.execute2Resp(chainId, param, null, contextBeanClazzArray, null); } + /** + * 直接执行 EL 表达式 + * + * @param elStr EL 表达式 + * @return LiteflowResponse + */ + public LiteflowResponse execute2RespWithEL(String elStr) { + return this.execute2RespWithEL(elStr, null, null, DefaultContext.class); + } + + /** + * 直接执行 EL 表达式 + * + * @param elStr EL 表达式 + * @param param 入参 + * @return LiteflowResponse + */ + public LiteflowResponse execute2RespWithEL(String elStr, Object param) { + return this.execute2RespWithEL(elStr, param, null, DefaultContext.class); + } + + /** + * 直接执行 EL 表达式 + * + * @param elStr EL 表达式 + * @param param 入参 + * @param requestId 请求 ID + * @param contextBeanClazzArray 上下文 Class + * @return LiteflowResponse + */ + public LiteflowResponse execute2RespWithEL(String elStr, Object param, String requestId, Class... contextBeanClazzArray) { + return this.execute2RespWithEL(elStr, param, requestId, contextBeanClazzArray, null); + } + + /** + * 直接执行 EL 表达式 + * + * @param elStr EL 表达式 + * @param param 入参 + * @param requestId 请求 ID + * @param contextBeanArray 上下文对象 + * @return LiteflowResponse + */ + public LiteflowResponse execute2RespWithEL(String elStr, Object param, String requestId, Object... contextBeanArray) { + return this.execute2RespWithEL(elStr, param, requestId, null, contextBeanArray); + } + + /** + * 直接执行 EL 表达式 + * + * @param elStr EL 表达式 + * @param param 入参 + * @param requestId 请求 ID + * @param contextBeanClazzArray 上下文 Class 数组 + * @param contextBeanArray 上下文对象数组 + * @return LiteflowResponse + */ + private LiteflowResponse execute2RespWithEL(String elStr, Object param, String requestId, Class[] contextBeanClazzArray, Object[] contextBeanArray) { + // 规范化 el 表达式 + String normalizedEl = ElRegexUtil.normalize(elStr); + + // 校验 EL 是否正常 + ValidationResp validationResp = LiteFlowChainELBuilder.validateWithEx(normalizedEl); + + if (!validationResp.isSuccess()) { + // 实际封装的是 ELParseException 类型 + return LiteflowResponse.newMainResponse(validationResp.getCause()); + } + + // 计算 EL MD5 值,并检查对应的 chain 是否已加载到内存中 + String elMd5 = MD5.create().digestHex(normalizedEl); + + String chainId; + + if (StrUtil.isEmpty(chainId = FlowBus.getChainIdByElMd5(elMd5))) { + // 调用表达式构造 chain,并且返回 UUID 作为 chainId + chainId = IdUtil.fastSimpleUUID(); + LiteFlowChainELBuilder.createChain() + .setChainId(chainId) + .setEL(normalizedEl) + .build(); + } + + return this.execute2Resp(chainId, param, requestId, contextBeanClazzArray, contextBeanArray); + } + public List executeRouteChain(Object param, Class... contextBeanClazzArray){ return this.executeWithRoute(null, param, null, contextBeanClazzArray, null); } diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/FlowBus.java b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/FlowBus.java index b870ae257..3c2ff9761 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/FlowBus.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/FlowBus.java @@ -45,6 +45,7 @@ import com.yomahub.liteflow.spi.holder.DeclComponentParserHolder; import com.yomahub.liteflow.util.CopyOnWriteHashMap; import java.util.*; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; @@ -66,6 +67,8 @@ public class FlowBus { private static final Map fallbackNodeMap; + private static final Map elMd5Map; + private static final AtomicBoolean initStat = new AtomicBoolean(false); static { @@ -74,10 +77,12 @@ public class FlowBus { chainMap = new HashMap<>(); nodeMap = new HashMap<>(); fallbackNodeMap = new HashMap<>(); - }else{ + elMd5Map = new HashMap<>(); + } else { chainMap = new CopyOnWriteHashMap<>(); nodeMap = new CopyOnWriteHashMap<>(); fallbackNodeMap = new CopyOnWriteHashMap<>(); + elMd5Map = new ConcurrentHashMap<>(); } } @@ -86,9 +91,9 @@ public class FlowBus { } // 这一方法主要用于第一阶段chain的预装载 - public static void addChain(String chainName) { - if (!chainMap.containsKey(chainName)) { - chainMap.put(chainName, new Chain(chainName)); + public static void addChain(String chainId) { + if (!chainMap.containsKey(chainId)) { + chainMap.put(chainId, new Chain(chainId)); } } @@ -103,6 +108,10 @@ public class FlowBus { chainMap.put(chain.getChainId(), chain); + if (StrUtil.isNotBlank(chain.getEl())){ + elMd5Map.put(chain.getElMd5(), chain.getChainId()); + } + //如果有生命周期则执行相应生命周期实现 if (CollUtil.isNotEmpty(LifeCycleHolder.getPostProcessChainBuildLifeCycleList())){ LifeCycleHolder.getPostProcessChainBuildLifeCycleList().forEach( @@ -201,7 +210,7 @@ public class FlowBus { } Node node = new Node(nodeId, name, nodeType, script, language); - nodeMap.put(nodeId, node); + put2NodeMap(nodeId, node); } else { addScriptNodeAndCompile(nodeId, name, nodeType, script, language); } @@ -215,11 +224,9 @@ public class FlowBus { * @param type type * @param script script content * @param language language - * @return NodeComponent instance */ - public static NodeComponent addScriptNodeAndCompile(String nodeId, String name, NodeTypeEnum type, String script, String language) { + public static void addScriptNodeAndCompile(String nodeId, String name, NodeTypeEnum type, String script, String language) { addNode(nodeId, name, type, ScriptComponent.ScriptComponentClassMap.get(type), script, language); - return nodeMap.get(nodeId).getInstance(); } private static List getNodeComponentList(String nodeId, String name, NodeTypeEnum type, Class cmpClazz) throws Exception { @@ -257,7 +264,7 @@ public class FlowBus { return cmpInstanceList; } - public static void compileNode(Node node) { + public static void compileScriptNode(Node node) { String nodeId = node.getId(), name = node.getName(), script = node.getScript(), language = node.getLanguage(); NodeTypeEnum type = node.getType(); try { @@ -292,8 +299,13 @@ public class FlowBus { addFallbackNode(node); } + // 如果是spring自动扫描的组件,在addManagedNode方法中就已经完成了组装了 + // 调用到这里,分两种情况,一是脚本组件,二是通过LiteFlowNodeBuilder代码进行组装的组件 private static void addNode(String nodeId, String name, NodeTypeEnum type, Class cmpClazz, String script, String language) { try { + // 获得初始化好的NodeComponent + // 按理说一个nodeId对应一个NodeComponent,这里得到的是List的原因是,声明式组件有可能会有多个nodeId。 + // 声明式组件又分类声明和方法声明,如果对于方法声明来说,这里的nodeId其实并不是最终真正的nodeId。 List cmpInstanceList = getNodeComponentList(nodeId, name, type, cmpClazz); // 初始化Node,把component放到Node里去 @@ -330,6 +342,7 @@ public class FlowBus { chainMap.clear(); nodeMap.clear(); fallbackNodeMap.clear(); + elMd5Map.clear(); cleanScriptCache(); } @@ -354,9 +367,15 @@ public class FlowBus { } } + public static String getChainIdByElMd5(String elMd5) { + return elMd5Map.get(elMd5); + } + public static boolean removeChain(String chainId) { if (containChain(chainId)) { - chainMap.remove(chainId); + Chain removedChain = chainMap.remove(chainId); + // 移除 elMd5 对应的 chainId + elMd5Map.remove(removedChain.getElMd5()); return true; } else { diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/LiteflowResponse.java b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/LiteflowResponse.java index 4c24df652..d5eec0e8e 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/LiteflowResponse.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/LiteflowResponse.java @@ -5,9 +5,10 @@ import com.yomahub.liteflow.exception.LiteFlowException; import com.yomahub.liteflow.flow.entity.CmpStep; import com.yomahub.liteflow.slot.Slot; -import java.io.Serializable; -import java.util.*; -import java.util.function.Consumer; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Queue; /** * 执行结果封装类 @@ -35,6 +36,12 @@ public class LiteflowResponse { return newResponse(slot, slot.getException()); } + public static LiteflowResponse newMainResponse(Exception exception) { + LiteflowResponse response = new LiteflowResponse(); + response.setExceptionParams(exception); + return response; + } + public static LiteflowResponse newInnerResponse(String chainId, Slot slot) { return newResponse(slot, slot.getSubException(chainId)); } @@ -42,20 +49,22 @@ public class LiteflowResponse { private static LiteflowResponse newResponse(Slot slot, Exception e) { LiteflowResponse response = new LiteflowResponse(); response.setChainId(slot.getChainId()); - if (e != null) { - response.setSuccess(false); - response.setCause(e); - response.setMessage(response.getCause().getMessage()); - response.setCode(response.getCause() instanceof LiteFlowException - ? ((LiteFlowException) response.getCause()).getCode() : null); - } - else { - response.setSuccess(true); - } + response.setExceptionParams(e); response.setSlot(slot); return response; } + private void setExceptionParams(Exception exception) { + if (exception != null) { + this.setSuccess(false); + this.setCause(exception); + this.setMessage(exception.getMessage()); + this.setCode(exception instanceof LiteFlowException ? ((LiteFlowException) exception).getCode() : null); + } else { + this.setSuccess(true); + } + } + public boolean isSuccess() { return success; } diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/element/Chain.java b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/element/Chain.java index 04ba347b4..aca03f3c6 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/element/Chain.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/element/Chain.java @@ -32,7 +32,7 @@ import java.util.List; * @author jason * @author luo yi */ -public class Chain implements Executable{ +public class Chain implements Executable { private static final LFLog LOG = LFLoggerManager.getLogger(Chain.class); @@ -48,6 +48,8 @@ public class Chain implements Executable{ private String namespace = ChainConstant.DEFAULT_NAMESPACE; + private String elMd5; + private String threadPoolExecutorClass; private final TransmittableThreadLocal runtimeIdTL = new TransmittableThreadLocal<>(); @@ -245,7 +247,15 @@ public class Chain implements Executable{ this.threadPoolExecutorClass = threadPoolExecutorClass; } - public Long getRuntimeId(){ - return runtimeIdTL.get(); - } + public Long getRuntimeId() { + return runtimeIdTL.get(); + } + + public String getElMd5() { + return elMd5; + } + + public void setElMd5(String elMd5) { + this.elMd5 = elMd5; + } } diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/element/Node.java b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/element/Node.java index 90f7800e4..11d082ecc 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/element/Node.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/element/Node.java @@ -173,7 +173,7 @@ public class Node implements Executable, Cloneable, Rollbackable{ if (!this.isCompiled()) { synchronized (this) { if (!this.isCompiled()) { - FlowBus.compileNode(this); + FlowBus.compileScriptNode(this); } } } diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/instanceId/BaseNodeInstanceIdManageSpi.java b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/instanceId/BaseNodeInstanceIdManageSpi.java index b88183623..b18ef6467 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/instanceId/BaseNodeInstanceIdManageSpi.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/instanceId/BaseNodeInstanceIdManageSpi.java @@ -1,7 +1,6 @@ package com.yomahub.liteflow.flow.instanceId; import cn.hutool.core.collection.CollUtil; -import cn.hutool.crypto.digest.MD5; import com.yomahub.liteflow.flow.FlowBus; import com.yomahub.liteflow.flow.element.Chain; import com.yomahub.liteflow.flow.element.Condition; @@ -9,11 +8,14 @@ import com.yomahub.liteflow.flow.element.Node; import com.yomahub.liteflow.flow.entity.InstanceInfoDto; import com.yomahub.liteflow.util.JsonUtil; import org.apache.commons.lang.StringUtils; + import java.util.*; + import static com.yomahub.liteflow.util.SerialsUtil.generateShortUUID; /** * @author lhh + * @author luo yi * @since 2.13.0 */ public abstract class BaseNodeInstanceIdManageSpi implements NodeInstanceIdManageSpi { @@ -156,7 +158,7 @@ public abstract class BaseNodeInstanceIdManageSpi implements NodeInstanceIdManag public void setNodesInstanceId(Condition condition, Chain chain) { NodeInstanceIdManageSpi nodeInstanceIdManageSpi = NodeInstanceIdManageSpiHolder.getInstance().getNodeInstanceIdManageSpi(); - String elMd5 = MD5.create().digestHex(chain.getEl()); + String elMd5 = chain.getElMd5(); String chainId = chain.getChainId(); List instanceIdFile = nodeInstanceIdManageSpi.readInstanceIdFile(chainId); diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/parser/helper/ParserHelper.java b/liteflow-core/src/main/java/com/yomahub/liteflow/parser/helper/ParserHelper.java index 7b13563c6..9d1b6832c 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/parser/helper/ParserHelper.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/parser/helper/ParserHelper.java @@ -183,8 +183,8 @@ public class ParserHelper { //首先需要对继承自抽象Chain的chain进行字符串替换 parseImplChain(abstratChainMap, implChainSet, chain); //如果一个chain不为抽象chain,则进行解析 - String chainName = Optional.ofNullable(chain.attributeValue(ID)).orElse(chain.attributeValue(NAME)); - if(!abstratChainMap.containsKey(chainName)){ + String chainId = Optional.ofNullable(chain.attributeValue(ID)).orElse(chain.attributeValue(NAME)); + if(!abstratChainMap.containsKey(chainId)){ parseOneChainConsumer.accept(chain); } } diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/util/ElRegexUtil.java b/liteflow-core/src/main/java/com/yomahub/liteflow/util/ElRegexUtil.java index 8598f0531..1cd752002 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/util/ElRegexUtil.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/util/ElRegexUtil.java @@ -1,7 +1,5 @@ package com.yomahub.liteflow.util; -import cn.hutool.core.text.CharSequenceUtil; -import cn.hutool.core.util.StrUtil; import com.yomahub.liteflow.exception.ParseException; import java.util.regex.Matcher; @@ -56,4 +54,16 @@ public class ElRegexUtil { public static boolean isAbstractChain(String elStr) { return Pattern.compile(REGEX_ABSTRACT_HOLDER).matcher(elStr).find(); } + + /** + * 规范化 EL + * + * @param elStr + * @return String + */ + public static String normalize(String elStr) { + // 剔除 EL 中多余空格,且将单引号变为双引号,并在末尾保留一个分号 + return elStr.replace("'", "\"").replaceAll("\\s", "").replaceFirst(";*$", ";"); + } + } diff --git a/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/RedisXmlELParser.java b/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/RedisXmlELParser.java index 48a3a9c88..92196a6ac 100644 --- a/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/RedisXmlELParser.java +++ b/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/RedisXmlELParser.java @@ -24,6 +24,7 @@ import java.util.Objects; * Redis解析器实现,只支持EL形式的XML,不支持其他的形式 * * @author hxinyu + * @author jay li * @since 2.11.0 */ @@ -109,11 +110,17 @@ public class RedisXmlELParser extends ClassXmlFlowELParser { if (redisParserVO.getRedisMode().equals(RedisMode.SENTINEL) && CollectionUtil.isEmpty(redisParserVO.getSentinelAddress())) { throw new RedisException(StrFormatter.format(ERROR_MSG_PATTERN, "sentinel address list")); } - if (ObjectUtil.isNull(redisParserVO.getChainDataBase())) { + if (ObjectUtil.isNull(redisParserVO.getChainDataBase()) && !redisParserVO.getRedisMode().equals(RedisMode.CLUSTER)) { throw new RedisException(StrFormatter.format(ERROR_MSG_PATTERN, "chainDataBase")); } if (StrUtil.isBlank(redisParserVO.getChainKey())) { throw new RedisException(StrFormatter.format(ERROR_MSG_PATTERN, "chainKey")); } + if (redisParserVO.getRedisMode().equals(RedisMode.CLUSTER) && CollectionUtil.isEmpty(redisParserVO.getClusterNodeAddress())) { + throw new RedisException(StrFormatter.format(ERROR_MSG_PATTERN, "cluster address list")); + } + if (ObjectUtil.isNotNull(redisParserVO.getScriptKey()) && ObjectUtil.isNull(redisParserVO.getScriptDataBase()) && !redisParserVO.getRedisMode().equals(RedisMode.CLUSTER)) { + throw new RedisException(StrFormatter.format(ERROR_MSG_PATTERN, "scriptDataBase")); + } } } diff --git a/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/mode/RedisMode.java b/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/mode/RedisMode.java index ce5610428..d333da87d 100644 --- a/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/mode/RedisMode.java +++ b/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/mode/RedisMode.java @@ -4,16 +4,19 @@ package com.yomahub.liteflow.parser.redis.mode; * 用于定义Redis模式的枚举类 * * single单点模式, sentinel哨兵模式 - * 不支持集群模式配置 + * cluster 集群模式配置 * * @author hxinyu + * @author jay li * @since 2.11.0 */ public enum RedisMode { SINGLE("single"), - SENTINEL("sentinel"); + SENTINEL("sentinel"), + + CLUSTER("cluster"); private String mode; diff --git a/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/mode/RedisParserHelper.java b/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/mode/RedisParserHelper.java index e6d80d51f..e588ea0d9 100644 --- a/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/mode/RedisParserHelper.java +++ b/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/mode/RedisParserHelper.java @@ -3,6 +3,7 @@ package com.yomahub.liteflow.parser.redis.mode; import cn.hutool.core.lang.Pair; import cn.hutool.core.text.StrFormatter; import cn.hutool.core.util.BooleanUtil; +import cn.hutool.core.util.ObjectUtil; import cn.hutool.core.util.StrUtil; import com.yomahub.liteflow.builder.LiteFlowNodeBuilder; import com.yomahub.liteflow.builder.el.LiteFlowChainELBuilder; @@ -13,6 +14,7 @@ import com.yomahub.liteflow.log.LFLoggerManager; import com.yomahub.liteflow.parser.helper.NodeConvertHelper; import com.yomahub.liteflow.parser.redis.vo.RedisParserVO; import com.yomahub.liteflow.util.RuleParsePluginUtil; +import org.redisson.config.ClusterServersConfig; import org.redisson.config.Config; import org.redisson.config.SentinelServersConfig; import org.redisson.config.SingleServerConfig; @@ -22,6 +24,7 @@ import org.redisson.config.SingleServerConfig; * * @author hxinyu * @author Bryan.Zhang + * @author jay li * @since 2.11.0 */ @@ -33,6 +36,8 @@ public interface RedisParserHelper { String SENTINEL_REDIS_URL_PATTERN = "redis://{}"; + String CLUSTER_REDIS_URL_PATTERN = "redis://{}"; + String CHAIN_XML_PATTERN = "{}"; String NODE_XML_PATTERN = "{}"; @@ -55,6 +60,9 @@ public interface RedisParserHelper { * @return redisson config */ default Config getSingleRedissonConfig(RedisParserVO redisParserVO, Integer dataBase) { + if (ObjectUtil.isNull(dataBase)) { + return null; + } Config config = new Config(); String redisAddress = StrFormatter.format(SINGLE_REDIS_URL_PATTERN, redisParserVO.getHost(), redisParserVO.getPort()); @@ -81,6 +89,9 @@ public interface RedisParserHelper { * @return redisson Config */ default Config getSentinelRedissonConfig(RedisParserVO redisParserVO, Integer dataBase) { + if (ObjectUtil.isNull(dataBase)) { + return null; + } Config config = new Config(); SentinelServersConfig sentinelConfig = config.useSentinelServers() .setMasterName(redisParserVO.getMasterName()) @@ -109,6 +120,35 @@ public interface RedisParserHelper { return config; } + /** + * 获取Redisson客户端的Config配置通用方法(集群模式) + * @param redisParserVO redisParserVO + * @return redisson Config + */ + default Config getCluserRedissonConfig(RedisParserVO redisParserVO) { + Config config = new Config(); + ClusterServersConfig clusterConfig = config.useClusterServers() + .setMasterConnectionPoolSize(redisParserVO.getConnectionPoolSize()) + .setSlaveConnectionPoolSize(redisParserVO.getConnectionPoolSize()) + .setMasterConnectionMinimumIdleSize(redisParserVO.getConnectionMinimumIdleSize()) + .setSlaveConnectionMinimumIdleSize(redisParserVO.getConnectionMinimumIdleSize()); + + redisParserVO.getClusterNodeAddress().forEach(address -> { + clusterConfig.addNodeAddress(StrFormatter.format(CLUSTER_REDIS_URL_PATTERN, address)); + }); + //如果配置了用户名和密码 + if(StrUtil.isNotBlank(redisParserVO.getUsername()) && StrUtil.isNotBlank(redisParserVO.getPassword())) { + clusterConfig.setUsername(redisParserVO.getUsername()) + .setPassword(redisParserVO.getPassword()); + } + //如果配置了密码 + else if(StrUtil.isNotBlank(redisParserVO.getPassword())) { + clusterConfig.setPassword(redisParserVO.getPassword()); + } + + return config; + } + /** * script节点的修改/添加 * diff --git a/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/mode/polling/RedisParserPollingMode.java b/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/mode/polling/RedisParserPollingMode.java index cafaa67fb..4331a5f02 100644 --- a/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/mode/polling/RedisParserPollingMode.java +++ b/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/mode/polling/RedisParserPollingMode.java @@ -26,6 +26,7 @@ import java.util.concurrent.TimeUnit; * Redis 轮询机制实现类 * * @author hxinyu + * @author jay li * @since 2.11.0 */ @@ -81,27 +82,29 @@ public class RedisParserPollingMode implements RedisParserHelper { } if (ObjectUtil.isNull(chainClient)) { RedisMode redisMode = redisParserVO.getRedisMode(); - Config config; - //Redis单点模式 - if (redisMode.equals(RedisMode.SINGLE)){ - config = getSingleRedissonConfig(redisParserVO, redisParserVO.getChainDataBase()); - this.chainClient = new RClient(Redisson.create(config)); - //如果有脚本数据 - if (ObjectUtil.isNotNull(redisParserVO.getScriptDataBase())) { - config = getSingleRedissonConfig(redisParserVO, redisParserVO.getScriptDataBase()); - this.scriptClient = new RClient(Redisson.create(config)); - } + Config chinaConfig, scriptConfig; + + switch (redisMode) { + case SINGLE: + chinaConfig = getSingleRedissonConfig(redisParserVO, redisParserVO.getChainDataBase()); + scriptConfig = getSingleRedissonConfig(redisParserVO, redisParserVO.getScriptDataBase()); + break; + case SENTINEL: + chinaConfig = getSentinelRedissonConfig(redisParserVO, redisParserVO.getChainDataBase()); + scriptConfig = getSentinelRedissonConfig(redisParserVO, redisParserVO.getScriptDataBase()); + break; + case CLUSTER: + chinaConfig = getCluserRedissonConfig(redisParserVO); + scriptConfig = chinaConfig; + break; + default: + throw new RedisException("RedisMode is not supported"); } - //Redis哨兵模式 - else if (redisMode.equals(RedisMode.SENTINEL)) { - config = getSentinelRedissonConfig(redisParserVO, redisParserVO.getChainDataBase()); - this.chainClient = new RClient(Redisson.create(config)); - //如果有脚本数据 - if (ObjectUtil.isNotNull(redisParserVO.getScriptDataBase())) { - config = getSentinelRedissonConfig(redisParserVO, redisParserVO.getScriptDataBase()); - this.scriptClient = new RClient(Redisson.create(config)); - } + this.chainClient = new RClient(Redisson.create(chinaConfig)); + //如果有脚本数据 + if (ObjectUtil.isNotNull(redisParserVO.getScriptKey())) { + this.scriptClient = new RClient(Redisson.create(scriptConfig)); } } //创建定时任务线程池 @@ -211,7 +214,7 @@ public class RedisParserPollingMode implements RedisParserHelper { redisParserVO.getPollingInterval().longValue(), TimeUnit.SECONDS); //如果有脚本 - if (ObjectUtil.isNotNull(scriptClient) && ObjectUtil.isNotNull(redisParserVO.getScriptDataBase()) + if (ObjectUtil.isNotNull(scriptClient) && (ObjectUtil.isNotNull(redisParserVO.getScriptDataBase()) || RedisMode.CLUSTER.equals(redisParserVO.getRedisMode())) && StrUtil.isNotBlank(redisParserVO.getScriptKey())) { //将lua脚本添加到scriptJedis脚本缓存 String keyLuaOfScript = scriptClient.scriptLoad(luaOfKey); diff --git a/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/mode/subscribe/RedisParserSubscribeMode.java b/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/mode/subscribe/RedisParserSubscribeMode.java index cb4e50c5d..4be097219 100644 --- a/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/mode/subscribe/RedisParserSubscribeMode.java +++ b/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/mode/subscribe/RedisParserSubscribeMode.java @@ -28,6 +28,7 @@ import java.util.Map; * 使用 Redisson客户端 RMapCache存储结构 * * @author hxinyu + * @author jay li * @since 2.11.0 */ @@ -50,27 +51,29 @@ public class RedisParserSubscribeMode implements RedisParserHelper { } if (ObjectUtil.isNull(chainClient)) { RedisMode redisMode = redisParserVO.getRedisMode(); - Config config; - //Redis单点模式 - if (redisMode.equals(RedisMode.SINGLE)) { - config = getSingleRedissonConfig(redisParserVO, redisParserVO.getChainDataBase()); - this.chainClient = new RClient(Redisson.create(config)); - //如果有脚本数据 - if (ObjectUtil.isNotNull(redisParserVO.getScriptDataBase())) { - config = getSingleRedissonConfig(redisParserVO, redisParserVO.getScriptDataBase()); - this.scriptClient = new RClient(Redisson.create(config)); - } + Config chinaConfig, scriptConfig; + + switch (redisMode) { + case SINGLE: + chinaConfig = getSingleRedissonConfig(redisParserVO, redisParserVO.getChainDataBase()); + scriptConfig = getSingleRedissonConfig(redisParserVO, redisParserVO.getScriptDataBase()); + break; + case SENTINEL: + chinaConfig = getSentinelRedissonConfig(redisParserVO, redisParserVO.getChainDataBase()); + scriptConfig = getSentinelRedissonConfig(redisParserVO, redisParserVO.getScriptDataBase()); + break; + case CLUSTER: + chinaConfig = getCluserRedissonConfig(redisParserVO); + scriptConfig = chinaConfig; + break; + default: + throw new RedisException("RedisMode is not supported"); } - //Redis哨兵模式 - else if (redisMode.equals(RedisMode.SENTINEL)) { - config = getSentinelRedissonConfig(redisParserVO, redisParserVO.getChainDataBase()); - this.chainClient = new RClient(Redisson.create(config)); - //如果有脚本数据 - if (ObjectUtil.isNotNull(redisParserVO.getScriptDataBase())) { - config = getSentinelRedissonConfig(redisParserVO, redisParserVO.getScriptDataBase()); - this.scriptClient = new RClient(Redisson.create(config)); - } + this.chainClient = new RClient(Redisson.create(chinaConfig)); + //如果有脚本数据 + if (ObjectUtil.isNotNull(redisParserVO.getScriptKey())) { + this.scriptClient = new RClient(Redisson.create(scriptConfig)); } } } catch (Exception e) { @@ -172,7 +175,7 @@ public class RedisParserSubscribeMode implements RedisParserHelper { }); //监听 script - if (ObjectUtil.isNotNull(scriptClient) && ObjectUtil.isNotNull(redisParserVO.getScriptDataBase())) { + if (ObjectUtil.isNotNull(scriptClient) && (ObjectUtil.isNotNull(redisParserVO.getScriptDataBase()) || RedisMode.CLUSTER.equals(redisParserVO.getRedisMode()))) { String scriptKey = redisParserVO.getScriptKey(); //添加 script diff --git a/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/vo/RedisParserVO.java b/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/vo/RedisParserVO.java index a3cc9293f..c4c9f3862 100644 --- a/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/vo/RedisParserVO.java +++ b/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/vo/RedisParserVO.java @@ -12,6 +12,7 @@ import java.util.List; * 用于解析RuleSourceExtData的vo类, 用于Redis模式中 * * @author hxinyu + * @author jay li * @since 2.11.0 */ @@ -63,6 +64,17 @@ public class RedisParserVO { /*脚本配置的键名 若没有脚本数据可不配置*/ private String scriptKey; + /*集群模式需配置 逗号分隔 集群地址 */ + private List clusterNodeAddress; + + public List getClusterNodeAddress() { + return clusterNodeAddress; + } + + public void setClusterNodeAddress(List clusterNodeAddress) { + this.clusterNodeAddress = clusterNodeAddress; + } + public void setRedisMode(String redisMode) { redisMode = redisMode.toUpperCase(); try{ @@ -120,6 +132,16 @@ public class RedisParserVO { } } + @JsonSetter("clusterAddress") + public void setClusterAddressFromString(String addresses) { + if (addresses != null && !addresses.trim().isEmpty()) { + // 按逗号分割,并去除每个地址前后的空格 + this.clusterNodeAddress = Arrays.asList(addresses.split("\\s*,\\s*")); + } else { + this.clusterNodeAddress = Collections.emptyList(); + } + } + public String getUsername() { return username; } @@ -232,6 +254,7 @@ public class RedisParserVO { ", chainKey='" + chainKey + '\'' + ", scriptDataBase=" + scriptDataBase + ", scriptKey='" + scriptKey + '\'' + + ", clusterAddress='" + clusterNodeAddress + '\'' + '}'; } } diff --git a/liteflow-testcase-el/liteflow-testcase-el-redis-springboot/src/test/java/com/yomahub/liteflow/test/redis/RedisClusterPollSpringBootTest.java b/liteflow-testcase-el/liteflow-testcase-el-redis-springboot/src/test/java/com/yomahub/liteflow/test/redis/RedisClusterPollSpringBootTest.java new file mode 100644 index 000000000..8f366f29c --- /dev/null +++ b/liteflow-testcase-el/liteflow-testcase-el-redis-springboot/src/test/java/com/yomahub/liteflow/test/redis/RedisClusterPollSpringBootTest.java @@ -0,0 +1,203 @@ +package com.yomahub.liteflow.test.redis; + +import cn.hutool.crypto.digest.DigestUtil; +import com.yomahub.liteflow.core.FlowExecutor; +import com.yomahub.liteflow.exception.ChainNotFoundException; +import com.yomahub.liteflow.flow.FlowBus; +import com.yomahub.liteflow.flow.LiteflowResponse; +import com.yomahub.liteflow.log.LFLog; +import com.yomahub.liteflow.log.LFLoggerManager; +import com.yomahub.liteflow.parser.redis.mode.RClient; +import com.yomahub.liteflow.parser.redis.mode.polling.RedisParserPollingMode; +import com.yomahub.liteflow.slot.DefaultContext; +import com.yomahub.liteflow.test.BaseTest; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.springframework.boot.autoconfigure.EnableAutoConfiguration; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.mock.mockito.MockBean; +import org.springframework.context.annotation.ComponentScan; +import org.springframework.test.context.TestPropertySource; +import org.springframework.test.context.junit.jupiter.SpringExtension; + +import javax.annotation.Resource; +import java.lang.reflect.Field; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledThreadPoolExecutor; + +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.when; + +/** + * springboot环境下的redis 集群配置源poll模式功能测试 + *

+ * 测试用例会在1号database中添加测试数据 chainKey:testChainKey; scriptKey:testScriptKey + * 测试完成后清除测试数据 + * + * @author jay li + * @since 2.13.3 + */ +@ExtendWith(SpringExtension.class) +@TestPropertySource(value = "classpath:/redis/application-poll-cluster-xml.properties") +@SpringBootTest(classes = RedisClusterPollSpringBootTest.class) +@EnableAutoConfiguration +@ComponentScan({"com.yomahub.liteflow.test.redis.cmp"}) +public class RedisClusterPollSpringBootTest extends BaseTest { + + @MockBean(name = "chainClient") + private static RClient chainClient; + + @MockBean(name = "scriptClient") + private static RClient scriptClient; + + @Resource + private FlowExecutor flowExecutor; + + //计算hash中field数量的lua脚本 + private final String luaOfKey = "local keys = redis.call(\"hkeys\", KEYS[1]);\n" + + "return #keys;\n"; + + //计算hash中value的SHA值的lua脚本 + private final String luaOfValue = "local key = KEYS[1];\n" + + "local field = KEYS[2];\n" + + "local value, err = redis.call(\"hget\", key, field);\n" + + "if value == false or value == nil then\n" + + " return \"nil\";\n" + + "end\n" + + "local sha1 = redis.sha1hex(value);\n" + + "return sha1;"; + + static LFLog LOG = LFLoggerManager.getLogger(RedisClusterPollSpringBootTest.class); + + @AfterEach + void afterEach() { + try { + Field pollExecutor = RedisParserPollingMode.class.getDeclaredField("pollExecutor"); + pollExecutor.setAccessible(true); + // 关闭旧线程池 + ScheduledThreadPoolExecutor oldPool = (ScheduledThreadPoolExecutor) pollExecutor.get(null); + if (oldPool != null) { + oldPool.shutdownNow(); + } + // 创建新线程池并设置回去 + ScheduledThreadPoolExecutor newPool = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(1); + pollExecutor.set(null, newPool); + } catch (Exception ignored) { + LOG.error("[Polling thread pool reset failed]", ignored); + } + } + + /** + * 统一测试chain和script + * + * 测试数据流程: + * 1、执行chain1值:"THEN(a, b, c);" + * 2、修改chain1值为:"THEN(s11, s22, s33, a, b);", 执行新chain 验证chain的轮询拉取功能 + * 3、修改chain1其中的script11值 执行chain 验证script的轮询拉取功能 + */ + @Test + public void testPollWithXml() throws InterruptedException { + Set chainNameSet = new HashSet<>(); + chainNameSet.add("chain11"); + String chainValue = "THEN(a, b, c);"; + String chainSHA = DigestUtil.sha1Hex(chainValue); + + //修改chain并更新SHA值 + String changeChainValue = "THEN(s11, s22, s33, a, b);"; + String changeChainSHA = DigestUtil.sha1Hex(changeChainValue); + + when(chainClient.hkeys("pollChainKey")).thenReturn(chainNameSet); + when(chainClient.hget("pollChainKey", "chain11")).thenReturn(chainValue).thenReturn(changeChainValue); + when(chainClient.scriptLoad(luaOfKey)).thenReturn("keysha"); + when(chainClient.scriptLoad(luaOfValue)).thenReturn("valuesha"); + when(chainClient.evalSha(eq("keysha"), anyString())).thenReturn("1"); + when(chainClient.evalSha(eq("valuesha"), anyString(), anyString())).thenReturn(chainSHA).thenReturn(changeChainSHA); + + //添加script + Set scriptFieldSet = new HashSet<>(); + scriptFieldSet.add("s11:script:脚本s11:groovy"); + scriptFieldSet.add("s22:script:脚本s22:js"); + scriptFieldSet.add("s33:script:脚本s33"); + String s11 = "defaultContext.setData(\"test11\",\"hello s11\");"; + String s22 = "defaultContext.setData(\"test22\",\"hello s22\");"; + String s33 = "defaultContext.setData(\"test33\",\"hello s33\");"; + //SHA值用于测试修改script的轮询刷新功能 + String s11SHA = DigestUtil.sha1Hex(s11); + String s22SHA = DigestUtil.sha1Hex(s22); + String s33SHA = DigestUtil.sha1Hex(s33); + //修改script值并更新SHA值 + String changeS11 = "defaultContext.setData(\"test11\",\"hello world\");"; + String changeS11SHA = DigestUtil.sha1Hex(changeS11); + + when(scriptClient.hkeys("pollScriptKey")).thenReturn(scriptFieldSet); + //这里休眠一段时间是为了防止在未修改脚本的chain还没有执行前 轮询线程就拉取了新script值 + when(scriptClient.hget("pollScriptKey", "s11:script:脚本s11:groovy")).thenReturn(s11).thenAnswer(invocation -> { + Thread.sleep(2000); + return changeS11; + }).thenReturn(changeS11); + when(scriptClient.hget("pollScriptKey", "s22:script:脚本s22:js")).thenReturn(s22); + when(scriptClient.hget("pollScriptKey", "s33:script:脚本s33")).thenReturn(s33); + + //分别模拟三个script的evalsha指纹值计算的返回值, 其中s11脚本修改 指纹值变化 + when(scriptClient.scriptLoad(luaOfKey)).thenReturn("keysha"); + when(scriptClient.scriptLoad(luaOfValue)).thenReturn("valuesha"); + when(scriptClient.evalSha(eq("keysha"), anyString())).thenReturn("3"); + when(scriptClient.evalSha("valuesha", "pollScriptKey", "s11:script:脚本s11:groovy")).thenReturn(s11SHA).thenAnswer(invocation -> { + Thread.sleep(2000); + return changeS11SHA; + }).thenReturn(changeS11SHA); + when(scriptClient.evalSha("valuesha", "pollScriptKey", "s22:script:脚本s22:js")).thenReturn(s22SHA); + when(scriptClient.evalSha("valuesha", "pollScriptKey", "s33:script:脚本s33")).thenReturn(s33SHA); + + //测试修改前的chain + LiteflowResponse response = flowExecutor.execute2Resp("chain11", "arg"); + Assertions.assertTrue(response.isSuccess()); + Assertions.assertEquals("a==>b==>c", response.getExecuteStepStr()); + + Thread.sleep(4000); + + //测试加了script的chain + response = flowExecutor.execute2Resp("chain11", "arg"); + DefaultContext context = response.getFirstContextBean(); + Assertions.assertTrue(response.isSuccess()); + Assertions.assertEquals("hello s11", context.getData("test11")); + Assertions.assertEquals("hello s22", context.getData("test22")); + Assertions.assertEquals("s11[脚本s11]==>s22[脚本s22]==>s33[脚本s33]==>a==>b", response.getExecuteStepStrWithoutTime()); + + Thread.sleep(4000); + + //测试修改script后的chain + response = flowExecutor.execute2Resp("chain11", "arg"); + context = response.getFirstContextBean(); + Assertions.assertTrue(response.isSuccess()); + Assertions.assertEquals("hello world", context.getData("test11")); + } + + @Test + public void testDisablePollWithXml() throws InterruptedException { + Set chainNameSet = new HashSet<>(); + chainNameSet.add("chain1122:false"); + String chainValue = "THEN(a, b, c);"; + + when(chainClient.hkeys("pollChainKey")).thenReturn(chainNameSet); + when(chainClient.hget("pollChainKey", "chain1122:true")).thenReturn(chainValue); + + Set scriptFieldSet = new HashSet<>(); + scriptFieldSet.add("s4:script:脚本s3:groovy:false"); + when(scriptClient.hkeys("pollScriptKey")).thenReturn(scriptFieldSet); + when(scriptClient.hget("pollScriptKey", "s4:script:脚本s3:groovy:true")).thenReturn("defaultContext.setData(\"test\",\"hello\");"); + + // 测试 chain 停用 + Assertions.assertThrows(ChainNotFoundException.class, () -> { + throw flowExecutor.execute2Resp("chain1122", "arg").getCause(); + }); + + // 测试 script 停用 + Assertions.assertTrue(!FlowBus.getNodeMap().containsKey("s4")); + } +} diff --git a/liteflow-testcase-el/liteflow-testcase-el-redis-springboot/src/test/java/com/yomahub/liteflow/test/redis/RedisClusterSubscribeSpringBootTest.java b/liteflow-testcase-el/liteflow-testcase-el-redis-springboot/src/test/java/com/yomahub/liteflow/test/redis/RedisClusterSubscribeSpringBootTest.java new file mode 100644 index 000000000..3f5368d4b --- /dev/null +++ b/liteflow-testcase-el/liteflow-testcase-el-redis-springboot/src/test/java/com/yomahub/liteflow/test/redis/RedisClusterSubscribeSpringBootTest.java @@ -0,0 +1,202 @@ +package com.yomahub.liteflow.test.redis; + +import com.yomahub.liteflow.core.FlowExecutor; +import com.yomahub.liteflow.flow.FlowBus; +import com.yomahub.liteflow.flow.LiteflowResponse; +import com.yomahub.liteflow.parser.helper.NodeConvertHelper; +import com.yomahub.liteflow.parser.redis.mode.RClient; +import com.yomahub.liteflow.parser.redis.mode.RedisParserHelper; +import com.yomahub.liteflow.slot.DefaultContext; +import com.yomahub.liteflow.test.BaseTest; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.redisson.api.RMapCache; +import org.redisson.api.RedissonClient; +import org.springframework.boot.autoconfigure.EnableAutoConfiguration; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.mock.mockito.MockBean; +import org.springframework.context.annotation.ComponentScan; +import org.springframework.test.context.TestPropertySource; +import org.springframework.test.context.junit.jupiter.SpringExtension; + +import javax.annotation.Resource; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * springboot环境下的redis 集群配置源订阅模式功能测试 + *

+ * 测试用例会在1号database中添加测试数据 chainKey:testChainKey; scriptKey:testScriptKey + * 测试完成后清除测试数据 + * + * @author jay li + * @since 2.13.3 + */ +@ExtendWith(SpringExtension.class) +@TestPropertySource(value = "classpath:/redis/application-sub-cluster-xml.properties") +@SpringBootTest(classes = RedisClusterSubscribeSpringBootTest.class) +@EnableAutoConfiguration +@ComponentScan({"com.yomahub.liteflow.test.redis.cmp"}) +public class RedisClusterSubscribeSpringBootTest extends BaseTest { + @Mock + private RedissonClient redissonClient; + + @Resource + private FlowExecutor flowExecutor; + + @MockBean(name = "chainClient") + private RClient chainClient; + + @MockBean(name = "scriptClient") + private RClient scriptClient; + + @Mock + private RMapCache chainKey; + + @Mock + private RMapCache scriptKey; + + @BeforeEach + public void setUpBeforeClass() { + + when(redissonClient.getMapCache("testChainKey")).thenReturn(chainKey); + when(redissonClient.getMapCache("testScriptKey")).thenReturn(scriptKey); + + when(scriptKey.get("s1:script:脚本s1:groovy")).thenReturn("defaultContext.setData(\"test1\",\"hello s1\");"); + when(scriptKey.get("s2:script:脚本s2:js")).thenReturn("defaultContext.setData(\"test2\",\"hello s2\");"); + when(scriptKey.get("s3:script:脚本s3")).thenReturn("defaultContext.setData(\"test3\",\"hello s3\");"); + + Set> mockEntrySet = new HashSet<>(); + mockEntrySet.add(createMockEntry("s1:script:脚本s1:groovy", "defaultContext.setData(\"test1\",\"hello s1\");")); + mockEntrySet.add(createMockEntry("s2:script:脚本s2:js", "defaultContext.setData(\"test2\",\"hello s2\");")); + mockEntrySet.add(createMockEntry("s3:script:脚本s3", "defaultContext.setData(\"test3\",\"hello s3\");")); + when(scriptKey.entrySet()).thenReturn(mockEntrySet); + + when(chainKey.get("chain1")).thenReturn("THEN(a, b, c);"); + when(chainKey.get("chain2")).thenReturn("THEN(a, b, c, s3);"); + when(chainKey.get("chain3")).thenReturn("THEN(a, b, c, s1, s2);"); + + mockEntrySet = new HashSet<>(); + mockEntrySet.add(createMockEntry("chain1", "THEN(a, b, c);")); + mockEntrySet.add(createMockEntry("chain2", "THEN(a, b, c, s3);")); + mockEntrySet.add(createMockEntry("chain3", "THEN(a, b, c, s1, s2);")); + + Set> mockEntrySet1 = new HashSet<>(mockEntrySet); + + mockEntrySet1.add(createMockEntry("chain1", "THEN(a, c, b);")); + when(chainKey.entrySet()).thenReturn(mockEntrySet).thenReturn(mockEntrySet1); + + when(chainClient.getMap(anyString())).thenReturn(chainKey); + when(scriptClient.getMap(anyString())).thenReturn(scriptKey); + } + + private Map.Entry createMockEntry(Object key, Object value) { + Map.Entry entry = mock(Map.Entry.class); + when(entry.getKey()).thenReturn(key); + when(entry.getValue()).thenReturn(value); + return entry; + } + + /** + * 测试chain + */ + @Test + public void testSubWithXml() throws InterruptedException { + LiteflowResponse response = flowExecutor.execute2Resp("chain1", "arg"); + Assertions.assertTrue(response.isSuccess()); + Assertions.assertEquals("a==>b==>c", response.getExecuteStepStr()); + + //修改redis中规则 + changeXMLData(); + //重新加载规则 + Thread.sleep(100); + + Assertions.assertEquals("a==>c==>b", flowExecutor.execute2Resp("chain1", "arg").getExecuteStepStr()); + + //删除redis中规则 + deleteXMLData(); + //重新加载规则 + Thread.sleep(100); + //由于chain1已被删除 这里会报ChainNotFoundException异常 + response = flowExecutor.execute2Resp("chain1", "arg"); + Assertions.assertTrue(!response.isSuccess()); + + //添加redis中规则 + addXMLData(); + //重新加载规则 + Thread.sleep(100); + Assertions.assertEquals("b==>c", flowExecutor.execute2Resp("chain4", "arg").getExecuteStepStr()); + } + + /** + * 测试script + */ + @Test + public void testSubWithScriptXml() throws InterruptedException { + LiteflowResponse response = flowExecutor.execute2Resp("chain3", "arg"); + DefaultContext context = response.getFirstContextBean(); + Assertions.assertTrue(response.isSuccess()); + Assertions.assertEquals("hello s1", context.getData("test1")); + Assertions.assertEquals("a==>b==>c==>s1[脚本s1]==>s2[脚本s2]", response.getExecuteStepStrWithoutTime()); + + //添加和删除脚本 + addAndDeleteScriptData(); + //修改redis脚本 + changeScriptData(); + Thread.sleep(100); + context = flowExecutor.execute2Resp("chain3", "arg").getFirstContextBean(); + Assertions.assertEquals("hello s1 version2", context.getData("test1")); + context = flowExecutor.execute2Resp("chain2", "arg").getFirstContextBean(); + Assertions.assertEquals("hello s3 version2", context.getData("test2")); + } + + /** + * 修改redisson中的chain + */ + public void changeXMLData() { + RedisParserHelper.changeChain("chain1", "THEN(a, c, b);"); + } + + /** + * 删除redisson中的chain + */ + public void deleteXMLData() { + FlowBus.removeChain("chain1"); + FlowBus.removeChain("chain4"); + } + + /** + * 新增redisson中的chain + */ + public void addXMLData() { + RedisParserHelper.changeChain("chain4", "THEN(b, c);"); + } + + /** + * 修改redisson中的脚本 + */ + public void changeScriptData() { + RedisParserHelper.changeScriptNode("s1:script:脚本s1:groovy", "defaultContext.setData(\"test1\",\"hello s1 version2\");"); + RedisParserHelper.changeScriptNode("s3:script:脚本s3", "defaultContext.setData(\"test2\",\"hello s3 version2\");"); + } + + /** + * 新增和删除redisson中的chain + */ + public void addAndDeleteScriptData() { + NodeConvertHelper.NodeSimpleVO nodeSimpleVO = NodeConvertHelper.convert("s3:script:脚本s3"); + FlowBus.unloadScriptNode(nodeSimpleVO.getNodeId()); + + RedisParserHelper.changeScriptNode("s5:script:脚本s5:groovy", "defaultContext.setData(\"test1\",\"hello s5\");"); + } + + +} diff --git a/liteflow-testcase-el/liteflow-testcase-el-redis-springboot/src/test/java/com/yomahub/liteflow/test/redis/RedisWithXmlELPollSpringbootTest.java b/liteflow-testcase-el/liteflow-testcase-el-redis-springboot/src/test/java/com/yomahub/liteflow/test/redis/RedisWithXmlELPollSpringbootTest.java index 2de4189a9..bf3d35f2f 100644 --- a/liteflow-testcase-el/liteflow-testcase-el-redis-springboot/src/test/java/com/yomahub/liteflow/test/redis/RedisWithXmlELPollSpringbootTest.java +++ b/liteflow-testcase-el/liteflow-testcase-el-redis-springboot/src/test/java/com/yomahub/liteflow/test/redis/RedisWithXmlELPollSpringbootTest.java @@ -11,7 +11,7 @@ import com.yomahub.liteflow.parser.redis.mode.RClient; import com.yomahub.liteflow.parser.redis.mode.polling.RedisParserPollingMode; import com.yomahub.liteflow.slot.DefaultContext; import com.yomahub.liteflow.test.BaseTest; -import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -26,6 +26,7 @@ import javax.annotation.Resource; import java.lang.reflect.Field; import java.util.HashSet; import java.util.Set; +import java.util.concurrent.Executors; import java.util.concurrent.ScheduledThreadPoolExecutor; import static org.mockito.ArgumentMatchers.anyString; @@ -71,16 +72,21 @@ public class RedisWithXmlELPollSpringbootTest extends BaseTest { static LFLog LOG = LFLoggerManager.getLogger(RedisWithXmlELPollSpringbootTest.class); - @AfterAll - public static void after() { - //关闭poll模式的轮询线程池 - try{ + @AfterEach + void afterEach() { + try { Field pollExecutor = RedisParserPollingMode.class.getDeclaredField("pollExecutor"); pollExecutor.setAccessible(true); - ScheduledThreadPoolExecutor threadPoolExecutor = (ScheduledThreadPoolExecutor) pollExecutor.get(null); - threadPoolExecutor.shutdownNow(); + // 关闭旧线程池 + ScheduledThreadPoolExecutor oldPool = (ScheduledThreadPoolExecutor) pollExecutor.get(null); + if (oldPool != null) { + oldPool.shutdownNow(); + } + // 创建新线程池并设置回去 + ScheduledThreadPoolExecutor newPool = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(1); + pollExecutor.set(null, newPool); } catch (Exception ignored) { - LOG.error("[Polling thread pool not closed]", ignored); + LOG.error("[Polling thread pool reset failed]", ignored); } } diff --git a/liteflow-testcase-el/liteflow-testcase-el-redis-springboot/src/test/resources/redis/application-poll-cluster-xml.properties b/liteflow-testcase-el/liteflow-testcase-el-redis-springboot/src/test/resources/redis/application-poll-cluster-xml.properties new file mode 100644 index 000000000..6153f5d55 --- /dev/null +++ b/liteflow-testcase-el/liteflow-testcase-el-redis-springboot/src/test/resources/redis/application-poll-cluster-xml.properties @@ -0,0 +1,9 @@ +liteflow.rule-source-ext-data={\ + "redisMode":"cluster",\ + "clusterAddress":"127.0.0.1:26389,127.0.0.1:26379",\ + "pollingInterval":1,\ + "pollingStartTime":2,\ + "chainKey":"pollChainKey",\ + "scriptKey":"pollScriptKey"\ + } +liteflow.parse-mode=PARSE_ALL_ON_FIRST_EXEC \ No newline at end of file diff --git a/liteflow-testcase-el/liteflow-testcase-el-redis-springboot/src/test/resources/redis/application-sub-cluster-xml.properties b/liteflow-testcase-el/liteflow-testcase-el-redis-springboot/src/test/resources/redis/application-sub-cluster-xml.properties new file mode 100644 index 000000000..826ac8948 --- /dev/null +++ b/liteflow-testcase-el/liteflow-testcase-el-redis-springboot/src/test/resources/redis/application-sub-cluster-xml.properties @@ -0,0 +1,10 @@ +liteflow.rule-source-ext-data={\ + "redisMode":"cluster",\ + "clusterAddress":"127.0.0.1:26389,127.0.0.1:26379",\ + "mode":"sub",\ + "chainDataBase":1,\ + "chainKey":"testChainKey",\ + "scriptDataBase":1,\ + "scriptKey":"testScriptKey"\ + } +liteflow.parse-mode=PARSE_ALL_ON_FIRST_EXEC \ No newline at end of file diff --git a/liteflow-testcase-el/liteflow-testcase-el-script-javaxpro-springboot/src/test/java/com/yomahub/liteflow/test/script/javapro/parseOneMode/ScriptJavaxProParseOneModeTest.java b/liteflow-testcase-el/liteflow-testcase-el-script-javaxpro-springboot/src/test/java/com/yomahub/liteflow/test/script/javapro/parseOneMode/ScriptJavaxProParseOneModeTest.java new file mode 100644 index 000000000..8d9d3887c --- /dev/null +++ b/liteflow-testcase-el/liteflow-testcase-el-script-javaxpro-springboot/src/test/java/com/yomahub/liteflow/test/script/javapro/parseOneMode/ScriptJavaxProParseOneModeTest.java @@ -0,0 +1,31 @@ +package com.yomahub.liteflow.test.script.javapro.parseOneMode; + +import com.yomahub.liteflow.core.FlowExecutor; +import com.yomahub.liteflow.flow.LiteflowResponse; +import com.yomahub.liteflow.test.BaseTest; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.springframework.boot.autoconfigure.EnableAutoConfiguration; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.test.context.TestPropertySource; +import org.springframework.test.context.junit.jupiter.SpringExtension; + +import javax.annotation.Resource; + +@ExtendWith(SpringExtension.class) +@TestPropertySource(value = "classpath:/parseOneMode/application.properties") +@SpringBootTest(classes = ScriptJavaxProParseOneModeTest.class) +@EnableAutoConfiguration +public class ScriptJavaxProParseOneModeTest extends BaseTest { + + @Resource + private FlowExecutor flowExecutor; + + @Test + public void testParseOneMode() { + LiteflowResponse response = flowExecutor.execute2Resp("chain1", "arg"); + Assertions.assertTrue(response.isSuccess()); + } + +} \ No newline at end of file diff --git a/liteflow-testcase-el/liteflow-testcase-el-script-javaxpro-springboot/src/test/resources/parseOneMode/application.properties b/liteflow-testcase-el/liteflow-testcase-el-script-javaxpro-springboot/src/test/resources/parseOneMode/application.properties new file mode 100644 index 000000000..5775aca1c --- /dev/null +++ b/liteflow-testcase-el/liteflow-testcase-el-script-javaxpro-springboot/src/test/resources/parseOneMode/application.properties @@ -0,0 +1,2 @@ +liteflow.rule-source=parseOneMode/flow.xml +liteflow.parse-mode=PARSE_ONE_ON_FIRST_EXEC \ No newline at end of file diff --git a/liteflow-testcase-el/liteflow-testcase-el-script-javaxpro-springboot/src/test/resources/parseOneMode/flow.xml b/liteflow-testcase-el/liteflow-testcase-el-script-javaxpro-springboot/src/test/resources/parseOneMode/flow.xml new file mode 100644 index 000000000..3da4f17fd --- /dev/null +++ b/liteflow-testcase-el/liteflow-testcase-el-script-javaxpro-springboot/src/test/resources/parseOneMode/flow.xml @@ -0,0 +1,32 @@ + + + + + + + + + + + + + THEN(s1); + + + \ No newline at end of file diff --git a/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/java/com/yomahub/liteflow/test/base/BaseELSpringbootTest.java b/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/java/com/yomahub/liteflow/test/base/BaseELSpringbootTest.java index 09490711a..4e600df2a 100644 --- a/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/java/com/yomahub/liteflow/test/base/BaseELSpringbootTest.java +++ b/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/java/com/yomahub/liteflow/test/base/BaseELSpringbootTest.java @@ -1,6 +1,7 @@ package com.yomahub.liteflow.test.base; import com.yomahub.liteflow.core.FlowExecutor; +import com.yomahub.liteflow.flow.FlowBus; import com.yomahub.liteflow.flow.LiteflowResponse; import com.yomahub.liteflow.test.BaseTest; import org.junit.jupiter.api.Assertions; @@ -16,6 +17,7 @@ import javax.annotation.Resource; * springboot环境EL常规的例子测试 * * @author Bryan.Zhang + * @author luo yi */ @TestPropertySource(value = "classpath:/base/application.properties") @SpringBootTest(classes = BaseELSpringbootTest.class) @@ -61,4 +63,45 @@ public class BaseELSpringbootTest extends BaseTest { Assertions.assertTrue(response.isSuccess()); } + // 入参执行 EL 表达式 + @Test + public void testBase6() throws Exception { + LiteflowResponse response = flowExecutor.execute2RespWithEL("THEN(a, b,c);;"); + Assertions.assertTrue(response.isSuccess()); + + LiteflowResponse response1 = flowExecutor.execute2RespWithEL("THEN(\na, \tb,c);"); + Assertions.assertTrue(response1.isSuccess()); + + Assertions.assertEquals(response.getChainId(), response1.getChainId()); + } + + // 入参执行 EL 表达式,测试移除 chain + @Test + public void testBase7() throws Exception { + LiteflowResponse response = flowExecutor.execute2RespWithEL("THEN(a,b, \nc);;"); + Assertions.assertTrue(response.isSuccess()); + + FlowBus.removeChain(response.getChainId()); + + LiteflowResponse response1 = flowExecutor.execute2RespWithEL("THEN(a,b, c);"); + Assertions.assertTrue(response1.isSuccess()); + + Assertions.assertNotEquals(response.getChainId(), response1.getChainId()); + } + + // 运行文件里同样的 chain + @Test + public void testBase8() throws Exception { + LiteflowResponse response = flowExecutor.execute2RespWithEL("THEN(a,b,SWITCH(e).to(d,f));"); + Assertions.assertTrue(response.isSuccess()); + // 应返回 chain2 + Assertions.assertEquals("chain2", response.getChainId()); + + LiteflowResponse response1 = flowExecutor.execute2RespWithEL("t1=THEN(c, WHEN(j,k));w1 = WHEN(q, THEN(p, r)).id('w01');t2 = THEN(h, i);\n" + + "THEN(a,b,WHEN(t1, d, t2 ),SWITCH(x).to(m, n, w1),z);"); + Assertions.assertTrue(response1.isSuccess()); + // 应返回 chain5 + Assertions.assertEquals("chain5", response1.getChainId()); + } + }