diff --git a/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/util/JDBCHelper.java b/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/util/JDBCHelper.java
index a9924270a..5fa5e22f7 100644
--- a/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/util/JDBCHelper.java
+++ b/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/util/JDBCHelper.java
@@ -29,7 +29,9 @@ public class JDBCHelper {
private static final String SCRIPT_SQL_PATTERN = "SELECT {},{},{},{} FROM {} WHERE {}=?";
private static final String CHAIN_XML_PATTERN = "{}";
- private static final String NODE_XML_PATTERN = "";
+ private static final String NODE_XML_PATTERN = "{}";
+
+ private static final String NODE_ITEM_XML_PATTERN = "";
private static final String XML_PATTERN = "{}{}";
private static final Integer FETCH_SIZE_MAX = 1000;
@@ -178,7 +180,7 @@ public class JDBCHelper {
throw new ELSQLException(StrUtil.format("The type value[{}] is not a script type", type));
}
- result.add(StrUtil.format(NODE_XML_PATTERN, id, name, type, data));
+ result.add(StrUtil.format(NODE_ITEM_XML_PATTERN, id, name, type, data));
}
} catch (Exception e) {
throw new ELSQLException(e.getMessage());
@@ -186,7 +188,7 @@ public class JDBCHelper {
// 关闭连接
close(conn, stmt, rs);
}
- return CollUtil.join(result, StrUtil.EMPTY);
+ return StrUtil.format(NODE_XML_PATTERN, CollUtil.join(result, StrUtil.EMPTY));
}
/**
diff --git a/liteflow-rule-plugin/liteflow-rule-zk/pom.xml b/liteflow-rule-plugin/liteflow-rule-zk/pom.xml
index 8aa6f15b1..60f864034 100644
--- a/liteflow-rule-plugin/liteflow-rule-zk/pom.xml
+++ b/liteflow-rule-plugin/liteflow-rule-zk/pom.xml
@@ -17,8 +17,8 @@
com.yomahub
liteflow-core
${revision}
- true
- provided
+
diff --git a/liteflow-rule-plugin/liteflow-rule-zk/src/main/java/com/yomahub/liteflow/parser/zk/ZkXmlELParser.java b/liteflow-rule-plugin/liteflow-rule-zk/src/main/java/com/yomahub/liteflow/parser/zk/ZkXmlELParser.java
index 28ffe6818..8d6eecfc3 100644
--- a/liteflow-rule-plugin/liteflow-rule-zk/src/main/java/com/yomahub/liteflow/parser/zk/ZkXmlELParser.java
+++ b/liteflow-rule-plugin/liteflow-rule-zk/src/main/java/com/yomahub/liteflow/parser/zk/ZkXmlELParser.java
@@ -25,13 +25,6 @@ public class ZkXmlELParser extends ClassXmlFlowELParser {
private final ZkParserHelper zkParserHelper;
public ZkXmlELParser() {
- Consumer parseConsumer = t -> {
- try {
- parse(t);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- };
LiteflowConfig liteflowConfig = LiteflowConfigGetter.get();
try{
@@ -46,14 +39,14 @@ public class ZkXmlELParser extends ClassXmlFlowELParser {
throw new ZkException("rule-source-ext-data is empty");
}
- if (StrUtil.isBlank(zkParserVO.getNodePath())){
- zkParserVO.setNodePath("/lite-flow/flow");
+ if (StrUtil.isBlank(zkParserVO.getChainPath())){
+ throw new ZkException("You must configure the chainPath property");
}
if (StrUtil.isBlank(zkParserVO.getConnectStr())){
throw new ZkException("zk connect string is empty");
}
- zkParserHelper = new ZkParserHelper(zkParserVO, parseConsumer);
+ zkParserHelper = new ZkParserHelper(zkParserVO);
}catch (Exception e){
throw new ZkException(e.getMessage());
}
@@ -64,9 +57,14 @@ public class ZkXmlELParser extends ClassXmlFlowELParser {
try{
String content = zkParserHelper.getContent();
- zkParserHelper.checkContent(content);
-
- zkParserHelper.listenZkNode();
+ Consumer listenerConsumer = s -> {
+ try{
+ parse(s);
+ }catch (Exception e){
+ throw new ZkException(e.getMessage());
+ }
+ };
+ zkParserHelper.listenZkNode(listenerConsumer);
return content;
}catch (Exception e){
diff --git a/liteflow-rule-plugin/liteflow-rule-zk/src/main/java/com/yomahub/liteflow/parser/zk/util/ZkParserHelper.java b/liteflow-rule-plugin/liteflow-rule-zk/src/main/java/com/yomahub/liteflow/parser/zk/util/ZkParserHelper.java
index 7f62697c1..1f6e337b5 100644
--- a/liteflow-rule-plugin/liteflow-rule-zk/src/main/java/com/yomahub/liteflow/parser/zk/util/ZkParserHelper.java
+++ b/liteflow-rule-plugin/liteflow-rule-zk/src/main/java/com/yomahub/liteflow/parser/zk/util/ZkParserHelper.java
@@ -1,20 +1,24 @@
package com.yomahub.liteflow.parser.zk.util;
+import cn.hutool.core.collection.CollUtil;
+import cn.hutool.core.collection.CollectionUtil;
+import cn.hutool.core.util.ReUtil;
import cn.hutool.core.util.StrUtil;
-import com.yomahub.liteflow.exception.ParseException;
+import com.yomahub.liteflow.core.FlowExecutorHolder;
import com.yomahub.liteflow.parser.zk.exception.ZkException;
import com.yomahub.liteflow.parser.zk.vo.ZkParserVO;
+import com.yomahub.liteflow.util.JsonUtil;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.CuratorCache;
import org.apache.curator.framework.recipes.cache.CuratorCacheListener;
-import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.retry.RetryNTimes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
-import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
import java.util.function.Consumer;
public class ZkParserHelper {
@@ -22,13 +26,18 @@ public class ZkParserHelper {
private static final Logger LOG = LoggerFactory.getLogger(ZkParserHelper.class);
private final ZkParserVO zkParserVO;
- private final Consumer parseConsumer;
-
private final CuratorFramework client;
- public ZkParserHelper(ZkParserVO zkParserVO, Consumer parseConsumer) {
+ private final String CHAIN_XML_PATTERN = "{}";
+
+ private final String NODE_XML_PATTERN = "{}";
+
+ private final String NODE_ITEM_XML_PATTERN = "";
+
+ private final String XML_PATTERN = "{}{}";
+
+ public ZkParserHelper(ZkParserVO zkParserVO) {
this.zkParserVO = zkParserVO;
- this.parseConsumer = parseConsumer;
try{
CuratorFramework client = CuratorFrameworkFactory.newClient(
@@ -37,46 +46,157 @@ public class ZkParserHelper {
);
client.start();
- if (client.checkExists().forPath(zkParserVO.getNodePath()) == null) {
- client.create().creatingParentsIfNeeded().forPath(zkParserVO.getNodePath(), "".getBytes());
- }
this.client = client;
}catch (Exception e){
throw new ZkException(e.getMessage());
}
-
}
public String getContent(){
try{
- return new String(client.getData().forPath(zkParserVO.getNodePath()));
+ //检查zk上有没有chainPath节点
+ if (client.checkExists().forPath(zkParserVO.getChainPath()) == null) {
+ throw new ZkException(StrUtil.format("zk node[{}] is not exist", zkParserVO.getChainPath()));
+ }
+
+ //检查chainPath路径下有没有子节点
+ List chainNameList = client.getChildren().forPath(zkParserVO.getChainPath());
+ if (CollectionUtil.isEmpty(chainNameList)){
+ throw new ZkException(StrUtil.format("There are no chains in path [{}]", zkParserVO.getChainPath()));
+ }
+
+ //获取chainPath路径下的所有子节点内容List
+ List chainItemContentList = new ArrayList<>();
+ for (String chainName : chainNameList){
+ String chainData = new String(client.getData().forPath(StrUtil.format("{}/{}", zkParserVO.getChainPath(), chainName)));
+ chainItemContentList.add(StrUtil.format(CHAIN_XML_PATTERN, chainName, chainData));
+ }
+ //合并成所有chain的xml内容
+ String chainAllContent = CollUtil.join(chainItemContentList, StrUtil.EMPTY);
+
+ //检查是否有脚本内容,如果有,进行脚本内容的获取
+ String scriptAllContent = StrUtil.EMPTY;
+ if (hasScript()){
+ List scriptNodeValueList = client.getChildren().forPath(zkParserVO.getScriptPath());
+
+ List scriptItemContentList = new ArrayList<>();
+ for (String scriptNodeValue: scriptNodeValueList){
+ NodeSimpleVO nodeSimpleVO = convert(scriptNodeValue);
+ if (Objects.isNull(nodeSimpleVO)){
+ throw new ZkException(StrUtil.format("The name of the zk node is invalid:{}", scriptNodeValue));
+ }
+ String scriptData = new String(
+ client.getData().forPath(StrUtil.format("{}/{}", zkParserVO.getScriptPath(), scriptNodeValue))
+ );
+
+ scriptItemContentList.add(
+ StrUtil.format(NODE_ITEM_XML_PATTERN,
+ nodeSimpleVO.getNodeId(),
+ nodeSimpleVO.getName(),
+ nodeSimpleVO.getType(),
+ scriptData)
+ );
+ }
+
+ scriptAllContent = StrUtil.format(NODE_XML_PATTERN, CollUtil.join(scriptItemContentList, StrUtil.EMPTY));
+ }
+
+ return StrUtil.format(XML_PATTERN, scriptAllContent, chainAllContent);
}catch (Exception e){
throw new ZkException(e.getMessage());
}
}
- /**
- * 检查 content 是否合法
- */
- public void checkContent(String content) {
- if (StrUtil.isBlank(content)) {
- String error = MessageFormat.format("the node[{0}] value is empty", zkParserVO.getNodePath());
- throw new ParseException(error);
+ public boolean hasScript(){
+ //没有配置scriptPath
+ if (StrUtil.isBlank(zkParserVO.getScriptPath())){
+ return false;
+ }
+
+ try{
+ //配置了,但是不存在这个节点
+ if (client.checkExists().forPath(zkParserVO.getScriptPath()) == null){
+ return false;
+ }
+
+ //存在这个节点,但是子节点不存在
+ List chainNameList = client.getChildren().forPath(zkParserVO.getScriptPath());
+ if (CollUtil.isEmpty(chainNameList)){
+ return false;
+ }
+
+ return true;
+ }catch (Exception e){
+ return false;
}
}
/**
* 监听 zk 节点
*/
- public void listenZkNode() {
- CuratorCache cache = CuratorCache.build(client, zkParserVO.getNodePath());
+ public void listenZkNode(Consumer listenerConsumer) {
+ //监听chain
+ CuratorCache cache1 = CuratorCache.build(client, zkParserVO.getChainPath());
+ cache1.start();
+ cache1.listenable().addListener((type, oldData, data) -> listenerConsumer.accept(getContent()));
- cache.start();
+ //监听script
+ CuratorCache cache2 = CuratorCache.build(client, zkParserVO.getScriptPath());
+ cache2.start();
+ cache2.listenable().addListener((type, oldData, data) -> listenerConsumer.accept(getContent()));
+ }
- cache.listenable().addListener((type, oldData, data) -> {
- String content1 = new String(data.getData());
- LOG.info("stating load flow config....");
- parseConsumer.accept(content1);
- });
+ public NodeSimpleVO convert(String str){
+ //不需要去理解这串正则,就是一个匹配冒号的
+ //一定得是a:b,或是a:b:c...这种完整类型的字符串的
+ List matchItemList = ReUtil.findAllGroup0("(?<=[^:]:)[^:]+|[^:]+(?=:[^:])", str);
+ if (CollUtil.isEmpty(matchItemList)){
+ return null;
+ }
+
+ NodeSimpleVO nodeSimpleVO = new NodeSimpleVO();
+ if (matchItemList.size() > 1){
+ nodeSimpleVO.setNodeId(matchItemList.get(0));
+ nodeSimpleVO.setType(matchItemList.get(1));
+ }
+
+ if (matchItemList.size() > 2){
+ nodeSimpleVO.setName(matchItemList.get(2));
+ }
+
+ return nodeSimpleVO;
+ }
+
+ private static class NodeSimpleVO{
+
+ private String nodeId;
+
+ private String type;
+
+ private String name="";
+
+ public String getNodeId() {
+ return nodeId;
+ }
+
+ public void setNodeId(String nodeId) {
+ this.nodeId = nodeId;
+ }
+
+ public String getType() {
+ return type;
+ }
+
+ public void setType(String type) {
+ this.type = type;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
}
}
diff --git a/liteflow-rule-plugin/liteflow-rule-zk/src/main/java/com/yomahub/liteflow/parser/zk/vo/ZkParserVO.java b/liteflow-rule-plugin/liteflow-rule-zk/src/main/java/com/yomahub/liteflow/parser/zk/vo/ZkParserVO.java
index 315797a8b..5d99ea446 100644
--- a/liteflow-rule-plugin/liteflow-rule-zk/src/main/java/com/yomahub/liteflow/parser/zk/vo/ZkParserVO.java
+++ b/liteflow-rule-plugin/liteflow-rule-zk/src/main/java/com/yomahub/liteflow/parser/zk/vo/ZkParserVO.java
@@ -9,7 +9,9 @@ public class ZkParserVO {
private String connectStr;
- private String nodePath;
+ private String chainPath;
+
+ private String scriptPath;
public String getConnectStr() {
return connectStr;
@@ -19,11 +21,19 @@ public class ZkParserVO {
this.connectStr = connectStr;
}
- public String getNodePath() {
- return nodePath;
+ public String getChainPath() {
+ return chainPath;
}
- public void setNodePath(String nodePath) {
- this.nodePath = nodePath;
+ public void setChainPath(String chainPath) {
+ this.chainPath = chainPath;
+ }
+
+ public String getScriptPath() {
+ return scriptPath;
+ }
+
+ public void setScriptPath(String scriptPath) {
+ this.scriptPath = scriptPath;
}
}