mirror of
https://gitee.com/dromara/liteFlow.git
synced 2026-05-14 12:12:08 +08:00
add listener
This commit is contained in:
@@ -58,7 +58,7 @@ public class RedisXmlELParser extends ClassXmlFlowELParser {
|
||||
try {
|
||||
String content = redisParserHelper.getContent();
|
||||
FlowInitHook.addHook(() -> {
|
||||
// redisParserHelper.listenApollo();
|
||||
redisParserHelper.listenRedis();
|
||||
return true;
|
||||
});
|
||||
return content;
|
||||
|
||||
@@ -6,12 +6,20 @@ import cn.hutool.core.text.StrFormatter;
|
||||
import cn.hutool.core.util.ObjectUtil;
|
||||
import cn.hutool.core.util.ReUtil;
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import com.yomahub.liteflow.builder.LiteFlowNodeBuilder;
|
||||
import com.yomahub.liteflow.builder.el.LiteFlowChainELBuilder;
|
||||
import com.yomahub.liteflow.enums.NodeTypeEnum;
|
||||
import com.yomahub.liteflow.flow.FlowBus;
|
||||
import com.yomahub.liteflow.parser.redis.exception.RedisException;
|
||||
import com.yomahub.liteflow.parser.redis.vo.RedisParserVO;
|
||||
import com.yomahub.liteflow.spi.holder.ContextAwareHolder;
|
||||
import org.redisson.Redisson;
|
||||
import org.redisson.api.RMapCache;
|
||||
import org.redisson.api.RedissonClient;
|
||||
import org.redisson.api.map.event.EntryCreatedListener;
|
||||
import org.redisson.api.map.event.EntryEvent;
|
||||
import org.redisson.api.map.event.EntryRemovedListener;
|
||||
import org.redisson.api.map.event.EntryUpdatedListener;
|
||||
import org.redisson.config.Config;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
@@ -20,13 +28,12 @@ import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public class RedisParserHelper {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(RedisParserHelper.class);
|
||||
|
||||
private RedisParserVO redisParserVO;
|
||||
private final RedisParserVO redisParserVO;
|
||||
|
||||
private final String REDIS_URL_PATTERN = "redis://{}:{}";
|
||||
|
||||
@@ -47,53 +54,51 @@ public class RedisParserHelper {
|
||||
public RedisParserHelper(RedisParserVO redisParserVO) {
|
||||
this.redisParserVO = redisParserVO;
|
||||
|
||||
try{
|
||||
try{
|
||||
try {
|
||||
try {
|
||||
this.chainClient = ContextAwareHolder.loadContextAware().getBean("chainClient");
|
||||
this.scriptClient = ContextAwareHolder.loadContextAware().getBean("scriptClient");
|
||||
} catch (Exception ignored) {
|
||||
}
|
||||
catch (Exception ignored){
|
||||
}
|
||||
if(ObjectUtil.isNull(chainClient)){
|
||||
Config config = new Config();
|
||||
config = getRedissonConfig(redisParserVO, config,
|
||||
if (ObjectUtil.isNull(chainClient)) {
|
||||
Config config = getRedissonConfig(redisParserVO,
|
||||
Integer.parseInt(redisParserVO.getChainDataBase()));
|
||||
this.chainClient = Redisson.create(config);
|
||||
//如果有脚本数据
|
||||
if (StrUtil.isNotBlank(redisParserVO.getScriptDataBase())){
|
||||
config = getRedissonConfig(redisParserVO, config,
|
||||
if (StrUtil.isNotBlank(redisParserVO.getScriptDataBase())) {
|
||||
config = getRedissonConfig(redisParserVO,
|
||||
Integer.parseInt(redisParserVO.getScriptDataBase()));
|
||||
this.scriptClient = Redisson.create(config);
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (Exception e){
|
||||
} catch (Exception e) {
|
||||
throw new RedisException(e.getMessage());
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private Config getRedissonConfig(RedisParserVO redisParserVO, Config config, Integer dataBase){
|
||||
private Config getRedissonConfig(RedisParserVO redisParserVO, Integer dataBase) {
|
||||
Config config = new Config();
|
||||
String redisAddress = StrFormatter.format(REDIS_URL_PATTERN, redisParserVO.getHost(), redisParserVO.getPort());
|
||||
if (StrUtil.isNotBlank(redisParserVO.getPassword())){
|
||||
if (StrUtil.isNotBlank(redisParserVO.getPassword())) {
|
||||
config.useSingleServer().setAddress(redisAddress)
|
||||
.setPassword(redisParserVO.getPassword())
|
||||
.setDatabase(dataBase);
|
||||
} else{
|
||||
} else {
|
||||
config.useSingleServer().setAddress(redisAddress)
|
||||
.setDatabase(dataBase);
|
||||
}
|
||||
return config;
|
||||
}
|
||||
|
||||
public String getContent(){
|
||||
try{
|
||||
public String getContent() {
|
||||
try {
|
||||
// 检查chainKey下有没有子节点
|
||||
RMapCache<String, String> chainKey = chainClient.getMapCache(redisParserVO.getChainKey());
|
||||
Set<String> chainNameSet = chainKey.keySet();
|
||||
if (CollectionUtil.isEmpty(chainNameSet)) {
|
||||
throw new RedisException(StrUtil.format("There are no chains in key [{}]",
|
||||
redisParserVO.getChainKey());
|
||||
redisParserVO.getChainKey()));
|
||||
}
|
||||
// 获取chainKey下的所有子节点内容List
|
||||
List<String> chainItemContentList = new ArrayList<>();
|
||||
@@ -108,12 +113,12 @@ public class RedisParserHelper {
|
||||
|
||||
// 检查是否有脚本内容,如果有,进行脚本内容的获取
|
||||
String scriptAllContent = StrUtil.EMPTY;
|
||||
if (hasScript()){
|
||||
if (hasScript()) {
|
||||
RMapCache<String, String> scriptKey = scriptClient.getMapCache(redisParserVO.getScriptKey());
|
||||
Set<String> scriptKeySet = scriptKey.keySet();
|
||||
|
||||
List<String> scriptItemContentList = new ArrayList<>();
|
||||
for (String scriptKeyValue : scriptKeySet){
|
||||
for (String scriptKeyValue : scriptKeySet) {
|
||||
NodeSimpleVO nodeSimpleVO = convert(scriptKeyValue);
|
||||
if (Objects.isNull(nodeSimpleVO)) {
|
||||
throw new RedisException(
|
||||
@@ -139,13 +144,12 @@ public class RedisParserHelper {
|
||||
}
|
||||
|
||||
return StrUtil.format(XML_PATTERN, scriptAllContent, chainAllContent);
|
||||
}
|
||||
catch (Exception e){
|
||||
} catch (Exception e) {
|
||||
throw new RedisException(e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
public boolean hasScript(){
|
||||
public boolean hasScript() {
|
||||
// 没有scriptClient或没有配置scriptDataBase
|
||||
if (Objects.isNull(scriptClient) || StrUtil.isNotBlank(redisParserVO.getScriptDataBase())) {
|
||||
return false;
|
||||
@@ -155,12 +159,79 @@ public class RedisParserHelper {
|
||||
RMapCache<String, String> scriptKey = scriptClient.getMapCache(redisParserVO.getScriptKey());
|
||||
Set<String> scriptKeySet = scriptKey.keySet();
|
||||
return !CollUtil.isEmpty(scriptKeySet);
|
||||
}
|
||||
catch (Exception e) {
|
||||
} catch (Exception e) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 监听 redis key
|
||||
*/
|
||||
public void listenRedis() {
|
||||
//监听 chain
|
||||
RMapCache<String, String> chainKey = chainClient.getMapCache(redisParserVO.getChainKey());
|
||||
//添加新 chain
|
||||
chainKey.addListener((EntryCreatedListener<String, String>) event -> {
|
||||
LOG.info("starting reload flow config... create key={} value={},", event.getKey(), event.getValue());
|
||||
LiteFlowChainELBuilder.createChain().setChainId(event.getKey()).setEL(event.getValue()).build();
|
||||
});
|
||||
//修改 chain
|
||||
chainKey.addListener((EntryUpdatedListener<String, String>) event -> {
|
||||
LOG.info("starting reload flow config... update path={} new value={},", event.getKey(), event.getValue());
|
||||
LiteFlowChainELBuilder.createChain().setChainId(event.getKey()).setEL(event.getValue()).build();
|
||||
});
|
||||
//删除 chain
|
||||
chainKey.addListener((EntryRemovedListener<String, String>) event -> {
|
||||
LOG.info("starting reload flow config... delete key={}", event.getKey());
|
||||
FlowBus.removeChain(event.getKey());
|
||||
});
|
||||
|
||||
//监听 script
|
||||
if (Objects.nonNull(scriptClient) && StrUtil.isNotBlank(redisParserVO.getScriptDataBase())) {
|
||||
RMapCache<String, String> scriptKey = scriptClient.getMapCache(redisParserVO.getScriptKey());
|
||||
//添加 script
|
||||
scriptKey.addListener((EntryCreatedListener<String, String>) event -> {
|
||||
LOG.info("starting reload flow config... create key={} value={},", event.getKey(), event.getValue());
|
||||
NodeSimpleVO nodeSimpleVO = convert(event.getKey());
|
||||
changeScriptNode(nodeSimpleVO, event.getValue());
|
||||
});
|
||||
//修改 script
|
||||
scriptKey.addListener((EntryUpdatedListener<String, String>) event -> {
|
||||
LOG.info("starting reload flow config... update path={} new value={},", event.getKey(), event.getValue());
|
||||
NodeSimpleVO nodeSimpleVO = convert(event.getKey());
|
||||
changeScriptNode(nodeSimpleVO, event.getValue());
|
||||
});
|
||||
//删除 script
|
||||
scriptKey.addListener((EntryRemovedListener<String, String>) event -> {
|
||||
LOG.info("starting reload flow config... delete key={}", event.getKey());
|
||||
NodeSimpleVO nodeSimpleVO = convert(event.getKey());
|
||||
FlowBus.getNodeMap().remove(nodeSimpleVO.getNodeId());
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
private void changeScriptNode(NodeSimpleVO nodeSimpleVO, String newValue) {
|
||||
// 有语言类型
|
||||
if (StrUtil.isNotBlank(nodeSimpleVO.getLanguage())) {
|
||||
LiteFlowNodeBuilder.createScriptNode()
|
||||
.setId(nodeSimpleVO.getNodeId())
|
||||
.setType(NodeTypeEnum.getEnumByCode(nodeSimpleVO.type))
|
||||
.setName(nodeSimpleVO.getName())
|
||||
.setScript(newValue)
|
||||
.setLanguage(nodeSimpleVO.getLanguage())
|
||||
.build();
|
||||
}
|
||||
// 没有语言类型
|
||||
else {
|
||||
LiteFlowNodeBuilder.createScriptNode()
|
||||
.setId(nodeSimpleVO.getNodeId())
|
||||
.setType(NodeTypeEnum.getEnumByCode(nodeSimpleVO.type))
|
||||
.setName(nodeSimpleVO.getName())
|
||||
.setScript(newValue)
|
||||
.build();
|
||||
}
|
||||
}
|
||||
|
||||
public NodeSimpleVO convert(String str) {
|
||||
// 不需要去理解这串正则,就是一个匹配冒号的
|
||||
// 一定得是a:b,或是a:b:c...这种完整类型的字符串的
|
||||
|
||||
Reference in New Issue
Block a user