diff --git a/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/RedisXmlELParser.java b/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/RedisXmlELParser.java index daaba861c..23961a500 100644 --- a/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/RedisXmlELParser.java +++ b/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/RedisXmlELParser.java @@ -89,7 +89,7 @@ public class RedisXmlELParser extends ClassXmlFlowELParser { if (StrUtil.isBlank(redisParserVO.getHost())) { throw new RedisException(StrFormatter.format(ERROR_MSG_PATTERN, "host")); } - if (StrUtil.isBlank(redisParserVO.getPort())) { + if (ObjectUtil.isNull(redisParserVO.getPort())) { throw new RedisException(StrFormatter.format(ERROR_MSG_PATTERN, "port")); } if (ObjectUtil.isNull(redisParserVO.getChainDataBase())) { diff --git a/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/util/RedisParserByPolling.java b/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/util/RedisParserByPolling.java index 5cdf59b98..3eb48e0c9 100644 --- a/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/util/RedisParserByPolling.java +++ b/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/util/RedisParserByPolling.java @@ -63,20 +63,21 @@ public class RedisParserByPolling implements RedisParserHelper{ try{ try{ - this.chainJedis = ContextAwareHolder.loadContextAware().getBean("chainJClient"); - this.scriptJedis = ContextAwareHolder.loadContextAware().getBean("scriptJClient"); + this.chainJedis = ContextAwareHolder.loadContextAware().getBean("chainJedis"); + this.scriptJedis = ContextAwareHolder.loadContextAware().getBean("scriptJedis"); } catch (Exception ignored) { } if (ObjectUtil.isNull(chainJedis)) { - chainJedis = new Jedis(redisParserVO.getHost(), Integer.parseInt(redisParserVO.getPort())); + chainJedis = new Jedis(redisParserVO.getHost(), redisParserVO.getPort()); + //如果配置了密码 if (StrUtil.isNotBlank(redisParserVO.getPassword())) { chainJedis.auth(redisParserVO.getPassword()); } chainJedis.select(redisParserVO.getChainDataBase()); //如果有脚本数据 if (ObjectUtil.isNotNull(redisParserVO.getScriptDataBase())) { - scriptJedis = new Jedis(redisParserVO.getHost(), Integer.parseInt(redisParserVO.getPort())); + scriptJedis = new Jedis(redisParserVO.getHost(), redisParserVO.getPort()); if (StrUtil.isNotBlank(redisParserVO.getPassword())) { scriptJedis.auth(redisParserVO.getPassword()); } @@ -176,6 +177,9 @@ public class RedisParserByPolling implements RedisParserHelper{ } } + /** + * 定时轮询拉取Redis中变化的数据 + */ @Override public void listenRedis() { //将lua脚本添加到chainJedis脚本缓存 @@ -188,8 +192,10 @@ public class RedisParserByPolling implements RedisParserHelper{ pool.scheduleAtFixedRate(pollChainTask(keyLuaOfChain, valueLuaOfChain), 60, Long.valueOf(redisParserVO.getPollingInterval()), TimeUnit.SECONDS); - //如果有脚本内容 - if (ObjectUtil.isNotNull(scriptJedis) && ObjectUtil.isNotNull(redisParserVO.getScriptDataBase())) { + //如果有脚本 + if (ObjectUtil.isNotNull(scriptJedis) && ObjectUtil.isNotNull(redisParserVO.getScriptDataBase()) + && StrUtil.isNotBlank(redisParserVO.getScriptKey())) { + //将lua脚本添加到scriptJedis脚本缓存 String keyLuaOfScript = scriptJedis.scriptLoad(luaOfKey); String valueLuaOfScript = scriptJedis.scriptLoad(luaOfValue); } @@ -199,6 +205,8 @@ public class RedisParserByPolling implements RedisParserHelper{ /** * 用于轮询chain的定时任务 + * 首先根据hash中field数量的变化拉取新增的chain + * 再根据hash中value的SHA值修改变化的和被删除的chain */ private Runnable pollChainTask(String keyLua, String valueLua) { Runnable r = new Runnable() { @@ -207,6 +215,8 @@ public class RedisParserByPolling implements RedisParserHelper{ String chainKey = redisParserVO.getChainKey(); //先判断chainKey中chain数量有无增长 String keyNum = chainJedis.evalsha(keyLua, 1, chainKey).toString(); + //修改chainFieldNum为最新chain数量 + chainFieldNum = Integer.parseInt(keyNum); if (Integer.parseInt(keyNum) > chainFieldNum) { //有新增加的chain,重新从redis中拉取chainId集合, 对比出新增的chain Set newChainSet = chainJedis.hkeys(chainKey); @@ -215,41 +225,55 @@ public class RedisParserByPolling implements RedisParserHelper{ Set newAdd = new HashSet<>(); newAdd.addAll(newChainSet); newAdd.removeAll(oldChainSet); - for (String newChainName : newAdd) { - String chainData = chainJedis.hget(chainKey, newChainName); - LiteFlowChainELBuilder.createChain().setChainId(newChainName).setEL(chainData).build(); - LOG.info("starting poll flow config... update key={} new value={},", newChainName, chainData); + for (String newChainId : newAdd) { + String chainData = chainJedis.hget(chainKey, newChainId); + LiteFlowChainELBuilder.createChain().setChainId(newChainId).setEL(chainData).build(); + LOG.info("starting poll flow config... update key={} new value={},", newChainId, chainData); //修改SHAMap - chainSHAMap.put(newChainName, DigestUtil.sha1Hex(chainData)); + chainSHAMap.put(newChainId, DigestUtil.sha1Hex(chainData)); } - //修改chainFieldNum - chainFieldNum = newChainSet.size(); } //遍历Map,判断各个chain的值有无变化 for(Map.Entry entry: chainSHAMap.entrySet()) { - String chainName = entry.getKey(); + String chainId = entry.getKey(); String oldSHA = entry.getValue(); - String newSHA = chainJedis.evalsha(valueLua, 2, chainKey, chainName).toString(); + String newSHA = chainJedis.evalsha(valueLua, 2, chainKey, chainId).toString(); if (StrUtil.equals(newSHA, "nil")) { - //新SHA值为nil, 即未获取到该chain,表示该chain已被删除 - FlowBus.removeChain(chainName); - LOG.info("starting reload flow config... delete key={}", chainName); + //新SHA值为nil, 即未获取到该chain,表示该chain已被删除 + FlowBus.removeChain(chainId); + LOG.info("starting reload flow config... delete key={}", chainId); //修改SHAMap - chainSHAMap.remove(chainName); + chainSHAMap.remove(chainId); } else if (!StrUtil.equals(newSHA, oldSHA)) { - //SHA值发生变化 表示该chain的值已被修改 重新拉取变化的chain - String chainData = chainJedis.hget(chainKey, chainName); - LiteFlowChainELBuilder.createChain().setChainId(chainName).setEL(chainData).build(); - LOG.info("starting poll flow config... update key={} new value={},", chainName, chainData); + //SHA值发生变化,表示该chain的值已被修改,重新拉取变化的chain + String chainData = chainJedis.hget(chainKey, chainId); + LiteFlowChainELBuilder.createChain().setChainId(chainId).setEL(chainData).build(); + LOG.info("starting poll flow config... update key={} new value={},", chainId, chainData); //修改SHAMap - chainSHAMap.put(chainName, newSHA); + chainSHAMap.put(chainId, newSHA); + } + //SHA值无变化,表示该chain未改变 + } + + if (chainFieldNum > chainSHAMap.size()) { + //如果封装的SHAMap数量比新的chain总数少, 说明有修改了chainId的情况 + //因为遍历到旧的id时会取到nil,SHAMap会把原来的chainId删掉,但没有机会添加新的chainId + //在此处重新拉取所有chainId集合,补充添加新chainId + Set newChainSet = chainJedis.hkeys(chainKey); + for (String chainId : newChainSet) { + if (chainSHAMap.get(chainId) == null) { + //将新chainId添加到LiteFlowChainELBuilder和SHAMap + String chainData = chainJedis.hget(chainKey, chainId); + LiteFlowChainELBuilder.createChain().setChainId(chainId).setEL(chainData).build(); + LOG.info("starting poll flow config... update key={} new value={},", chainId, chainData); + chainSHAMap.put(chainId, DigestUtil.sha1Hex(chainData)); + } } - //SHA值无变化 表示该chain未改变 } } }; diff --git a/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/util/RedisParserBySubscribe.java b/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/util/RedisParserBySubscribe.java index 934c50340..4ee4cea1c 100644 --- a/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/util/RedisParserBySubscribe.java +++ b/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/util/RedisParserBySubscribe.java @@ -45,8 +45,8 @@ public class RedisParserBySubscribe implements RedisParserHelper { try { try { - this.chainClient = ContextAwareHolder.loadContextAware().getBean("chainRClient"); - this.scriptClient = ContextAwareHolder.loadContextAware().getBean("scriptRClient"); + this.chainClient = ContextAwareHolder.loadContextAware().getBean("chainClient"); + this.scriptClient = ContextAwareHolder.loadContextAware().getBean("scriptClient"); } catch (Exception ignored) { } @@ -69,11 +69,13 @@ public class RedisParserBySubscribe implements RedisParserHelper { private Config getRedissonConfig(RedisParserVO redisParserVO, Integer dataBase) { Config config = new Config(); String redisAddress = StrFormatter.format(REDIS_URL_PATTERN, redisParserVO.getHost(), redisParserVO.getPort()); + //如果配置了密码 if (StrUtil.isNotBlank(redisParserVO.getPassword())) { config.useSingleServer().setAddress(redisAddress) .setPassword(redisParserVO.getPassword()) .setDatabase(dataBase); } + //没有配置密码 else { config.useSingleServer().setAddress(redisAddress) .setDatabase(dataBase); @@ -93,10 +95,10 @@ public class RedisParserBySubscribe implements RedisParserHelper { } // 获取chainKey下的所有子节点内容List List chainItemContentList = new ArrayList<>(); - for (String chainName : chainNameSet) { - String chainData = chainKey.get(chainName); + for (String chainId : chainNameSet) { + String chainData = chainKey.get(chainId); if (StrUtil.isNotBlank(chainData)) { - chainItemContentList.add(StrUtil.format(CHAIN_XML_PATTERN, chainName, chainData)); + chainItemContentList.add(StrUtil.format(CHAIN_XML_PATTERN, chainId, chainData)); } } // 合并成所有chain的xml内容 diff --git a/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/vo/RedisParserVO.java b/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/vo/RedisParserVO.java index 45d12a4b8..d3f53f54d 100644 --- a/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/vo/RedisParserVO.java +++ b/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/vo/RedisParserVO.java @@ -13,7 +13,7 @@ public class RedisParserVO { private String host; /*端口号*/ - private String port; + private Integer port; /*密码*/ private String password; @@ -44,11 +44,11 @@ public class RedisParserVO { this.host = host; } - public String getPort() { + public Integer getPort() { return port; } - public void setPort(String port) { + public void setPort(Integer port) { this.port = port; }