From e8f8ed86e842297450dbe08cab5b80d94b6cd9cd Mon Sep 17 00:00:00 2001 From: tonnyguo Date: Mon, 29 Mar 2021 20:03:17 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B7=BB=E5=8A=A0zk=E8=A7=A3=E6=9E=90=E5=99=A8?= =?UTF-8?q?=EF=BC=8C=E6=9B=B4=E6=96=B0=E9=80=82=E9=85=8D=E8=A7=A3=E6=9E=90?= =?UTF-8?q?=E5=99=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../yomahub/liteflow/core/FlowExecutor.java | 157 ++++++++++++------ .../parser/ZookeeperJsonFlowParser.java | 68 ++++++++ .../parser/ZookeeperYmlFlowParser.java | 68 ++++++++ .../src/main/resources/application.properties | 3 +- 4 files changed, 243 insertions(+), 53 deletions(-) create mode 100644 liteflow-core/src/main/java/com/yomahub/liteflow/parser/ZookeeperJsonFlowParser.java create mode 100644 liteflow-core/src/main/java/com/yomahub/liteflow/parser/ZookeeperYmlFlowParser.java diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/core/FlowExecutor.java b/liteflow-core/src/main/java/com/yomahub/liteflow/core/FlowExecutor.java index 9949db876..6818e5b78 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/core/FlowExecutor.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/core/FlowExecutor.java @@ -33,8 +33,6 @@ import com.yomahub.liteflow.parser.ZookeeperXmlFlowParser; import java.text.MessageFormat; import java.util.List; -import java.util.regex.Matcher; -import java.util.regex.Pattern; /** * 流程规则主要执行器类 @@ -50,49 +48,49 @@ public class FlowExecutor { private static final String LOCAL_JSON_CONFIG_REGEX = "^[\\w_\\-\\@\\/]+\\.json$"; private static final String LOCAL_YML_CONFIG_REGEX = "^[\\w_\\-\\@\\/]+\\.yml$"; + private static final String FORMATE_XML_CONFIG_REGEX = "xml:.+"; + private static final String FORMATE_JSON_CONFIG_REGEX = "json:.+"; + private static final String FORMATE_YML_CONFIG_REGEX = "yml:.+"; + private static final String PREFIX_FORMATE_CONFIG_REGEX = "xml:|json:|yml:"; + private static final String CLASS_CONFIG_REGEX = "^\\w+(\\.\\w+)*$"; private LiteflowConfig liteflowConfig; private String zkNode; - //FlowExecutor的初始化化方式,主要用于parse规则文件 + /** + * FlowExecutor的初始化化方式,主要用于parse规则文件 + */ public void init() { if (ObjectUtil.isNull(liteflowConfig) || StrUtil.isBlank(liteflowConfig.getRuleSource())) { throw new ConfigErrorException("config error, please check liteflow config property"); } - List rulePath = Lists.newArrayList(liteflowConfig.getRuleSource().split(",|;")); FlowParser parser = null; for (String path : rulePath) { try { - if (isLocalConfig(path)) { - parser = new LocalXmlFlowParser(); - } else if (isLocalJsonConfig(path)) { - parser = new LocalJsonFlowParser(); - } else if (isLocalYmlConfig(path)) { - parser = new LocalYmlFlowParser(); - } else if (isZKConfig(path)) { //判断是否是zk配置 - if (StrUtil.isNotBlank(zkNode)) { - parser = new ZookeeperXmlFlowParser(zkNode); - } else { - parser = new ZookeeperXmlFlowParser(); - } - } else if (isClassConfig(path)) { - Class c = Class.forName(path); - if(XmlFlowParser.class.isAssignableFrom(c.getClass())) { - parser = (XmlFlowParser) c.newInstance(); - } else if(JsonFlowParser.class.isAssignableFrom(c.getClass())) { - parser = (JsonFlowParser) c.newInstance(); - } else if(YmlFlowParser.class.isAssignableFrom(c.getClass())) { - parser = (YmlFlowParser) c.newInstance(); - } else { - String errorMsg = MessageFormat.format("can't recognize self class-flow-parser: {0}", path); - throw new FlowExecutorNotInitException(errorMsg); - } - } - parser.parseMain(path); + String pattern = matchFormatConfig(path); + path = ReUtil.replaceAll(path, PREFIX_FORMATE_CONFIG_REGEX, ""); + switch (pattern) { + case "xml" : + parser = matchFormatParser(path, "xml"); + break; + case "json" : + parser = matchFormatParser(path, "json"); + break; + case "yml" : + parser = matchFormatParser(path, "yml"); + break; + default: + LOG.error("can't surport the format {}", path); + } + if(null != parser) { + parser.parseMain(path); + } else { + throw new ConfigErrorException("parse error, please check liteflow config property"); + } } catch (Exception e) { String errorMsg = MessageFormat.format("init flow executor cause error,cannot parse rule file{0}", path); LOG.error(errorMsg, e); @@ -101,37 +99,94 @@ public class FlowExecutor { } } + /** + * 匹配路径配置,生成对应的解析器 + * @param path 配置路径 + * @param pattern 格式 + * @return + */ + private FlowParser matchFormatParser(String path, String pattern) throws ClassNotFoundException, IllegalAccessException, InstantiationException { + boolean isLocalFile = isLocalConfig(path); + if(isLocalFile) { + switch (pattern) { + case "xml": + return new LocalXmlFlowParser(); + case "json": + return new LocalJsonFlowParser(); + case "yml": + return new LocalYmlFlowParser(); + default: + } + } else if(isClassConfig(path)){ + Class c = Class.forName(path); + switch (pattern) { + case "xml": + return (XmlFlowParser) c.newInstance(); + case "json": + return (JsonFlowParser) c.newInstance(); + case "yml": + return (YmlFlowParser) c.newInstance(); + default: + } + } else if(isZKConfig(path)) { + switch (pattern) { + case "xml": + return StrUtil.isNotBlank(zkNode) ? new ZookeeperXmlFlowParser(zkNode) : new ZookeeperXmlFlowParser(); + case "json": + return StrUtil.isNotBlank(zkNode) ? new ZookeeperJsonFlowParser(zkNode) : new ZookeeperJsonFlowParser(); + case "yml": + return StrUtil.isNotBlank(zkNode) ? new ZookeeperYmlFlowParser(zkNode) : new ZookeeperYmlFlowParser(); + default: + } + } + return null; + } + + /** + * 判定是否为本地文件 + * @param path + * @return + */ + private boolean isLocalConfig(String path) { + return ReUtil.isMatch(LOCAL_XML_CONFIG_REGEX, path) + || ReUtil.isMatch(LOCAL_JSON_CONFIG_REGEX, path) + || ReUtil.isMatch(LOCAL_YML_CONFIG_REGEX, path) ; + } + + /** + * 判定是否为自定义class配置 + * @param path + * @return + */ + private boolean isClassConfig(String path) { + return ReUtil.isMatch(CLASS_CONFIG_REGEX, path); + } + + /** + * 判定是否为zk配置 + * @param path + * @return + */ private boolean isZKConfig(String path) { return ReUtil.isMatch(ZK_CONFIG_REGEX, path); } - private String matchLocalConfig(String path) { - if(ReUtil.isMatch(LOCAL_XML_CONFIG_REGEX, path)) { + /** + * 匹配文本格式,支持xml,json和yml + * @param path + * @return + */ + private String matchFormatConfig(String path) { + if(ReUtil.isMatch(LOCAL_XML_CONFIG_REGEX, path) || ReUtil.isMatch(FORMATE_XML_CONFIG_REGEX, path)) { return "xml"; - } else if(ReUtil.isMatch(LOCAL_JSON_CONFIG_REGEX, path)) { + } else if(ReUtil.isMatch(LOCAL_JSON_CONFIG_REGEX, path) || ReUtil.isMatch(FORMATE_JSON_CONFIG_REGEX, path)) { return "json"; - } else if(ReUtil.isMatch(LOCAL_YML_CONFIG_REGEX, path)) { - + } else if(ReUtil.isMatch(LOCAL_YML_CONFIG_REGEX, path) || ReUtil.isMatch(FORMATE_YML_CONFIG_REGEX, path)) { + return "yml"; } return ""; } - private boolean isLocalConfig(String path) { - return ReUtil.isMatch(LOCAL_XML_CONFIG_REGEX, path); - } - - private boolean isLocalJsonConfig(String path) { - return ReUtil.isMatch(LOCAL_JSON_CONFIG_REGEX, path); - } - - private boolean isLocalYmlConfig(String path) { - return ReUtil.isMatch(LOCAL_YML_CONFIG_REGEX, path); - } - - private boolean isClassConfig(String path) { - return ReUtil.isMatch(CLASS_CONFIG_REGEX, path); - } - public void reloadRule() { init(); } diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/parser/ZookeeperJsonFlowParser.java b/liteflow-core/src/main/java/com/yomahub/liteflow/parser/ZookeeperJsonFlowParser.java new file mode 100644 index 000000000..241a82aa7 --- /dev/null +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/parser/ZookeeperJsonFlowParser.java @@ -0,0 +1,68 @@ +package com.yomahub.liteflow.parser; + +import cn.hutool.core.util.StrUtil; +import com.yomahub.liteflow.exception.ParseException; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.recipes.cache.NodeCache; +import org.apache.curator.framework.recipes.cache.NodeCacheListener; +import org.apache.curator.retry.RetryNTimes; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.text.MessageFormat; + +/** + * 基于zk方式的json形式的解析器 + * @Author: guodongqing + * @Date: 2021/3/29 7:42 下午 + */ +public class ZookeeperJsonFlowParser extends JsonFlowParser{ + + private static final Logger LOG = LoggerFactory.getLogger(ZookeeperJsonFlowParser.class); + + private String nodePath = "/lite-flow/flow"; + + public ZookeeperJsonFlowParser() { + + } + + public ZookeeperJsonFlowParser(String node) { + nodePath = node; + } + + @Override + public void parseMain(String path) throws Exception { + CuratorFramework client = CuratorFrameworkFactory.newClient( + path, + new RetryNTimes(10, 5000) + ); + client.start(); + + if (client.checkExists().forPath(nodePath) == null) { + client.create().creatingParentsIfNeeded().forPath(nodePath, "".getBytes()); + } + + String content = new String(client.getData().forPath(nodePath)); + + + if (StrUtil.isBlank(content)) { + String error = MessageFormat.format("the node[{0}] value is empty", nodePath); + throw new ParseException(error); + } + parse(content); + + + final NodeCache cache = new NodeCache(client,nodePath); + cache.start(); + + cache.getListenable().addListener(new NodeCacheListener() { + @Override + public void nodeChanged() throws Exception { + String content = new String(cache.getCurrentData().getData()); + LOG.info("stating load flow config...."); + parse(content); + } + }); + } +} diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/parser/ZookeeperYmlFlowParser.java b/liteflow-core/src/main/java/com/yomahub/liteflow/parser/ZookeeperYmlFlowParser.java new file mode 100644 index 000000000..113e77806 --- /dev/null +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/parser/ZookeeperYmlFlowParser.java @@ -0,0 +1,68 @@ +package com.yomahub.liteflow.parser; + +import cn.hutool.core.util.StrUtil; +import com.yomahub.liteflow.exception.ParseException; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.recipes.cache.NodeCache; +import org.apache.curator.framework.recipes.cache.NodeCacheListener; +import org.apache.curator.retry.RetryNTimes; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.text.MessageFormat; + +/** + * 基于zk方式的yml形式的解析器 + * @Author: guodongqing + * @Date: 2021/3/29 7:42 下午 + */ +public class ZookeeperYmlFlowParser extends YmlFlowParser{ + + private static final Logger LOG = LoggerFactory.getLogger(ZookeeperYmlFlowParser.class); + + private String nodePath = "/lite-flow/flow"; + + public ZookeeperYmlFlowParser() { + + } + + public ZookeeperYmlFlowParser(String node) { + nodePath = node; + } + + @Override + public void parseMain(String path) throws Exception { + CuratorFramework client = CuratorFrameworkFactory.newClient( + path, + new RetryNTimes(10, 5000) + ); + client.start(); + + if (client.checkExists().forPath(nodePath) == null) { + client.create().creatingParentsIfNeeded().forPath(nodePath, "".getBytes()); + } + + String content = new String(client.getData().forPath(nodePath)); + + + if (StrUtil.isBlank(content)) { + String error = MessageFormat.format("the node[{0}] value is empty", nodePath); + throw new ParseException(error); + } + parse(content); + + + final NodeCache cache = new NodeCache(client,nodePath); + cache.start(); + + cache.getListenable().addListener(new NodeCacheListener() { + @Override + public void nodeChanged() throws Exception { + String content = new String(cache.getCurrentData().getData()); + LOG.info("stating load flow config...."); + parse(content); + } + }); + } +} diff --git a/liteflow-test-springboot/src/main/resources/application.properties b/liteflow-test-springboot/src/main/resources/application.properties index 6834ed727..935a66a81 100644 --- a/liteflow-test-springboot/src/main/resources/application.properties +++ b/liteflow-test-springboot/src/main/resources/application.properties @@ -1,7 +1,6 @@ #liteflow.rule-source=config/flow.xml #liteflow.rule-source=config/flow.yml -#liteflow.rule-source=config/flow.json -liteflow.rule-source=config/flow.json|xml#com.yomahub.liteflow.parser.ClassXmlFlowParser|json#127.0.0.1 +liteflow.rule-source=config/flow.json #liteflow.slot-size=2048 liteflow.when-max-wait-seconds=20 liteflow.monitor.enable-log=true