mirror of
https://gitee.com/dromara/liteFlow.git
synced 2026-05-14 20:22:07 +08:00
add poll script task
This commit is contained in:
@@ -33,10 +33,10 @@ public class RedisParserByPolling implements RedisParserHelper{
|
||||
private Jedis scriptJedis;
|
||||
|
||||
//chainKey中chain总数
|
||||
private Integer chainFieldNum = 0;
|
||||
private Integer chainNum = 0;
|
||||
|
||||
//scriptKey中script总数
|
||||
private Integer scriptFieldNum = 0;
|
||||
private Integer scriptNum = 0;
|
||||
|
||||
//chainKey中value的SHA1加密值 用于轮询时确定value是否变化
|
||||
private Map<String, String> chainSHAMap = new HashMap<>();
|
||||
@@ -99,7 +99,7 @@ public class RedisParserByPolling implements RedisParserHelper{
|
||||
if (CollectionUtil.isEmpty(chainNameSet)) {
|
||||
throw new RedisException(StrUtil.format("There are no chains in key [{}]", chainKey));
|
||||
}
|
||||
chainFieldNum = chainNameSet.size();
|
||||
chainNum = chainNameSet.size();
|
||||
// 获取chainKey下的所有子节点内容List
|
||||
List<String> chainItemContentList = new ArrayList<>();
|
||||
for (String chainName : chainNameSet) {
|
||||
@@ -120,7 +120,7 @@ public class RedisParserByPolling implements RedisParserHelper{
|
||||
if (hasScript()) {
|
||||
String scriptKey = redisParserVO.getScriptKey();
|
||||
Set<String> scriptFieldSet = scriptJedis.hkeys(scriptKey);
|
||||
scriptFieldNum = scriptFieldSet.size();
|
||||
scriptNum = scriptFieldSet.size();
|
||||
|
||||
List<String> scriptItemContentList = new ArrayList<>();
|
||||
for (String scriptFieldValue : scriptFieldSet) {
|
||||
@@ -188,7 +188,7 @@ public class RedisParserByPolling implements RedisParserHelper{
|
||||
|
||||
//定时任务线程池
|
||||
ScheduledExecutorService pool = Executors.newScheduledThreadPool(10);
|
||||
//轮询chain内容的定时任务
|
||||
//添加轮询chain的定时任务
|
||||
pool.scheduleAtFixedRate(pollChainTask(keyLuaOfChain, valueLuaOfChain),
|
||||
60, Long.valueOf(redisParserVO.getPollingInterval()), TimeUnit.SECONDS);
|
||||
|
||||
@@ -198,8 +198,10 @@ public class RedisParserByPolling implements RedisParserHelper{
|
||||
//将lua脚本添加到scriptJedis脚本缓存
|
||||
String keyLuaOfScript = scriptJedis.scriptLoad(luaOfKey);
|
||||
String valueLuaOfScript = scriptJedis.scriptLoad(luaOfValue);
|
||||
//添加轮询script的定时任务
|
||||
pool.scheduleAtFixedRate(pollScriptTask(keyLuaOfScript, valueLuaOfScript),
|
||||
60, Long.valueOf(redisParserVO.getPollingInterval()), TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
@@ -211,15 +213,16 @@ public class RedisParserByPolling implements RedisParserHelper{
|
||||
private Runnable pollChainTask(String keyLua, String valueLua) {
|
||||
Runnable r = () -> {
|
||||
String chainKey = redisParserVO.getChainKey();
|
||||
//先获取chainKey中最新的chain数量
|
||||
//Lua获取chainKey中最新的chain数量
|
||||
String keyNum = chainJedis.evalsha(keyLua, 1, chainKey).toString();
|
||||
//修改chainFieldNum为最新chain数量
|
||||
chainFieldNum = Integer.parseInt(keyNum);
|
||||
//修改chainNum为最新chain数量
|
||||
chainNum = Integer.parseInt(keyNum);
|
||||
|
||||
//遍历Map,判断各个chain的value有无变化:修改变化了值的chain和被删除的chain
|
||||
for(Map.Entry<String, String> entry: chainSHAMap.entrySet()) {
|
||||
String chainId = entry.getKey();
|
||||
String oldSHA = entry.getValue();
|
||||
//在redis服务端通过Lua脚本计算SHA值
|
||||
String newSHA = chainJedis.evalsha(valueLua, 2, chainKey, chainId).toString();
|
||||
if (StrUtil.equals(newSHA, "nil")) {
|
||||
//新SHA值为nil, 即未获取到该chain,表示该chain已被删除
|
||||
@@ -242,7 +245,7 @@ public class RedisParserByPolling implements RedisParserHelper{
|
||||
}
|
||||
|
||||
//处理新添加chain和chainId被修改的情况
|
||||
if (chainFieldNum > chainSHAMap.size()) {
|
||||
if (chainNum > chainSHAMap.size()) {
|
||||
//如果封装的SHAMap数量比最新chain总数少, 说明有两种情况:
|
||||
// 1、添加了新chain
|
||||
// 2、修改了chainId:因为遍历到旧的id时会取到nil,SHAMap会把原来的chainId删掉,但没有机会添加新的chainId
|
||||
@@ -262,4 +265,66 @@ public class RedisParserByPolling implements RedisParserHelper{
|
||||
};
|
||||
return r;
|
||||
}
|
||||
|
||||
/**
|
||||
* 用于轮询script的定时任务
|
||||
* 首先根据hash中field数量的变化拉取新增的script
|
||||
* 再根据hash中value的SHA值修改变化的和被删除的script
|
||||
*/
|
||||
private Runnable pollScriptTask(String keyLua, String valueLua) {
|
||||
Runnable r = () -> {
|
||||
String scriptKey = redisParserVO.getScriptKey();
|
||||
//Lua获取scriptKey中最新的script数量
|
||||
String keyNum = scriptJedis.evalsha(keyLua, 1, scriptKey).toString();
|
||||
//修改scriptNum为最新script数量
|
||||
scriptNum = Integer.parseInt(keyNum);
|
||||
|
||||
//遍历Map,判断各个script的value有无变化:修改变化了值的script和被删除的script
|
||||
for(Map.Entry<String, String> entry: scriptSHAMap.entrySet()) {
|
||||
String scriptFieldValue = entry.getKey();
|
||||
String oldSHA = entry.getValue();
|
||||
//在redis服务端通过Lua脚本计算SHA值
|
||||
String newSHA = scriptJedis.evalsha(valueLua, 2, scriptKey, scriptFieldValue).toString();
|
||||
if (StrUtil.equals(newSHA, "nil")) {
|
||||
//新SHA值为nil, 即未获取到该script,表示该script已被删除
|
||||
NodeSimpleVO nodeSimpleVO = convert(scriptFieldValue);
|
||||
FlowBus.getNodeMap().remove(nodeSimpleVO.getNodeId());
|
||||
LOG.info("starting reload flow config... delete key={}", scriptFieldValue);
|
||||
|
||||
//修改SHAMap
|
||||
scriptSHAMap.remove(scriptFieldValue);
|
||||
}
|
||||
else if (!StrUtil.equals(newSHA, oldSHA)) {
|
||||
//SHA值发生变化,表示该script的值已被修改,重新拉取变化的script
|
||||
String scriptData = scriptJedis.hget(scriptKey, scriptFieldValue);
|
||||
changeScriptNode(scriptFieldValue, scriptData);
|
||||
LOG.info("starting reload flow config... update key={} new value={},", scriptFieldValue, scriptData);
|
||||
|
||||
//修改SHAMap
|
||||
scriptSHAMap.put(scriptFieldValue, newSHA);
|
||||
}
|
||||
//SHA值无变化,表示该script未改变
|
||||
}
|
||||
|
||||
//处理新添加script和script名被修改的情况
|
||||
if (scriptNum > scriptSHAMap.size()) {
|
||||
//如果封装的SHAMap数量比最新script总数少, 说明有两种情况:
|
||||
// 1、添加了新script
|
||||
// 2、修改了script名:因为遍历到旧的id时会取到nil,SHAMap会把原来的script删掉,但没有机会添加新的script
|
||||
// 3、上述两者结合
|
||||
//在此处重新拉取所有script名集合,补充添加新script
|
||||
Set<String> newScriptSet = scriptJedis.hkeys(scriptKey);
|
||||
for (String scriptFieldValue : newScriptSet) {
|
||||
if (scriptSHAMap.get(scriptFieldValue) == null) {
|
||||
//将新script添加到LiteFlowChainELBuilder和SHAMap
|
||||
String scriptData = scriptJedis.hget(scriptKey, scriptFieldValue);
|
||||
changeScriptNode(scriptFieldValue, scriptData);
|
||||
LOG.info("starting reload flow config... update key={} new value={},", scriptFieldValue, scriptData);
|
||||
scriptSHAMap.put(scriptFieldValue, DigestUtil.sha1Hex(scriptData));
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
return r;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -189,14 +189,12 @@ public class RedisParserBySubscribe implements RedisParserHelper {
|
||||
//添加 script
|
||||
scriptKey.addListener((EntryCreatedListener<String, String>) event -> {
|
||||
LOG.info("starting reload flow config... create key={} value={},", event.getKey(), event.getValue());
|
||||
NodeSimpleVO nodeSimpleVO = convert(event.getKey());
|
||||
changeScriptNode(nodeSimpleVO, event.getValue());
|
||||
changeScriptNode(event.getKey(), event.getValue());
|
||||
});
|
||||
//修改 script
|
||||
scriptKey.addListener((EntryUpdatedListener<String, String>) event -> {
|
||||
LOG.info("starting reload flow config... update path={} new value={},", event.getKey(), event.getValue());
|
||||
NodeSimpleVO nodeSimpleVO = convert(event.getKey());
|
||||
changeScriptNode(nodeSimpleVO, event.getValue());
|
||||
LOG.info("starting reload flow config... update key={} new value={},", event.getKey(), event.getValue());
|
||||
changeScriptNode(event.getKey(), event.getValue());
|
||||
});
|
||||
//删除 script
|
||||
scriptKey.addListener((EntryRemovedListener<String, String>) event -> {
|
||||
@@ -206,26 +204,4 @@ public class RedisParserBySubscribe implements RedisParserHelper {
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
private void changeScriptNode(NodeSimpleVO nodeSimpleVO, String newValue) {
|
||||
// 有语言类型
|
||||
if (StrUtil.isNotBlank(nodeSimpleVO.getLanguage())) {
|
||||
LiteFlowNodeBuilder.createScriptNode()
|
||||
.setId(nodeSimpleVO.getNodeId())
|
||||
.setType(NodeTypeEnum.getEnumByCode(nodeSimpleVO.getType()))
|
||||
.setName(nodeSimpleVO.getName())
|
||||
.setScript(newValue)
|
||||
.setLanguage(nodeSimpleVO.getLanguage())
|
||||
.build();
|
||||
}
|
||||
// 没有语言类型
|
||||
else {
|
||||
LiteFlowNodeBuilder.createScriptNode()
|
||||
.setId(nodeSimpleVO.getNodeId())
|
||||
.setType(NodeTypeEnum.getEnumByCode(nodeSimpleVO.getType()))
|
||||
.setName(nodeSimpleVO.getName())
|
||||
.setScript(newValue)
|
||||
.build();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,7 +3,9 @@ package com.yomahub.liteflow.parser.redis.util;
|
||||
import cn.hutool.core.collection.CollUtil;
|
||||
import cn.hutool.core.util.ReUtil;
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import com.yomahub.liteflow.builder.LiteFlowNodeBuilder;
|
||||
import com.yomahub.liteflow.core.FlowExecutor;
|
||||
import com.yomahub.liteflow.enums.NodeTypeEnum;
|
||||
import com.yomahub.liteflow.log.LFLog;
|
||||
import com.yomahub.liteflow.log.LFLoggerManager;
|
||||
import org.slf4j.Logger;
|
||||
@@ -38,6 +40,35 @@ public interface RedisParserHelper {
|
||||
|
||||
void listenRedis();
|
||||
|
||||
|
||||
/**
|
||||
* script节点的修改/添加
|
||||
* @param scriptFieldValue 新的script名
|
||||
* @param newValue 新的script值
|
||||
*/
|
||||
default void changeScriptNode(String scriptFieldValue, String newValue) {
|
||||
NodeSimpleVO nodeSimpleVO = convert(scriptFieldValue);
|
||||
// 有语言类型
|
||||
if (StrUtil.isNotBlank(nodeSimpleVO.getLanguage())) {
|
||||
LiteFlowNodeBuilder.createScriptNode()
|
||||
.setId(nodeSimpleVO.getNodeId())
|
||||
.setType(NodeTypeEnum.getEnumByCode(nodeSimpleVO.getType()))
|
||||
.setName(nodeSimpleVO.getName())
|
||||
.setScript(newValue)
|
||||
.setLanguage(nodeSimpleVO.getLanguage())
|
||||
.build();
|
||||
}
|
||||
// 没有语言类型
|
||||
else {
|
||||
LiteFlowNodeBuilder.createScriptNode()
|
||||
.setId(nodeSimpleVO.getNodeId())
|
||||
.setType(NodeTypeEnum.getEnumByCode(nodeSimpleVO.getType()))
|
||||
.setName(nodeSimpleVO.getName())
|
||||
.setScript(newValue)
|
||||
.build();
|
||||
}
|
||||
}
|
||||
|
||||
default NodeSimpleVO convert(String str) {
|
||||
// 不需要去理解这串正则,就是一个匹配冒号的
|
||||
// 一定得是a:b,或是a:b:c...这种完整类型的字符串的
|
||||
|
||||
@@ -22,6 +22,7 @@ public class RedisParserVO {
|
||||
private String mode = "poll";
|
||||
|
||||
/*轮询时间间隔(s) 默认1分钟 若选择订阅机制可不配置*/
|
||||
//todo 确定类型是string还是long,若为string需校验
|
||||
private String pollingInterval = "60";
|
||||
|
||||
/*chain表配置的数据库号*/
|
||||
|
||||
Reference in New Issue
Block a user