mirror of
https://gitee.com/dromara/liteFlow.git
synced 2026-05-14 12:12:08 +08:00
add poll task of chain
This commit is contained in:
@@ -32,6 +32,12 @@
|
||||
<artifactId>jedis</artifactId>
|
||||
<version>${jedis.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>cn.hutool</groupId>
|
||||
<artifactId>hutool-crypto</artifactId>
|
||||
<version>${hutool-crypto.version}</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
</project>
|
||||
@@ -4,14 +4,18 @@ import cn.hutool.core.collection.CollUtil;
|
||||
import cn.hutool.core.collection.CollectionUtil;
|
||||
import cn.hutool.core.util.ObjectUtil;
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import cn.hutool.crypto.digest.DigestUtil;
|
||||
import com.yomahub.liteflow.builder.el.LiteFlowChainELBuilder;
|
||||
import com.yomahub.liteflow.flow.FlowBus;
|
||||
import com.yomahub.liteflow.parser.redis.exception.RedisException;
|
||||
import com.yomahub.liteflow.parser.redis.vo.RedisParserVO;
|
||||
import com.yomahub.liteflow.spi.holder.ContextAwareHolder;
|
||||
import redis.clients.jedis.Jedis;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
* Redis 轮询机制实现类
|
||||
@@ -24,33 +28,59 @@ public class RedisParserByPolling implements RedisParserHelper{
|
||||
|
||||
private final RedisParserVO redisParserVO;
|
||||
|
||||
private Jedis chainClient;
|
||||
private Jedis chainJedis;
|
||||
|
||||
private Jedis scriptClient;
|
||||
private Jedis scriptJedis;
|
||||
|
||||
//chainKey中chain总数
|
||||
private Integer chainFieldNum = 0;
|
||||
|
||||
//scriptKey中script总数
|
||||
private Integer scriptFieldNum = 0;
|
||||
|
||||
//chainKey中value的SHA1加密值 用于轮询时确定value是否变化
|
||||
private Map<String, String> chainSHAMap = new HashMap<>();
|
||||
|
||||
//scriptKey中value的SHA1加密值 用于轮询时确定value是否变化
|
||||
private Map<String, String> scriptSHAMap = new HashMap<>();
|
||||
|
||||
//计算hash中field数量的lua脚本
|
||||
private String luaOfKey = "local keys = redis.call(\"hkeys\", KEYS[1]);\n" +
|
||||
"return #keys;\n";
|
||||
|
||||
//计算hash中value的SHA值的lua脚本
|
||||
private String luaOfValue = "local key = KEYS[1];\n" +
|
||||
"local field = KEYS[2];\n" +
|
||||
"local value, err = redis.call(\"hget\", key, field);\n" +
|
||||
"if value == false or value == nil then\n" +
|
||||
" return \"nil\";\n" +
|
||||
"end\n" +
|
||||
"local sha1 = redis.sha1hex(value);\n" +
|
||||
"return sha1;";
|
||||
|
||||
public RedisParserByPolling(RedisParserVO redisParserVO) {
|
||||
this.redisParserVO = redisParserVO;
|
||||
|
||||
try{
|
||||
try{
|
||||
this.chainClient = ContextAwareHolder.loadContextAware().getBean("chainJClient");
|
||||
this.scriptClient = ContextAwareHolder.loadContextAware().getBean("scriptJClient");
|
||||
this.chainJedis = ContextAwareHolder.loadContextAware().getBean("chainJClient");
|
||||
this.scriptJedis = ContextAwareHolder.loadContextAware().getBean("scriptJClient");
|
||||
}
|
||||
catch (Exception ignored) {
|
||||
}
|
||||
if (ObjectUtil.isNull(chainClient)) {
|
||||
chainClient = new Jedis(redisParserVO.getHost(), Integer.parseInt(redisParserVO.getPort()));
|
||||
if (ObjectUtil.isNull(chainJedis)) {
|
||||
chainJedis = new Jedis(redisParserVO.getHost(), Integer.parseInt(redisParserVO.getPort()));
|
||||
if (StrUtil.isNotBlank(redisParserVO.getPassword())) {
|
||||
chainClient.auth(redisParserVO.getPassword());
|
||||
chainJedis.auth(redisParserVO.getPassword());
|
||||
}
|
||||
chainClient.select(redisParserVO.getChainDataBase());
|
||||
chainJedis.select(redisParserVO.getChainDataBase());
|
||||
//如果有脚本数据
|
||||
if (ObjectUtil.isNotNull(redisParserVO.getScriptDataBase())) {
|
||||
scriptClient = new Jedis(redisParserVO.getHost(), Integer.parseInt(redisParserVO.getPort()));
|
||||
scriptJedis = new Jedis(redisParserVO.getHost(), Integer.parseInt(redisParserVO.getPort()));
|
||||
if (StrUtil.isNotBlank(redisParserVO.getPassword())) {
|
||||
scriptClient.auth(redisParserVO.getPassword());
|
||||
scriptJedis.auth(redisParserVO.getPassword());
|
||||
}
|
||||
scriptClient.select(redisParserVO.getScriptDataBase());
|
||||
scriptJedis.select(redisParserVO.getScriptDataBase());
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -64,17 +94,22 @@ public class RedisParserByPolling implements RedisParserHelper{
|
||||
try {
|
||||
// 检查chainKey下有没有子节点
|
||||
String chainKey = redisParserVO.getChainKey();
|
||||
Set<String> chainNameSet = chainClient.hkeys(chainKey);
|
||||
Set<String> chainNameSet = chainJedis.hkeys(chainKey);
|
||||
if (CollectionUtil.isEmpty(chainNameSet)) {
|
||||
throw new RedisException(StrUtil.format("There are no chains in key [{}]", chainKey));
|
||||
}
|
||||
chainFieldNum = chainNameSet.size();
|
||||
// 获取chainKey下的所有子节点内容List
|
||||
List<String> chainItemContentList = new ArrayList<>();
|
||||
for (String chainName : chainNameSet) {
|
||||
String chainData = chainClient.hget(chainKey, chainName);
|
||||
String chainData = chainJedis.hget(chainKey, chainName);
|
||||
if (StrUtil.isNotBlank(chainData)) {
|
||||
chainItemContentList.add(StrUtil.format(CHAIN_XML_PATTERN, chainName, chainData));
|
||||
}
|
||||
|
||||
//计算该chainData的SHA值
|
||||
String chainSHA = DigestUtil.sha1Hex(chainData);
|
||||
chainSHAMap.put(chainName, chainSHA);
|
||||
}
|
||||
// 合并成所有chain的xml内容
|
||||
String chainAllContent = CollUtil.join(chainItemContentList, StrUtil.EMPTY);
|
||||
@@ -83,7 +118,8 @@ public class RedisParserByPolling implements RedisParserHelper{
|
||||
String scriptAllContent = StrUtil.EMPTY;
|
||||
if (hasScript()) {
|
||||
String scriptKey = redisParserVO.getScriptKey();
|
||||
Set<String> scriptFieldSet = scriptClient.hkeys(scriptKey);
|
||||
Set<String> scriptFieldSet = scriptJedis.hkeys(scriptKey);
|
||||
scriptFieldNum = scriptFieldSet.size();
|
||||
|
||||
List<String> scriptItemContentList = new ArrayList<>();
|
||||
for (String scriptFieldValue : scriptFieldSet) {
|
||||
@@ -93,7 +129,7 @@ public class RedisParserByPolling implements RedisParserHelper{
|
||||
StrUtil.format("The name of the redis field [{}] in scriptKey [{}] is invalid",
|
||||
scriptFieldValue, scriptKey));
|
||||
}
|
||||
String scriptData = scriptClient.hget(scriptKey, scriptFieldValue);
|
||||
String scriptData = scriptJedis.hget(scriptKey, scriptFieldValue);
|
||||
|
||||
// 有语言类型
|
||||
if (StrUtil.isNotBlank(nodeSimpleVO.getLanguage())) {
|
||||
@@ -106,6 +142,10 @@ public class RedisParserByPolling implements RedisParserHelper{
|
||||
scriptItemContentList.add(StrUtil.format(NODE_ITEM_XML_PATTERN, nodeSimpleVO.getNodeId(),
|
||||
nodeSimpleVO.getName(), nodeSimpleVO.getType(), scriptData));
|
||||
}
|
||||
|
||||
//计算scriptData的SHA值
|
||||
String scriptSHA = DigestUtil.sha1Hex(scriptData);
|
||||
scriptSHAMap.put(scriptFieldValue, scriptSHA);
|
||||
}
|
||||
|
||||
scriptAllContent = StrUtil.format(NODE_XML_PATTERN,
|
||||
@@ -120,7 +160,7 @@ public class RedisParserByPolling implements RedisParserHelper{
|
||||
}
|
||||
|
||||
public boolean hasScript() {
|
||||
if (ObjectUtil.isNull(scriptClient) || ObjectUtil.isNull(redisParserVO.getScriptDataBase())) {
|
||||
if (ObjectUtil.isNull(scriptJedis) || ObjectUtil.isNull(redisParserVO.getScriptDataBase())) {
|
||||
return false;
|
||||
}
|
||||
try{
|
||||
@@ -128,7 +168,7 @@ public class RedisParserByPolling implements RedisParserHelper{
|
||||
if (StrUtil.isBlank(scriptKey)) {
|
||||
return false;
|
||||
}
|
||||
Set<String> scriptKeySet = scriptClient.hkeys(scriptKey);
|
||||
Set<String> scriptKeySet = scriptJedis.hkeys(scriptKey);
|
||||
return !CollUtil.isEmpty(scriptKeySet);
|
||||
}
|
||||
catch (Exception e) {
|
||||
@@ -138,6 +178,74 @@ public class RedisParserByPolling implements RedisParserHelper{
|
||||
|
||||
@Override
|
||||
public void listenRedis() {
|
||||
//将lua脚本添加到chainJedis脚本缓存
|
||||
String keyLuaOfChain = chainJedis.scriptLoad(luaOfKey);
|
||||
String valueLuaOfChain = chainJedis.scriptLoad(luaOfValue);
|
||||
|
||||
//定时任务线程池
|
||||
ScheduledExecutorService pool = Executors.newScheduledThreadPool(10);
|
||||
//轮询chain内容的定时任务
|
||||
pool.scheduleAtFixedRate(pollChainTask(keyLuaOfChain, valueLuaOfChain),
|
||||
60, Long.valueOf(redisParserVO.getPollingInterval()), TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* 用于轮询chain的定时任务
|
||||
*/
|
||||
private Runnable pollChainTask(String keyLua, String valueLua) {
|
||||
Runnable r = new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
String chainKey = redisParserVO.getChainKey();
|
||||
//先判断chainKey中chain数量有无增长
|
||||
String keyNum = chainJedis.evalsha(keyLua, 1, chainKey).toString();
|
||||
if (Integer.parseInt(keyNum) > chainFieldNum) {
|
||||
//有新增加的chain,重新从redis中拉取chainId集合, 对比出新增的chain
|
||||
Set<String> newChainSet = chainJedis.hkeys(chainKey);
|
||||
Set<String> oldChainSet = chainSHAMap.keySet();
|
||||
//求出差集,即新增的chain
|
||||
Set<String> newAdd = new HashSet<>();
|
||||
newAdd.addAll(newChainSet);
|
||||
newAdd.removeAll(oldChainSet);
|
||||
for (String newChainName : newAdd) {
|
||||
String chainData = chainJedis.hget(chainKey, newChainName);
|
||||
LiteFlowChainELBuilder.createChain().setChainId(newChainName).setEL(chainData).build();
|
||||
LOG.info("starting poll flow config... update key={} new value={},", newChainName, chainData);
|
||||
|
||||
//修改SHAMap
|
||||
chainSHAMap.put(newChainName, DigestUtil.sha1Hex(chainData));
|
||||
}
|
||||
//修改chainFieldNum
|
||||
chainFieldNum = newChainSet.size();
|
||||
}
|
||||
|
||||
//遍历Map,判断各个chain的值有无变化
|
||||
for(Map.Entry<String, String> entry: chainSHAMap.entrySet()) {
|
||||
String chainName = entry.getKey();
|
||||
String oldSHA = entry.getValue();
|
||||
String newSHA = chainJedis.evalsha(valueLua, 2, chainKey, chainName).toString();
|
||||
if (StrUtil.equals(newSHA, "nil")) {
|
||||
//新SHA值为nil, 即未获取到该chain,表示该chain已被删除
|
||||
FlowBus.removeChain(chainName);
|
||||
LOG.info("starting reload flow config... delete key={}", chainName);
|
||||
|
||||
//修改SHAMap
|
||||
chainSHAMap.remove(chainName);
|
||||
}
|
||||
else if (!StrUtil.equals(newSHA, oldSHA)) {
|
||||
//SHA值发生变化 表示该chain的值已被修改 重新拉取变化的chain
|
||||
String chainData = chainJedis.hget(chainKey, chainName);
|
||||
LiteFlowChainELBuilder.createChain().setChainId(chainName).setEL(chainData).build();
|
||||
LOG.info("starting poll flow config... update key={} new value={},", chainName, chainData);
|
||||
|
||||
//修改SHAMap
|
||||
chainSHAMap.put(chainName, newSHA);
|
||||
}
|
||||
//SHA值无变化 表示该chain未改变
|
||||
}
|
||||
}
|
||||
};
|
||||
return r;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -172,7 +172,7 @@ public class RedisParserBySubscribe implements RedisParserHelper {
|
||||
});
|
||||
//修改 chain
|
||||
chainKey.addListener((EntryUpdatedListener<String, String>) event -> {
|
||||
LOG.info("starting reload flow config... update path={} new value={},", event.getKey(), event.getValue());
|
||||
LOG.info("starting reload flow config... update key={} new value={},", event.getKey(), event.getValue());
|
||||
LiteFlowChainELBuilder.createChain().setChainId(event.getKey()).setEL(event.getValue()).build();
|
||||
});
|
||||
//删除 chain
|
||||
|
||||
@@ -21,8 +21,8 @@ public class RedisParserVO {
|
||||
/*监听机制 轮询为poll 订阅为subscribe 默认为poll*/
|
||||
private String mode = "poll";
|
||||
|
||||
/*轮询时间间隔(ms) 默认1分钟 若选择订阅机制可不配置*/
|
||||
private String pollingInterval = "60000";
|
||||
/*轮询时间间隔(s) 默认1分钟 若选择订阅机制可不配置*/
|
||||
private String pollingInterval = "60";
|
||||
|
||||
/*chain表配置的数据库号*/
|
||||
private Integer chainDataBase;
|
||||
|
||||
Reference in New Issue
Block a user