From 132729141df4650e1f77edb58d4ebd4d5bfa683d Mon Sep 17 00:00:00 2001 From: houxinyu Date: Wed, 16 Aug 2023 18:12:58 +0800 Subject: [PATCH] fix from cr --- .../redis/mode/polling/ChainPollingTask.java | 29 ++++++++++++------- .../mode/polling/RedisParserPollingMode.java | 14 ++++----- .../redis/mode/polling/ScriptPollingTask.java | 27 ++++++++++------- .../parser/redis/vo/RedisParserVO.java | 13 ++++++++- .../pom.xml | 1 + .../RedisWithXmlELPollSpringbootTest.java | 1 - .../redis/application-poll-xml.properties | 1 + 7 files changed, 56 insertions(+), 30 deletions(-) diff --git a/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/mode/polling/ChainPollingTask.java b/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/mode/polling/ChainPollingTask.java index 95b9c1b9b..6b66d030b 100644 --- a/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/mode/polling/ChainPollingTask.java +++ b/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/mode/polling/ChainPollingTask.java @@ -5,6 +5,7 @@ import cn.hutool.crypto.digest.DigestUtil; import com.yomahub.liteflow.builder.el.LiteFlowChainELBuilder; import com.yomahub.liteflow.flow.FlowBus; import com.yomahub.liteflow.log.LFLog; +import com.yomahub.liteflow.log.LFLoggerManager; import com.yomahub.liteflow.parser.redis.mode.RClient; import com.yomahub.liteflow.parser.redis.vo.RedisParserVO; @@ -16,7 +17,7 @@ import java.util.*; * @author hxinyu * @since 2.11.0 */ -public class ChainPollingTask { +public class ChainPollingTask implements Runnable { private RedisParserVO redisParserVO; @@ -26,24 +27,29 @@ public class ChainPollingTask { private Map chainSHAMap; - LFLog LOG; + private String keyLua; - public ChainPollingTask(RedisParserVO redisParserVO, RClient chainClient, Integer chainNum, Map chainSHAMap, LFLog LOG) { + private String valueLua; + + LFLog LOG = LFLoggerManager.getLogger(ChainPollingTask.class); + + public ChainPollingTask(RedisParserVO redisParserVO, RClient chainClient, Integer chainNum, Map chainSHAMap, String keyLua, String valueLua) { this.redisParserVO = redisParserVO; this.chainClient = chainClient; this.chainNum = chainNum; this.chainSHAMap = chainSHAMap; - this.LOG = LOG; + this.keyLua = keyLua; + this.valueLua = valueLua; } - /** - * 用于返回chain轮询任务的Runnable实例 + * 用于返回chain轮询任务 * 先根据hash中value的SHA值修改变化的和被删除的chain * 再根据hash中field数量的变化拉取新增的chain */ - public Runnable pollChainTask(String keyLua, String valueLua) { - Runnable r = () -> { + @Override + public void run() { + try { String chainKey = redisParserVO.getChainKey(); //Lua获取chainKey中最新的chain数量 String keyNum = chainClient.evalSha(keyLua, chainKey); @@ -52,7 +58,7 @@ public class ChainPollingTask { List needDelete = new ArrayList<>(); //遍历Map,判断各个chain的value有无变化:修改变化了值的chain和被删除的chain - for(Map.Entry entry: chainSHAMap.entrySet()) { + for (Map.Entry entry : chainSHAMap.entrySet()) { String chainId = entry.getKey(); String oldSHA = entry.getValue(); //在redis服务端通过Lua脚本计算SHA值 @@ -101,7 +107,8 @@ public class ChainPollingTask { } } } - }; - return r; + } catch (Exception e) { + LOG.error("[Exception during chain polling] " + e.getMessage(), e); + } } } diff --git a/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/mode/polling/RedisParserPollingMode.java b/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/mode/polling/RedisParserPollingMode.java index 318994a8c..c5d933682 100644 --- a/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/mode/polling/RedisParserPollingMode.java +++ b/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/mode/polling/RedisParserPollingMode.java @@ -44,7 +44,7 @@ public class RedisParserPollingMode implements RedisParserHelper { private Map scriptSHAMap = new HashMap<>(); //定时任务线程池核心线程数 - private static final int CORE_POOL_SIZE = 1; + private static final int CORE_POOL_SIZE = 2; //计算hash中field数量的lua脚本 private final String luaOfKey = "local keys = redis.call(\"hkeys\", KEYS[1]);\n" + @@ -187,9 +187,9 @@ public class RedisParserPollingMode implements RedisParserHelper { new ThreadPoolExecutor.DiscardOldestPolicy()); //添加轮询chain的定时任务 - ChainPollingTask chainTask = new ChainPollingTask(redisParserVO, chainClient, chainNum, chainSHAMap, LOG); - pollExecutor.scheduleAtFixedRate(chainTask.pollChainTask(keyLuaOfChain, valueLuaOfChain), - 60, redisParserVO.getPollingInterval().longValue(), TimeUnit.SECONDS); + ChainPollingTask chainTask = new ChainPollingTask(redisParserVO, chainClient, chainNum, chainSHAMap, keyLuaOfChain, valueLuaOfChain); + pollExecutor.scheduleAtFixedRate(chainTask, redisParserVO.getPollingStartTime().longValue(), + redisParserVO.getPollingInterval().longValue(), TimeUnit.SECONDS); //如果有脚本 if (ObjectUtil.isNotNull(scriptClient) && ObjectUtil.isNotNull(redisParserVO.getScriptDataBase()) @@ -199,9 +199,9 @@ public class RedisParserPollingMode implements RedisParserHelper { String valueLuaOfScript = scriptClient.scriptLoad(luaOfValue); //添加轮询script的定时任务 - ScriptPollingTask scriptTask = new ScriptPollingTask(redisParserVO, scriptClient, scriptNum, scriptSHAMap, LOG); - pollExecutor.scheduleAtFixedRate(scriptTask.pollScriptTask(keyLuaOfScript, valueLuaOfScript), - 60, redisParserVO.getPollingInterval().longValue(), TimeUnit.SECONDS); + ScriptPollingTask scriptTask = new ScriptPollingTask(redisParserVO, scriptClient, scriptNum, scriptSHAMap, keyLuaOfScript, valueLuaOfScript); + pollExecutor.scheduleAtFixedRate(scriptTask, redisParserVO.getPollingStartTime().longValue(), + redisParserVO.getPollingInterval().longValue(), TimeUnit.SECONDS); } } } diff --git a/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/mode/polling/ScriptPollingTask.java b/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/mode/polling/ScriptPollingTask.java index f53a62b04..a23965c37 100644 --- a/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/mode/polling/ScriptPollingTask.java +++ b/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/mode/polling/ScriptPollingTask.java @@ -4,6 +4,7 @@ import cn.hutool.core.util.StrUtil; import cn.hutool.crypto.digest.DigestUtil; import com.yomahub.liteflow.flow.FlowBus; import com.yomahub.liteflow.log.LFLog; +import com.yomahub.liteflow.log.LFLoggerManager; import com.yomahub.liteflow.parser.redis.mode.RClient; import com.yomahub.liteflow.parser.redis.mode.RedisParserHelper; import com.yomahub.liteflow.parser.redis.vo.RedisParserVO; @@ -19,7 +20,7 @@ import java.util.Set; * @author hxinyu * @since 2.11.0 */ -public class ScriptPollingTask { +public class ScriptPollingTask implements Runnable { private RedisParserVO redisParserVO; @@ -29,24 +30,29 @@ public class ScriptPollingTask { private Map scriptSHAMap; - LFLog LOG; + private String keyLua; - public ScriptPollingTask(RedisParserVO redisParserVO, RClient scriptClient, Integer scriptNum, Map scriptSHAMap, LFLog LOG) { + private String valueLua; + + LFLog LOG = LFLoggerManager.getLogger(ScriptPollingTask.class); + + public ScriptPollingTask(RedisParserVO redisParserVO, RClient scriptClient, Integer scriptNum, Map scriptSHAMap, String keyLua, String valueLua) { this.redisParserVO = redisParserVO; this.scriptClient = scriptClient; this.scriptNum = scriptNum; this.scriptSHAMap = scriptSHAMap; - this.LOG = LOG; + this.keyLua = keyLua; + this.valueLua = valueLua; } - /** - * 用于返回script轮询任务的Runnable实例 + * 用于返回script轮询任务 * 首先根据hash中field数量的变化拉取新增的script * 再根据hash中value的SHA值修改变化的和被删除的script */ - public Runnable pollScriptTask(String keyLua, String valueLua) { - Runnable r = () -> { + @Override + public void run() { + try { String scriptKey = redisParserVO.getScriptKey(); //Lua获取scriptKey中最新的script数量 String keyNum = scriptClient.evalSha(keyLua, scriptKey); @@ -105,7 +111,8 @@ public class ScriptPollingTask { } } } - }; - return r; + } catch (Exception e) { + LOG.error("[Exception during script polling] " + e.getMessage(), e); + } } } \ No newline at end of file diff --git a/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/vo/RedisParserVO.java b/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/vo/RedisParserVO.java index beeee195c..9ec2026f8 100644 --- a/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/vo/RedisParserVO.java +++ b/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/vo/RedisParserVO.java @@ -23,9 +23,12 @@ public class RedisParserVO { /*监听机制 轮询为poll 订阅为subscribe 默认为poll*/ private RedisParserMode mode = RedisParserMode.POLL; - /*轮询时间间隔(s) 默认1分钟 若选择订阅机制可不配置*/ + /*轮询时间间隔(s) 默认60s 若选择订阅机制可不配置*/ private Integer pollingInterval = 60; + /*规则配置后首次轮询的起始时间 默认为60s 若选择订阅机制可不配置*/ + private Integer pollingStartTime = 60; + /*chain表配置的数据库号*/ private Integer chainDataBase; @@ -77,6 +80,14 @@ public class RedisParserVO { } } + public Integer getPollingStartTime() { + return pollingStartTime; + } + + public void setPollingStartTime(Integer pollingStartTime) { + this.pollingStartTime = pollingStartTime; + } + public Integer getPollingInterval() { return pollingInterval; } diff --git a/liteflow-testcase-el/liteflow-testcase-el-redis-springboot/pom.xml b/liteflow-testcase-el/liteflow-testcase-el-redis-springboot/pom.xml index 90fe5ef9d..4b493d8ab 100644 --- a/liteflow-testcase-el/liteflow-testcase-el-redis-springboot/pom.xml +++ b/liteflow-testcase-el/liteflow-testcase-el-redis-springboot/pom.xml @@ -43,6 +43,7 @@ ${revision} test + \ No newline at end of file diff --git a/liteflow-testcase-el/liteflow-testcase-el-redis-springboot/src/test/java/com/yomahub/liteflow/test/redis/RedisWithXmlELPollSpringbootTest.java b/liteflow-testcase-el/liteflow-testcase-el-redis-springboot/src/test/java/com/yomahub/liteflow/test/redis/RedisWithXmlELPollSpringbootTest.java index 7ce8d5c74..45103a0f2 100644 --- a/liteflow-testcase-el/liteflow-testcase-el-redis-springboot/src/test/java/com/yomahub/liteflow/test/redis/RedisWithXmlELPollSpringbootTest.java +++ b/liteflow-testcase-el/liteflow-testcase-el-redis-springboot/src/test/java/com/yomahub/liteflow/test/redis/RedisWithXmlELPollSpringbootTest.java @@ -9,7 +9,6 @@ import com.yomahub.liteflow.slot.DefaultContext; import com.yomahub.liteflow.test.BaseTest; import org.junit.jupiter.api.*; import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.MockitoAnnotations; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.boot.test.mock.mockito.MockBean; diff --git a/liteflow-testcase-el/liteflow-testcase-el-redis-springboot/src/test/resources/redis/application-poll-xml.properties b/liteflow-testcase-el/liteflow-testcase-el-redis-springboot/src/test/resources/redis/application-poll-xml.properties index c89b08663..567bff7b4 100644 --- a/liteflow-testcase-el/liteflow-testcase-el-redis-springboot/src/test/resources/redis/application-poll-xml.properties +++ b/liteflow-testcase-el/liteflow-testcase-el-redis-springboot/src/test/resources/redis/application-poll-xml.properties @@ -2,6 +2,7 @@ liteflow.rule-source-ext-data={\ "host":"localhost",\ "port":6379,\ "pollingInterval":1,\ + "pollingStartTime":1,\ "chainDataBase":1,\ "chainKey":"pollChainKey",\ "scriptDataBase":1,\