bug #IC9ZZ8 reids poll模式代码问题

This commit is contained in:
everywhere.z
2025-05-23 19:46:09 +08:00
parent 535c2b7c84
commit e695648728
2 changed files with 105 additions and 54 deletions

View File

@@ -0,0 +1,46 @@
package com.yomahub.liteflow.util;
/**
* 三元值对象
*
* @author Bryan.Zhang
* @since 2.13.3
*/
public class TupleOf3<A, B, C> {
private A a;
private B b;
private C c;
public TupleOf3(A a, B b, C c) {
this.a = a;
this.b = b;
this.c = c;
}
public A getA() {
return a;
}
public B getB() {
return b;
}
public void setA(A a) {
this.a = a;
}
public void setB(B b) {
this.b = b;
}
public C getC() {
return c;
}
public void setC(C c) {
this.c = c;
}
}

View File

@@ -1,6 +1,8 @@
package com.yomahub.liteflow.parser.redis.mode.polling;
import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.lang.Pair;
import cn.hutool.core.util.BooleanUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.crypto.digest.DigestUtil;
import com.yomahub.liteflow.builder.el.LiteFlowChainELBuilder;
@@ -10,13 +12,18 @@ import com.yomahub.liteflow.log.LFLoggerManager;
import com.yomahub.liteflow.parser.redis.mode.RClient;
import com.yomahub.liteflow.parser.redis.vo.RedisParserVO;
import com.yomahub.liteflow.util.RuleParsePluginUtil;
import com.yomahub.liteflow.util.TupleOf3;
import java.util.*;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
/**
* 用于轮询chain的定时任务
*
* @author hxinyu
* @author Bryan.Zhang
* @since 2.11.0
*/
public class ChainPollingTask implements Runnable {
@@ -27,6 +34,7 @@ public class ChainPollingTask implements Runnable {
private Integer chainNum;
//key为chainIdvalue为缓存的SHA值
private Map<String, String> chainSHAMap;
private String keyLua;
@@ -58,69 +66,66 @@ public class ChainPollingTask implements Runnable {
//修改chainNum为最新chain数量
chainNum = Integer.parseInt(keyNum);
List<String> needDelete = new ArrayList<>();
//遍历Map,判断各个chain的value有无变化修改变化了值的chain和被删除的chain
for (Map.Entry<String, String> entry : chainSHAMap.entrySet()) {
String chainId = entry.getKey();
String oldSHA = entry.getValue();
Pair<Boolean/*启停*/, String/*id*/> pair = RuleParsePluginUtil.parseIdKey(chainId);
// 如果是停用,就直接进删除
if (pair.getKey()){
FlowBus.removeChain(pair.getValue());
needDelete.add(chainId);
continue;
//拿到所有的Chain的HashKey
Set<String> newChainHashKeySet = chainClient.hkeys(chainKey);
List<TupleOf3<String, String, Boolean>> tupleOf3List = newChainHashKeySet.stream().map(chainHashKey -> {
Pair<Boolean/*启停*/, String/*id*/> pair = RuleParsePluginUtil.parseIdKey(chainHashKey);
//TupleOf3为三元值对象在这里A为chainHashKeyB为解析到的chainIdC为是否启用
return new TupleOf3<>(chainHashKey, pair.getValue(), pair.getKey());
}).collect(Collectors.toList());
tupleOf3List.forEach(tupleOf3 -> {
String chainHashKey = tupleOf3.getA();
String chainId = tupleOf3.getB();
Boolean enable = tupleOf3.getC();
// 如果是停用,就直接删除
if (BooleanUtil.isFalse(enable)){
LOG.info("starting reload flow config... delete key={}", chainId);
chainSHAMap.remove(chainId);
FlowBus.removeChain(chainId);
return;
}
//在redis服务端通过Lua脚本计算SHA值
String newSHA = chainClient.evalSha(valueLua, chainKey, chainId);
if (StrUtil.equals(newSHA, "nil")) {
//新SHA值为nil, 即未获取到该chain,表示该chain已被删除
FlowBus.removeChain(pair.getValue());
LOG.info("starting reload flow config... delete key={}", chainId);
String newSHA = chainClient.evalSha(valueLua, chainKey, chainHashKey);
//添加到待删除的list 后续统一从SHAMap中移除
//不在这里直接移除是为了避免先删除导致chainSHAMap并没有完全遍历完 chain删除不全
needDelete.add(chainId);
if (StrUtil.isBlank(newSHA) || "nil".equals(newSHA)){
FlowBus.removeChain(chainId);
return;
}
else if (!StrUtil.equals(newSHA, oldSHA)) {
//SHA值发生变化,表示该chain的值已被修改,重新拉取变化的chain
String chainData = chainClient.hget(chainKey, chainId);
LiteFlowChainELBuilder.createChain().setChainId(pair.getValue()).setEL(chainData).build();
LOG.info("starting reload flow config... update key={} new value={},", chainId, chainData);
//修改SHAMap
// chainSHAMap不含有redis取到的chainId说明是新增的
if (!chainSHAMap.containsKey(chainId)){
String chainEL = chainClient.hget(chainKey, chainHashKey);
LiteFlowChainELBuilder.createChain().setChainId(chainId).setEL(chainEL).build();
chainSHAMap.put(chainId, newSHA);
}
//SHA值无变化,表示该chain未改变
}
//统一从SHAMap中移除要删除的chain
for (String chainId : needDelete) {
chainSHAMap.remove(chainId);
}
//处理新添加chain和chainId被修改的情况
if (chainNum > chainSHAMap.size()) {
//如果封装的SHAMap数量比最新chain总数少, 说明有两种情况:
// 1、添加了新chain
// 2、修改了chainId:因为遍历到旧的id时会取到nil,SHAMap会把原来的chainId删掉,但没有机会添加新的chainId
// 3、上述两者结合
//在此处重新拉取所有chainId集合,补充添加新chain
Set<String> newChainSet = chainClient.hkeys(chainKey);
for (String chainId : newChainSet) {
Pair<Boolean/*启停*/, String/*id*/> pair = RuleParsePluginUtil.parseIdKey(chainId);
if (!chainSHAMap.containsKey(chainId)) {
//将新chainId添加到LiteFlowChainELBuilder和SHAMap
String chainData = chainClient.hget(chainKey, chainId);
// 如果是启用,才装配
if (pair.getKey()){
LiteFlowChainELBuilder.createChain().setChainId(pair.getValue()).setEL(chainData).build();
LOG.info("starting reload flow config... create key={} new value={},", chainId, chainData);
chainSHAMap.put(chainId, DigestUtil.sha1Hex(chainData));
}
}else{
String oldSHA = chainSHAMap.get(chainId);
if (!StrUtil.equals(newSHA, oldSHA)) {
//SHA值发生变化,表示该chain的值已被修改,重新拉取变化的chain
String chainEL = chainClient.hget(chainKey, chainHashKey);
LiteFlowChainELBuilder.createChain().setChainId(chainId).setEL(chainEL).build();
LOG.info("starting reload flow config... update key={} new value={},", chainId, chainEL);
//修改SHAMap
chainSHAMap.put(chainId, newSHA);
}
}
});
// 这里是为了处理在redis服务端删除但是本地缓存还存在chainId的情况
// 这表明是服务端这边已经删除了chain
if (CollectionUtil.isNotEmpty(chainSHAMap)){
Set<String> newChainIdSet = tupleOf3List.stream().map(TupleOf3::getB).collect(Collectors.toSet());
Collection<String> deletedChainIdSet = CollectionUtil.subtract(chainSHAMap.keySet(), newChainIdSet);
deletedChainIdSet.forEach(chainId -> {
chainSHAMap.remove(chainId);
FlowBus.removeChain(chainId);
LOG.info("starting reload flow config... delete key={}", chainId);
});
}
} catch (Exception e) {
LOG.error("[Exception during chain polling] " + e.getMessage(), e);