mirror of
https://gitee.com/dromara/liteFlow.git
synced 2026-06-10 19:26:54 +08:00
!211 修改Redis配置源测试部分
Merge pull request !211 from houxinyu/dev_for_redis_v3
This commit is contained in:
@@ -49,7 +49,7 @@ public class RedisParserPollingMode implements RedisParserHelper {
|
||||
private static final int CORE_POOL_SIZE = 2;
|
||||
|
||||
//定时任务线程池
|
||||
private ScheduledThreadPoolExecutor pollExecutor;
|
||||
private static ScheduledThreadPoolExecutor pollExecutor;
|
||||
|
||||
//计算hash中field数量的lua脚本
|
||||
private final String luaOfKey = "local keys = redis.call(\"hkeys\", KEYS[1]);\n" +
|
||||
|
||||
@@ -2,16 +2,13 @@ package com.yomahub.liteflow.test.redis;
|
||||
|
||||
import cn.hutool.crypto.digest.DigestUtil;
|
||||
import com.yomahub.liteflow.core.FlowExecutor;
|
||||
import com.yomahub.liteflow.core.FlowInitHook;
|
||||
import com.yomahub.liteflow.flow.FlowBus;
|
||||
import com.yomahub.liteflow.flow.LiteflowResponse;
|
||||
import com.yomahub.liteflow.log.LFLog;
|
||||
import com.yomahub.liteflow.log.LFLoggerManager;
|
||||
import com.yomahub.liteflow.parser.redis.mode.RClient;
|
||||
import com.yomahub.liteflow.property.LiteflowConfigGetter;
|
||||
import com.yomahub.liteflow.parser.redis.mode.polling.RedisParserPollingMode;
|
||||
import com.yomahub.liteflow.slot.DefaultContext;
|
||||
import com.yomahub.liteflow.spi.holder.SpiFactoryCleaner;
|
||||
import com.yomahub.liteflow.spring.ComponentScanner;
|
||||
import com.yomahub.liteflow.test.BaseTest;
|
||||
import com.yomahub.liteflow.thread.ExecutorHelper;
|
||||
import org.junit.jupiter.api.*;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
|
||||
@@ -21,14 +18,16 @@ import org.springframework.context.annotation.ComponentScan;
|
||||
import org.springframework.test.context.TestPropertySource;
|
||||
import org.springframework.test.context.junit.jupiter.SpringExtension;
|
||||
import javax.annotation.Resource;
|
||||
import java.lang.reflect.Field;
|
||||
import java.util.HashSet;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
||||
|
||||
import static org.mockito.ArgumentMatchers.*;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
/**
|
||||
* springboot环境下的redis配置源轮询拉取模式功能测试
|
||||
* springboot环境下的redis配置源chain轮询拉取模式功能测试
|
||||
*
|
||||
* @author hxinyu
|
||||
* @since 2.11.0
|
||||
@@ -63,69 +62,49 @@ public class RedisWithXmlELPollSpringbootTest extends BaseTest {
|
||||
"local sha1 = redis.sha1hex(value);\n" +
|
||||
"return sha1;";
|
||||
|
||||
static LFLog LOG = LFLoggerManager.getLogger(RedisWithXmlELPollSpringbootTest.class);
|
||||
|
||||
@AfterEach
|
||||
public void after() {
|
||||
FlowBus.cleanCache();
|
||||
FlowInitHook.cleanHook();
|
||||
ExecutorHelper.loadInstance().clearExecutorServiceMap();
|
||||
SpiFactoryCleaner.clean();
|
||||
|
||||
@AfterAll
|
||||
public static void after() {
|
||||
//关闭poll模式的轮询线程池
|
||||
try{
|
||||
Field pollExecutor = RedisParserPollingMode.class.getDeclaredField("pollExecutor");
|
||||
pollExecutor.setAccessible(true);
|
||||
ScheduledThreadPoolExecutor threadPoolExecutor = (ScheduledThreadPoolExecutor) pollExecutor.get(null);
|
||||
threadPoolExecutor.shutdownNow();
|
||||
} catch (Exception ignored) {
|
||||
LOG.error("[Polling thread pool not closed]", ignored);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 测试chain
|
||||
* 统一测试chain和script
|
||||
*
|
||||
* 测试数据流程:
|
||||
* 1、执行chain1值:"THEN(a, b, c);"
|
||||
* 2、修改chain1值为:"THEN(s11, s22, s33, a, b);", 执行新chain 验证chain的轮询拉取功能
|
||||
* 3、修改chain1其中的script11值 执行chain 验证script的轮询拉取功能
|
||||
*/
|
||||
@Test
|
||||
public void testPollWithXml() throws InterruptedException {
|
||||
Set<String> chainNameSet = new HashSet<>();
|
||||
chainNameSet.add("chain11");
|
||||
String chainValue = "THEN(a, b, c);";
|
||||
//SHA值用于测试修改chain的轮询刷新功能
|
||||
String chainSHA = DigestUtil.sha1Hex(chainValue);
|
||||
|
||||
//修改chain并更新SHA值
|
||||
String changeChainValue = "THEN(a, c);";
|
||||
String changeChainValue = "THEN(s11, s22, s33, a, b);";
|
||||
String changeChainSHA = DigestUtil.sha1Hex(changeChainValue);
|
||||
|
||||
when(chainClient.hkeys("pollChainKey")).thenReturn(chainNameSet);
|
||||
when(chainClient.hget("pollChainKey", "chain11")).thenReturn(chainValue).thenReturn(changeChainValue);
|
||||
when(chainClient.scriptLoad(luaOfKey)).thenReturn("keysha");
|
||||
when(chainClient.scriptLoad(luaOfValue)).thenReturn("valuesha");
|
||||
when(chainClient.evalSha(eq("keysha"), anyString())).thenReturn("1");
|
||||
when(chainClient.evalSha(eq("valuesha"), anyString(), anyString())).thenReturn(chainSHA).thenReturn(changeChainSHA);
|
||||
//这里其实并没有script数据 预设数据只是为了不产生NumberFormatException
|
||||
when(scriptClient.scriptLoad(luaOfKey)).thenReturn("keysha");
|
||||
when(scriptClient.scriptLoad(luaOfValue)).thenReturn("valuesha");
|
||||
when(scriptClient.evalSha(eq("keysha"), anyString())).thenReturn("0");
|
||||
when(scriptClient.evalSha(eq("valuesha"), anyString(), anyString())).thenReturn("");
|
||||
|
||||
//测试修改前的chain
|
||||
LiteflowResponse response = flowExecutor.execute2Resp("chain11", "arg");
|
||||
Assertions.assertTrue(response.isSuccess());
|
||||
Assertions.assertEquals("a==>b==>c", response.getExecuteStepStr());
|
||||
|
||||
Thread.sleep(4000);
|
||||
|
||||
//测试修改后的chain
|
||||
response = flowExecutor.execute2Resp("chain11", "arg");
|
||||
Assertions.assertTrue(response.isSuccess());
|
||||
Assertions.assertEquals("a==>c", response.getExecuteStepStr());
|
||||
}
|
||||
|
||||
/**
|
||||
* 测试script
|
||||
*/
|
||||
@Test
|
||||
public void testPollWithScript() throws InterruptedException {
|
||||
Set<String> chainNameSet = new HashSet<>();
|
||||
chainNameSet.add("chain22");
|
||||
String chainValue = "THEN(s11, s22, s33, a, b);";
|
||||
when(chainClient.hkeys("pollChainKey")).thenReturn(chainNameSet);
|
||||
when(chainClient.hget("pollChainKey", "chain22")).thenReturn(chainValue);
|
||||
when(chainClient.scriptLoad(luaOfKey)).thenReturn("keysha");
|
||||
when(chainClient.scriptLoad(luaOfValue)).thenReturn("valuesha");
|
||||
when(chainClient.evalSha(eq("keysha"), anyString())).thenReturn("1");
|
||||
when(chainClient.evalSha(eq("valuesha"), anyString(), anyString())).thenReturn("");
|
||||
|
||||
//添加script
|
||||
Set<String> scriptFieldSet = new HashSet<>();
|
||||
scriptFieldSet.add("s11:script:脚本s11:groovy");
|
||||
scriptFieldSet.add("s22:script:脚本s22:js");
|
||||
@@ -142,19 +121,34 @@ public class RedisWithXmlELPollSpringbootTest extends BaseTest {
|
||||
String changeS11SHA = DigestUtil.sha1Hex(changeS11);
|
||||
|
||||
when(scriptClient.hkeys("pollScriptKey")).thenReturn(scriptFieldSet);
|
||||
when(scriptClient.hget("pollScriptKey", "s11:script:脚本s11:groovy")).thenReturn(s11).thenReturn(changeS11);
|
||||
//这里休眠一段时间是为了防止在未修改脚本的chain还没有执行前 轮询线程就拉取了新script值
|
||||
when(scriptClient.hget("pollScriptKey", "s11:script:脚本s11:groovy")).thenReturn(s11).thenAnswer(invocation -> {
|
||||
Thread.sleep(2000);
|
||||
return changeS11;
|
||||
}).thenReturn(changeS11);
|
||||
when(scriptClient.hget("pollScriptKey", "s22:script:脚本s22:js")).thenReturn(s22);
|
||||
when(scriptClient.hget("pollScriptKey", "s33:script:脚本s33")).thenReturn(s33);
|
||||
|
||||
//分别模拟三个script的evalsha指纹值计算的返回值, 其中s11脚本修改 指纹值变化
|
||||
when(scriptClient.scriptLoad(luaOfKey)).thenReturn("keysha");
|
||||
when(scriptClient.scriptLoad(luaOfValue)).thenReturn("valuesha");
|
||||
when(scriptClient.evalSha(eq("keysha"), anyString())).thenReturn("3");
|
||||
when(scriptClient.evalSha("valuesha", "pollScriptKey", "s11:script:脚本s11:groovy")).thenReturn(s11SHA).thenReturn(changeS11SHA);
|
||||
when(scriptClient.evalSha("valuesha", "pollScriptKey", "s11:script:脚本s11:groovy")).thenReturn(s11SHA).thenAnswer(invocation -> {
|
||||
Thread.sleep(2000);
|
||||
return changeS11SHA;
|
||||
}).thenReturn(changeS11SHA);
|
||||
when(scriptClient.evalSha("valuesha", "pollScriptKey", "s22:script:脚本s22:js")).thenReturn(s22SHA);
|
||||
when(scriptClient.evalSha("valuesha", "pollScriptKey", "s33:script:脚本s33")).thenReturn(s33SHA);
|
||||
|
||||
//测试修改前的script
|
||||
LiteflowResponse response = flowExecutor.execute2Resp("chain22", "arg");
|
||||
//测试修改前的chain
|
||||
LiteflowResponse response = flowExecutor.execute2Resp("chain11", "arg");
|
||||
Assertions.assertTrue(response.isSuccess());
|
||||
Assertions.assertEquals("a==>b==>c", response.getExecuteStepStr());
|
||||
|
||||
Thread.sleep(4000);
|
||||
|
||||
//测试加了script的chain
|
||||
response = flowExecutor.execute2Resp("chain11", "arg");
|
||||
DefaultContext context = response.getFirstContextBean();
|
||||
Assertions.assertTrue(response.isSuccess());
|
||||
Assertions.assertEquals("hello s11", context.getData("test11"));
|
||||
@@ -163,8 +157,8 @@ public class RedisWithXmlELPollSpringbootTest extends BaseTest {
|
||||
|
||||
Thread.sleep(4000);
|
||||
|
||||
//测试修改后的script
|
||||
response = flowExecutor.execute2Resp("chain22", "arg");
|
||||
//测试修改script后的chain
|
||||
response = flowExecutor.execute2Resp("chain11", "arg");
|
||||
context = response.getFirstContextBean();
|
||||
Assertions.assertTrue(response.isSuccess());
|
||||
Assertions.assertEquals("hello world", context.getData("test11"));
|
||||
|
||||
@@ -2,6 +2,7 @@ package com.yomahub.liteflow.test.redis;
|
||||
|
||||
import cn.hutool.core.util.ObjectUtil;
|
||||
import com.yomahub.liteflow.core.FlowExecutor;
|
||||
import com.yomahub.liteflow.exception.ChainNotFoundException;
|
||||
import com.yomahub.liteflow.flow.LiteflowResponse;
|
||||
import com.yomahub.liteflow.parser.redis.vo.RedisParserVO;
|
||||
import com.yomahub.liteflow.property.LiteflowConfig;
|
||||
@@ -30,11 +31,11 @@ import javax.annotation.Resource;
|
||||
|
||||
/**
|
||||
* springboot环境下的redis配置源订阅模式功能测试
|
||||
*
|
||||
* <p>
|
||||
* 由于Redisson中RMapCache的监听器功能无法mock测试
|
||||
* 故Sub模式测试用例需本地启动Redis服务 连接地址: 127.0.0.1:6379
|
||||
* 若本地该端口号未启动Redis 则自动忽略本类中测试用例
|
||||
*
|
||||
* <p>
|
||||
* 测试用例会在1号database中添加测试数据 chainKey:testChainKey; scriptKey:testScriptKey
|
||||
* 测试完成后清除测试数据
|
||||
*
|
||||
@@ -70,7 +71,7 @@ public class RedisWithXmlELSubscribeSpringbootTest extends BaseTest {
|
||||
}
|
||||
|
||||
@AfterAll
|
||||
public static void after(){
|
||||
public static void after() {
|
||||
testCleanData();
|
||||
}
|
||||
|
||||
@@ -93,6 +94,7 @@ public class RedisWithXmlELSubscribeSpringbootTest extends BaseTest {
|
||||
deleteXMLData();
|
||||
//重新加载规则
|
||||
Thread.sleep(100);
|
||||
//由于chain1已被删除 这里会报ChainNotFoundException异常
|
||||
response = flowExecutor.execute2Resp("chain1", "arg");
|
||||
Assertions.assertTrue(!response.isSuccess());
|
||||
|
||||
@@ -153,7 +155,7 @@ public class RedisWithXmlELSubscribeSpringbootTest extends BaseTest {
|
||||
LiteflowConfig liteflowConfig = LiteflowConfigGetter.get();
|
||||
RedisParserVO redisParserVO = JsonUtil.parseObject(liteflowConfig.getRuleSourceExtData(), RedisParserVO.class);
|
||||
RMapCache<String, String> chainKey = redissonClient.getMapCache(redisParserVO.getChainKey());
|
||||
chainKey.put("chain4","THEN(b, c);");
|
||||
chainKey.put("chain4", "THEN(b, c);");
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -179,8 +181,8 @@ public class RedisWithXmlELSubscribeSpringbootTest extends BaseTest {
|
||||
}
|
||||
|
||||
//redis内规则数据数据清空
|
||||
public static void testCleanData(){
|
||||
if(ObjectUtil.isNotNull(redissonClient)){
|
||||
public static void testCleanData() {
|
||||
if (ObjectUtil.isNotNull(redissonClient)) {
|
||||
RMapCache<String, String> chainKey = redissonClient.getMapCache("testChainKey");
|
||||
RMapCache<String, String> scriptKey = redissonClient.getMapCache("testScriptKey");
|
||||
for (String key : chainKey.keySet()) {
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
liteflow.rule-source-ext-data={\
|
||||
"host":"localhost",\
|
||||
"port":6379,\
|
||||
"pollingInterval":2,\
|
||||
"pollingInterval":1,\
|
||||
"pollingStartTime":2,\
|
||||
"chainDataBase":1,\
|
||||
"chainKey":"pollChainKey",\
|
||||
|
||||
Reference in New Issue
Block a user