From 3ec3cfd346a2582ae00ebe95089cd22c3859c70c Mon Sep 17 00:00:00 2001 From: "everywhere.z" Date: Sat, 5 Nov 2022 11:01:32 +0800 Subject: [PATCH] =?UTF-8?q?enhancement=20#I5ZLH6=20=E6=94=AF=E6=8C=81zk?= =?UTF-8?q?=E5=88=86=E7=A6=BBchain=E4=BB=A5=E5=8F=8A=E8=84=9A=E6=9C=AC?= =?UTF-8?q?=E7=9A=84=E5=AD=98=E5=82=A8=E7=BB=93=E6=9E=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../liteflow/parser/sql/util/JDBCHelper.java | 8 +- liteflow-rule-plugin/liteflow-rule-zk/pom.xml | 4 +- .../liteflow/parser/zk/ZkXmlELParser.java | 24 ++- .../parser/zk/util/ZkParserHelper.java | 176 +++++++++++++++--- .../liteflow/parser/zk/vo/ZkParserVO.java | 20 +- 5 files changed, 181 insertions(+), 51 deletions(-) diff --git a/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/util/JDBCHelper.java b/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/util/JDBCHelper.java index a9924270a..5fa5e22f7 100644 --- a/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/util/JDBCHelper.java +++ b/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/util/JDBCHelper.java @@ -29,7 +29,9 @@ public class JDBCHelper { private static final String SCRIPT_SQL_PATTERN = "SELECT {},{},{},{} FROM {} WHERE {}=?"; private static final String CHAIN_XML_PATTERN = "{}"; - private static final String NODE_XML_PATTERN = ""; + private static final String NODE_XML_PATTERN = "{}"; + + private static final String NODE_ITEM_XML_PATTERN = ""; private static final String XML_PATTERN = "{}{}"; private static final Integer FETCH_SIZE_MAX = 1000; @@ -178,7 +180,7 @@ public class JDBCHelper { throw new ELSQLException(StrUtil.format("The type value[{}] is not a script type", type)); } - result.add(StrUtil.format(NODE_XML_PATTERN, id, name, type, data)); + result.add(StrUtil.format(NODE_ITEM_XML_PATTERN, id, name, type, data)); } } catch (Exception e) { throw new ELSQLException(e.getMessage()); @@ -186,7 +188,7 @@ public class JDBCHelper { // 关闭连接 close(conn, stmt, rs); } - return CollUtil.join(result, StrUtil.EMPTY); + return StrUtil.format(NODE_XML_PATTERN, CollUtil.join(result, StrUtil.EMPTY)); } /** diff --git a/liteflow-rule-plugin/liteflow-rule-zk/pom.xml b/liteflow-rule-plugin/liteflow-rule-zk/pom.xml index 8aa6f15b1..60f864034 100644 --- a/liteflow-rule-plugin/liteflow-rule-zk/pom.xml +++ b/liteflow-rule-plugin/liteflow-rule-zk/pom.xml @@ -17,8 +17,8 @@ com.yomahub liteflow-core ${revision} - true - provided + diff --git a/liteflow-rule-plugin/liteflow-rule-zk/src/main/java/com/yomahub/liteflow/parser/zk/ZkXmlELParser.java b/liteflow-rule-plugin/liteflow-rule-zk/src/main/java/com/yomahub/liteflow/parser/zk/ZkXmlELParser.java index 28ffe6818..8d6eecfc3 100644 --- a/liteflow-rule-plugin/liteflow-rule-zk/src/main/java/com/yomahub/liteflow/parser/zk/ZkXmlELParser.java +++ b/liteflow-rule-plugin/liteflow-rule-zk/src/main/java/com/yomahub/liteflow/parser/zk/ZkXmlELParser.java @@ -25,13 +25,6 @@ public class ZkXmlELParser extends ClassXmlFlowELParser { private final ZkParserHelper zkParserHelper; public ZkXmlELParser() { - Consumer parseConsumer = t -> { - try { - parse(t); - } catch (Exception e) { - throw new RuntimeException(e); - } - }; LiteflowConfig liteflowConfig = LiteflowConfigGetter.get(); try{ @@ -46,14 +39,14 @@ public class ZkXmlELParser extends ClassXmlFlowELParser { throw new ZkException("rule-source-ext-data is empty"); } - if (StrUtil.isBlank(zkParserVO.getNodePath())){ - zkParserVO.setNodePath("/lite-flow/flow"); + if (StrUtil.isBlank(zkParserVO.getChainPath())){ + throw new ZkException("You must configure the chainPath property"); } if (StrUtil.isBlank(zkParserVO.getConnectStr())){ throw new ZkException("zk connect string is empty"); } - zkParserHelper = new ZkParserHelper(zkParserVO, parseConsumer); + zkParserHelper = new ZkParserHelper(zkParserVO); }catch (Exception e){ throw new ZkException(e.getMessage()); } @@ -64,9 +57,14 @@ public class ZkXmlELParser extends ClassXmlFlowELParser { try{ String content = zkParserHelper.getContent(); - zkParserHelper.checkContent(content); - - zkParserHelper.listenZkNode(); + Consumer listenerConsumer = s -> { + try{ + parse(s); + }catch (Exception e){ + throw new ZkException(e.getMessage()); + } + }; + zkParserHelper.listenZkNode(listenerConsumer); return content; }catch (Exception e){ diff --git a/liteflow-rule-plugin/liteflow-rule-zk/src/main/java/com/yomahub/liteflow/parser/zk/util/ZkParserHelper.java b/liteflow-rule-plugin/liteflow-rule-zk/src/main/java/com/yomahub/liteflow/parser/zk/util/ZkParserHelper.java index 7f62697c1..1f6e337b5 100644 --- a/liteflow-rule-plugin/liteflow-rule-zk/src/main/java/com/yomahub/liteflow/parser/zk/util/ZkParserHelper.java +++ b/liteflow-rule-plugin/liteflow-rule-zk/src/main/java/com/yomahub/liteflow/parser/zk/util/ZkParserHelper.java @@ -1,20 +1,24 @@ package com.yomahub.liteflow.parser.zk.util; +import cn.hutool.core.collection.CollUtil; +import cn.hutool.core.collection.CollectionUtil; +import cn.hutool.core.util.ReUtil; import cn.hutool.core.util.StrUtil; -import com.yomahub.liteflow.exception.ParseException; +import com.yomahub.liteflow.core.FlowExecutorHolder; import com.yomahub.liteflow.parser.zk.exception.ZkException; import com.yomahub.liteflow.parser.zk.vo.ZkParserVO; +import com.yomahub.liteflow.util.JsonUtil; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.cache.ChildData; import org.apache.curator.framework.recipes.cache.CuratorCache; import org.apache.curator.framework.recipes.cache.CuratorCacheListener; -import org.apache.curator.framework.recipes.cache.NodeCache; import org.apache.curator.retry.RetryNTimes; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - -import java.text.MessageFormat; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; import java.util.function.Consumer; public class ZkParserHelper { @@ -22,13 +26,18 @@ public class ZkParserHelper { private static final Logger LOG = LoggerFactory.getLogger(ZkParserHelper.class); private final ZkParserVO zkParserVO; - private final Consumer parseConsumer; - private final CuratorFramework client; - public ZkParserHelper(ZkParserVO zkParserVO, Consumer parseConsumer) { + private final String CHAIN_XML_PATTERN = "{}"; + + private final String NODE_XML_PATTERN = "{}"; + + private final String NODE_ITEM_XML_PATTERN = ""; + + private final String XML_PATTERN = "{}{}"; + + public ZkParserHelper(ZkParserVO zkParserVO) { this.zkParserVO = zkParserVO; - this.parseConsumer = parseConsumer; try{ CuratorFramework client = CuratorFrameworkFactory.newClient( @@ -37,46 +46,157 @@ public class ZkParserHelper { ); client.start(); - if (client.checkExists().forPath(zkParserVO.getNodePath()) == null) { - client.create().creatingParentsIfNeeded().forPath(zkParserVO.getNodePath(), "".getBytes()); - } this.client = client; }catch (Exception e){ throw new ZkException(e.getMessage()); } - } public String getContent(){ try{ - return new String(client.getData().forPath(zkParserVO.getNodePath())); + //检查zk上有没有chainPath节点 + if (client.checkExists().forPath(zkParserVO.getChainPath()) == null) { + throw new ZkException(StrUtil.format("zk node[{}] is not exist", zkParserVO.getChainPath())); + } + + //检查chainPath路径下有没有子节点 + List chainNameList = client.getChildren().forPath(zkParserVO.getChainPath()); + if (CollectionUtil.isEmpty(chainNameList)){ + throw new ZkException(StrUtil.format("There are no chains in path [{}]", zkParserVO.getChainPath())); + } + + //获取chainPath路径下的所有子节点内容List + List chainItemContentList = new ArrayList<>(); + for (String chainName : chainNameList){ + String chainData = new String(client.getData().forPath(StrUtil.format("{}/{}", zkParserVO.getChainPath(), chainName))); + chainItemContentList.add(StrUtil.format(CHAIN_XML_PATTERN, chainName, chainData)); + } + //合并成所有chain的xml内容 + String chainAllContent = CollUtil.join(chainItemContentList, StrUtil.EMPTY); + + //检查是否有脚本内容,如果有,进行脚本内容的获取 + String scriptAllContent = StrUtil.EMPTY; + if (hasScript()){ + List scriptNodeValueList = client.getChildren().forPath(zkParserVO.getScriptPath()); + + List scriptItemContentList = new ArrayList<>(); + for (String scriptNodeValue: scriptNodeValueList){ + NodeSimpleVO nodeSimpleVO = convert(scriptNodeValue); + if (Objects.isNull(nodeSimpleVO)){ + throw new ZkException(StrUtil.format("The name of the zk node is invalid:{}", scriptNodeValue)); + } + String scriptData = new String( + client.getData().forPath(StrUtil.format("{}/{}", zkParserVO.getScriptPath(), scriptNodeValue)) + ); + + scriptItemContentList.add( + StrUtil.format(NODE_ITEM_XML_PATTERN, + nodeSimpleVO.getNodeId(), + nodeSimpleVO.getName(), + nodeSimpleVO.getType(), + scriptData) + ); + } + + scriptAllContent = StrUtil.format(NODE_XML_PATTERN, CollUtil.join(scriptItemContentList, StrUtil.EMPTY)); + } + + return StrUtil.format(XML_PATTERN, scriptAllContent, chainAllContent); }catch (Exception e){ throw new ZkException(e.getMessage()); } } - /** - * 检查 content 是否合法 - */ - public void checkContent(String content) { - if (StrUtil.isBlank(content)) { - String error = MessageFormat.format("the node[{0}] value is empty", zkParserVO.getNodePath()); - throw new ParseException(error); + public boolean hasScript(){ + //没有配置scriptPath + if (StrUtil.isBlank(zkParserVO.getScriptPath())){ + return false; + } + + try{ + //配置了,但是不存在这个节点 + if (client.checkExists().forPath(zkParserVO.getScriptPath()) == null){ + return false; + } + + //存在这个节点,但是子节点不存在 + List chainNameList = client.getChildren().forPath(zkParserVO.getScriptPath()); + if (CollUtil.isEmpty(chainNameList)){ + return false; + } + + return true; + }catch (Exception e){ + return false; } } /** * 监听 zk 节点 */ - public void listenZkNode() { - CuratorCache cache = CuratorCache.build(client, zkParserVO.getNodePath()); + public void listenZkNode(Consumer listenerConsumer) { + //监听chain + CuratorCache cache1 = CuratorCache.build(client, zkParserVO.getChainPath()); + cache1.start(); + cache1.listenable().addListener((type, oldData, data) -> listenerConsumer.accept(getContent())); - cache.start(); + //监听script + CuratorCache cache2 = CuratorCache.build(client, zkParserVO.getScriptPath()); + cache2.start(); + cache2.listenable().addListener((type, oldData, data) -> listenerConsumer.accept(getContent())); + } - cache.listenable().addListener((type, oldData, data) -> { - String content1 = new String(data.getData()); - LOG.info("stating load flow config...."); - parseConsumer.accept(content1); - }); + public NodeSimpleVO convert(String str){ + //不需要去理解这串正则,就是一个匹配冒号的 + //一定得是a:b,或是a:b:c...这种完整类型的字符串的 + List matchItemList = ReUtil.findAllGroup0("(?<=[^:]:)[^:]+|[^:]+(?=:[^:])", str); + if (CollUtil.isEmpty(matchItemList)){ + return null; + } + + NodeSimpleVO nodeSimpleVO = new NodeSimpleVO(); + if (matchItemList.size() > 1){ + nodeSimpleVO.setNodeId(matchItemList.get(0)); + nodeSimpleVO.setType(matchItemList.get(1)); + } + + if (matchItemList.size() > 2){ + nodeSimpleVO.setName(matchItemList.get(2)); + } + + return nodeSimpleVO; + } + + private static class NodeSimpleVO{ + + private String nodeId; + + private String type; + + private String name=""; + + public String getNodeId() { + return nodeId; + } + + public void setNodeId(String nodeId) { + this.nodeId = nodeId; + } + + public String getType() { + return type; + } + + public void setType(String type) { + this.type = type; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } } } diff --git a/liteflow-rule-plugin/liteflow-rule-zk/src/main/java/com/yomahub/liteflow/parser/zk/vo/ZkParserVO.java b/liteflow-rule-plugin/liteflow-rule-zk/src/main/java/com/yomahub/liteflow/parser/zk/vo/ZkParserVO.java index 315797a8b..5d99ea446 100644 --- a/liteflow-rule-plugin/liteflow-rule-zk/src/main/java/com/yomahub/liteflow/parser/zk/vo/ZkParserVO.java +++ b/liteflow-rule-plugin/liteflow-rule-zk/src/main/java/com/yomahub/liteflow/parser/zk/vo/ZkParserVO.java @@ -9,7 +9,9 @@ public class ZkParserVO { private String connectStr; - private String nodePath; + private String chainPath; + + private String scriptPath; public String getConnectStr() { return connectStr; @@ -19,11 +21,19 @@ public class ZkParserVO { this.connectStr = connectStr; } - public String getNodePath() { - return nodePath; + public String getChainPath() { + return chainPath; } - public void setNodePath(String nodePath) { - this.nodePath = nodePath; + public void setChainPath(String chainPath) { + this.chainPath = chainPath; + } + + public String getScriptPath() { + return scriptPath; + } + + public void setScriptPath(String scriptPath) { + this.scriptPath = scriptPath; } }