mirror of
https://gitee.com/dromara/liteFlow.git
synced 2026-05-14 12:12:08 +08:00
fix poll test & fix from cr
This commit is contained in:
@@ -2,6 +2,7 @@ package com.yomahub.liteflow.parser.redis.mode.polling;
|
||||
|
||||
import cn.hutool.core.collection.CollUtil;
|
||||
import cn.hutool.core.collection.CollectionUtil;
|
||||
import cn.hutool.core.thread.NamedThreadFactory;
|
||||
import cn.hutool.core.util.ObjectUtil;
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import cn.hutool.crypto.digest.DigestUtil;
|
||||
@@ -46,6 +47,9 @@ public class RedisParserPollingMode implements RedisParserHelper {
|
||||
//定时任务线程池核心线程数
|
||||
private static final int CORE_POOL_SIZE = 2;
|
||||
|
||||
//定时任务线程池
|
||||
private ScheduledThreadPoolExecutor pollExecutor;
|
||||
|
||||
//计算hash中field数量的lua脚本
|
||||
private final String luaOfKey = "local keys = redis.call(\"hkeys\", KEYS[1]);\n" +
|
||||
"return #keys;\n";
|
||||
@@ -79,6 +83,14 @@ public class RedisParserPollingMode implements RedisParserHelper {
|
||||
this.scriptClient = new RClient(Redisson.create(config));
|
||||
}
|
||||
}
|
||||
//创建定时任务线程池
|
||||
if (ObjectUtil.isNull(pollExecutor)) {
|
||||
ThreadFactory namedThreadFactory = new NamedThreadFactory("RedisParser-Polling-Thread-", false);
|
||||
pollExecutor = new ScheduledThreadPoolExecutor(
|
||||
CORE_POOL_SIZE,
|
||||
namedThreadFactory,
|
||||
new ThreadPoolExecutor.DiscardOldestPolicy());
|
||||
}
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw new RedisException(e.getMessage());
|
||||
@@ -181,11 +193,6 @@ public class RedisParserPollingMode implements RedisParserHelper {
|
||||
String keyLuaOfChain = chainClient.scriptLoad(luaOfKey);
|
||||
String valueLuaOfChain = chainClient.scriptLoad(luaOfValue);
|
||||
|
||||
//定时任务线程池
|
||||
ScheduledThreadPoolExecutor pollExecutor = new ScheduledThreadPoolExecutor(
|
||||
CORE_POOL_SIZE,
|
||||
new ThreadPoolExecutor.DiscardOldestPolicy());
|
||||
|
||||
//添加轮询chain的定时任务
|
||||
ChainPollingTask chainTask = new ChainPollingTask(redisParserVO, chainClient, chainNum, chainSHAMap, keyLuaOfChain, valueLuaOfChain);
|
||||
pollExecutor.scheduleAtFixedRate(chainTask, redisParserVO.getPollingStartTime().longValue(),
|
||||
|
||||
@@ -44,6 +44,20 @@ public class RedisWithXmlELPollSpringbootTest extends BaseTest {
|
||||
@Resource
|
||||
private FlowExecutor flowExecutor;
|
||||
|
||||
//计算hash中field数量的lua脚本
|
||||
private final String luaOfKey = "local keys = redis.call(\"hkeys\", KEYS[1]);\n" +
|
||||
"return #keys;\n";
|
||||
|
||||
//计算hash中value的SHA值的lua脚本
|
||||
private final String luaOfValue = "local key = KEYS[1];\n" +
|
||||
"local field = KEYS[2];\n" +
|
||||
"local value, err = redis.call(\"hget\", key, field);\n" +
|
||||
"if value == false or value == nil then\n" +
|
||||
" return \"nil\";\n" +
|
||||
"end\n" +
|
||||
"local sha1 = redis.sha1hex(value);\n" +
|
||||
"return sha1;";
|
||||
|
||||
@AfterEach
|
||||
public void after() {
|
||||
FlowBus.cleanCache();
|
||||
@@ -53,7 +67,7 @@ public class RedisWithXmlELPollSpringbootTest extends BaseTest {
|
||||
* 测试chain
|
||||
*/
|
||||
@Test
|
||||
public void testPollWithXml() {
|
||||
public void testPollWithXml() throws InterruptedException {
|
||||
Set<String> chainNameSet = new HashSet<>();
|
||||
chainNameSet.add("chain11");
|
||||
String chainValue = "THEN(a, b, c);";
|
||||
@@ -65,14 +79,22 @@ public class RedisWithXmlELPollSpringbootTest extends BaseTest {
|
||||
String changeChainSHA = DigestUtil.sha1Hex(changeChainValue);
|
||||
when(chainClient.hkeys("pollChainKey")).thenReturn(chainNameSet);
|
||||
when(chainClient.hget("pollChainKey", "chain11")).thenReturn(chainValue).thenReturn(changeChainValue);
|
||||
when(chainClient.evalSha(anyString(), anyString())).thenReturn(chainSHA).thenReturn(changeChainSHA);
|
||||
when(chainClient.scriptLoad(luaOfKey)).thenReturn("keysha");
|
||||
when(chainClient.scriptLoad(luaOfValue)).thenReturn("valuesha");
|
||||
when(chainClient.evalSha(eq("keysha"), anyString())).thenReturn("1");
|
||||
when(chainClient.evalSha(eq("valuesha"), anyString(), anyString())).thenReturn(chainSHA).thenReturn(changeChainSHA);
|
||||
//这里其实并没有script数据 预设数据只是为了不产生NumberFormatException
|
||||
when(scriptClient.scriptLoad(luaOfKey)).thenReturn("keysha");
|
||||
when(scriptClient.scriptLoad(luaOfValue)).thenReturn("valuesha");
|
||||
when(scriptClient.evalSha(eq("keysha"), anyString())).thenReturn("0");
|
||||
when(scriptClient.evalSha(eq("valuesha"), anyString(), anyString())).thenReturn("");
|
||||
|
||||
//测试修改前的chain
|
||||
LiteflowResponse response = flowExecutor.execute2Resp("chain11", "arg");
|
||||
Assertions.assertTrue(response.isSuccess());
|
||||
Assertions.assertEquals("a==>b==>c", response.getExecuteStepStr());
|
||||
|
||||
flowExecutor.reloadRule();
|
||||
Thread.sleep(2000);
|
||||
|
||||
//测试修改后的chain
|
||||
response = flowExecutor.execute2Resp("chain11", "arg");
|
||||
@@ -84,12 +106,16 @@ public class RedisWithXmlELPollSpringbootTest extends BaseTest {
|
||||
* 测试script
|
||||
*/
|
||||
@Test
|
||||
public void testPollWithScriptXml() {
|
||||
public void testPollWithScriptXml() throws InterruptedException {
|
||||
Set<String> chainNameSet = new HashSet<>();
|
||||
chainNameSet.add("chain22");
|
||||
String chainValue = "THEN(a, b, c, s11, s22, s33);";
|
||||
when(chainClient.hkeys("pollChainKey")).thenReturn(chainNameSet);
|
||||
when(chainClient.hget("pollChainKey", "chain22")).thenReturn(chainValue);
|
||||
when(chainClient.scriptLoad(luaOfKey)).thenReturn("keysha");
|
||||
when(chainClient.scriptLoad(luaOfValue)).thenReturn("valuesha");
|
||||
when(chainClient.evalSha(eq("keysha"), anyString())).thenReturn("1");
|
||||
when(chainClient.evalSha(eq("valuesha"), anyString(), anyString())).thenReturn("");
|
||||
|
||||
Set<String> scriptFieldSet = new HashSet<>();
|
||||
scriptFieldSet.add("s11:script:脚本s11:groovy");
|
||||
@@ -111,9 +137,12 @@ public class RedisWithXmlELPollSpringbootTest extends BaseTest {
|
||||
when(scriptClient.hget("pollScriptKey", "s22:script:脚本s22:js")).thenReturn(s22);
|
||||
when(scriptClient.hget("pollScriptKey", "s33:script:脚本s33")).thenReturn(s33);
|
||||
//分别模拟三个script的evalsha指纹值计算的返回值, 其中s11脚本修改 指纹值变化
|
||||
when(scriptClient.evalSha(anyString(), eq("pollScriptKey"), eq("s11:script:脚本s11:groovy"))).thenReturn(s11SHA).thenReturn(changeS11SHA);
|
||||
when(scriptClient.evalSha(anyString(), eq("pollScriptKey"), eq("s22:script:脚本s22:js"))).thenReturn(s22SHA);
|
||||
when(scriptClient.evalSha(anyString(), eq("pollScriptKey"), eq("s33:script:脚本s33"))).thenReturn(s33SHA);
|
||||
when(scriptClient.scriptLoad(luaOfKey)).thenReturn("keysha");
|
||||
when(scriptClient.scriptLoad(luaOfValue)).thenReturn("valuesha");
|
||||
when(scriptClient.evalSha(eq("keysha"), anyString())).thenReturn("3");
|
||||
when(scriptClient.evalSha("valuesha", "pollScriptKey", "s11:script:脚本s11:groovy")).thenReturn(s11SHA).thenReturn(changeS11SHA);
|
||||
when(scriptClient.evalSha("valuesha", "pollScriptKey", "s22:script:脚本s22:js")).thenReturn(s22SHA);
|
||||
when(scriptClient.evalSha("valuesha", "pollScriptKey", "s33:script:脚本s33")).thenReturn(s33SHA);
|
||||
|
||||
//测试修改前的script
|
||||
LiteflowResponse response = flowExecutor.execute2Resp("chain22", "arg");
|
||||
@@ -123,11 +152,11 @@ public class RedisWithXmlELPollSpringbootTest extends BaseTest {
|
||||
Assertions.assertEquals("hello s22", context.getData("test22"));
|
||||
Assertions.assertEquals("a==>b==>c==>s11[脚本s11]==>s22[脚本s22]==>s33[脚本s33]", response.getExecuteStepStrWithoutTime());
|
||||
|
||||
flowExecutor.reloadRule();
|
||||
Thread.sleep(2000);
|
||||
|
||||
//测试修改后的script
|
||||
response = flowExecutor.execute2Resp("chain22", "arg");
|
||||
context = response.getFirstContextBean();
|
||||
context = response.getFirstContextBean();
|
||||
Assertions.assertTrue(response.isSuccess());
|
||||
Assertions.assertEquals("hello world", context.getData("test11"));
|
||||
}
|
||||
|
||||
@@ -8,10 +8,8 @@ import com.yomahub.liteflow.slot.DefaultContext;
|
||||
import com.yomahub.liteflow.test.BaseTest;
|
||||
import org.junit.jupiter.api.AfterEach;
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import org.mockito.MockitoAnnotations;
|
||||
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
|
||||
import org.springframework.boot.test.context.SpringBootTest;
|
||||
import org.springframework.boot.test.mock.mockito.MockBean;
|
||||
|
||||
Reference in New Issue
Block a user