This commit is contained in:
houxinyu
2023-07-09 16:51:42 +08:00
parent ed3b6354b4
commit ae7896569c
4 changed files with 60 additions and 34 deletions

View File

@@ -89,7 +89,7 @@ public class RedisXmlELParser extends ClassXmlFlowELParser {
if (StrUtil.isBlank(redisParserVO.getHost())) {
throw new RedisException(StrFormatter.format(ERROR_MSG_PATTERN, "host"));
}
if (StrUtil.isBlank(redisParserVO.getPort())) {
if (ObjectUtil.isNull(redisParserVO.getPort())) {
throw new RedisException(StrFormatter.format(ERROR_MSG_PATTERN, "port"));
}
if (ObjectUtil.isNull(redisParserVO.getChainDataBase())) {

View File

@@ -63,20 +63,21 @@ public class RedisParserByPolling implements RedisParserHelper{
try{
try{
this.chainJedis = ContextAwareHolder.loadContextAware().getBean("chainJClient");
this.scriptJedis = ContextAwareHolder.loadContextAware().getBean("scriptJClient");
this.chainJedis = ContextAwareHolder.loadContextAware().getBean("chainJedis");
this.scriptJedis = ContextAwareHolder.loadContextAware().getBean("scriptJedis");
}
catch (Exception ignored) {
}
if (ObjectUtil.isNull(chainJedis)) {
chainJedis = new Jedis(redisParserVO.getHost(), Integer.parseInt(redisParserVO.getPort()));
chainJedis = new Jedis(redisParserVO.getHost(), redisParserVO.getPort());
//如果配置了密码
if (StrUtil.isNotBlank(redisParserVO.getPassword())) {
chainJedis.auth(redisParserVO.getPassword());
}
chainJedis.select(redisParserVO.getChainDataBase());
//如果有脚本数据
if (ObjectUtil.isNotNull(redisParserVO.getScriptDataBase())) {
scriptJedis = new Jedis(redisParserVO.getHost(), Integer.parseInt(redisParserVO.getPort()));
scriptJedis = new Jedis(redisParserVO.getHost(), redisParserVO.getPort());
if (StrUtil.isNotBlank(redisParserVO.getPassword())) {
scriptJedis.auth(redisParserVO.getPassword());
}
@@ -176,6 +177,9 @@ public class RedisParserByPolling implements RedisParserHelper{
}
}
/**
* 定时轮询拉取Redis中变化的数据
*/
@Override
public void listenRedis() {
//将lua脚本添加到chainJedis脚本缓存
@@ -188,8 +192,10 @@ public class RedisParserByPolling implements RedisParserHelper{
pool.scheduleAtFixedRate(pollChainTask(keyLuaOfChain, valueLuaOfChain),
60, Long.valueOf(redisParserVO.getPollingInterval()), TimeUnit.SECONDS);
//如果有脚本内容
if (ObjectUtil.isNotNull(scriptJedis) && ObjectUtil.isNotNull(redisParserVO.getScriptDataBase())) {
//如果有脚本
if (ObjectUtil.isNotNull(scriptJedis) && ObjectUtil.isNotNull(redisParserVO.getScriptDataBase())
&& StrUtil.isNotBlank(redisParserVO.getScriptKey())) {
//将lua脚本添加到scriptJedis脚本缓存
String keyLuaOfScript = scriptJedis.scriptLoad(luaOfKey);
String valueLuaOfScript = scriptJedis.scriptLoad(luaOfValue);
}
@@ -199,6 +205,8 @@ public class RedisParserByPolling implements RedisParserHelper{
/**
* 用于轮询chain的定时任务
* 首先根据hash中field数量的变化拉取新增的chain
* 再根据hash中value的SHA值修改变化的和被删除的chain
*/
private Runnable pollChainTask(String keyLua, String valueLua) {
Runnable r = new Runnable() {
@@ -207,6 +215,8 @@ public class RedisParserByPolling implements RedisParserHelper{
String chainKey = redisParserVO.getChainKey();
//先判断chainKey中chain数量有无增长
String keyNum = chainJedis.evalsha(keyLua, 1, chainKey).toString();
//修改chainFieldNum为最新chain数量
chainFieldNum = Integer.parseInt(keyNum);
if (Integer.parseInt(keyNum) > chainFieldNum) {
//有新增加的chain,重新从redis中拉取chainId集合, 对比出新增的chain
Set<String> newChainSet = chainJedis.hkeys(chainKey);
@@ -215,41 +225,55 @@ public class RedisParserByPolling implements RedisParserHelper{
Set<String> newAdd = new HashSet<>();
newAdd.addAll(newChainSet);
newAdd.removeAll(oldChainSet);
for (String newChainName : newAdd) {
String chainData = chainJedis.hget(chainKey, newChainName);
LiteFlowChainELBuilder.createChain().setChainId(newChainName).setEL(chainData).build();
LOG.info("starting poll flow config... update key={} new value={},", newChainName, chainData);
for (String newChainId : newAdd) {
String chainData = chainJedis.hget(chainKey, newChainId);
LiteFlowChainELBuilder.createChain().setChainId(newChainId).setEL(chainData).build();
LOG.info("starting poll flow config... update key={} new value={},", newChainId, chainData);
//修改SHAMap
chainSHAMap.put(newChainName, DigestUtil.sha1Hex(chainData));
chainSHAMap.put(newChainId, DigestUtil.sha1Hex(chainData));
}
//修改chainFieldNum
chainFieldNum = newChainSet.size();
}
//遍历Map,判断各个chain的值有无变化
for(Map.Entry<String, String> entry: chainSHAMap.entrySet()) {
String chainName = entry.getKey();
String chainId = entry.getKey();
String oldSHA = entry.getValue();
String newSHA = chainJedis.evalsha(valueLua, 2, chainKey, chainName).toString();
String newSHA = chainJedis.evalsha(valueLua, 2, chainKey, chainId).toString();
if (StrUtil.equals(newSHA, "nil")) {
//新SHA值为nil, 即未获取到该chain表示该chain已被删除
FlowBus.removeChain(chainName);
LOG.info("starting reload flow config... delete key={}", chainName);
//新SHA值为nil, 即未获取到该chain,表示该chain已被删除
FlowBus.removeChain(chainId);
LOG.info("starting reload flow config... delete key={}", chainId);
//修改SHAMap
chainSHAMap.remove(chainName);
chainSHAMap.remove(chainId);
}
else if (!StrUtil.equals(newSHA, oldSHA)) {
//SHA值发生变化 表示该chain的值已被修改 重新拉取变化的chain
String chainData = chainJedis.hget(chainKey, chainName);
LiteFlowChainELBuilder.createChain().setChainId(chainName).setEL(chainData).build();
LOG.info("starting poll flow config... update key={} new value={},", chainName, chainData);
//SHA值发生变化,表示该chain的值已被修改,重新拉取变化的chain
String chainData = chainJedis.hget(chainKey, chainId);
LiteFlowChainELBuilder.createChain().setChainId(chainId).setEL(chainData).build();
LOG.info("starting poll flow config... update key={} new value={},", chainId, chainData);
//修改SHAMap
chainSHAMap.put(chainName, newSHA);
chainSHAMap.put(chainId, newSHA);
}
//SHA值无变化,表示该chain未改变
}
if (chainFieldNum > chainSHAMap.size()) {
//如果封装的SHAMap数量比新的chain总数少, 说明有修改了chainId的情况
//因为遍历到旧的id时会取到nil,SHAMap会把原来的chainId删掉,但没有机会添加新的chainId
//在此处重新拉取所有chainId集合,补充添加新chainId
Set<String> newChainSet = chainJedis.hkeys(chainKey);
for (String chainId : newChainSet) {
if (chainSHAMap.get(chainId) == null) {
//将新chainId添加到LiteFlowChainELBuilder和SHAMap
String chainData = chainJedis.hget(chainKey, chainId);
LiteFlowChainELBuilder.createChain().setChainId(chainId).setEL(chainData).build();
LOG.info("starting poll flow config... update key={} new value={},", chainId, chainData);
chainSHAMap.put(chainId, DigestUtil.sha1Hex(chainData));
}
}
//SHA值无变化 表示该chain未改变
}
}
};

View File

@@ -45,8 +45,8 @@ public class RedisParserBySubscribe implements RedisParserHelper {
try {
try {
this.chainClient = ContextAwareHolder.loadContextAware().getBean("chainRClient");
this.scriptClient = ContextAwareHolder.loadContextAware().getBean("scriptRClient");
this.chainClient = ContextAwareHolder.loadContextAware().getBean("chainClient");
this.scriptClient = ContextAwareHolder.loadContextAware().getBean("scriptClient");
}
catch (Exception ignored) {
}
@@ -69,11 +69,13 @@ public class RedisParserBySubscribe implements RedisParserHelper {
private Config getRedissonConfig(RedisParserVO redisParserVO, Integer dataBase) {
Config config = new Config();
String redisAddress = StrFormatter.format(REDIS_URL_PATTERN, redisParserVO.getHost(), redisParserVO.getPort());
//如果配置了密码
if (StrUtil.isNotBlank(redisParserVO.getPassword())) {
config.useSingleServer().setAddress(redisAddress)
.setPassword(redisParserVO.getPassword())
.setDatabase(dataBase);
}
//没有配置密码
else {
config.useSingleServer().setAddress(redisAddress)
.setDatabase(dataBase);
@@ -93,10 +95,10 @@ public class RedisParserBySubscribe implements RedisParserHelper {
}
// 获取chainKey下的所有子节点内容List
List<String> chainItemContentList = new ArrayList<>();
for (String chainName : chainNameSet) {
String chainData = chainKey.get(chainName);
for (String chainId : chainNameSet) {
String chainData = chainKey.get(chainId);
if (StrUtil.isNotBlank(chainData)) {
chainItemContentList.add(StrUtil.format(CHAIN_XML_PATTERN, chainName, chainData));
chainItemContentList.add(StrUtil.format(CHAIN_XML_PATTERN, chainId, chainData));
}
}
// 合并成所有chain的xml内容

View File

@@ -13,7 +13,7 @@ public class RedisParserVO {
private String host;
/*端口号*/
private String port;
private Integer port;
/*密码*/
private String password;
@@ -44,11 +44,11 @@ public class RedisParserVO {
this.host = host;
}
public String getPort() {
public Integer getPort() {
return port;
}
public void setPort(String port) {
public void setPort(Integer port) {
this.port = port;
}