From ac446950c3a40cbcbd1f5b61dfe27c6d5bdb02c8 Mon Sep 17 00:00:00 2001 From: houxinyu Date: Sun, 2 Jul 2023 19:45:54 +0800 Subject: [PATCH] add listener --- .../parser/redis/RedisXmlELParser.java | 2 +- .../parser/redis/util/RedisParserHelper.java | 123 ++++++++++++++---- 2 files changed, 98 insertions(+), 27 deletions(-) diff --git a/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/RedisXmlELParser.java b/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/RedisXmlELParser.java index b48958a05..8cf02e382 100644 --- a/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/RedisXmlELParser.java +++ b/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/RedisXmlELParser.java @@ -58,7 +58,7 @@ public class RedisXmlELParser extends ClassXmlFlowELParser { try { String content = redisParserHelper.getContent(); FlowInitHook.addHook(() -> { - // redisParserHelper.listenApollo(); + redisParserHelper.listenRedis(); return true; }); return content; diff --git a/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/util/RedisParserHelper.java b/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/util/RedisParserHelper.java index 7f70fda57..ee9fa67a2 100644 --- a/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/util/RedisParserHelper.java +++ b/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/util/RedisParserHelper.java @@ -6,12 +6,20 @@ import cn.hutool.core.text.StrFormatter; import cn.hutool.core.util.ObjectUtil; import cn.hutool.core.util.ReUtil; import cn.hutool.core.util.StrUtil; +import com.yomahub.liteflow.builder.LiteFlowNodeBuilder; +import com.yomahub.liteflow.builder.el.LiteFlowChainELBuilder; +import com.yomahub.liteflow.enums.NodeTypeEnum; +import com.yomahub.liteflow.flow.FlowBus; import com.yomahub.liteflow.parser.redis.exception.RedisException; import com.yomahub.liteflow.parser.redis.vo.RedisParserVO; import com.yomahub.liteflow.spi.holder.ContextAwareHolder; import org.redisson.Redisson; import org.redisson.api.RMapCache; import org.redisson.api.RedissonClient; +import org.redisson.api.map.event.EntryCreatedListener; +import org.redisson.api.map.event.EntryEvent; +import org.redisson.api.map.event.EntryRemovedListener; +import org.redisson.api.map.event.EntryUpdatedListener; import org.redisson.config.Config; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -20,13 +28,12 @@ import java.util.ArrayList; import java.util.List; import java.util.Objects; import java.util.Set; -import java.util.stream.Collectors; public class RedisParserHelper { private static final Logger LOG = LoggerFactory.getLogger(RedisParserHelper.class); - private RedisParserVO redisParserVO; + private final RedisParserVO redisParserVO; private final String REDIS_URL_PATTERN = "redis://{}:{}"; @@ -47,53 +54,51 @@ public class RedisParserHelper { public RedisParserHelper(RedisParserVO redisParserVO) { this.redisParserVO = redisParserVO; - try{ - try{ + try { + try { this.chainClient = ContextAwareHolder.loadContextAware().getBean("chainClient"); this.scriptClient = ContextAwareHolder.loadContextAware().getBean("scriptClient"); + } catch (Exception ignored) { } - catch (Exception ignored){ - } - if(ObjectUtil.isNull(chainClient)){ - Config config = new Config(); - config = getRedissonConfig(redisParserVO, config, + if (ObjectUtil.isNull(chainClient)) { + Config config = getRedissonConfig(redisParserVO, Integer.parseInt(redisParserVO.getChainDataBase())); this.chainClient = Redisson.create(config); //如果有脚本数据 - if (StrUtil.isNotBlank(redisParserVO.getScriptDataBase())){ - config = getRedissonConfig(redisParserVO, config, + if (StrUtil.isNotBlank(redisParserVO.getScriptDataBase())) { + config = getRedissonConfig(redisParserVO, Integer.parseInt(redisParserVO.getScriptDataBase())); this.scriptClient = Redisson.create(config); } } - } - catch (Exception e){ + } catch (Exception e) { throw new RedisException(e.getMessage()); } } - private Config getRedissonConfig(RedisParserVO redisParserVO, Config config, Integer dataBase){ + private Config getRedissonConfig(RedisParserVO redisParserVO, Integer dataBase) { + Config config = new Config(); String redisAddress = StrFormatter.format(REDIS_URL_PATTERN, redisParserVO.getHost(), redisParserVO.getPort()); - if (StrUtil.isNotBlank(redisParserVO.getPassword())){ + if (StrUtil.isNotBlank(redisParserVO.getPassword())) { config.useSingleServer().setAddress(redisAddress) .setPassword(redisParserVO.getPassword()) .setDatabase(dataBase); - } else{ + } else { config.useSingleServer().setAddress(redisAddress) .setDatabase(dataBase); } return config; } - public String getContent(){ - try{ + public String getContent() { + try { // 检查chainKey下有没有子节点 RMapCache chainKey = chainClient.getMapCache(redisParserVO.getChainKey()); Set chainNameSet = chainKey.keySet(); if (CollectionUtil.isEmpty(chainNameSet)) { throw new RedisException(StrUtil.format("There are no chains in key [{}]", - redisParserVO.getChainKey()); + redisParserVO.getChainKey())); } // 获取chainKey下的所有子节点内容List List chainItemContentList = new ArrayList<>(); @@ -108,12 +113,12 @@ public class RedisParserHelper { // 检查是否有脚本内容,如果有,进行脚本内容的获取 String scriptAllContent = StrUtil.EMPTY; - if (hasScript()){ + if (hasScript()) { RMapCache scriptKey = scriptClient.getMapCache(redisParserVO.getScriptKey()); Set scriptKeySet = scriptKey.keySet(); List scriptItemContentList = new ArrayList<>(); - for (String scriptKeyValue : scriptKeySet){ + for (String scriptKeyValue : scriptKeySet) { NodeSimpleVO nodeSimpleVO = convert(scriptKeyValue); if (Objects.isNull(nodeSimpleVO)) { throw new RedisException( @@ -139,13 +144,12 @@ public class RedisParserHelper { } return StrUtil.format(XML_PATTERN, scriptAllContent, chainAllContent); - } - catch (Exception e){ + } catch (Exception e) { throw new RedisException(e.getMessage()); } } - public boolean hasScript(){ + public boolean hasScript() { // 没有scriptClient或没有配置scriptDataBase if (Objects.isNull(scriptClient) || StrUtil.isNotBlank(redisParserVO.getScriptDataBase())) { return false; @@ -155,12 +159,79 @@ public class RedisParserHelper { RMapCache scriptKey = scriptClient.getMapCache(redisParserVO.getScriptKey()); Set scriptKeySet = scriptKey.keySet(); return !CollUtil.isEmpty(scriptKeySet); - } - catch (Exception e) { + } catch (Exception e) { return false; } } + /** + * 监听 redis key + */ + public void listenRedis() { + //监听 chain + RMapCache chainKey = chainClient.getMapCache(redisParserVO.getChainKey()); + //添加新 chain + chainKey.addListener((EntryCreatedListener) event -> { + LOG.info("starting reload flow config... create key={} value={},", event.getKey(), event.getValue()); + LiteFlowChainELBuilder.createChain().setChainId(event.getKey()).setEL(event.getValue()).build(); + }); + //修改 chain + chainKey.addListener((EntryUpdatedListener) event -> { + LOG.info("starting reload flow config... update path={} new value={},", event.getKey(), event.getValue()); + LiteFlowChainELBuilder.createChain().setChainId(event.getKey()).setEL(event.getValue()).build(); + }); + //删除 chain + chainKey.addListener((EntryRemovedListener) event -> { + LOG.info("starting reload flow config... delete key={}", event.getKey()); + FlowBus.removeChain(event.getKey()); + }); + + //监听 script + if (Objects.nonNull(scriptClient) && StrUtil.isNotBlank(redisParserVO.getScriptDataBase())) { + RMapCache scriptKey = scriptClient.getMapCache(redisParserVO.getScriptKey()); + //添加 script + scriptKey.addListener((EntryCreatedListener) event -> { + LOG.info("starting reload flow config... create key={} value={},", event.getKey(), event.getValue()); + NodeSimpleVO nodeSimpleVO = convert(event.getKey()); + changeScriptNode(nodeSimpleVO, event.getValue()); + }); + //修改 script + scriptKey.addListener((EntryUpdatedListener) event -> { + LOG.info("starting reload flow config... update path={} new value={},", event.getKey(), event.getValue()); + NodeSimpleVO nodeSimpleVO = convert(event.getKey()); + changeScriptNode(nodeSimpleVO, event.getValue()); + }); + //删除 script + scriptKey.addListener((EntryRemovedListener) event -> { + LOG.info("starting reload flow config... delete key={}", event.getKey()); + NodeSimpleVO nodeSimpleVO = convert(event.getKey()); + FlowBus.getNodeMap().remove(nodeSimpleVO.getNodeId()); + }); + } + } + + private void changeScriptNode(NodeSimpleVO nodeSimpleVO, String newValue) { + // 有语言类型 + if (StrUtil.isNotBlank(nodeSimpleVO.getLanguage())) { + LiteFlowNodeBuilder.createScriptNode() + .setId(nodeSimpleVO.getNodeId()) + .setType(NodeTypeEnum.getEnumByCode(nodeSimpleVO.type)) + .setName(nodeSimpleVO.getName()) + .setScript(newValue) + .setLanguage(nodeSimpleVO.getLanguage()) + .build(); + } + // 没有语言类型 + else { + LiteFlowNodeBuilder.createScriptNode() + .setId(nodeSimpleVO.getNodeId()) + .setType(NodeTypeEnum.getEnumByCode(nodeSimpleVO.type)) + .setName(nodeSimpleVO.getName()) + .setScript(newValue) + .build(); + } + } + public NodeSimpleVO convert(String str) { // 不需要去理解这串正则,就是一个匹配冒号的 // 一定得是a:b,或是a:b:c...这种完整类型的字符串的