diff --git a/liteflow-rule-plugin/liteflow-rule-redis/pom.xml b/liteflow-rule-plugin/liteflow-rule-redis/pom.xml index a1e05b655..4ee4a1c7f 100644 --- a/liteflow-rule-plugin/liteflow-rule-redis/pom.xml +++ b/liteflow-rule-plugin/liteflow-rule-redis/pom.xml @@ -32,6 +32,12 @@ jedis ${jedis.version} + + + cn.hutool + hutool-crypto + ${hutool-crypto.version} + \ No newline at end of file diff --git a/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/util/RedisParserByPolling.java b/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/util/RedisParserByPolling.java index 495f7b056..b49844371 100644 --- a/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/util/RedisParserByPolling.java +++ b/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/util/RedisParserByPolling.java @@ -4,14 +4,18 @@ import cn.hutool.core.collection.CollUtil; import cn.hutool.core.collection.CollectionUtil; import cn.hutool.core.util.ObjectUtil; import cn.hutool.core.util.StrUtil; +import cn.hutool.crypto.digest.DigestUtil; +import com.yomahub.liteflow.builder.el.LiteFlowChainELBuilder; +import com.yomahub.liteflow.flow.FlowBus; import com.yomahub.liteflow.parser.redis.exception.RedisException; import com.yomahub.liteflow.parser.redis.vo.RedisParserVO; import com.yomahub.liteflow.spi.holder.ContextAwareHolder; import redis.clients.jedis.Jedis; -import java.util.ArrayList; -import java.util.List; -import java.util.Set; +import java.util.*; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; /** * Redis 轮询机制实现类 @@ -24,33 +28,59 @@ public class RedisParserByPolling implements RedisParserHelper{ private final RedisParserVO redisParserVO; - private Jedis chainClient; + private Jedis chainJedis; - private Jedis scriptClient; + private Jedis scriptJedis; + + //chainKey中chain总数 + private Integer chainFieldNum = 0; + + //scriptKey中script总数 + private Integer scriptFieldNum = 0; + + //chainKey中value的SHA1加密值 用于轮询时确定value是否变化 + private Map chainSHAMap = new HashMap<>(); + + //scriptKey中value的SHA1加密值 用于轮询时确定value是否变化 + private Map scriptSHAMap = new HashMap<>(); + + //计算hash中field数量的lua脚本 + private String luaOfKey = "local keys = redis.call(\"hkeys\", KEYS[1]);\n" + + "return #keys;\n"; + + //计算hash中value的SHA值的lua脚本 + private String luaOfValue = "local key = KEYS[1];\n" + + "local field = KEYS[2];\n" + + "local value, err = redis.call(\"hget\", key, field);\n" + + "if value == false or value == nil then\n" + + " return \"nil\";\n" + + "end\n" + + "local sha1 = redis.sha1hex(value);\n" + + "return sha1;"; public RedisParserByPolling(RedisParserVO redisParserVO) { this.redisParserVO = redisParserVO; try{ try{ - this.chainClient = ContextAwareHolder.loadContextAware().getBean("chainJClient"); - this.scriptClient = ContextAwareHolder.loadContextAware().getBean("scriptJClient"); + this.chainJedis = ContextAwareHolder.loadContextAware().getBean("chainJClient"); + this.scriptJedis = ContextAwareHolder.loadContextAware().getBean("scriptJClient"); } catch (Exception ignored) { } - if (ObjectUtil.isNull(chainClient)) { - chainClient = new Jedis(redisParserVO.getHost(), Integer.parseInt(redisParserVO.getPort())); + if (ObjectUtil.isNull(chainJedis)) { + chainJedis = new Jedis(redisParserVO.getHost(), Integer.parseInt(redisParserVO.getPort())); if (StrUtil.isNotBlank(redisParserVO.getPassword())) { - chainClient.auth(redisParserVO.getPassword()); + chainJedis.auth(redisParserVO.getPassword()); } - chainClient.select(redisParserVO.getChainDataBase()); + chainJedis.select(redisParserVO.getChainDataBase()); //如果有脚本数据 if (ObjectUtil.isNotNull(redisParserVO.getScriptDataBase())) { - scriptClient = new Jedis(redisParserVO.getHost(), Integer.parseInt(redisParserVO.getPort())); + scriptJedis = new Jedis(redisParserVO.getHost(), Integer.parseInt(redisParserVO.getPort())); if (StrUtil.isNotBlank(redisParserVO.getPassword())) { - scriptClient.auth(redisParserVO.getPassword()); + scriptJedis.auth(redisParserVO.getPassword()); } - scriptClient.select(redisParserVO.getScriptDataBase()); + scriptJedis.select(redisParserVO.getScriptDataBase()); } } } @@ -64,17 +94,22 @@ public class RedisParserByPolling implements RedisParserHelper{ try { // 检查chainKey下有没有子节点 String chainKey = redisParserVO.getChainKey(); - Set chainNameSet = chainClient.hkeys(chainKey); + Set chainNameSet = chainJedis.hkeys(chainKey); if (CollectionUtil.isEmpty(chainNameSet)) { throw new RedisException(StrUtil.format("There are no chains in key [{}]", chainKey)); } + chainFieldNum = chainNameSet.size(); // 获取chainKey下的所有子节点内容List List chainItemContentList = new ArrayList<>(); for (String chainName : chainNameSet) { - String chainData = chainClient.hget(chainKey, chainName); + String chainData = chainJedis.hget(chainKey, chainName); if (StrUtil.isNotBlank(chainData)) { chainItemContentList.add(StrUtil.format(CHAIN_XML_PATTERN, chainName, chainData)); } + + //计算该chainData的SHA值 + String chainSHA = DigestUtil.sha1Hex(chainData); + chainSHAMap.put(chainName, chainSHA); } // 合并成所有chain的xml内容 String chainAllContent = CollUtil.join(chainItemContentList, StrUtil.EMPTY); @@ -83,7 +118,8 @@ public class RedisParserByPolling implements RedisParserHelper{ String scriptAllContent = StrUtil.EMPTY; if (hasScript()) { String scriptKey = redisParserVO.getScriptKey(); - Set scriptFieldSet = scriptClient.hkeys(scriptKey); + Set scriptFieldSet = scriptJedis.hkeys(scriptKey); + scriptFieldNum = scriptFieldSet.size(); List scriptItemContentList = new ArrayList<>(); for (String scriptFieldValue : scriptFieldSet) { @@ -93,7 +129,7 @@ public class RedisParserByPolling implements RedisParserHelper{ StrUtil.format("The name of the redis field [{}] in scriptKey [{}] is invalid", scriptFieldValue, scriptKey)); } - String scriptData = scriptClient.hget(scriptKey, scriptFieldValue); + String scriptData = scriptJedis.hget(scriptKey, scriptFieldValue); // 有语言类型 if (StrUtil.isNotBlank(nodeSimpleVO.getLanguage())) { @@ -106,6 +142,10 @@ public class RedisParserByPolling implements RedisParserHelper{ scriptItemContentList.add(StrUtil.format(NODE_ITEM_XML_PATTERN, nodeSimpleVO.getNodeId(), nodeSimpleVO.getName(), nodeSimpleVO.getType(), scriptData)); } + + //计算scriptData的SHA值 + String scriptSHA = DigestUtil.sha1Hex(scriptData); + scriptSHAMap.put(scriptFieldValue, scriptSHA); } scriptAllContent = StrUtil.format(NODE_XML_PATTERN, @@ -120,7 +160,7 @@ public class RedisParserByPolling implements RedisParserHelper{ } public boolean hasScript() { - if (ObjectUtil.isNull(scriptClient) || ObjectUtil.isNull(redisParserVO.getScriptDataBase())) { + if (ObjectUtil.isNull(scriptJedis) || ObjectUtil.isNull(redisParserVO.getScriptDataBase())) { return false; } try{ @@ -128,7 +168,7 @@ public class RedisParserByPolling implements RedisParserHelper{ if (StrUtil.isBlank(scriptKey)) { return false; } - Set scriptKeySet = scriptClient.hkeys(scriptKey); + Set scriptKeySet = scriptJedis.hkeys(scriptKey); return !CollUtil.isEmpty(scriptKeySet); } catch (Exception e) { @@ -138,6 +178,74 @@ public class RedisParserByPolling implements RedisParserHelper{ @Override public void listenRedis() { + //将lua脚本添加到chainJedis脚本缓存 + String keyLuaOfChain = chainJedis.scriptLoad(luaOfKey); + String valueLuaOfChain = chainJedis.scriptLoad(luaOfValue); + //定时任务线程池 + ScheduledExecutorService pool = Executors.newScheduledThreadPool(10); + //轮询chain内容的定时任务 + pool.scheduleAtFixedRate(pollChainTask(keyLuaOfChain, valueLuaOfChain), + 60, Long.valueOf(redisParserVO.getPollingInterval()), TimeUnit.SECONDS); + } + + + /** + * 用于轮询chain的定时任务 + */ + private Runnable pollChainTask(String keyLua, String valueLua) { + Runnable r = new Runnable() { + @Override + public void run() { + String chainKey = redisParserVO.getChainKey(); + //先判断chainKey中chain数量有无增长 + String keyNum = chainJedis.evalsha(keyLua, 1, chainKey).toString(); + if (Integer.parseInt(keyNum) > chainFieldNum) { + //有新增加的chain,重新从redis中拉取chainId集合, 对比出新增的chain + Set newChainSet = chainJedis.hkeys(chainKey); + Set oldChainSet = chainSHAMap.keySet(); + //求出差集,即新增的chain + Set newAdd = new HashSet<>(); + newAdd.addAll(newChainSet); + newAdd.removeAll(oldChainSet); + for (String newChainName : newAdd) { + String chainData = chainJedis.hget(chainKey, newChainName); + LiteFlowChainELBuilder.createChain().setChainId(newChainName).setEL(chainData).build(); + LOG.info("starting poll flow config... update key={} new value={},", newChainName, chainData); + + //修改SHAMap + chainSHAMap.put(newChainName, DigestUtil.sha1Hex(chainData)); + } + //修改chainFieldNum + chainFieldNum = newChainSet.size(); + } + + //遍历Map,判断各个chain的值有无变化 + for(Map.Entry entry: chainSHAMap.entrySet()) { + String chainName = entry.getKey(); + String oldSHA = entry.getValue(); + String newSHA = chainJedis.evalsha(valueLua, 2, chainKey, chainName).toString(); + if (StrUtil.equals(newSHA, "nil")) { + //新SHA值为nil, 即未获取到该chain,表示该chain已被删除 + FlowBus.removeChain(chainName); + LOG.info("starting reload flow config... delete key={}", chainName); + + //修改SHAMap + chainSHAMap.remove(chainName); + } + else if (!StrUtil.equals(newSHA, oldSHA)) { + //SHA值发生变化 表示该chain的值已被修改 重新拉取变化的chain + String chainData = chainJedis.hget(chainKey, chainName); + LiteFlowChainELBuilder.createChain().setChainId(chainName).setEL(chainData).build(); + LOG.info("starting poll flow config... update key={} new value={},", chainName, chainData); + + //修改SHAMap + chainSHAMap.put(chainName, newSHA); + } + //SHA值无变化 表示该chain未改变 + } + } + }; + return r; } } diff --git a/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/util/RedisParserBySubscribe.java b/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/util/RedisParserBySubscribe.java index d144c9b57..934c50340 100644 --- a/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/util/RedisParserBySubscribe.java +++ b/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/util/RedisParserBySubscribe.java @@ -172,7 +172,7 @@ public class RedisParserBySubscribe implements RedisParserHelper { }); //修改 chain chainKey.addListener((EntryUpdatedListener) event -> { - LOG.info("starting reload flow config... update path={} new value={},", event.getKey(), event.getValue()); + LOG.info("starting reload flow config... update key={} new value={},", event.getKey(), event.getValue()); LiteFlowChainELBuilder.createChain().setChainId(event.getKey()).setEL(event.getValue()).build(); }); //删除 chain diff --git a/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/vo/RedisParserVO.java b/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/vo/RedisParserVO.java index 51e8024db..45d12a4b8 100644 --- a/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/vo/RedisParserVO.java +++ b/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/vo/RedisParserVO.java @@ -21,8 +21,8 @@ public class RedisParserVO { /*监听机制 轮询为poll 订阅为subscribe 默认为poll*/ private String mode = "poll"; - /*轮询时间间隔(ms) 默认1分钟 若选择订阅机制可不配置*/ - private String pollingInterval = "60000"; + /*轮询时间间隔(s) 默认1分钟 若选择订阅机制可不配置*/ + private String pollingInterval = "60"; /*chain表配置的数据库号*/ private Integer chainDataBase; diff --git a/pom.xml b/pom.xml index 5edcba195..306c242e6 100644 --- a/pom.xml +++ b/pom.xml @@ -76,6 +76,7 @@ 2.11.0 3.21.0 4.3.1 + 5.8.18