Merge branch 'dev' of https://gitee.com/dromara/liteFlow into issues/IBQCWB

This commit is contained in:
luoyi
2025-07-09 18:35:21 +08:00
17 changed files with 642 additions and 64 deletions

View File

@@ -91,9 +91,9 @@ public class FlowBus {
}
// 这一方法主要用于第一阶段chain的预装载
public static void addChain(String chainName) {
if (!chainMap.containsKey(chainName)) {
chainMap.put(chainName, new Chain(chainName));
public static void addChain(String chainId) {
if (!chainMap.containsKey(chainId)) {
chainMap.put(chainId, new Chain(chainId));
}
}
@@ -208,7 +208,7 @@ public class FlowBus {
}
Node node = new Node(nodeId, name, nodeType, script, language);
nodeMap.put(nodeId, node);
put2NodeMap(nodeId, node);
} else {
addScriptNodeAndCompile(nodeId, name, nodeType, script, language);
}
@@ -222,11 +222,9 @@ public class FlowBus {
* @param type type
* @param script script content
* @param language language
* @return NodeComponent instance
*/
public static NodeComponent addScriptNodeAndCompile(String nodeId, String name, NodeTypeEnum type, String script, String language) {
public static void addScriptNodeAndCompile(String nodeId, String name, NodeTypeEnum type, String script, String language) {
addNode(nodeId, name, type, ScriptComponent.ScriptComponentClassMap.get(type), script, language);
return nodeMap.get(nodeId).getInstance();
}
private static List<NodeComponent> getNodeComponentList(String nodeId, String name, NodeTypeEnum type, Class<?> cmpClazz) throws Exception {
@@ -264,7 +262,7 @@ public class FlowBus {
return cmpInstanceList;
}
public static void compileNode(Node node) {
public static void compileScriptNode(Node node) {
String nodeId = node.getId(), name = node.getName(), script = node.getScript(), language = node.getLanguage();
NodeTypeEnum type = node.getType();
try {
@@ -299,8 +297,13 @@ public class FlowBus {
addFallbackNode(node);
}
// 如果是spring自动扫描的组件在addManagedNode方法中就已经完成了组装了
// 调用到这里分两种情况一是脚本组件二是通过LiteFlowNodeBuilder代码进行组装的组件
private static void addNode(String nodeId, String name, NodeTypeEnum type, Class<?> cmpClazz, String script, String language) {
try {
// 获得初始化好的NodeComponent
// 按理说一个nodeId对应一个NodeComponent这里得到的是List<NodeComponent>的原因是声明式组件有可能会有多个nodeId。
// 声明式组件又分类声明和方法声明如果对于方法声明来说这里的nodeId其实并不是最终真正的nodeId。
List<NodeComponent> cmpInstanceList = getNodeComponentList(nodeId, name, type, cmpClazz);
// 初始化Node把component放到Node里去

View File

@@ -173,7 +173,7 @@ public class Node implements Executable, Cloneable, Rollbackable{
if (!this.isCompiled()) {
synchronized (this) {
if (!this.isCompiled()) {
FlowBus.compileNode(this);
FlowBus.compileScriptNode(this);
}
}
}

View File

@@ -183,8 +183,8 @@ public class ParserHelper {
//首先需要对继承自抽象Chain的chain进行字符串替换
parseImplChain(abstratChainMap, implChainSet, chain);
//如果一个chain不为抽象chain则进行解析
String chainName = Optional.ofNullable(chain.attributeValue(ID)).orElse(chain.attributeValue(NAME));
if(!abstratChainMap.containsKey(chainName)){
String chainId = Optional.ofNullable(chain.attributeValue(ID)).orElse(chain.attributeValue(NAME));
if(!abstratChainMap.containsKey(chainId)){
parseOneChainConsumer.accept(chain);
}
}

View File

@@ -24,6 +24,7 @@ import java.util.Objects;
* Redis解析器实现只支持EL形式的XML不支持其他的形式
*
* @author hxinyu
* @author jay li
* @since 2.11.0
*/
@@ -109,11 +110,17 @@ public class RedisXmlELParser extends ClassXmlFlowELParser {
if (redisParserVO.getRedisMode().equals(RedisMode.SENTINEL) && CollectionUtil.isEmpty(redisParserVO.getSentinelAddress())) {
throw new RedisException(StrFormatter.format(ERROR_MSG_PATTERN, "sentinel address list"));
}
if (ObjectUtil.isNull(redisParserVO.getChainDataBase())) {
if (ObjectUtil.isNull(redisParserVO.getChainDataBase()) && !redisParserVO.getRedisMode().equals(RedisMode.CLUSTER)) {
throw new RedisException(StrFormatter.format(ERROR_MSG_PATTERN, "chainDataBase"));
}
if (StrUtil.isBlank(redisParserVO.getChainKey())) {
throw new RedisException(StrFormatter.format(ERROR_MSG_PATTERN, "chainKey"));
}
if (redisParserVO.getRedisMode().equals(RedisMode.CLUSTER) && CollectionUtil.isEmpty(redisParserVO.getClusterNodeAddress())) {
throw new RedisException(StrFormatter.format(ERROR_MSG_PATTERN, "cluster address list"));
}
if (ObjectUtil.isNotNull(redisParserVO.getScriptKey()) && ObjectUtil.isNull(redisParserVO.getScriptDataBase()) && !redisParserVO.getRedisMode().equals(RedisMode.CLUSTER)) {
throw new RedisException(StrFormatter.format(ERROR_MSG_PATTERN, "scriptDataBase"));
}
}
}

View File

@@ -4,16 +4,19 @@ package com.yomahub.liteflow.parser.redis.mode;
* 用于定义Redis模式的枚举类
*
* single单点模式, sentinel哨兵模式
* 不支持集群模式配置
* cluster 集群模式配置
*
* @author hxinyu
* @author jay li
* @since 2.11.0
*/
public enum RedisMode {
SINGLE("single"),
SENTINEL("sentinel");
SENTINEL("sentinel"),
CLUSTER("cluster");
private String mode;

View File

@@ -3,6 +3,7 @@ package com.yomahub.liteflow.parser.redis.mode;
import cn.hutool.core.lang.Pair;
import cn.hutool.core.text.StrFormatter;
import cn.hutool.core.util.BooleanUtil;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import com.yomahub.liteflow.builder.LiteFlowNodeBuilder;
import com.yomahub.liteflow.builder.el.LiteFlowChainELBuilder;
@@ -13,6 +14,7 @@ import com.yomahub.liteflow.log.LFLoggerManager;
import com.yomahub.liteflow.parser.helper.NodeConvertHelper;
import com.yomahub.liteflow.parser.redis.vo.RedisParserVO;
import com.yomahub.liteflow.util.RuleParsePluginUtil;
import org.redisson.config.ClusterServersConfig;
import org.redisson.config.Config;
import org.redisson.config.SentinelServersConfig;
import org.redisson.config.SingleServerConfig;
@@ -22,6 +24,7 @@ import org.redisson.config.SingleServerConfig;
*
* @author hxinyu
* @author Bryan.Zhang
* @author jay li
* @since 2.11.0
*/
@@ -33,6 +36,8 @@ public interface RedisParserHelper {
String SENTINEL_REDIS_URL_PATTERN = "redis://{}";
String CLUSTER_REDIS_URL_PATTERN = "redis://{}";
String CHAIN_XML_PATTERN = "<chain name=\"{}\">{}</chain>";
String NODE_XML_PATTERN = "<nodes>{}</nodes>";
@@ -55,6 +60,9 @@ public interface RedisParserHelper {
* @return redisson config
*/
default Config getSingleRedissonConfig(RedisParserVO redisParserVO, Integer dataBase) {
if (ObjectUtil.isNull(dataBase)) {
return null;
}
Config config = new Config();
String redisAddress = StrFormatter.format(SINGLE_REDIS_URL_PATTERN, redisParserVO.getHost(), redisParserVO.getPort());
@@ -81,6 +89,9 @@ public interface RedisParserHelper {
* @return redisson Config
*/
default Config getSentinelRedissonConfig(RedisParserVO redisParserVO, Integer dataBase) {
if (ObjectUtil.isNull(dataBase)) {
return null;
}
Config config = new Config();
SentinelServersConfig sentinelConfig = config.useSentinelServers()
.setMasterName(redisParserVO.getMasterName())
@@ -109,6 +120,35 @@ public interface RedisParserHelper {
return config;
}
/**
* 获取Redisson客户端的Config配置通用方法(集群模式)
* @param redisParserVO redisParserVO
* @return redisson Config
*/
default Config getCluserRedissonConfig(RedisParserVO redisParserVO) {
Config config = new Config();
ClusterServersConfig clusterConfig = config.useClusterServers()
.setMasterConnectionPoolSize(redisParserVO.getConnectionPoolSize())
.setSlaveConnectionPoolSize(redisParserVO.getConnectionPoolSize())
.setMasterConnectionMinimumIdleSize(redisParserVO.getConnectionMinimumIdleSize())
.setSlaveConnectionMinimumIdleSize(redisParserVO.getConnectionMinimumIdleSize());
redisParserVO.getClusterNodeAddress().forEach(address -> {
clusterConfig.addNodeAddress(StrFormatter.format(CLUSTER_REDIS_URL_PATTERN, address));
});
//如果配置了用户名和密码
if(StrUtil.isNotBlank(redisParserVO.getUsername()) && StrUtil.isNotBlank(redisParserVO.getPassword())) {
clusterConfig.setUsername(redisParserVO.getUsername())
.setPassword(redisParserVO.getPassword());
}
//如果配置了密码
else if(StrUtil.isNotBlank(redisParserVO.getPassword())) {
clusterConfig.setPassword(redisParserVO.getPassword());
}
return config;
}
/**
* script节点的修改/添加
*

View File

@@ -26,6 +26,7 @@ import java.util.concurrent.TimeUnit;
* Redis 轮询机制实现类
*
* @author hxinyu
* @author jay li
* @since 2.11.0
*/
@@ -81,27 +82,29 @@ public class RedisParserPollingMode implements RedisParserHelper {
}
if (ObjectUtil.isNull(chainClient)) {
RedisMode redisMode = redisParserVO.getRedisMode();
Config config;
//Redis单点模式
if (redisMode.equals(RedisMode.SINGLE)){
config = getSingleRedissonConfig(redisParserVO, redisParserVO.getChainDataBase());
this.chainClient = new RClient(Redisson.create(config));
//如果有脚本数据
if (ObjectUtil.isNotNull(redisParserVO.getScriptDataBase())) {
config = getSingleRedissonConfig(redisParserVO, redisParserVO.getScriptDataBase());
this.scriptClient = new RClient(Redisson.create(config));
}
Config chinaConfig, scriptConfig;
switch (redisMode) {
case SINGLE:
chinaConfig = getSingleRedissonConfig(redisParserVO, redisParserVO.getChainDataBase());
scriptConfig = getSingleRedissonConfig(redisParserVO, redisParserVO.getScriptDataBase());
break;
case SENTINEL:
chinaConfig = getSentinelRedissonConfig(redisParserVO, redisParserVO.getChainDataBase());
scriptConfig = getSentinelRedissonConfig(redisParserVO, redisParserVO.getScriptDataBase());
break;
case CLUSTER:
chinaConfig = getCluserRedissonConfig(redisParserVO);
scriptConfig = chinaConfig;
break;
default:
throw new RedisException("RedisMode is not supported");
}
//Redis哨兵模式
else if (redisMode.equals(RedisMode.SENTINEL)) {
config = getSentinelRedissonConfig(redisParserVO, redisParserVO.getChainDataBase());
this.chainClient = new RClient(Redisson.create(config));
//如果有脚本数据
if (ObjectUtil.isNotNull(redisParserVO.getScriptDataBase())) {
config = getSentinelRedissonConfig(redisParserVO, redisParserVO.getScriptDataBase());
this.scriptClient = new RClient(Redisson.create(config));
}
this.chainClient = new RClient(Redisson.create(chinaConfig));
//如果有脚本数据
if (ObjectUtil.isNotNull(redisParserVO.getScriptKey())) {
this.scriptClient = new RClient(Redisson.create(scriptConfig));
}
}
//创建定时任务线程池
@@ -211,7 +214,7 @@ public class RedisParserPollingMode implements RedisParserHelper {
redisParserVO.getPollingInterval().longValue(), TimeUnit.SECONDS);
//如果有脚本
if (ObjectUtil.isNotNull(scriptClient) && ObjectUtil.isNotNull(redisParserVO.getScriptDataBase())
if (ObjectUtil.isNotNull(scriptClient) && (ObjectUtil.isNotNull(redisParserVO.getScriptDataBase()) || RedisMode.CLUSTER.equals(redisParserVO.getRedisMode()))
&& StrUtil.isNotBlank(redisParserVO.getScriptKey())) {
//将lua脚本添加到scriptJedis脚本缓存
String keyLuaOfScript = scriptClient.scriptLoad(luaOfKey);

View File

@@ -28,6 +28,7 @@ import java.util.Map;
* 使用 Redisson客户端 RMapCache存储结构
*
* @author hxinyu
* @author jay li
* @since 2.11.0
*/
@@ -50,27 +51,29 @@ public class RedisParserSubscribeMode implements RedisParserHelper {
}
if (ObjectUtil.isNull(chainClient)) {
RedisMode redisMode = redisParserVO.getRedisMode();
Config config;
//Redis单点模式
if (redisMode.equals(RedisMode.SINGLE)) {
config = getSingleRedissonConfig(redisParserVO, redisParserVO.getChainDataBase());
this.chainClient = new RClient(Redisson.create(config));
//如果有脚本数据
if (ObjectUtil.isNotNull(redisParserVO.getScriptDataBase())) {
config = getSingleRedissonConfig(redisParserVO, redisParserVO.getScriptDataBase());
this.scriptClient = new RClient(Redisson.create(config));
}
Config chinaConfig, scriptConfig;
switch (redisMode) {
case SINGLE:
chinaConfig = getSingleRedissonConfig(redisParserVO, redisParserVO.getChainDataBase());
scriptConfig = getSingleRedissonConfig(redisParserVO, redisParserVO.getScriptDataBase());
break;
case SENTINEL:
chinaConfig = getSentinelRedissonConfig(redisParserVO, redisParserVO.getChainDataBase());
scriptConfig = getSentinelRedissonConfig(redisParserVO, redisParserVO.getScriptDataBase());
break;
case CLUSTER:
chinaConfig = getCluserRedissonConfig(redisParserVO);
scriptConfig = chinaConfig;
break;
default:
throw new RedisException("RedisMode is not supported");
}
//Redis哨兵模式
else if (redisMode.equals(RedisMode.SENTINEL)) {
config = getSentinelRedissonConfig(redisParserVO, redisParserVO.getChainDataBase());
this.chainClient = new RClient(Redisson.create(config));
//如果有脚本数据
if (ObjectUtil.isNotNull(redisParserVO.getScriptDataBase())) {
config = getSentinelRedissonConfig(redisParserVO, redisParserVO.getScriptDataBase());
this.scriptClient = new RClient(Redisson.create(config));
}
this.chainClient = new RClient(Redisson.create(chinaConfig));
//如果有脚本数据
if (ObjectUtil.isNotNull(redisParserVO.getScriptKey())) {
this.scriptClient = new RClient(Redisson.create(scriptConfig));
}
}
} catch (Exception e) {
@@ -172,7 +175,7 @@ public class RedisParserSubscribeMode implements RedisParserHelper {
});
//监听 script
if (ObjectUtil.isNotNull(scriptClient) && ObjectUtil.isNotNull(redisParserVO.getScriptDataBase())) {
if (ObjectUtil.isNotNull(scriptClient) && (ObjectUtil.isNotNull(redisParserVO.getScriptDataBase()) || RedisMode.CLUSTER.equals(redisParserVO.getRedisMode()))) {
String scriptKey = redisParserVO.getScriptKey();
//添加 script

View File

@@ -12,6 +12,7 @@ import java.util.List;
* 用于解析RuleSourceExtData的vo类, 用于Redis模式中
*
* @author hxinyu
* @author jay li
* @since 2.11.0
*/
@@ -63,6 +64,17 @@ public class RedisParserVO {
/*脚本配置的键名 若没有脚本数据可不配置*/
private String scriptKey;
/*集群模式需配置 逗号分隔 集群地址 */
private List<String> clusterNodeAddress;
public List<String> getClusterNodeAddress() {
return clusterNodeAddress;
}
public void setClusterNodeAddress(List<String> clusterNodeAddress) {
this.clusterNodeAddress = clusterNodeAddress;
}
public void setRedisMode(String redisMode) {
redisMode = redisMode.toUpperCase();
try{
@@ -120,6 +132,16 @@ public class RedisParserVO {
}
}
@JsonSetter("clusterAddress")
public void setClusterAddressFromString(String addresses) {
if (addresses != null && !addresses.trim().isEmpty()) {
// 按逗号分割,并去除每个地址前后的空格
this.clusterNodeAddress = Arrays.asList(addresses.split("\\s*,\\s*"));
} else {
this.clusterNodeAddress = Collections.emptyList();
}
}
public String getUsername() {
return username;
}
@@ -232,6 +254,7 @@ public class RedisParserVO {
", chainKey='" + chainKey + '\'' +
", scriptDataBase=" + scriptDataBase +
", scriptKey='" + scriptKey + '\'' +
", clusterAddress='" + clusterNodeAddress + '\'' +
'}';
}
}

View File

@@ -0,0 +1,204 @@
package com.yomahub.liteflow.test.redis;
import cn.hutool.crypto.digest.DigestUtil;
import com.yomahub.liteflow.core.FlowExecutor;
import com.yomahub.liteflow.exception.ChainNotFoundException;
import com.yomahub.liteflow.flow.FlowBus;
import com.yomahub.liteflow.flow.LiteflowResponse;
import com.yomahub.liteflow.log.LFLog;
import com.yomahub.liteflow.log.LFLoggerManager;
import com.yomahub.liteflow.parser.redis.mode.RClient;
import com.yomahub.liteflow.parser.redis.mode.polling.RedisParserPollingMode;
import com.yomahub.liteflow.slot.DefaultContext;
import com.yomahub.liteflow.test.BaseTest;
import org.junit.jupiter.api.*;
import org.junit.jupiter.api.extension.ExtendWith;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.mock.mockito.MockBean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.test.context.TestPropertySource;
import org.springframework.test.context.junit.jupiter.SpringExtension;
import javax.annotation.Resource;
import java.lang.reflect.Field;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.when;
/**
* springboot环境下的redis 集群配置源poll模式功能测试
* <p>
* 测试用例会在1号database中添加测试数据 chainKey:testChainKey; scriptKey:testScriptKey
* 测试完成后清除测试数据
*
* @author jay li
* @since 2.13.3
*/
@ExtendWith(SpringExtension.class)
@TestPropertySource(value = "classpath:/redis/application-poll-cluster-xml.properties")
@SpringBootTest(classes = RedisClusterPollSpringBootTest.class)
@EnableAutoConfiguration
@ComponentScan({"com.yomahub.liteflow.test.redis.cmp"})
public class RedisClusterPollSpringBootTest extends BaseTest {
@MockBean(name = "chainClient")
private static RClient chainClient;
@MockBean(name = "scriptClient")
private static RClient scriptClient;
@Resource
private FlowExecutor flowExecutor;
//计算hash中field数量的lua脚本
private final String luaOfKey = "local keys = redis.call(\"hkeys\", KEYS[1]);\n" +
"return #keys;\n";
//计算hash中value的SHA值的lua脚本
private final String luaOfValue = "local key = KEYS[1];\n" +
"local field = KEYS[2];\n" +
"local value, err = redis.call(\"hget\", key, field);\n" +
"if value == false or value == nil then\n" +
" return \"nil\";\n" +
"end\n" +
"local sha1 = redis.sha1hex(value);\n" +
"return sha1;";
static LFLog LOG = LFLoggerManager.getLogger(RedisClusterPollSpringBootTest.class);
@AfterEach
void afterEach() {
try {
Field pollExecutor = RedisParserPollingMode.class.getDeclaredField("pollExecutor");
pollExecutor.setAccessible(true);
// 关闭旧线程池
ScheduledThreadPoolExecutor oldPool = (ScheduledThreadPoolExecutor) pollExecutor.get(null);
if (oldPool != null) {
oldPool.shutdownNow();
}
// 创建新线程池并设置回去
ScheduledThreadPoolExecutor newPool = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(1);
pollExecutor.set(null, newPool);
} catch (Exception ignored) {
LOG.error("[Polling thread pool reset failed]", ignored);
}
}
/**
* 统一测试chain和script
*
* 测试数据流程:
* 1、执行chain1值"THEN(a, b, c);"
* 2、修改chain1值为"THEN(s11, s22, s33, a, b);", 执行新chain 验证chain的轮询拉取功能
* 3、修改chain1其中的script11值 执行chain 验证script的轮询拉取功能
*/
@Test
public void testPollWithXml() throws InterruptedException {
Set<String> chainNameSet = new HashSet<>();
chainNameSet.add("chain11");
String chainValue = "THEN(a, b, c);";
String chainSHA = DigestUtil.sha1Hex(chainValue);
//修改chain并更新SHA值
String changeChainValue = "THEN(s11, s22, s33, a, b);";
String changeChainSHA = DigestUtil.sha1Hex(changeChainValue);
when(chainClient.hkeys("pollChainKey")).thenReturn(chainNameSet);
when(chainClient.hget("pollChainKey", "chain11")).thenReturn(chainValue).thenReturn(changeChainValue);
when(chainClient.scriptLoad(luaOfKey)).thenReturn("keysha");
when(chainClient.scriptLoad(luaOfValue)).thenReturn("valuesha");
when(chainClient.evalSha(eq("keysha"), anyString())).thenReturn("1");
when(chainClient.evalSha(eq("valuesha"), anyString(), anyString())).thenReturn(chainSHA).thenReturn(changeChainSHA);
//添加script
Set<String> scriptFieldSet = new HashSet<>();
scriptFieldSet.add("s11:script:脚本s11:groovy");
scriptFieldSet.add("s22:script:脚本s22:js");
scriptFieldSet.add("s33:script:脚本s33");
String s11 = "defaultContext.setData(\"test11\",\"hello s11\");";
String s22 = "defaultContext.setData(\"test22\",\"hello s22\");";
String s33 = "defaultContext.setData(\"test33\",\"hello s33\");";
//SHA值用于测试修改script的轮询刷新功能
String s11SHA = DigestUtil.sha1Hex(s11);
String s22SHA = DigestUtil.sha1Hex(s22);
String s33SHA = DigestUtil.sha1Hex(s33);
//修改script值并更新SHA值
String changeS11 = "defaultContext.setData(\"test11\",\"hello world\");";
String changeS11SHA = DigestUtil.sha1Hex(changeS11);
when(scriptClient.hkeys("pollScriptKey")).thenReturn(scriptFieldSet);
//这里休眠一段时间是为了防止在未修改脚本的chain还没有执行前 轮询线程就拉取了新script值
when(scriptClient.hget("pollScriptKey", "s11:script:脚本s11:groovy")).thenReturn(s11).thenAnswer(invocation -> {
Thread.sleep(2000);
return changeS11;
}).thenReturn(changeS11);
when(scriptClient.hget("pollScriptKey", "s22:script:脚本s22:js")).thenReturn(s22);
when(scriptClient.hget("pollScriptKey", "s33:script:脚本s33")).thenReturn(s33);
//分别模拟三个script的evalsha指纹值计算的返回值, 其中s11脚本修改 指纹值变化
when(scriptClient.scriptLoad(luaOfKey)).thenReturn("keysha");
when(scriptClient.scriptLoad(luaOfValue)).thenReturn("valuesha");
when(scriptClient.evalSha(eq("keysha"), anyString())).thenReturn("3");
when(scriptClient.evalSha("valuesha", "pollScriptKey", "s11:script:脚本s11:groovy")).thenReturn(s11SHA).thenAnswer(invocation -> {
Thread.sleep(2000);
return changeS11SHA;
}).thenReturn(changeS11SHA);
when(scriptClient.evalSha("valuesha", "pollScriptKey", "s22:script:脚本s22:js")).thenReturn(s22SHA);
when(scriptClient.evalSha("valuesha", "pollScriptKey", "s33:script:脚本s33")).thenReturn(s33SHA);
//测试修改前的chain
LiteflowResponse response = flowExecutor.execute2Resp("chain11", "arg");
Assertions.assertTrue(response.isSuccess());
Assertions.assertEquals("a==>b==>c", response.getExecuteStepStr());
Thread.sleep(4000);
//测试加了script的chain
response = flowExecutor.execute2Resp("chain11", "arg");
DefaultContext context = response.getFirstContextBean();
Assertions.assertTrue(response.isSuccess());
Assertions.assertEquals("hello s11", context.getData("test11"));
Assertions.assertEquals("hello s22", context.getData("test22"));
Assertions.assertEquals("s11[脚本s11]==>s22[脚本s22]==>s33[脚本s33]==>a==>b", response.getExecuteStepStrWithoutTime());
Thread.sleep(4000);
//测试修改script后的chain
response = flowExecutor.execute2Resp("chain11", "arg");
context = response.getFirstContextBean();
Assertions.assertTrue(response.isSuccess());
Assertions.assertEquals("hello world", context.getData("test11"));
}
@Test
public void testDisablePollWithXml() throws InterruptedException {
Set<String> chainNameSet = new HashSet<>();
chainNameSet.add("chain1122:false");
String chainValue = "THEN(a, b, c);";
when(chainClient.hkeys("pollChainKey")).thenReturn(chainNameSet);
when(chainClient.hget("pollChainKey", "chain1122:true")).thenReturn(chainValue);
Set<String> scriptFieldSet = new HashSet<>();
scriptFieldSet.add("s4:script:脚本s3:groovy:false");
when(scriptClient.hkeys("pollScriptKey")).thenReturn(scriptFieldSet);
when(scriptClient.hget("pollScriptKey", "s4:script:脚本s3:groovy:true")).thenReturn("defaultContext.setData(\"test\",\"hello\");");
// 测试 chain 停用
Assertions.assertThrows(ChainNotFoundException.class, () -> {
throw flowExecutor.execute2Resp("chain1122", "arg").getCause();
});
// 测试 script 停用
Assertions.assertTrue(!FlowBus.getNodeMap().containsKey("s4"));
}
}

View File

@@ -0,0 +1,202 @@
package com.yomahub.liteflow.test.redis;
import com.yomahub.liteflow.core.FlowExecutor;
import com.yomahub.liteflow.flow.FlowBus;
import com.yomahub.liteflow.flow.LiteflowResponse;
import com.yomahub.liteflow.parser.helper.NodeConvertHelper;
import com.yomahub.liteflow.parser.redis.mode.RClient;
import com.yomahub.liteflow.parser.redis.mode.RedisParserHelper;
import com.yomahub.liteflow.slot.DefaultContext;
import com.yomahub.liteflow.test.BaseTest;
import org.junit.jupiter.api.*;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.redisson.api.RMapCache;
import org.redisson.api.RedissonClient;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.mock.mockito.MockBean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.test.context.TestPropertySource;
import org.springframework.test.context.junit.jupiter.SpringExtension;
import javax.annotation.Resource;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
/**
* springboot环境下的redis 集群配置源订阅模式功能测试
* <p>
* 测试用例会在1号database中添加测试数据 chainKey:testChainKey; scriptKey:testScriptKey
* 测试完成后清除测试数据
*
* @author jay li
* @since 2.13.3
*/
@ExtendWith(SpringExtension.class)
@TestPropertySource(value = "classpath:/redis/application-sub-cluster-xml.properties")
@SpringBootTest(classes = RedisClusterSubscribeSpringBootTest.class)
@EnableAutoConfiguration
@ComponentScan({"com.yomahub.liteflow.test.redis.cmp"})
public class RedisClusterSubscribeSpringBootTest extends BaseTest {
@Mock
private RedissonClient redissonClient;
@Resource
private FlowExecutor flowExecutor;
@MockBean(name = "chainClient")
private RClient chainClient;
@MockBean(name = "scriptClient")
private RClient scriptClient;
@Mock
private RMapCache chainKey;
@Mock
private RMapCache scriptKey;
@BeforeEach
public void setUpBeforeClass() {
when(redissonClient.getMapCache("testChainKey")).thenReturn(chainKey);
when(redissonClient.getMapCache("testScriptKey")).thenReturn(scriptKey);
when(scriptKey.get("s1:script:脚本s1:groovy")).thenReturn("defaultContext.setData(\"test1\",\"hello s1\");");
when(scriptKey.get("s2:script:脚本s2:js")).thenReturn("defaultContext.setData(\"test2\",\"hello s2\");");
when(scriptKey.get("s3:script:脚本s3")).thenReturn("defaultContext.setData(\"test3\",\"hello s3\");");
Set<Map.Entry<Object, Object>> mockEntrySet = new HashSet<>();
mockEntrySet.add(createMockEntry("s1:script:脚本s1:groovy", "defaultContext.setData(\"test1\",\"hello s1\");"));
mockEntrySet.add(createMockEntry("s2:script:脚本s2:js", "defaultContext.setData(\"test2\",\"hello s2\");"));
mockEntrySet.add(createMockEntry("s3:script:脚本s3", "defaultContext.setData(\"test3\",\"hello s3\");"));
when(scriptKey.entrySet()).thenReturn(mockEntrySet);
when(chainKey.get("chain1")).thenReturn("THEN(a, b, c);");
when(chainKey.get("chain2")).thenReturn("THEN(a, b, c, s3);");
when(chainKey.get("chain3")).thenReturn("THEN(a, b, c, s1, s2);");
mockEntrySet = new HashSet<>();
mockEntrySet.add(createMockEntry("chain1", "THEN(a, b, c);"));
mockEntrySet.add(createMockEntry("chain2", "THEN(a, b, c, s3);"));
mockEntrySet.add(createMockEntry("chain3", "THEN(a, b, c, s1, s2);"));
Set<Map.Entry<Object, Object>> mockEntrySet1 = new HashSet<>(mockEntrySet);
mockEntrySet1.add(createMockEntry("chain1", "THEN(a, c, b);"));
when(chainKey.entrySet()).thenReturn(mockEntrySet).thenReturn(mockEntrySet1);
when(chainClient.getMap(anyString())).thenReturn(chainKey);
when(scriptClient.getMap(anyString())).thenReturn(scriptKey);
}
private Map.Entry<Object, Object> createMockEntry(Object key, Object value) {
Map.Entry<Object, Object> entry = mock(Map.Entry.class);
when(entry.getKey()).thenReturn(key);
when(entry.getValue()).thenReturn(value);
return entry;
}
/**
* 测试chain
*/
@Test
public void testSubWithXml() throws InterruptedException {
LiteflowResponse response = flowExecutor.execute2Resp("chain1", "arg");
Assertions.assertTrue(response.isSuccess());
Assertions.assertEquals("a==>b==>c", response.getExecuteStepStr());
//修改redis中规则
changeXMLData();
//重新加载规则
Thread.sleep(100);
Assertions.assertEquals("a==>c==>b", flowExecutor.execute2Resp("chain1", "arg").getExecuteStepStr());
//删除redis中规则
deleteXMLData();
//重新加载规则
Thread.sleep(100);
//由于chain1已被删除 这里会报ChainNotFoundException异常
response = flowExecutor.execute2Resp("chain1", "arg");
Assertions.assertTrue(!response.isSuccess());
//添加redis中规则
addXMLData();
//重新加载规则
Thread.sleep(100);
Assertions.assertEquals("b==>c", flowExecutor.execute2Resp("chain4", "arg").getExecuteStepStr());
}
/**
* 测试script
*/
@Test
public void testSubWithScriptXml() throws InterruptedException {
LiteflowResponse response = flowExecutor.execute2Resp("chain3", "arg");
DefaultContext context = response.getFirstContextBean();
Assertions.assertTrue(response.isSuccess());
Assertions.assertEquals("hello s1", context.getData("test1"));
Assertions.assertEquals("a==>b==>c==>s1[脚本s1]==>s2[脚本s2]", response.getExecuteStepStrWithoutTime());
//添加和删除脚本
addAndDeleteScriptData();
//修改redis脚本
changeScriptData();
Thread.sleep(100);
context = flowExecutor.execute2Resp("chain3", "arg").getFirstContextBean();
Assertions.assertEquals("hello s1 version2", context.getData("test1"));
context = flowExecutor.execute2Resp("chain2", "arg").getFirstContextBean();
Assertions.assertEquals("hello s3 version2", context.getData("test2"));
}
/**
* 修改redisson中的chain
*/
public void changeXMLData() {
RedisParserHelper.changeChain("chain1", "THEN(a, c, b);");
}
/**
* 删除redisson中的chain
*/
public void deleteXMLData() {
FlowBus.removeChain("chain1");
FlowBus.removeChain("chain4");
}
/**
* 新增redisson中的chain
*/
public void addXMLData() {
RedisParserHelper.changeChain("chain4", "THEN(b, c);");
}
/**
* 修改redisson中的脚本
*/
public void changeScriptData() {
RedisParserHelper.changeScriptNode("s1:script:脚本s1:groovy", "defaultContext.setData(\"test1\",\"hello s1 version2\");");
RedisParserHelper.changeScriptNode("s3:script:脚本s3", "defaultContext.setData(\"test2\",\"hello s3 version2\");");
}
/**
* 新增和删除redisson中的chain
*/
public void addAndDeleteScriptData() {
NodeConvertHelper.NodeSimpleVO nodeSimpleVO = NodeConvertHelper.convert("s3:script:脚本s3");
FlowBus.unloadScriptNode(nodeSimpleVO.getNodeId());
RedisParserHelper.changeScriptNode("s5:script:脚本s5:groovy", "defaultContext.setData(\"test1\",\"hello s5\");");
}
}

View File

@@ -11,9 +11,7 @@ import com.yomahub.liteflow.parser.redis.mode.RClient;
import com.yomahub.liteflow.parser.redis.mode.polling.RedisParserPollingMode;
import com.yomahub.liteflow.slot.DefaultContext;
import com.yomahub.liteflow.test.BaseTest;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.*;
import org.junit.jupiter.api.extension.ExtendWith;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.test.context.SpringBootTest;
@@ -26,6 +24,7 @@ import javax.annotation.Resource;
import java.lang.reflect.Field;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import static org.mockito.ArgumentMatchers.anyString;
@@ -71,16 +70,21 @@ public class RedisWithXmlELPollSpringbootTest extends BaseTest {
static LFLog LOG = LFLoggerManager.getLogger(RedisWithXmlELPollSpringbootTest.class);
@AfterAll
public static void after() {
//关闭poll模式的轮询线程池
try{
@AfterEach
void afterEach() {
try {
Field pollExecutor = RedisParserPollingMode.class.getDeclaredField("pollExecutor");
pollExecutor.setAccessible(true);
ScheduledThreadPoolExecutor threadPoolExecutor = (ScheduledThreadPoolExecutor) pollExecutor.get(null);
threadPoolExecutor.shutdownNow();
// 关闭旧线程池
ScheduledThreadPoolExecutor oldPool = (ScheduledThreadPoolExecutor) pollExecutor.get(null);
if (oldPool != null) {
oldPool.shutdownNow();
}
// 创建新线程池并设置回去
ScheduledThreadPoolExecutor newPool = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(1);
pollExecutor.set(null, newPool);
} catch (Exception ignored) {
LOG.error("[Polling thread pool not closed]", ignored);
LOG.error("[Polling thread pool reset failed]", ignored);
}
}

View File

@@ -0,0 +1,9 @@
liteflow.rule-source-ext-data={\
"redisMode":"cluster",\
"clusterAddress":"127.0.0.1:26389,127.0.0.1:26379",\
"pollingInterval":1,\
"pollingStartTime":2,\
"chainKey":"pollChainKey",\
"scriptKey":"pollScriptKey"\
}
liteflow.parse-mode=PARSE_ALL_ON_FIRST_EXEC

View File

@@ -0,0 +1,10 @@
liteflow.rule-source-ext-data={\
"redisMode":"cluster",\
"clusterAddress":"127.0.0.1:26389,127.0.0.1:26379",\
"mode":"sub",\
"chainDataBase":1,\
"chainKey":"testChainKey",\
"scriptDataBase":1,\
"scriptKey":"testScriptKey"\
}
liteflow.parse-mode=PARSE_ALL_ON_FIRST_EXEC

View File

@@ -0,0 +1,33 @@
package com.yomahub.liteflow.test.script.javapro.parseOneMode;
import com.yomahub.liteflow.core.FlowExecutor;
import com.yomahub.liteflow.flow.LiteflowResponse;
import com.yomahub.liteflow.slot.DefaultContext;
import com.yomahub.liteflow.test.BaseTest;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.test.context.TestPropertySource;
import org.springframework.test.context.junit.jupiter.SpringExtension;
import javax.annotation.Resource;
@ExtendWith(SpringExtension.class)
@TestPropertySource(value = "classpath:/parseOneMode/application.properties")
@SpringBootTest(classes = ScriptJavaxProParseOneModeTest.class)
@EnableAutoConfiguration
public class ScriptJavaxProParseOneModeTest extends BaseTest {
@Resource
private FlowExecutor flowExecutor;
@Test
public void testParseOneMode() {
LiteflowResponse response = flowExecutor.execute2Resp("chain1", "arg");
Assertions.assertTrue(response.isSuccess());
}
}

View File

@@ -0,0 +1,2 @@
liteflow.rule-source=parseOneMode/flow.xml
liteflow.parse-mode=PARSE_ONE_ON_FIRST_EXEC

View File

@@ -0,0 +1,32 @@
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE flow PUBLIC "liteflow" "liteflow.dtd">
<flow>
<nodes>
<node id="s1" name="普通脚本1" type="script" language="java">
<![CDATA[
import cn.hutool.core.collection.ListUtil;
import com.yomahub.liteflow.core.NodeComponent;
import com.yomahub.liteflow.slot.DefaultContext;
import com.yomahub.liteflow.spi.holder.ContextAwareHolder;
import com.yomahub.liteflow.test.script.javaxpro.common.cmp.Person;
import com.yomahub.liteflow.test.script.javaxpro.common.cmp.TestDomain;
import java.util.List;
public class Demo extends NodeComponent {
@Override
public void process() throws Exception {
System.out.println("hello world");
}
}
]]>
</node>
</nodes>
<chain name="chain1">
THEN(s1);
</chain>
</flow>