添加zk解析器,更新适配解析器

This commit is contained in:
tonnyguo
2021-03-29 20:03:17 +08:00
parent 73e787eb5b
commit e8f8ed86e8
4 changed files with 243 additions and 53 deletions

View File

@@ -33,8 +33,6 @@ import com.yomahub.liteflow.parser.ZookeeperXmlFlowParser;
import java.text.MessageFormat;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* 流程规则主要执行器类
@@ -50,49 +48,49 @@ public class FlowExecutor {
private static final String LOCAL_JSON_CONFIG_REGEX = "^[\\w_\\-\\@\\/]+\\.json$";
private static final String LOCAL_YML_CONFIG_REGEX = "^[\\w_\\-\\@\\/]+\\.yml$";
private static final String FORMATE_XML_CONFIG_REGEX = "xml:.+";
private static final String FORMATE_JSON_CONFIG_REGEX = "json:.+";
private static final String FORMATE_YML_CONFIG_REGEX = "yml:.+";
private static final String PREFIX_FORMATE_CONFIG_REGEX = "xml:|json:|yml:";
private static final String CLASS_CONFIG_REGEX = "^\\w+(\\.\\w+)*$";
private LiteflowConfig liteflowConfig;
private String zkNode;
//FlowExecutor的初始化化方式主要用于parse规则文件
/**
* FlowExecutor的初始化化方式主要用于parse规则文件
*/
public void init() {
if (ObjectUtil.isNull(liteflowConfig) || StrUtil.isBlank(liteflowConfig.getRuleSource())) {
throw new ConfigErrorException("config error, please check liteflow config property");
}
List<String> rulePath = Lists.newArrayList(liteflowConfig.getRuleSource().split(",|;"));
FlowParser parser = null;
for (String path : rulePath) {
try {
if (isLocalConfig(path)) {
parser = new LocalXmlFlowParser();
} else if (isLocalJsonConfig(path)) {
parser = new LocalJsonFlowParser();
} else if (isLocalYmlConfig(path)) {
parser = new LocalYmlFlowParser();
} else if (isZKConfig(path)) { //判断是否是zk配置
if (StrUtil.isNotBlank(zkNode)) {
parser = new ZookeeperXmlFlowParser(zkNode);
} else {
parser = new ZookeeperXmlFlowParser();
}
} else if (isClassConfig(path)) {
Class c = Class.forName(path);
if(XmlFlowParser.class.isAssignableFrom(c.getClass())) {
parser = (XmlFlowParser) c.newInstance();
} else if(JsonFlowParser.class.isAssignableFrom(c.getClass())) {
parser = (JsonFlowParser) c.newInstance();
} else if(YmlFlowParser.class.isAssignableFrom(c.getClass())) {
parser = (YmlFlowParser) c.newInstance();
} else {
String errorMsg = MessageFormat.format("can't recognize self class-flow-parser: {0}", path);
throw new FlowExecutorNotInitException(errorMsg);
}
}
parser.parseMain(path);
String pattern = matchFormatConfig(path);
path = ReUtil.replaceAll(path, PREFIX_FORMATE_CONFIG_REGEX, "");
switch (pattern) {
case "xml" :
parser = matchFormatParser(path, "xml");
break;
case "json" :
parser = matchFormatParser(path, "json");
break;
case "yml" :
parser = matchFormatParser(path, "yml");
break;
default:
LOG.error("can't surport the format {}", path);
}
if(null != parser) {
parser.parseMain(path);
} else {
throw new ConfigErrorException("parse error, please check liteflow config property");
}
} catch (Exception e) {
String errorMsg = MessageFormat.format("init flow executor cause error,cannot parse rule file{0}", path);
LOG.error(errorMsg, e);
@@ -101,37 +99,94 @@ public class FlowExecutor {
}
}
/**
* 匹配路径配置,生成对应的解析器
* @param path 配置路径
* @param pattern 格式
* @return
*/
private FlowParser matchFormatParser(String path, String pattern) throws ClassNotFoundException, IllegalAccessException, InstantiationException {
boolean isLocalFile = isLocalConfig(path);
if(isLocalFile) {
switch (pattern) {
case "xml":
return new LocalXmlFlowParser();
case "json":
return new LocalJsonFlowParser();
case "yml":
return new LocalYmlFlowParser();
default:
}
} else if(isClassConfig(path)){
Class c = Class.forName(path);
switch (pattern) {
case "xml":
return (XmlFlowParser) c.newInstance();
case "json":
return (JsonFlowParser) c.newInstance();
case "yml":
return (YmlFlowParser) c.newInstance();
default:
}
} else if(isZKConfig(path)) {
switch (pattern) {
case "xml":
return StrUtil.isNotBlank(zkNode) ? new ZookeeperXmlFlowParser(zkNode) : new ZookeeperXmlFlowParser();
case "json":
return StrUtil.isNotBlank(zkNode) ? new ZookeeperJsonFlowParser(zkNode) : new ZookeeperJsonFlowParser();
case "yml":
return StrUtil.isNotBlank(zkNode) ? new ZookeeperYmlFlowParser(zkNode) : new ZookeeperYmlFlowParser();
default:
}
}
return null;
}
/**
* 判定是否为本地文件
* @param path
* @return
*/
private boolean isLocalConfig(String path) {
return ReUtil.isMatch(LOCAL_XML_CONFIG_REGEX, path)
|| ReUtil.isMatch(LOCAL_JSON_CONFIG_REGEX, path)
|| ReUtil.isMatch(LOCAL_YML_CONFIG_REGEX, path) ;
}
/**
* 判定是否为自定义class配置
* @param path
* @return
*/
private boolean isClassConfig(String path) {
return ReUtil.isMatch(CLASS_CONFIG_REGEX, path);
}
/**
* 判定是否为zk配置
* @param path
* @return
*/
private boolean isZKConfig(String path) {
return ReUtil.isMatch(ZK_CONFIG_REGEX, path);
}
private String matchLocalConfig(String path) {
if(ReUtil.isMatch(LOCAL_XML_CONFIG_REGEX, path)) {
/**
* 匹配文本格式支持xmljson和yml
* @param path
* @return
*/
private String matchFormatConfig(String path) {
if(ReUtil.isMatch(LOCAL_XML_CONFIG_REGEX, path) || ReUtil.isMatch(FORMATE_XML_CONFIG_REGEX, path)) {
return "xml";
} else if(ReUtil.isMatch(LOCAL_JSON_CONFIG_REGEX, path)) {
} else if(ReUtil.isMatch(LOCAL_JSON_CONFIG_REGEX, path) || ReUtil.isMatch(FORMATE_JSON_CONFIG_REGEX, path)) {
return "json";
} else if(ReUtil.isMatch(LOCAL_YML_CONFIG_REGEX, path)) {
} else if(ReUtil.isMatch(LOCAL_YML_CONFIG_REGEX, path) || ReUtil.isMatch(FORMATE_YML_CONFIG_REGEX, path)) {
return "yml";
}
return "";
}
private boolean isLocalConfig(String path) {
return ReUtil.isMatch(LOCAL_XML_CONFIG_REGEX, path);
}
private boolean isLocalJsonConfig(String path) {
return ReUtil.isMatch(LOCAL_JSON_CONFIG_REGEX, path);
}
private boolean isLocalYmlConfig(String path) {
return ReUtil.isMatch(LOCAL_YML_CONFIG_REGEX, path);
}
private boolean isClassConfig(String path) {
return ReUtil.isMatch(CLASS_CONFIG_REGEX, path);
}
public void reloadRule() {
init();
}

View File

@@ -0,0 +1,68 @@
package com.yomahub.liteflow.parser;
import cn.hutool.core.util.StrUtil;
import com.yomahub.liteflow.exception.ParseException;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.NodeCacheListener;
import org.apache.curator.retry.RetryNTimes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.text.MessageFormat;
/**
* 基于zk方式的json形式的解析器
* @Author: guodongqing
* @Date: 2021/3/29 7:42 下午
*/
public class ZookeeperJsonFlowParser extends JsonFlowParser{
private static final Logger LOG = LoggerFactory.getLogger(ZookeeperJsonFlowParser.class);
private String nodePath = "/lite-flow/flow";
public ZookeeperJsonFlowParser() {
}
public ZookeeperJsonFlowParser(String node) {
nodePath = node;
}
@Override
public void parseMain(String path) throws Exception {
CuratorFramework client = CuratorFrameworkFactory.newClient(
path,
new RetryNTimes(10, 5000)
);
client.start();
if (client.checkExists().forPath(nodePath) == null) {
client.create().creatingParentsIfNeeded().forPath(nodePath, "".getBytes());
}
String content = new String(client.getData().forPath(nodePath));
if (StrUtil.isBlank(content)) {
String error = MessageFormat.format("the node[{0}] value is empty", nodePath);
throw new ParseException(error);
}
parse(content);
final NodeCache cache = new NodeCache(client,nodePath);
cache.start();
cache.getListenable().addListener(new NodeCacheListener() {
@Override
public void nodeChanged() throws Exception {
String content = new String(cache.getCurrentData().getData());
LOG.info("stating load flow config....");
parse(content);
}
});
}
}

View File

@@ -0,0 +1,68 @@
package com.yomahub.liteflow.parser;
import cn.hutool.core.util.StrUtil;
import com.yomahub.liteflow.exception.ParseException;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.NodeCacheListener;
import org.apache.curator.retry.RetryNTimes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.text.MessageFormat;
/**
* 基于zk方式的yml形式的解析器
* @Author: guodongqing
* @Date: 2021/3/29 7:42 下午
*/
public class ZookeeperYmlFlowParser extends YmlFlowParser{
private static final Logger LOG = LoggerFactory.getLogger(ZookeeperYmlFlowParser.class);
private String nodePath = "/lite-flow/flow";
public ZookeeperYmlFlowParser() {
}
public ZookeeperYmlFlowParser(String node) {
nodePath = node;
}
@Override
public void parseMain(String path) throws Exception {
CuratorFramework client = CuratorFrameworkFactory.newClient(
path,
new RetryNTimes(10, 5000)
);
client.start();
if (client.checkExists().forPath(nodePath) == null) {
client.create().creatingParentsIfNeeded().forPath(nodePath, "".getBytes());
}
String content = new String(client.getData().forPath(nodePath));
if (StrUtil.isBlank(content)) {
String error = MessageFormat.format("the node[{0}] value is empty", nodePath);
throw new ParseException(error);
}
parse(content);
final NodeCache cache = new NodeCache(client,nodePath);
cache.start();
cache.getListenable().addListener(new NodeCacheListener() {
@Override
public void nodeChanged() throws Exception {
String content = new String(cache.getCurrentData().getData());
LOG.info("stating load flow config....");
parse(content);
}
});
}
}

View File

@@ -1,7 +1,6 @@
#liteflow.rule-source=config/flow.xml
#liteflow.rule-source=config/flow.yml
#liteflow.rule-source=config/flow.json
liteflow.rule-source=config/flow.jsonxml#com.yomahub.liteflow.parser.ClassXmlFlowParser|json#127.0.0.1
liteflow.rule-source=config/flow.json
#liteflow.slot-size=2048
liteflow.when-max-wait-seconds=20
liteflow.monitor.enable-log=true