diff --git a/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/mode/polling/RedisParserPollingMode.java b/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/mode/polling/RedisParserPollingMode.java index c5d933682..ae4d2ee38 100644 --- a/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/mode/polling/RedisParserPollingMode.java +++ b/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/mode/polling/RedisParserPollingMode.java @@ -2,6 +2,7 @@ package com.yomahub.liteflow.parser.redis.mode.polling; import cn.hutool.core.collection.CollUtil; import cn.hutool.core.collection.CollectionUtil; +import cn.hutool.core.thread.NamedThreadFactory; import cn.hutool.core.util.ObjectUtil; import cn.hutool.core.util.StrUtil; import cn.hutool.crypto.digest.DigestUtil; @@ -46,6 +47,9 @@ public class RedisParserPollingMode implements RedisParserHelper { //定时任务线程池核心线程数 private static final int CORE_POOL_SIZE = 2; + //定时任务线程池 + private ScheduledThreadPoolExecutor pollExecutor; + //计算hash中field数量的lua脚本 private final String luaOfKey = "local keys = redis.call(\"hkeys\", KEYS[1]);\n" + "return #keys;\n"; @@ -79,6 +83,14 @@ public class RedisParserPollingMode implements RedisParserHelper { this.scriptClient = new RClient(Redisson.create(config)); } } + //创建定时任务线程池 + if (ObjectUtil.isNull(pollExecutor)) { + ThreadFactory namedThreadFactory = new NamedThreadFactory("RedisParser-Polling-Thread-", false); + pollExecutor = new ScheduledThreadPoolExecutor( + CORE_POOL_SIZE, + namedThreadFactory, + new ThreadPoolExecutor.DiscardOldestPolicy()); + } } catch (Exception e) { throw new RedisException(e.getMessage()); @@ -181,11 +193,6 @@ public class RedisParserPollingMode implements RedisParserHelper { String keyLuaOfChain = chainClient.scriptLoad(luaOfKey); String valueLuaOfChain = chainClient.scriptLoad(luaOfValue); - //定时任务线程池 - ScheduledThreadPoolExecutor pollExecutor = new ScheduledThreadPoolExecutor( - CORE_POOL_SIZE, - new ThreadPoolExecutor.DiscardOldestPolicy()); - //添加轮询chain的定时任务 ChainPollingTask chainTask = new ChainPollingTask(redisParserVO, chainClient, chainNum, chainSHAMap, keyLuaOfChain, valueLuaOfChain); pollExecutor.scheduleAtFixedRate(chainTask, redisParserVO.getPollingStartTime().longValue(), diff --git a/liteflow-testcase-el/liteflow-testcase-el-redis-springboot/src/test/java/com/yomahub/liteflow/test/redis/RedisWithXmlELPollSpringbootTest.java b/liteflow-testcase-el/liteflow-testcase-el-redis-springboot/src/test/java/com/yomahub/liteflow/test/redis/RedisWithXmlELPollSpringbootTest.java index 45103a0f2..18786d427 100644 --- a/liteflow-testcase-el/liteflow-testcase-el-redis-springboot/src/test/java/com/yomahub/liteflow/test/redis/RedisWithXmlELPollSpringbootTest.java +++ b/liteflow-testcase-el/liteflow-testcase-el-redis-springboot/src/test/java/com/yomahub/liteflow/test/redis/RedisWithXmlELPollSpringbootTest.java @@ -44,6 +44,20 @@ public class RedisWithXmlELPollSpringbootTest extends BaseTest { @Resource private FlowExecutor flowExecutor; + //计算hash中field数量的lua脚本 + private final String luaOfKey = "local keys = redis.call(\"hkeys\", KEYS[1]);\n" + + "return #keys;\n"; + + //计算hash中value的SHA值的lua脚本 + private final String luaOfValue = "local key = KEYS[1];\n" + + "local field = KEYS[2];\n" + + "local value, err = redis.call(\"hget\", key, field);\n" + + "if value == false or value == nil then\n" + + " return \"nil\";\n" + + "end\n" + + "local sha1 = redis.sha1hex(value);\n" + + "return sha1;"; + @AfterEach public void after() { FlowBus.cleanCache(); @@ -53,7 +67,7 @@ public class RedisWithXmlELPollSpringbootTest extends BaseTest { * 测试chain */ @Test - public void testPollWithXml() { + public void testPollWithXml() throws InterruptedException { Set chainNameSet = new HashSet<>(); chainNameSet.add("chain11"); String chainValue = "THEN(a, b, c);"; @@ -65,14 +79,22 @@ public class RedisWithXmlELPollSpringbootTest extends BaseTest { String changeChainSHA = DigestUtil.sha1Hex(changeChainValue); when(chainClient.hkeys("pollChainKey")).thenReturn(chainNameSet); when(chainClient.hget("pollChainKey", "chain11")).thenReturn(chainValue).thenReturn(changeChainValue); - when(chainClient.evalSha(anyString(), anyString())).thenReturn(chainSHA).thenReturn(changeChainSHA); + when(chainClient.scriptLoad(luaOfKey)).thenReturn("keysha"); + when(chainClient.scriptLoad(luaOfValue)).thenReturn("valuesha"); + when(chainClient.evalSha(eq("keysha"), anyString())).thenReturn("1"); + when(chainClient.evalSha(eq("valuesha"), anyString(), anyString())).thenReturn(chainSHA).thenReturn(changeChainSHA); + //这里其实并没有script数据 预设数据只是为了不产生NumberFormatException + when(scriptClient.scriptLoad(luaOfKey)).thenReturn("keysha"); + when(scriptClient.scriptLoad(luaOfValue)).thenReturn("valuesha"); + when(scriptClient.evalSha(eq("keysha"), anyString())).thenReturn("0"); + when(scriptClient.evalSha(eq("valuesha"), anyString(), anyString())).thenReturn(""); //测试修改前的chain LiteflowResponse response = flowExecutor.execute2Resp("chain11", "arg"); Assertions.assertTrue(response.isSuccess()); Assertions.assertEquals("a==>b==>c", response.getExecuteStepStr()); - flowExecutor.reloadRule(); + Thread.sleep(2000); //测试修改后的chain response = flowExecutor.execute2Resp("chain11", "arg"); @@ -84,12 +106,16 @@ public class RedisWithXmlELPollSpringbootTest extends BaseTest { * 测试script */ @Test - public void testPollWithScriptXml() { + public void testPollWithScriptXml() throws InterruptedException { Set chainNameSet = new HashSet<>(); chainNameSet.add("chain22"); String chainValue = "THEN(a, b, c, s11, s22, s33);"; when(chainClient.hkeys("pollChainKey")).thenReturn(chainNameSet); when(chainClient.hget("pollChainKey", "chain22")).thenReturn(chainValue); + when(chainClient.scriptLoad(luaOfKey)).thenReturn("keysha"); + when(chainClient.scriptLoad(luaOfValue)).thenReturn("valuesha"); + when(chainClient.evalSha(eq("keysha"), anyString())).thenReturn("1"); + when(chainClient.evalSha(eq("valuesha"), anyString(), anyString())).thenReturn(""); Set scriptFieldSet = new HashSet<>(); scriptFieldSet.add("s11:script:脚本s11:groovy"); @@ -111,9 +137,12 @@ public class RedisWithXmlELPollSpringbootTest extends BaseTest { when(scriptClient.hget("pollScriptKey", "s22:script:脚本s22:js")).thenReturn(s22); when(scriptClient.hget("pollScriptKey", "s33:script:脚本s33")).thenReturn(s33); //分别模拟三个script的evalsha指纹值计算的返回值, 其中s11脚本修改 指纹值变化 - when(scriptClient.evalSha(anyString(), eq("pollScriptKey"), eq("s11:script:脚本s11:groovy"))).thenReturn(s11SHA).thenReturn(changeS11SHA); - when(scriptClient.evalSha(anyString(), eq("pollScriptKey"), eq("s22:script:脚本s22:js"))).thenReturn(s22SHA); - when(scriptClient.evalSha(anyString(), eq("pollScriptKey"), eq("s33:script:脚本s33"))).thenReturn(s33SHA); + when(scriptClient.scriptLoad(luaOfKey)).thenReturn("keysha"); + when(scriptClient.scriptLoad(luaOfValue)).thenReturn("valuesha"); + when(scriptClient.evalSha(eq("keysha"), anyString())).thenReturn("3"); + when(scriptClient.evalSha("valuesha", "pollScriptKey", "s11:script:脚本s11:groovy")).thenReturn(s11SHA).thenReturn(changeS11SHA); + when(scriptClient.evalSha("valuesha", "pollScriptKey", "s22:script:脚本s22:js")).thenReturn(s22SHA); + when(scriptClient.evalSha("valuesha", "pollScriptKey", "s33:script:脚本s33")).thenReturn(s33SHA); //测试修改前的script LiteflowResponse response = flowExecutor.execute2Resp("chain22", "arg"); @@ -123,11 +152,11 @@ public class RedisWithXmlELPollSpringbootTest extends BaseTest { Assertions.assertEquals("hello s22", context.getData("test22")); Assertions.assertEquals("a==>b==>c==>s11[脚本s11]==>s22[脚本s22]==>s33[脚本s33]", response.getExecuteStepStrWithoutTime()); - flowExecutor.reloadRule(); + Thread.sleep(2000); //测试修改后的script response = flowExecutor.execute2Resp("chain22", "arg"); - context = response.getFirstContextBean(); + context = response.getFirstContextBean(); Assertions.assertTrue(response.isSuccess()); Assertions.assertEquals("hello world", context.getData("test11")); } diff --git a/liteflow-testcase-el/liteflow-testcase-el-redis-springboot/src/test/java/com/yomahub/liteflow/test/redis/RedisWithXmlELSubscribeSpringbootTest.java b/liteflow-testcase-el/liteflow-testcase-el-redis-springboot/src/test/java/com/yomahub/liteflow/test/redis/RedisWithXmlELSubscribeSpringbootTest.java index a7fb01985..f926789e4 100644 --- a/liteflow-testcase-el/liteflow-testcase-el-redis-springboot/src/test/java/com/yomahub/liteflow/test/redis/RedisWithXmlELSubscribeSpringbootTest.java +++ b/liteflow-testcase-el/liteflow-testcase-el-redis-springboot/src/test/java/com/yomahub/liteflow/test/redis/RedisWithXmlELSubscribeSpringbootTest.java @@ -8,10 +8,8 @@ import com.yomahub.liteflow.slot.DefaultContext; import com.yomahub.liteflow.test.BaseTest; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.MockitoAnnotations; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.boot.test.mock.mockito.MockBean;