diff --git a/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/util/RedisParserByPolling.java b/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/util/RedisParserByPolling.java index 2ac5853b1..f5dba400a 100644 --- a/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/util/RedisParserByPolling.java +++ b/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/util/RedisParserByPolling.java @@ -33,10 +33,10 @@ public class RedisParserByPolling implements RedisParserHelper{ private Jedis scriptJedis; //chainKey中chain总数 - private Integer chainFieldNum = 0; + private Integer chainNum = 0; //scriptKey中script总数 - private Integer scriptFieldNum = 0; + private Integer scriptNum = 0; //chainKey中value的SHA1加密值 用于轮询时确定value是否变化 private Map chainSHAMap = new HashMap<>(); @@ -99,7 +99,7 @@ public class RedisParserByPolling implements RedisParserHelper{ if (CollectionUtil.isEmpty(chainNameSet)) { throw new RedisException(StrUtil.format("There are no chains in key [{}]", chainKey)); } - chainFieldNum = chainNameSet.size(); + chainNum = chainNameSet.size(); // 获取chainKey下的所有子节点内容List List chainItemContentList = new ArrayList<>(); for (String chainName : chainNameSet) { @@ -120,7 +120,7 @@ public class RedisParserByPolling implements RedisParserHelper{ if (hasScript()) { String scriptKey = redisParserVO.getScriptKey(); Set scriptFieldSet = scriptJedis.hkeys(scriptKey); - scriptFieldNum = scriptFieldSet.size(); + scriptNum = scriptFieldSet.size(); List scriptItemContentList = new ArrayList<>(); for (String scriptFieldValue : scriptFieldSet) { @@ -188,7 +188,7 @@ public class RedisParserByPolling implements RedisParserHelper{ //定时任务线程池 ScheduledExecutorService pool = Executors.newScheduledThreadPool(10); - //轮询chain内容的定时任务 + //添加轮询chain的定时任务 pool.scheduleAtFixedRate(pollChainTask(keyLuaOfChain, valueLuaOfChain), 60, Long.valueOf(redisParserVO.getPollingInterval()), TimeUnit.SECONDS); @@ -198,8 +198,10 @@ public class RedisParserByPolling implements RedisParserHelper{ //将lua脚本添加到scriptJedis脚本缓存 String keyLuaOfScript = scriptJedis.scriptLoad(luaOfKey); String valueLuaOfScript = scriptJedis.scriptLoad(luaOfValue); + //添加轮询script的定时任务 + pool.scheduleAtFixedRate(pollScriptTask(keyLuaOfScript, valueLuaOfScript), + 60, Long.valueOf(redisParserVO.getPollingInterval()), TimeUnit.SECONDS); } - } @@ -211,15 +213,16 @@ public class RedisParserByPolling implements RedisParserHelper{ private Runnable pollChainTask(String keyLua, String valueLua) { Runnable r = () -> { String chainKey = redisParserVO.getChainKey(); - //先获取chainKey中最新的chain数量 + //Lua获取chainKey中最新的chain数量 String keyNum = chainJedis.evalsha(keyLua, 1, chainKey).toString(); - //修改chainFieldNum为最新chain数量 - chainFieldNum = Integer.parseInt(keyNum); + //修改chainNum为最新chain数量 + chainNum = Integer.parseInt(keyNum); //遍历Map,判断各个chain的value有无变化:修改变化了值的chain和被删除的chain for(Map.Entry entry: chainSHAMap.entrySet()) { String chainId = entry.getKey(); String oldSHA = entry.getValue(); + //在redis服务端通过Lua脚本计算SHA值 String newSHA = chainJedis.evalsha(valueLua, 2, chainKey, chainId).toString(); if (StrUtil.equals(newSHA, "nil")) { //新SHA值为nil, 即未获取到该chain,表示该chain已被删除 @@ -242,7 +245,7 @@ public class RedisParserByPolling implements RedisParserHelper{ } //处理新添加chain和chainId被修改的情况 - if (chainFieldNum > chainSHAMap.size()) { + if (chainNum > chainSHAMap.size()) { //如果封装的SHAMap数量比最新chain总数少, 说明有两种情况: // 1、添加了新chain // 2、修改了chainId:因为遍历到旧的id时会取到nil,SHAMap会把原来的chainId删掉,但没有机会添加新的chainId @@ -262,4 +265,66 @@ public class RedisParserByPolling implements RedisParserHelper{ }; return r; } + + /** + * 用于轮询script的定时任务 + * 首先根据hash中field数量的变化拉取新增的script + * 再根据hash中value的SHA值修改变化的和被删除的script + */ + private Runnable pollScriptTask(String keyLua, String valueLua) { + Runnable r = () -> { + String scriptKey = redisParserVO.getScriptKey(); + //Lua获取scriptKey中最新的script数量 + String keyNum = scriptJedis.evalsha(keyLua, 1, scriptKey).toString(); + //修改scriptNum为最新script数量 + scriptNum = Integer.parseInt(keyNum); + + //遍历Map,判断各个script的value有无变化:修改变化了值的script和被删除的script + for(Map.Entry entry: scriptSHAMap.entrySet()) { + String scriptFieldValue = entry.getKey(); + String oldSHA = entry.getValue(); + //在redis服务端通过Lua脚本计算SHA值 + String newSHA = scriptJedis.evalsha(valueLua, 2, scriptKey, scriptFieldValue).toString(); + if (StrUtil.equals(newSHA, "nil")) { + //新SHA值为nil, 即未获取到该script,表示该script已被删除 + NodeSimpleVO nodeSimpleVO = convert(scriptFieldValue); + FlowBus.getNodeMap().remove(nodeSimpleVO.getNodeId()); + LOG.info("starting reload flow config... delete key={}", scriptFieldValue); + + //修改SHAMap + scriptSHAMap.remove(scriptFieldValue); + } + else if (!StrUtil.equals(newSHA, oldSHA)) { + //SHA值发生变化,表示该script的值已被修改,重新拉取变化的script + String scriptData = scriptJedis.hget(scriptKey, scriptFieldValue); + changeScriptNode(scriptFieldValue, scriptData); + LOG.info("starting reload flow config... update key={} new value={},", scriptFieldValue, scriptData); + + //修改SHAMap + scriptSHAMap.put(scriptFieldValue, newSHA); + } + //SHA值无变化,表示该script未改变 + } + + //处理新添加script和script名被修改的情况 + if (scriptNum > scriptSHAMap.size()) { + //如果封装的SHAMap数量比最新script总数少, 说明有两种情况: + // 1、添加了新script + // 2、修改了script名:因为遍历到旧的id时会取到nil,SHAMap会把原来的script删掉,但没有机会添加新的script + // 3、上述两者结合 + //在此处重新拉取所有script名集合,补充添加新script + Set newScriptSet = scriptJedis.hkeys(scriptKey); + for (String scriptFieldValue : newScriptSet) { + if (scriptSHAMap.get(scriptFieldValue) == null) { + //将新script添加到LiteFlowChainELBuilder和SHAMap + String scriptData = scriptJedis.hget(scriptKey, scriptFieldValue); + changeScriptNode(scriptFieldValue, scriptData); + LOG.info("starting reload flow config... update key={} new value={},", scriptFieldValue, scriptData); + scriptSHAMap.put(scriptFieldValue, DigestUtil.sha1Hex(scriptData)); + } + } + } + }; + return r; + } } diff --git a/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/util/RedisParserBySubscribe.java b/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/util/RedisParserBySubscribe.java index 4ee4cea1c..9a7fc796f 100644 --- a/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/util/RedisParserBySubscribe.java +++ b/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/util/RedisParserBySubscribe.java @@ -189,14 +189,12 @@ public class RedisParserBySubscribe implements RedisParserHelper { //添加 script scriptKey.addListener((EntryCreatedListener) event -> { LOG.info("starting reload flow config... create key={} value={},", event.getKey(), event.getValue()); - NodeSimpleVO nodeSimpleVO = convert(event.getKey()); - changeScriptNode(nodeSimpleVO, event.getValue()); + changeScriptNode(event.getKey(), event.getValue()); }); //修改 script scriptKey.addListener((EntryUpdatedListener) event -> { - LOG.info("starting reload flow config... update path={} new value={},", event.getKey(), event.getValue()); - NodeSimpleVO nodeSimpleVO = convert(event.getKey()); - changeScriptNode(nodeSimpleVO, event.getValue()); + LOG.info("starting reload flow config... update key={} new value={},", event.getKey(), event.getValue()); + changeScriptNode(event.getKey(), event.getValue()); }); //删除 script scriptKey.addListener((EntryRemovedListener) event -> { @@ -206,26 +204,4 @@ public class RedisParserBySubscribe implements RedisParserHelper { }); } } - - private void changeScriptNode(NodeSimpleVO nodeSimpleVO, String newValue) { - // 有语言类型 - if (StrUtil.isNotBlank(nodeSimpleVO.getLanguage())) { - LiteFlowNodeBuilder.createScriptNode() - .setId(nodeSimpleVO.getNodeId()) - .setType(NodeTypeEnum.getEnumByCode(nodeSimpleVO.getType())) - .setName(nodeSimpleVO.getName()) - .setScript(newValue) - .setLanguage(nodeSimpleVO.getLanguage()) - .build(); - } - // 没有语言类型 - else { - LiteFlowNodeBuilder.createScriptNode() - .setId(nodeSimpleVO.getNodeId()) - .setType(NodeTypeEnum.getEnumByCode(nodeSimpleVO.getType())) - .setName(nodeSimpleVO.getName()) - .setScript(newValue) - .build(); - } - } } diff --git a/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/util/RedisParserHelper.java b/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/util/RedisParserHelper.java index ca1c7364b..821ab3f79 100644 --- a/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/util/RedisParserHelper.java +++ b/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/util/RedisParserHelper.java @@ -3,7 +3,9 @@ package com.yomahub.liteflow.parser.redis.util; import cn.hutool.core.collection.CollUtil; import cn.hutool.core.util.ReUtil; import cn.hutool.core.util.StrUtil; +import com.yomahub.liteflow.builder.LiteFlowNodeBuilder; import com.yomahub.liteflow.core.FlowExecutor; +import com.yomahub.liteflow.enums.NodeTypeEnum; import com.yomahub.liteflow.log.LFLog; import com.yomahub.liteflow.log.LFLoggerManager; import org.slf4j.Logger; @@ -38,6 +40,35 @@ public interface RedisParserHelper { void listenRedis(); + + /** + * script节点的修改/添加 + * @param scriptFieldValue 新的script名 + * @param newValue 新的script值 + */ + default void changeScriptNode(String scriptFieldValue, String newValue) { + NodeSimpleVO nodeSimpleVO = convert(scriptFieldValue); + // 有语言类型 + if (StrUtil.isNotBlank(nodeSimpleVO.getLanguage())) { + LiteFlowNodeBuilder.createScriptNode() + .setId(nodeSimpleVO.getNodeId()) + .setType(NodeTypeEnum.getEnumByCode(nodeSimpleVO.getType())) + .setName(nodeSimpleVO.getName()) + .setScript(newValue) + .setLanguage(nodeSimpleVO.getLanguage()) + .build(); + } + // 没有语言类型 + else { + LiteFlowNodeBuilder.createScriptNode() + .setId(nodeSimpleVO.getNodeId()) + .setType(NodeTypeEnum.getEnumByCode(nodeSimpleVO.getType())) + .setName(nodeSimpleVO.getName()) + .setScript(newValue) + .build(); + } + } + default NodeSimpleVO convert(String str) { // 不需要去理解这串正则,就是一个匹配冒号的 // 一定得是a:b,或是a:b:c...这种完整类型的字符串的 diff --git a/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/vo/RedisParserVO.java b/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/vo/RedisParserVO.java index d3f53f54d..1848ecb13 100644 --- a/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/vo/RedisParserVO.java +++ b/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/vo/RedisParserVO.java @@ -22,6 +22,7 @@ public class RedisParserVO { private String mode = "poll"; /*轮询时间间隔(s) 默认1分钟 若选择订阅机制可不配置*/ + //todo 确定类型是string还是long,若为string需校验 private String pollingInterval = "60"; /*chain表配置的数据库号*/