diff --git a/liteflow-rule-plugin/liteflow-rule-apollo/src/main/java/com/yomahub/liteflow/parser/apollo/util/ApolloParseHelper.java b/liteflow-rule-plugin/liteflow-rule-apollo/src/main/java/com/yomahub/liteflow/parser/apollo/util/ApolloParseHelper.java index 3ed4858f0..4cd6acca8 100644 --- a/liteflow-rule-plugin/liteflow-rule-apollo/src/main/java/com/yomahub/liteflow/parser/apollo/util/ApolloParseHelper.java +++ b/liteflow-rule-plugin/liteflow-rule-apollo/src/main/java/com/yomahub/liteflow/parser/apollo/util/ApolloParseHelper.java @@ -24,6 +24,7 @@ import org.slf4j.LoggerFactory; import java.util.List; import java.util.Objects; import java.util.Set; +import java.util.function.Function; import java.util.stream.Collectors; import static com.ctrip.framework.apollo.enums.PropertyChangeType.DELETED; @@ -91,7 +92,6 @@ public class ApolloParseHelper { List scriptItemContentList = scriptNamespaces.stream() .map(item -> convert(item, scriptConfig.getProperty(item, StrUtil.EMPTY))) - .filter(Objects::nonNull) .map(RuleParsePluginUtil::toScriptXml) .collect(Collectors.toList()); @@ -115,7 +115,7 @@ public class ApolloParseHelper { String newValue = configChange.getNewValue(); PropertyChangeType changeType = configChange.getChangeType(); Pair pair = RuleParsePluginUtil.parseIdKey(changeKey); - String id = pair.getValue(); + String chainId = pair.getValue(); switch (changeType) { case ADDED: case MODIFIED: @@ -123,16 +123,16 @@ public class ApolloParseHelper { newValue); // 如果是启用,就正常更新 if (pair.getKey()) { - LiteFlowChainELBuilder.createChain().setChainId(id).setEL(newValue).build(); + LiteFlowChainELBuilder.createChain().setChainId(chainId).setEL(newValue).build(); } // 如果是禁用,就删除 else { - FlowBus.removeChain(id); + FlowBus.removeChain(chainId); } break; case DELETED: LOG.info("starting reload flow config... delete key={}", changeKey); - FlowBus.removeChain(id); + FlowBus.removeChain(chainId); break; default: } @@ -148,11 +148,6 @@ public class ApolloParseHelper { newValue = null; } NodeConvertHelper.NodeSimpleVO nodeSimpleVO = convert(changeKey, newValue); - if (Objects.isNull(nodeSimpleVO)) { - // key不符合规范的时候,直接忽略 - LOG.error("key={} is not a valid node config, ignore it", changeKey); - return; - } switch (changeType) { case ADDED: case MODIFIED: @@ -171,12 +166,12 @@ public class ApolloParseHelper { } // 禁用就删除 else { - FlowBus.getNodeMap().remove(nodeSimpleVO.getNodeId()); + FlowBus.unloadScriptNode(nodeSimpleVO.getNodeId()); } break; case DELETED: LOG.info("starting reload flow config... delete key={}", changeKey); - FlowBus.getNodeMap().remove(nodeSimpleVO.getNodeId()); + FlowBus.unloadScriptNode(nodeSimpleVO.getNodeId()); break; default: } diff --git a/liteflow-rule-plugin/liteflow-rule-etcd/src/main/java/com/yomahub/liteflow/parser/etcd/util/EtcdParserHelper.java b/liteflow-rule-plugin/liteflow-rule-etcd/src/main/java/com/yomahub/liteflow/parser/etcd/util/EtcdParserHelper.java index 7ad55e37c..684e1bf26 100644 --- a/liteflow-rule-plugin/liteflow-rule-etcd/src/main/java/com/yomahub/liteflow/parser/etcd/util/EtcdParserHelper.java +++ b/liteflow-rule-plugin/liteflow-rule-etcd/src/main/java/com/yomahub/liteflow/parser/etcd/util/EtcdParserHelper.java @@ -141,20 +141,19 @@ public class EtcdParserHelper { LOG.info("starting reload flow config... update path={} value={},", updatePath, updateValue); String changeKey = FileNameUtil.getName(updatePath); Pair pair = RuleParsePluginUtil.parseIdKey(changeKey); - Boolean enable = pair.getKey(); - String id = pair.getValue(); + String chainId = pair.getValue(); // 如果是启用,就正常更新 if (pair.getKey()) { - LiteFlowChainELBuilder.createChain().setChainId(id).setEL(updateValue).build(); + LiteFlowChainELBuilder.createChain().setChainId(chainId).setEL(updateValue).build(); } // 如果是禁用,就删除 else { - FlowBus.removeChain(id); + FlowBus.removeChain(chainId); } }, (deletePath) -> { LOG.info("starting reload flow config... delete path={}", deletePath); - String chainName = FileNameUtil.getName(deletePath); - Pair pair = RuleParsePluginUtil.parseIdKey(chainName); + String chainKey = FileNameUtil.getName(deletePath); + Pair pair = RuleParsePluginUtil.parseIdKey(chainKey); FlowBus.removeChain(pair.getValue()); }); @@ -175,13 +174,13 @@ public class EtcdParserHelper { } // 禁用就删除 else { - FlowBus.getNodeMap().remove(nodeSimpleVO.getNodeId()); + FlowBus.unloadScriptNode(nodeSimpleVO.getNodeId()); } }, (deletePath) -> { LOG.info("starting reload flow config... delete path={}", deletePath); String scriptNodeValue = FileNameUtil.getName(deletePath); NodeConvertHelper.NodeSimpleVO nodeSimpleVO = NodeConvertHelper.convert(scriptNodeValue); - FlowBus.getNodeMap().remove(nodeSimpleVO.getNodeId()); + FlowBus.unloadScriptNode(nodeSimpleVO.getNodeId()); }); } } diff --git a/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/mode/RedisParserHelper.java b/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/mode/RedisParserHelper.java index 68bb1bc68..a5afeffc1 100644 --- a/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/mode/RedisParserHelper.java +++ b/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/mode/RedisParserHelper.java @@ -1,13 +1,18 @@ package com.yomahub.liteflow.parser.redis.mode; +import cn.hutool.core.lang.Pair; import cn.hutool.core.text.StrFormatter; +import cn.hutool.core.util.BooleanUtil; import cn.hutool.core.util.StrUtil; import com.yomahub.liteflow.builder.LiteFlowNodeBuilder; +import com.yomahub.liteflow.builder.el.LiteFlowChainELBuilder; import com.yomahub.liteflow.enums.NodeTypeEnum; +import com.yomahub.liteflow.flow.FlowBus; import com.yomahub.liteflow.log.LFLog; import com.yomahub.liteflow.log.LFLoggerManager; import com.yomahub.liteflow.parser.helper.NodeConvertHelper; import com.yomahub.liteflow.parser.redis.vo.RedisParserVO; +import com.yomahub.liteflow.util.RuleParsePluginUtil; import org.redisson.config.Config; import org.redisson.config.SentinelServersConfig; @@ -15,6 +20,7 @@ import org.redisson.config.SentinelServersConfig; * Redis 解析器通用接口 * * @author hxinyu + * @author Bryan.Zhang * @since 2.11.0 */ @@ -105,29 +111,48 @@ public interface RedisParserHelper { /** * script节点的修改/添加 * - * @param scriptFieldValue 新的script名 + * @param scriptKeyValue 新的script名 * @param newValue 新的script值 */ - static void changeScriptNode(String scriptFieldValue, String newValue) { - NodeConvertHelper.NodeSimpleVO nodeSimpleVO = NodeConvertHelper.convert(scriptFieldValue); - // 有语言类型 - if (StrUtil.isNotBlank(nodeSimpleVO.getLanguage())) { - LiteFlowNodeBuilder.createScriptNode() - .setId(nodeSimpleVO.getNodeId()) - .setType(NodeTypeEnum.getEnumByCode(nodeSimpleVO.getType())) - .setName(nodeSimpleVO.getName()) - .setScript(newValue) - .setLanguage(nodeSimpleVO.getLanguage()) - .build(); + static boolean changeScriptNode(String scriptKeyValue, String newValue) { + NodeConvertHelper.NodeSimpleVO nodeSimpleVO = NodeConvertHelper.convert(scriptKeyValue); + + if (BooleanUtil.isTrue(nodeSimpleVO.getEnable())){ + // 有语言类型 + if (StrUtil.isNotBlank(nodeSimpleVO.getLanguage())) { + LiteFlowNodeBuilder.createScriptNode() + .setId(nodeSimpleVO.getNodeId()) + .setType(NodeTypeEnum.getEnumByCode(nodeSimpleVO.getType())) + .setName(nodeSimpleVO.getName()) + .setScript(newValue) + .setLanguage(nodeSimpleVO.getLanguage()) + .build(); + } + // 没有语言类型 + else { + LiteFlowNodeBuilder.createScriptNode() + .setId(nodeSimpleVO.getNodeId()) + .setType(NodeTypeEnum.getEnumByCode(nodeSimpleVO.getType())) + .setName(nodeSimpleVO.getName()) + .setScript(newValue) + .build(); + } + return true; + }else{ + FlowBus.unloadScriptNode(nodeSimpleVO.getNodeId()); + return false; } - // 没有语言类型 + } + + static void changeChain(String chainId, String value) { + Pair pair = RuleParsePluginUtil.parseIdKey(chainId); + // 如果是启用,就正常更新 + if (BooleanUtil.isTrue(pair.getKey())) { + LiteFlowChainELBuilder.createChain().setChainId(chainId).setEL(value).build(); + } + // 如果是禁用,就删除 else { - LiteFlowNodeBuilder.createScriptNode() - .setId(nodeSimpleVO.getNodeId()) - .setType(NodeTypeEnum.getEnumByCode(nodeSimpleVO.getType())) - .setName(nodeSimpleVO.getName()) - .setScript(newValue) - .build(); + FlowBus.removeChain(chainId); } } } diff --git a/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/mode/polling/ScriptPollingTask.java b/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/mode/polling/ScriptPollingTask.java index 67229de76..b0b5ed62e 100644 --- a/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/mode/polling/ScriptPollingTask.java +++ b/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/mode/polling/ScriptPollingTask.java @@ -1,5 +1,6 @@ package com.yomahub.liteflow.parser.redis.mode.polling; +import cn.hutool.core.util.BooleanUtil; import cn.hutool.core.util.StrUtil; import cn.hutool.crypto.digest.DigestUtil; import com.yomahub.liteflow.flow.FlowBus; @@ -70,7 +71,7 @@ public class ScriptPollingTask implements Runnable { if (StrUtil.equals(newSHA, "nil")) { //新SHA值为nil, 即未获取到该script,表示该script已被删除 NodeConvertHelper.NodeSimpleVO nodeSimpleVO = NodeConvertHelper.convert(scriptFieldValue); - FlowBus.getNodeMap().remove(nodeSimpleVO.getNodeId()); + FlowBus.unloadScriptNode(nodeSimpleVO.getNodeId()); LOG.info("starting reload flow config... delete key={}", scriptFieldValue); //添加到待删除的list 后续统一从SHAMap中移除 @@ -80,17 +81,11 @@ public class ScriptPollingTask implements Runnable { //SHA值发生变化,表示该script的值已被修改,重新拉取变化的script String scriptData = scriptClient.hget(scriptKey, scriptFieldValue); - NodeConvertHelper.NodeSimpleVO nodeSimpleVO = NodeConvertHelper.convert(scriptFieldValue); - nodeSimpleVO.setScript(scriptData); - if (nodeSimpleVO.getEnable()) { - RedisParserHelper.changeScriptNode(scriptFieldValue, scriptData); - LOG.info("starting reload flow config... update key={} new value={},", scriptFieldValue, scriptData); + boolean changeSuccess = RedisParserHelper.changeScriptNode(scriptFieldValue, scriptData); - //修改SHAMap + if (BooleanUtil.isTrue(changeSuccess)){ scriptSHAMap.put(scriptFieldValue, newSHA); - } else { - FlowBus.getNodeMap().remove(nodeSimpleVO.getNodeId()); - LOG.info("starting reload flow config... delete key={}", scriptFieldValue); + }else{ needDelete.add(scriptFieldValue); } } @@ -114,13 +109,13 @@ public class ScriptPollingTask implements Runnable { if (!scriptSHAMap.containsKey(scriptFieldValue)) { //将新script添加到LiteFlowChainELBuilder和SHAMap String scriptData = scriptClient.hget(scriptKey, scriptFieldValue); - NodeConvertHelper.NodeSimpleVO nodeSimpleVO = NodeConvertHelper.convert(scriptFieldValue); - if (nodeSimpleVO.getEnable()) { - RedisParserHelper.changeScriptNode(scriptFieldValue, scriptData); + + boolean isAddSuccess = RedisParserHelper.changeScriptNode(scriptFieldValue, scriptData); + + if (BooleanUtil.isTrue(isAddSuccess)){ LOG.info("starting reload flow config... create key={} new value={},", scriptFieldValue, scriptData); scriptSHAMap.put(scriptFieldValue, DigestUtil.sha1Hex(scriptData)); - } else { - FlowBus.getNodeMap().remove(nodeSimpleVO.getNodeId()); + }else{ LOG.info("starting reload flow config... delete key={}", scriptFieldValue); needDelete.add(scriptFieldValue); } diff --git a/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/mode/subscribe/RedisParserSubscribeMode.java b/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/mode/subscribe/RedisParserSubscribeMode.java index 2deb35beb..0e490b240 100644 --- a/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/mode/subscribe/RedisParserSubscribeMode.java +++ b/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/mode/subscribe/RedisParserSubscribeMode.java @@ -2,6 +2,7 @@ package com.yomahub.liteflow.parser.redis.mode.subscribe; import cn.hutool.core.collection.CollUtil; import cn.hutool.core.lang.Pair; +import cn.hutool.core.util.BooleanUtil; import cn.hutool.core.util.ObjectUtil; import cn.hutool.core.util.StrUtil; import com.yomahub.liteflow.builder.LiteFlowNodeBuilder; @@ -19,6 +20,7 @@ import com.yomahub.liteflow.util.RuleParsePluginUtil; import org.redisson.Redisson; import org.redisson.api.map.event.EntryCreatedListener; import org.redisson.api.map.event.EntryRemovedListener; +import org.redisson.api.map.event.EntryUpdatedListener; import org.redisson.config.Config; import java.util.ArrayList; @@ -149,25 +151,23 @@ public class RedisParserSubscribeMode implements RedisParserHelper { public void listenRedis() { //监听 chain String chainKey = redisParserVO.getChainKey(); - EntryCreatedListener chainModifyFunc = event -> { - LOG.info("starting modify flow config... create key={} value={},", event.getKey(), event.getValue()); - String chainName = event.getKey(); - String value = event.getValue(); - Pair pair = RuleParsePluginUtil.parseIdKey(chainName); - String id = pair.getValue(); - // 如果是启用,就正常更新 - if (pair.getKey()) { - LiteFlowChainELBuilder.createChain().setChainId(id).setEL(value).build(); - } - // 如果是禁用,就删除 - else { - FlowBus.removeChain(id); - } - }; + //添加新 chain - chainClient.addListener(chainKey, chainModifyFunc); + chainClient.addListener(chainKey, (EntryCreatedListener) event -> { + LOG.info("starting modify flow config... create key={} value={},", event.getKey(), event.getValue()); + String chainId = event.getKey(); + String value = event.getValue(); + RedisParserHelper.changeChain(chainId, value); + }); + //修改 chain - chainClient.addListener(chainKey, chainModifyFunc); + chainClient.addListener(chainKey, (EntryUpdatedListener) event -> { + LOG.info("starting modify flow config... create key={} value={},", event.getKey(), event.getValue()); + String chainId = event.getKey(); + String value = event.getValue(); + RedisParserHelper.changeChain(chainId, value); + }); + //删除 chain chainClient.addListener(chainKey, (EntryRemovedListener) event -> { LOG.info("starting reload flow config... delete key={}", event.getKey()); @@ -176,36 +176,24 @@ public class RedisParserSubscribeMode implements RedisParserHelper { }); //监听 script - EntryCreatedListener scriptModifyFunc = event -> { - LOG.info("starting modify flow config... create key={} value={},", event.getKey(), event.getValue()); - NodeConvertHelper.NodeSimpleVO nodeSimpleVO = NodeConvertHelper.convert(event.getKey()); - nodeSimpleVO.setScript(event.getValue()); - // 启用就正常更新 - if (nodeSimpleVO.getEnable()) { - LiteFlowNodeBuilder.createScriptNode() - .setId(nodeSimpleVO.getNodeId()) - .setType(NodeTypeEnum.getEnumByCode(nodeSimpleVO.getType())) - .setName(nodeSimpleVO.getName()) - .setScript(nodeSimpleVO.getScript()) - .setLanguage(nodeSimpleVO.getLanguage()) - .build(); - } - // 禁用就删除 - else { - FlowBus.getNodeMap().remove(nodeSimpleVO.getNodeId()); - } - }; if (ObjectUtil.isNotNull(scriptClient) && ObjectUtil.isNotNull(redisParserVO.getScriptDataBase())) { String scriptKey = redisParserVO.getScriptKey(); + //添加 script - scriptClient.addListener(scriptKey, scriptModifyFunc); + scriptClient.addListener(scriptKey, (EntryCreatedListener) event -> { + LOG.info("starting reload flow config... create key={} value={},", event.getKey(), event.getValue()); + RedisParserHelper.changeScriptNode(event.getKey(), event.getValue()); + }); //修改 script - scriptClient.addListener(scriptKey, scriptModifyFunc); + scriptClient.addListener(scriptKey, (EntryUpdatedListener) event -> { + LOG.info("starting reload flow config... update key={} new value={},", event.getKey(), event.getValue()); + RedisParserHelper.changeScriptNode(event.getKey(), event.getValue()); + }); //删除 script scriptClient.addListener(scriptKey, (EntryRemovedListener) event -> { LOG.info("starting reload flow config... delete key={}", event.getKey()); NodeConvertHelper.NodeSimpleVO nodeSimpleVO = NodeConvertHelper.convert(event.getKey()); - FlowBus.getNodeMap().remove(nodeSimpleVO.getNodeId()); + FlowBus.unloadScriptNode(nodeSimpleVO.getNodeId()); }); } } diff --git a/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/polling/impl/ScriptReadPollTask.java b/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/polling/impl/ScriptReadPollTask.java index 77c6ce74d..38bbd621d 100644 --- a/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/polling/impl/ScriptReadPollTask.java +++ b/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/polling/impl/ScriptReadPollTask.java @@ -38,7 +38,7 @@ public class ScriptReadPollTask extends AbstractSqlReadPollTask { NodeConvertHelper.NodeSimpleVO scriptVO = NodeConvertHelper.convert(id); // 删除script - FlowBus.getNodeMap().remove(scriptVO.getNodeId()); + FlowBus.unloadScriptNode(scriptVO.getNodeId()); } } diff --git a/liteflow-rule-plugin/liteflow-rule-zk/src/main/java/com/yomahub/liteflow/parser/zk/util/ZkParserHelper.java b/liteflow-rule-plugin/liteflow-rule-zk/src/main/java/com/yomahub/liteflow/parser/zk/util/ZkParserHelper.java index 7bace4d99..2f78d734e 100644 --- a/liteflow-rule-plugin/liteflow-rule-zk/src/main/java/com/yomahub/liteflow/parser/zk/util/ZkParserHelper.java +++ b/liteflow-rule-plugin/liteflow-rule-zk/src/main/java/com/yomahub/liteflow/parser/zk/util/ZkParserHelper.java @@ -15,6 +15,7 @@ import com.yomahub.liteflow.parser.zk.vo.ZkParserVO; import com.yomahub.liteflow.util.RuleParsePluginUtil; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.recipes.cache.ChildData; import org.apache.curator.framework.recipes.cache.CuratorCache; import org.apache.curator.framework.recipes.cache.CuratorCacheListener; import org.apache.curator.retry.RetryNTimes; @@ -129,8 +130,9 @@ public class ZkParserHelper { CuratorCache cache1 = CuratorCache.build(client, zkParserVO.getChainPath()); cache1.start(); cache1.listenable().addListener((type, oldData, data) -> { - String path = data.getPath(); - String value = new String(data.getData()); + ChildData currChildData = data == null? oldData : data; + String path = currChildData.getPath(); + String value = new String(currChildData.getData()); if (StrUtil.isBlank(value)) { return; } @@ -161,8 +163,9 @@ public class ZkParserHelper { CuratorCache cache2 = CuratorCache.build(client, zkParserVO.getScriptPath()); cache2.start(); cache2.listenable().addListener((type, oldData, data) -> { - String path = data.getPath(); - String value = new String(data.getData()); + ChildData currChildData = data == null? oldData : data; + String path = currChildData.getPath(); + String value = new String(currChildData.getData()); if (StrUtil.isBlank(value)) { return; } @@ -184,13 +187,13 @@ public class ZkParserHelper { } // 禁用就删除 else { - FlowBus.getNodeMap().remove(nodeSimpleVO.getNodeId()); + FlowBus.unloadScriptNode(nodeSimpleVO.getNodeId()); } } else if (CuratorCacheListener.Type.NODE_DELETED.equals(type)) { LOG.info("starting reload flow config... delete path={}", path); String scriptNodeValue = FileNameUtil.getName(path); NodeConvertHelper.NodeSimpleVO nodeSimpleVO = NodeConvertHelper.convert(scriptNodeValue); - FlowBus.getNodeMap().remove(nodeSimpleVO.getNodeId()); + FlowBus.unloadScriptNode(nodeSimpleVO.getNodeId()); } }); }