enhancement #I5E17C 提取公共方法减少重复代码

enhancement #I5E17C 提取公共方法减少重复代码

enhancement #I5E17C 提取公共方法减少重复代码

enhancement #I5E17C 提取公共方法减少重复代码

enhancement #I5E17C 提取公共方法减少重复代码

enhancement #I5E17C 提取公共方法减少重复代码
This commit is contained in:
tangkc
2022-06-26 14:15:06 +08:00
parent d14308fd6a
commit bef8628739
23 changed files with 912 additions and 1031 deletions

View File

@@ -21,6 +21,7 @@ import com.yomahub.liteflow.flow.LiteflowResponse;
import com.yomahub.liteflow.flow.element.Chain;
import com.yomahub.liteflow.flow.element.Node;
import com.yomahub.liteflow.parser.*;
import com.yomahub.liteflow.parser.base.FlowParser;
import com.yomahub.liteflow.parser.el.*;
import com.yomahub.liteflow.property.LiteflowConfig;
import com.yomahub.liteflow.property.LiteflowConfigGetter;
@@ -381,7 +382,7 @@ public class FlowExecutor {
}
private LiteflowResponse execute2Resp(String chainId, Object param, Class<?>[] contextBeanClazzArray,
Integer slotIndex, boolean isInnerChain) {
Integer slotIndex, boolean isInnerChain) {
LiteflowResponse response = new LiteflowResponse();
Slot slot = doExecute(chainId, param, contextBeanClazzArray, slotIndex, isInnerChain);
@@ -398,7 +399,7 @@ public class FlowExecutor {
}
private Slot doExecute(String chainId, Object param, Class<?>[] contextBeanClazzArray, Integer slotIndex,
boolean isInnerChain) {
boolean isInnerChain) {
if (FlowBus.needInit()) {
init();
}

View File

@@ -1,135 +0,0 @@
package com.yomahub.liteflow.parser;
import cn.hutool.core.annotation.AnnotationUtil;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.ReflectUtil;
import cn.hutool.core.util.StrUtil;
import com.yomahub.liteflow.annotation.LiteflowCmpDefine;
import com.yomahub.liteflow.annotation.LiteflowSwitchCmpDefine;
import com.yomahub.liteflow.builder.LiteFlowChainBuilder;
import com.yomahub.liteflow.builder.LiteFlowConditionBuilder;
import com.yomahub.liteflow.builder.LiteFlowNodeBuilder;
import com.yomahub.liteflow.builder.prop.ChainPropBean;
import com.yomahub.liteflow.builder.prop.NodePropBean;
import com.yomahub.liteflow.core.NodeComponent;
import com.yomahub.liteflow.core.NodeSwitchComponent;
import com.yomahub.liteflow.enums.ConditionTypeEnum;
import com.yomahub.liteflow.enums.NodeTypeEnum;
import com.yomahub.liteflow.exception.*;
/**
* 基类,用于存放通用方法
*
* @author tangkc
*/
public abstract class BaseFlowParser implements FlowParser {
/**
* 构建 node
*
* @param nodePropBean 构建 node 的中间属性
*/
public void buildNode(NodePropBean nodePropBean) {
String id = nodePropBean.getId();
String name = nodePropBean.getName();
String clazz = nodePropBean.getClazz();
String script = nodePropBean.getScript();
String type = nodePropBean.getType();
String file = nodePropBean.getFile();
//先尝试自动推断类型
if (StrUtil.isNotBlank(clazz)) {
try{
//先尝试从继承的类型中推断
Class<?> c = Class.forName(clazz);
Object o = ReflectUtil.newInstanceIfPossible(c);
if (o instanceof NodeSwitchComponent){
type = NodeTypeEnum.SWITCH.getCode();
}else if(o instanceof NodeComponent){
type = NodeTypeEnum.COMMON.getCode();
}
//再尝试声明式组件这部分的推断
if (type == null){
LiteflowCmpDefine liteflowCmpDefine = AnnotationUtil.getAnnotation(c, LiteflowCmpDefine.class);
if (liteflowCmpDefine != null){
type = NodeTypeEnum.COMMON.getCode();
}
}
if (type == null){
LiteflowSwitchCmpDefine liteflowSwitchCmpDefine = AnnotationUtil.getAnnotation(c, LiteflowSwitchCmpDefine.class);
if (liteflowSwitchCmpDefine != null){
type = NodeTypeEnum.SWITCH.getCode();
}
}
}catch (Exception e){
throw new NodeClassNotFoundException(StrUtil.format("cannot find the node[{}]", clazz));
}
}
//因为脚本节点是必须设置type的所以到这里type就全都有了所以进行二次检查
if (StrUtil.isBlank(type)){
throw new NodeTypeCanNotGuessException(StrUtil.format("cannot guess the type of node[{}]", clazz));
}
//检查nodeType是不是规定的类型
NodeTypeEnum nodeTypeEnum = NodeTypeEnum.getEnumByCode(type);
if (ObjectUtil.isNull(nodeTypeEnum)) {
throw new NodeTypeNotSupportException(StrUtil.format("type [{}] is not support", type));
}
//进行node的build过程
LiteFlowNodeBuilder.createNode()
.setId(id)
.setName(name)
.setClazz(clazz)
.setType(nodeTypeEnum)
.setScript(script)
.setFile(file)
.build();
}
/**
* 构建 chain
*
* @param chainPropBean 构建 chain 的中间属性
* @param chainBuilder chainBuilder
*/
public void buildChain(ChainPropBean chainPropBean, LiteFlowChainBuilder chainBuilder) {
String condValueStr = chainPropBean.getCondValueStr();
String group = chainPropBean.getGroup();
String errorResume = chainPropBean.getErrorResume();
String any = chainPropBean.getAny();
String threadExecutorClass = chainPropBean.getThreadExecutorClass();
ConditionTypeEnum conditionType = chainPropBean.getConditionType();
if (ObjectUtil.isNull(conditionType)) {
throw new NotSupportConditionException("ConditionType is not supported");
}
if (StrUtil.isBlank(condValueStr)) {
throw new EmptyConditionValueException("Condition value cannot be empty");
}
//如果是when类型的话有特殊化参数要设置只针对于when的
if (conditionType.equals(ConditionTypeEnum.TYPE_WHEN)) {
chainBuilder.setCondition(
LiteFlowConditionBuilder.createWhenCondition()
.setErrorResume(errorResume)
.setGroup(group)
.setAny(any)
.setThreadExecutorClass(threadExecutorClass)
.setValue(condValueStr)
.build()
).build();
} else {
chainBuilder.setCondition(
LiteFlowConditionBuilder.createCondition(conditionType)
.setValue(condValueStr)
.build()
).build();
}
}
}

View File

@@ -1,39 +1,20 @@
package com.yomahub.liteflow.parser;
import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.collection.ListUtil;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.parser.Feature;
import com.yomahub.liteflow.builder.LiteFlowChainBuilder;
import com.yomahub.liteflow.builder.prop.ChainPropBean;
import com.yomahub.liteflow.builder.prop.NodePropBean;
import com.yomahub.liteflow.enums.ConditionTypeEnum;
import com.yomahub.liteflow.exception.ChainDuplicateException;
import com.yomahub.liteflow.flow.FlowBus;
import com.yomahub.liteflow.spi.holder.ContextCmpInitHolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import com.yomahub.liteflow.parser.base.BaseJsonFlowParser;
import com.yomahub.liteflow.parser.helper.ParserHelper;
import static com.yomahub.liteflow.common.ChainConstant.ANY;
import static com.yomahub.liteflow.common.ChainConstant.CHAIN;
import static com.yomahub.liteflow.common.ChainConstant.CONDITION;
import static com.yomahub.liteflow.common.ChainConstant.ERROR_RESUME;
import static com.yomahub.liteflow.common.ChainConstant.FILE;
import static com.yomahub.liteflow.common.ChainConstant.FLOW;
import static com.yomahub.liteflow.common.ChainConstant.GROUP;
import static com.yomahub.liteflow.common.ChainConstant.ID;
import static com.yomahub.liteflow.common.ChainConstant.NAME;
import static com.yomahub.liteflow.common.ChainConstant.NODE;
import static com.yomahub.liteflow.common.ChainConstant.NODES;
import static com.yomahub.liteflow.common.ChainConstant.THREAD_EXECUTOR_CLASS;
import static com.yomahub.liteflow.common.ChainConstant.TYPE;
import static com.yomahub.liteflow.common.ChainConstant.VALUE;
import static com.yomahub.liteflow.common.ChainConstant._CLASS;
/**
* Json格式解析器
@@ -41,129 +22,13 @@ import static com.yomahub.liteflow.common.ChainConstant._CLASS;
* @author guodongqing
* @since 2.5.0
*/
public abstract class JsonFlowParser extends BaseFlowParser {
private final Logger LOG = LoggerFactory.getLogger(JsonFlowParser.class);
private final Set<String> CHAIN_NAME_SET = new CopyOnWriteArraySet<>();
public void parse(String content) throws Exception {
parse(ListUtil.toList(content));
}
@Override
public void parse(List<String> contentList) throws Exception {
if (CollectionUtil.isEmpty(contentList)) {
return;
}
List<JSONObject> jsonObjectList = ListUtil.toList();
for (String content : contentList) {
//把字符串原生转换为json对象如果不加第二个参数OrderedField会无序
JSONObject flowJsonObject = JSONObject.parseObject(content, Feature.OrderedField);
jsonObjectList.add(flowJsonObject);
}
parseJsonObject(jsonObjectList);
}
//json格式解析过程
public void parseJsonObject(List<JSONObject> flowJsonObjectList) throws Exception {
//先在元数据里放上chain
//先放有一个好处可以在parse的时候先映射到FlowBus的chainMap然后再去解析
//这样就不用去像之前的版本那样回归调用
//同时也解决了不能循环依赖的问题
flowJsonObjectList.forEach(jsonObject -> {
// 解析chain节点
JSONArray chainArray = jsonObject.getJSONObject(FLOW).getJSONArray(CHAIN);
//先在元数据里放上chain
chainArray.forEach(o -> {
JSONObject innerJsonObject = (JSONObject) o;
//校验加载的 chainName 是否有重复的
//TODO 这里是否有个问题当混合格式加载的时候2个同名的Chain在不同的文件里就不行了
String chainName = innerJsonObject.getString(NAME);
if (!CHAIN_NAME_SET.add(chainName)) {
throw new ChainDuplicateException(String.format("[chain name duplicate] chainName=%s", chainName));
}
FlowBus.addChain(innerJsonObject.getString(NAME));
});
});
// 清空
CHAIN_NAME_SET.clear();
for (JSONObject flowJsonObject : flowJsonObjectList) {
// 当存在<nodes>节点定义时解析node节点
if (flowJsonObject.getJSONObject(FLOW).containsKey(NODES)) {
JSONArray nodeArrayList = flowJsonObject.getJSONObject(FLOW).getJSONObject(NODES).getJSONArray(NODE);
String id, name, clazz, script, type, file;
for (int i = 0; i < nodeArrayList.size(); i++) {
JSONObject nodeObject = nodeArrayList.getJSONObject(i);
id = nodeObject.getString(ID);
name = nodeObject.getString(NAME);
clazz = nodeObject.getString(_CLASS);
type = nodeObject.getString(TYPE);
script = nodeObject.getString(VALUE);
file = nodeObject.getString(FILE);
// 构建 node
NodePropBean nodePropBean = new NodePropBean()
.setId(id)
.setName(name)
.setClazz(clazz)
.setScript(script)
.setType(type)
.setFile(file);
buildNode(nodePropBean);
}
}
//解析每一个chain
JSONArray chainArray = flowJsonObject.getJSONObject(FLOW).getJSONArray(CHAIN);
chainArray.forEach(o -> {
JSONObject jsonObject = (JSONObject) o;
parseOneChain(jsonObject);
});
}
}
public abstract class JsonFlowParser extends BaseJsonFlowParser {
/**
* 解析一个chain的过程
*/
private void parseOneChain(JSONObject chainObject) {
String condValueStr;
ConditionTypeEnum conditionType;
String group;
String errorResume;
String any;
String threadExecutorClass;
//构建chainBuilder
String chainName = chainObject.getString(NAME);
LiteFlowChainBuilder chainBuilder = LiteFlowChainBuilder.createChain().setChainName(chainName);
for (Object o : chainObject.getJSONArray(CONDITION)) {
JSONObject condObject = (JSONObject) o;
conditionType = ConditionTypeEnum.getEnumByCode(condObject.getString(TYPE));
condValueStr = condObject.getString(VALUE);
errorResume = condObject.getString(ERROR_RESUME);
group = condObject.getString(GROUP);
any = condObject.getString(ANY);
threadExecutorClass = condObject.getString(THREAD_EXECUTOR_CLASS);
ChainPropBean chainPropBean = new ChainPropBean()
.setCondValueStr(condValueStr)
.setGroup(group)
.setErrorResume(errorResume)
.setAny(any)
.setThreadExecutorClass(threadExecutorClass)
.setConditionType(conditionType);
// 构建 chain
buildChain(chainPropBean, chainBuilder);
}
public void parseOneChain(JSONObject chainObject) {
ParserHelper.parseOneChain(chainObject);
}
}

View File

@@ -1,37 +1,8 @@
package com.yomahub.liteflow.parser;
import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.collection.ListUtil;
import cn.hutool.core.util.ObjectUtil;
import com.yomahub.liteflow.builder.LiteFlowChainBuilder;
import com.yomahub.liteflow.builder.prop.ChainPropBean;
import com.yomahub.liteflow.builder.prop.NodePropBean;
import com.yomahub.liteflow.enums.ConditionTypeEnum;
import com.yomahub.liteflow.exception.ChainDuplicateException;
import com.yomahub.liteflow.flow.FlowBus;
import com.yomahub.liteflow.spi.holder.ContextCmpInitHolder;
import org.dom4j.Document;
import org.dom4j.DocumentHelper;
import com.yomahub.liteflow.parser.base.BaseXmlFlowParser;
import com.yomahub.liteflow.parser.helper.ParserHelper;
import org.dom4j.Element;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import static com.yomahub.liteflow.common.ChainConstant.ANY;
import static com.yomahub.liteflow.common.ChainConstant.CHAIN;
import static com.yomahub.liteflow.common.ChainConstant.ERROR_RESUME;
import static com.yomahub.liteflow.common.ChainConstant.FILE;
import static com.yomahub.liteflow.common.ChainConstant.GROUP;
import static com.yomahub.liteflow.common.ChainConstant.ID;
import static com.yomahub.liteflow.common.ChainConstant.NAME;
import static com.yomahub.liteflow.common.ChainConstant.NODE;
import static com.yomahub.liteflow.common.ChainConstant.NODES;
import static com.yomahub.liteflow.common.ChainConstant.THREAD_EXECUTOR_CLASS;
import static com.yomahub.liteflow.common.ChainConstant.TYPE;
import static com.yomahub.liteflow.common.ChainConstant.VALUE;
import static com.yomahub.liteflow.common.ChainConstant._CLASS;
;
@@ -40,123 +11,13 @@ import static com.yomahub.liteflow.common.ChainConstant._CLASS;
*
* @author Bryan.Zhang
*/
public abstract class XmlFlowParser extends BaseFlowParser {
public abstract class XmlFlowParser extends BaseXmlFlowParser {
private final Logger LOG = LoggerFactory.getLogger(XmlFlowParser.class);
private final Set<String> CHAIN_NAME_SET = new HashSet<>();
public void parse(String content) throws Exception {
parse(ListUtil.toList(content));
}
@Override
public void parse(List<String> contentList) throws Exception {
if (CollectionUtil.isEmpty(contentList)) {
return;
}
List<Document> documentList = ListUtil.toList();
for (String content : contentList) {
Document document = DocumentHelper.parseText(content);
documentList.add(document);
}
parseDocument(documentList);
}
//xml形式的主要解析过程
public void parseDocument(List<Document> documentList) throws Exception {
//先在元数据里放上chain
//先放有一个好处可以在parse的时候先映射到FlowBus的chainMap然后再去解析
//这样就不用去像之前的版本那样回归调用
//同时也解决了不能循环依赖的问题
documentList.forEach(document -> {
// 解析chain节点
List<Element> chainList = document.getRootElement().elements(CHAIN);
//先在元数据里放上chain
chainList.forEach(e -> {
//校验加载的 chainName 是否有重复的
//TODO 这里是否有个问题当混合格式加载的时候2个同名的Chain在不同的文件里就不行了
String chainName = e.attributeValue(NAME);
if (!CHAIN_NAME_SET.add(chainName)) {
throw new ChainDuplicateException(String.format("[chain name duplicate] chainName=%s", chainName));
}
FlowBus.addChain(chainName);
});
});
// 清空
CHAIN_NAME_SET.clear();
for (Document document : documentList) {
Element rootElement = document.getRootElement();
Element nodesElement = rootElement.element(NODES);
// 当存在<nodes>节点定义时解析node节点
if (ObjectUtil.isNotNull(nodesElement)) {
List<Element> nodeList = nodesElement.elements(NODE);
String id, name, clazz, type, script, file;
for (Element e : nodeList) {
id = e.attributeValue(ID);
name = e.attributeValue(NAME);
clazz = e.attributeValue(_CLASS);
type = e.attributeValue(TYPE);
script = e.getTextTrim();
file = e.attributeValue(FILE);
// 构建 node
NodePropBean nodePropBean = new NodePropBean()
.setId(id)
.setName(name)
.setClazz(clazz)
.setScript(script)
.setType(type)
.setFile(file);
buildNode(nodePropBean);
}
}
//解析每一个chain
List<Element> chainList = rootElement.elements(CHAIN);
chainList.forEach(this::parseOneChain);
}
}
/**
* 解析一个chain的过程
*/
private void parseOneChain(Element e) {
String condValueStr;
String group;
String errorResume;
String any;
String threadExecutorClass;
ConditionTypeEnum conditionType;
//构建chainBuilder
String chainName = e.attributeValue(NAME);
LiteFlowChainBuilder chainBuilder = LiteFlowChainBuilder.createChain().setChainName(chainName);
for (Iterator<Element> it = e.elementIterator(); it.hasNext(); ) {
Element condE = it.next();
conditionType = ConditionTypeEnum.getEnumByCode(condE.getName());
condValueStr = condE.attributeValue(VALUE);
errorResume = condE.attributeValue(ERROR_RESUME);
group = condE.attributeValue(GROUP);
any = condE.attributeValue(ANY);
threadExecutorClass = condE.attributeValue(THREAD_EXECUTOR_CLASS);
ChainPropBean chainPropBean = new ChainPropBean()
.setCondValueStr(condValueStr)
.setGroup(group)
.setErrorResume(errorResume)
.setAny(any)
.setThreadExecutorClass(threadExecutorClass)
.setConditionType(conditionType);
// 构建 chain
buildChain(chainPropBean, chainBuilder);
}
}
/**
* 解析一个chain的过程
*/
public void parseOneChain(Element e) {
ParserHelper.parseOneChain(e);
}
}

View File

@@ -1,48 +1,21 @@
package com.yomahub.liteflow.parser;
import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.collection.ListUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.yaml.snakeyaml.Yaml;
import java.util.List;
import java.util.Map;
import com.yomahub.liteflow.parser.base.BaseYmlFlowParser;
import com.yomahub.liteflow.parser.helper.ParserHelper;
/**
* Yml格式解析器转换为json格式进行解析
* @author guodongqing
* @since 2.5.0
*/
public abstract class YmlFlowParser extends JsonFlowParser{
public abstract class YmlFlowParser extends BaseYmlFlowParser {
private final Logger LOG = LoggerFactory.getLogger(YmlFlowParser.class);
@Override
public void parse(String content) throws Exception{
parse(ListUtil.toList(content));
/**
* 解析一个chain的过程
*/
public void parseOneChain(JSONObject chainObject) {
ParserHelper.parseOneChain(chainObject);
}
@Override
public void parse(List<String> contentList) throws Exception {
if (CollectionUtil.isEmpty(contentList)) {
return;
}
List<JSONObject> jsonObjectList = ListUtil.toList();
for (String content : contentList){
JSONObject ruleObject = convertToJson(content);
jsonObjectList.add(ruleObject);
}
super.parseJsonObject(jsonObjectList);
}
protected JSONObject convertToJson(String yamlString) {
Yaml yaml= new Yaml();
Map<String, Object> map = yaml.load(yamlString);
return JSON.parseObject(JSON.toJSONString(map));
}
}

View File

@@ -1,63 +1,23 @@
package com.yomahub.liteflow.parser;
import cn.hutool.core.util.StrUtil;
import com.yomahub.liteflow.exception.ParseException;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.retry.RetryNTimes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.text.MessageFormat;
import java.util.List;
import com.alibaba.fastjson.JSONObject;
import com.yomahub.liteflow.parser.base.BaseZookeeperJsonFlowParser;
import com.yomahub.liteflow.parser.helper.ParserHelper;
/**
* 基于zk方式的json形式的解析器
* @author guodongqing
* @since 2.5.0
*/
public class ZookeeperJsonFlowParser extends JsonFlowParser{
private static final Logger LOG = LoggerFactory.getLogger(ZookeeperJsonFlowParser.class);
private final String nodePath;
public class ZookeeperJsonFlowParser extends BaseZookeeperJsonFlowParser {
public ZookeeperJsonFlowParser(String node) {
nodePath = node;
super(node);
}
@Override
public void parseMain(List<String> pathList) throws Exception {
//zk不允许有多个path
String path = pathList.get(0);
CuratorFramework client = CuratorFrameworkFactory.newClient(
path,
new RetryNTimes(10, 5000)
);
client.start();
if (client.checkExists().forPath(nodePath) == null) {
client.create().creatingParentsIfNeeded().forPath(nodePath, "".getBytes());
}
String content = new String(client.getData().forPath(nodePath));
if (StrUtil.isBlank(content)) {
String error = MessageFormat.format("the node[{0}] value is empty", nodePath);
throw new ParseException(error);
}
parse(content);
final NodeCache cache = new NodeCache(client,nodePath);
cache.start();
cache.getListenable().addListener(() -> {
String content1 = new String(cache.getCurrentData().getData());
LOG.info("stating load flow config....");
parse(content1);
});
public void parseOneChain(JSONObject chainObject) {
ParserHelper.parseOneChain(chainObject);
}
}

View File

@@ -1,65 +1,21 @@
package com.yomahub.liteflow.parser;
import java.text.MessageFormat;
import java.util.List;
import cn.hutool.core.util.StrUtil;
import com.yomahub.liteflow.flow.FlowBus;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.retry.RetryNTimes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.yomahub.liteflow.exception.ParseException;
import com.yomahub.liteflow.parser.base.BaseZookeeperXmlFlowParser;
import com.yomahub.liteflow.parser.helper.ParserHelper;
import org.dom4j.Element;
/**
* 基于zk方式的xml形式的解析器
* @author Bryan.Zhang
*/
public class ZookeeperXmlFlowParser extends XmlFlowParser{
private static final Logger LOG = LoggerFactory.getLogger(ZookeeperXmlFlowParser.class);
private final String nodePath;
public class ZookeeperXmlFlowParser extends BaseZookeeperXmlFlowParser {
public ZookeeperXmlFlowParser(String node) {
nodePath = node;
super(node);
}
@Override
public void parseMain(List<String> pathList) throws Exception {
//zk不允许有多个path
String path = pathList.get(0);
CuratorFramework client = CuratorFrameworkFactory.newClient(
path,
new RetryNTimes(10, 5000)
);
client.start();
if (client.checkExists().forPath(nodePath) == null) {
client.create().creatingParentsIfNeeded().forPath(nodePath, "".getBytes());
}
String content = new String(client.getData().forPath(nodePath));
if (StrUtil.isBlank(content)) {
String error = MessageFormat.format("the node[{0}] value is empty", nodePath);
throw new ParseException(error);
}
parse(content);
final NodeCache cache = new NodeCache(client,nodePath);
cache.start();
cache.getListenable().addListener(() -> {
String content1 = new String(cache.getCurrentData().getData());
LOG.info("stating load flow config....");
parse(content1);
});
public void parseOneChain(Element chain) {
ParserHelper.parseOneChain(chain);
}
}

View File

@@ -1,68 +1,22 @@
package com.yomahub.liteflow.parser;
import cn.hutool.core.collection.ListUtil;
import cn.hutool.core.util.StrUtil;
import com.alibaba.fastjson.JSONObject;
import com.yomahub.liteflow.exception.ParseException;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.retry.RetryNTimes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.text.MessageFormat;
import java.util.List;
import com.yomahub.liteflow.parser.base.BaseZookeeperYmlFlowParser;
import com.yomahub.liteflow.parser.helper.ParserHelper;
/**
* 基于zk方式的yml形式的解析器
*
* @author guodongqing
* @since 2.5.0
*/
public class ZookeeperYmlFlowParser extends YmlFlowParser{
public class ZookeeperYmlFlowParser extends BaseZookeeperYmlFlowParser {
public ZookeeperYmlFlowParser(String node) {
super(node);
}
private static final Logger LOG = LoggerFactory.getLogger(ZookeeperYmlFlowParser.class);
private final String nodePath;
public ZookeeperYmlFlowParser(String node) {
nodePath = node;
}
@Override
public void parseMain(List<String> pathList) throws Exception {
//zk不允许有多个path
String path = pathList.get(0);
CuratorFramework client = CuratorFrameworkFactory.newClient(
path,
new RetryNTimes(10, 5000)
);
client.start();
if (client.checkExists().forPath(nodePath) == null) {
client.create().creatingParentsIfNeeded().forPath(nodePath, "".getBytes());
}
String content = new String(client.getData().forPath(nodePath));
if (StrUtil.isBlank(content)) {
String error = MessageFormat.format("the node[{0}] value is empty", nodePath);
throw new ParseException(error);
}
JSONObject ruleObject = convertToJson(content);
parse(ruleObject.toJSONString());
final NodeCache cache = new NodeCache(client,nodePath);
cache.start();
cache.getListenable().addListener(() -> {
String content1 = new String(cache.getCurrentData().getData());
LOG.info("stating load flow config....");
JSONObject ruleObject1 = convertToJson(content1);
parse(ruleObject1.toJSONString());
});
}
@Override
public void parseOneChain(JSONObject chain) {
ParserHelper.parseOneChain(chain);
}
}

View File

@@ -0,0 +1,49 @@
package com.yomahub.liteflow.parser.base;
import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.collection.ListUtil;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.parser.Feature;
import com.yomahub.liteflow.parser.helper.ParserHelper;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
/**
* 基类,用于存放 JsonFlowParser 通用方法
*
* @author tangkc
*/
public abstract class BaseJsonFlowParser implements FlowParser {
private final Set<String> CHAIN_NAME_SET = new CopyOnWriteArraySet<>();
public void parse(String content) throws Exception {
parse(ListUtil.toList(content));
}
@Override
public void parse(List<String> contentList) throws Exception {
if (CollectionUtil.isEmpty(contentList)) {
return;
}
List<JSONObject> jsonObjectList = ListUtil.toList();
for (String content : contentList) {
//把字符串原生转换为json对象如果不加第二个参数OrderedField会无序
JSONObject flowJsonObject = JSONObject.parseObject(content, Feature.OrderedField);
jsonObjectList.add(flowJsonObject);
}
ParserHelper.parseJsonObject(jsonObjectList, CHAIN_NAME_SET, this::parseOneChain);
}
/**
* 解析一个chain的过程
*
* @param chainObject chain 节点
*/
public abstract void parseOneChain(JSONObject chainObject);
}

View File

@@ -0,0 +1,50 @@
package com.yomahub.liteflow.parser.base;
import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.collection.ListUtil;
import com.yomahub.liteflow.parser.helper.ParserHelper;
import org.dom4j.Document;
import org.dom4j.DocumentHelper;
import org.dom4j.Element;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.function.Consumer;
/**
* 基类,用于存放 XmlFlowParser 通用方法
*
* @author tangkc
*/
public abstract class BaseXmlFlowParser implements FlowParser {
private final Set<String> CHAIN_NAME_SET = new HashSet<>();
public void parse(String content) throws Exception {
parse(ListUtil.toList(content));
}
@Override
public void parse(List<String> contentList) throws Exception {
if (CollectionUtil.isEmpty(contentList)) {
return;
}
List<Document> documentList = ListUtil.toList();
for (String content : contentList) {
Document document = DocumentHelper.parseText(content);
documentList.add(document);
}
Consumer<Element> parseOneChainConsumer = this::parseOneChain;
ParserHelper.parseDocument(documentList, CHAIN_NAME_SET, parseOneChainConsumer);
}
/**
* 解析一个 chain 的过程
*
* @param chain chain
*/
public abstract void parseOneChain(Element chain);
}

View File

@@ -0,0 +1,57 @@
package com.yomahub.liteflow.parser.base;
import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.collection.ListUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.yomahub.liteflow.parser.helper.ParserHelper;
import org.yaml.snakeyaml.Yaml;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Consumer;
/**
* 基类,用于存放 YmlFlowParser 通用方法
*
* @author tangkc
*/
public abstract class BaseYmlFlowParser implements FlowParser {
private final Set<String> CHAIN_NAME_SET = new HashSet<>();
public void parse(String content) throws Exception{
parse(ListUtil.toList(content));
}
@Override
public void parse(List<String> contentList) throws Exception {
if (CollectionUtil.isEmpty(contentList)) {
return;
}
List<JSONObject> jsonObjectList = ListUtil.toList();
for (String content : contentList){
JSONObject ruleObject = convertToJson(content);
jsonObjectList.add(ruleObject);
}
Consumer<JSONObject> parseOneChainConsumer = this::parseOneChain;
ParserHelper.parseJsonObject(jsonObjectList, CHAIN_NAME_SET,parseOneChainConsumer);
}
protected JSONObject convertToJson(String yamlString) {
Yaml yaml= new Yaml();
Map<String, Object> map = yaml.load(yamlString);
return JSON.parseObject(JSON.toJSONString(map));
}
/**
* 解析一个 chain 的过程
*
* @param chain chain
*/
public abstract void parseOneChain(JSONObject chain);
}

View File

@@ -0,0 +1,57 @@
package com.yomahub.liteflow.parser.base;
import com.alibaba.fastjson.JSONObject;
import com.yomahub.liteflow.parser.ZookeeperJsonFlowParser;
import com.yomahub.liteflow.parser.helper.ZkParserHelper;
import org.apache.curator.framework.CuratorFramework;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.function.Consumer;
/**
* 基类,用于存放 ZookeeperJsonFlowParser 通用方法
*
* @author tangkc
*/
public abstract class BaseZookeeperJsonFlowParser extends BaseJsonFlowParser {
private static final Logger LOG = LoggerFactory.getLogger(ZookeeperJsonFlowParser.class);
private final String nodePath;
private final ZkParserHelper zkParserHelper;
public BaseZookeeperJsonFlowParser(String node) {
nodePath = node;
Consumer<String> parseConsumer = t -> {
try {
parse(t);
} catch (Exception e) {
throw new RuntimeException(e);
}
};
zkParserHelper = new ZkParserHelper(nodePath, parseConsumer);
}
@Override
public void parseMain(List<String> pathList) throws Exception {
CuratorFramework client = zkParserHelper.getZkCuratorFramework(pathList);
String content = new String(client.getData().forPath(nodePath));
zkParserHelper.checkContent(content);
parse(content);
zkParserHelper.listenZkNode(client);
}
/**
* 解析一个chain的过程
*
* @param chainObject chain 节点
*/
public abstract void parseOneChain(JSONObject chainObject);
}

View File

@@ -0,0 +1,57 @@
package com.yomahub.liteflow.parser.base;
import com.yomahub.liteflow.parser.ZookeeperXmlFlowParser;
import com.yomahub.liteflow.parser.helper.ZkParserHelper;
import org.apache.curator.framework.CuratorFramework;
import org.dom4j.Element;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.function.Consumer;
/**
* 基类,用于存放 ZookeeperXmlFlowELParser 通用方法
*
* @author tangkc
*/
public abstract class BaseZookeeperXmlFlowParser extends BaseXmlFlowParser {
private static final Logger LOG = LoggerFactory.getLogger(ZookeeperXmlFlowParser.class);
private final String nodePath;
private final ZkParserHelper zkParserHelper;
public BaseZookeeperXmlFlowParser(String node) {
nodePath = node;
Consumer<String> parseConsumer = t -> {
try {
parse(t);
} catch (Exception e) {
throw new RuntimeException(e);
}
};
zkParserHelper = new ZkParserHelper(nodePath, parseConsumer);
}
@Override
public void parseMain(List<String> pathList) throws Exception {
CuratorFramework client = zkParserHelper.getZkCuratorFramework(pathList);
String content = new String(client.getData().forPath(nodePath));
zkParserHelper.checkContent(content);
parse(content);
zkParserHelper.listenZkNode(client);
}
/**
* 解析一个chain的过程
*
* @param chain 节点
*/
public abstract void parseOneChain(Element chain);
}

View File

@@ -0,0 +1,59 @@
package com.yomahub.liteflow.parser.base;
import com.alibaba.fastjson.JSONObject;
import com.yomahub.liteflow.parser.helper.ZkParserHelper;
import org.apache.curator.framework.CuratorFramework;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.function.Consumer;
/**
* 基类,用于存放 ZookeeperYmlFlowELParser 通用方法
*
* @author tangkc
*/
public abstract class BaseZookeeperYmlFlowParser extends BaseYmlFlowParser{
private static final Logger LOG = LoggerFactory.getLogger(BaseZookeeperYmlFlowParser.class);
private final String nodePath;
private final ZkParserHelper zkParserHelper;
public BaseZookeeperYmlFlowParser(String node) {
nodePath = node;
Consumer<String> parseConsumer = t -> {
try {
parse(t);
} catch (Exception e) {
throw new RuntimeException(e);
}
};
zkParserHelper = new ZkParserHelper(nodePath, parseConsumer);
}
@Override
public void parseMain(List<String> pathList) throws Exception {
CuratorFramework client = zkParserHelper.getZkCuratorFramework(pathList);
String content = new String(client.getData().forPath(nodePath));
zkParserHelper.checkContent(content);
JSONObject ruleObject = convertToJson(content);
parse(ruleObject.toJSONString());
zkParserHelper.listenZkNode(client);
}
/**
* 解析一个 chain 的过程
*
* @param chain chain
*/
public abstract void parseOneChain(JSONObject chain);
}

View File

@@ -1,4 +1,6 @@
package com.yomahub.liteflow.parser;
package com.yomahub.liteflow.parser.base;
import org.dom4j.Element;
import java.util.*;

View File

@@ -1,125 +1,27 @@
package com.yomahub.liteflow.parser.el;
import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.collection.ListUtil;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.parser.Feature;
import com.yomahub.liteflow.builder.el.LiteFlowChainELBuilder;
import com.yomahub.liteflow.builder.prop.NodePropBean;
import com.yomahub.liteflow.exception.ChainDuplicateException;
import com.yomahub.liteflow.flow.FlowBus;
import com.yomahub.liteflow.parser.BaseFlowParser;
import com.yomahub.liteflow.spi.holder.ContextCmpInitHolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import com.yomahub.liteflow.parser.base.BaseJsonFlowParser;
import com.yomahub.liteflow.parser.helper.ParserHelper;
import static com.yomahub.liteflow.common.ChainConstant.*;
/**
* JSON形式的EL表达式解析抽象引擎
*
* @author Bryan.Zhang
* @since 2.8.0
*/
public abstract class JsonFlowELParser extends BaseFlowParser {
public abstract class JsonFlowELParser extends BaseJsonFlowParser {
private final Logger LOG = LoggerFactory.getLogger(this.getClass());
/**
* 解析一个chain的过程
*
* @param chainObject chain 节点
*/
public void parseOneChain(JSONObject chainObject) {
ParserHelper.parseOneChainEl(chainObject);
}
private final Set<String> CHAIN_NAME_SET = new CopyOnWriteArraySet<>();
public void parse(String content) throws Exception {
parse(ListUtil.toList(content));
}
@Override
public void parse(List<String> contentList) throws Exception {
if (CollectionUtil.isEmpty(contentList)) {
return;
}
List<JSONObject> jsonObjectList = ListUtil.toList();
for (String content : contentList) {
//把字符串原生转换为json对象如果不加第二个参数OrderedField会无序
JSONObject flowJsonObject = JSONObject.parseObject(content, Feature.OrderedField);
jsonObjectList.add(flowJsonObject);
}
parseJsonObject(jsonObjectList);
}
//json格式解析过程
public void parseJsonObject(List<JSONObject> flowJsonObjectList) throws Exception {
//先在元数据里放上chain
//先放有一个好处可以在parse的时候先映射到FlowBus的chainMap然后再去解析
//这样就不用去像之前的版本那样回归调用
//同时也解决了不能循环依赖的问题
flowJsonObjectList.forEach(jsonObject -> {
// 解析chain节点
JSONArray chainArray = jsonObject.getJSONObject(FLOW).getJSONArray(CHAIN);
//先在元数据里放上chain
chainArray.forEach(o -> {
JSONObject innerJsonObject = (JSONObject) o;
//校验加载的 chainName 是否有重复的
//TODO 这里是否有个问题当混合格式加载的时候2个同名的Chain在不同的文件里就不行了
String chainName = innerJsonObject.getString(NAME);
if (!CHAIN_NAME_SET.add(chainName)) {
throw new ChainDuplicateException(String.format("[chain name duplicate] chainName=%s", chainName));
}
FlowBus.addChain(innerJsonObject.getString(NAME));
});
});
// 清空
CHAIN_NAME_SET.clear();
for (JSONObject flowJsonObject : flowJsonObjectList) {
// 当存在<nodes>节点定义时解析node节点
if (flowJsonObject.getJSONObject(FLOW).containsKey(NODES)) {
JSONArray nodeArrayList = flowJsonObject.getJSONObject(FLOW).getJSONObject(NODES).getJSONArray(NODE);
String id, name, clazz, script, type, file;
for (int i = 0; i < nodeArrayList.size(); i++) {
JSONObject nodeObject = nodeArrayList.getJSONObject(i);
id = nodeObject.getString(ID);
name = nodeObject.getString(NAME);
clazz = nodeObject.getString(_CLASS);
type = nodeObject.getString(TYPE);
script = nodeObject.getString(VALUE);
file = nodeObject.getString(FILE);
// 构建 node
NodePropBean nodePropBean = new NodePropBean()
.setId(id)
.setName(name)
.setClazz(clazz)
.setScript(script)
.setType(type)
.setFile(file);
buildNode(nodePropBean);
}
}
//解析每一个chain
JSONArray chainArray = flowJsonObject.getJSONObject(FLOW).getJSONArray(CHAIN);
chainArray.forEach(o -> {
JSONObject jsonObject = (JSONObject) o;
parseOneChain(jsonObject);
});
}
}
/**
* 解析一个chain的过程
*/
private void parseOneChain(JSONObject chainObject) {
//构建chainBuilder
String chainName = chainObject.getString(NAME);
String el = chainObject.getString(VALUE);
LiteFlowChainELBuilder chainELBuilder = LiteFlowChainELBuilder.createChain().setChainName(chainName);
chainELBuilder.setEL(el).build();
}
}

View File

@@ -1,122 +1,22 @@
package com.yomahub.liteflow.parser.el;
import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.collection.ListUtil;
import cn.hutool.core.util.ObjectUtil;
import com.yomahub.liteflow.builder.el.LiteFlowChainELBuilder;
import com.yomahub.liteflow.builder.prop.NodePropBean;
import com.yomahub.liteflow.exception.ChainDuplicateException;
import com.yomahub.liteflow.flow.FlowBus;
import com.yomahub.liteflow.parser.BaseFlowParser;
import com.yomahub.liteflow.parser.XmlFlowParser;
import com.yomahub.liteflow.spi.holder.ContextCmpInitHolder;
import org.dom4j.Document;
import org.dom4j.DocumentHelper;
import com.yomahub.liteflow.parser.base.BaseXmlFlowParser;
import com.yomahub.liteflow.parser.helper.ParserHelper;
import org.dom4j.Element;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import static com.yomahub.liteflow.common.ChainConstant.*;
/**
* Xml形式的EL表达式解析抽象引擎
*
* @author Bryan.Zhang
* @since 2.8.0
*/
public abstract class XmlFlowELParser extends BaseFlowParser {
public abstract class XmlFlowELParser extends BaseXmlFlowParser {
private final Logger LOG = LoggerFactory.getLogger(this.getClass());
/**
* 解析一个chain的过程
*/
public void parseOneChain(Element e) {
ParserHelper.parseOneChainEl(e);
}
private final Set<String> CHAIN_NAME_SET = new CopyOnWriteArraySet<>();
public void parse(String content) throws Exception {
parse(ListUtil.toList(content));
}
@Override
public void parse(List<String> contentList) throws Exception {
if (CollectionUtil.isEmpty(contentList)) {
return;
}
List<Document> documentList = ListUtil.toList();
for (String content : contentList) {
Document document = DocumentHelper.parseText(content);
documentList.add(document);
}
parseDocument(documentList);
}
//xml形式的主要解析过程
public void parseDocument(List<Document> documentList) throws Exception {
//先在元数据里放上chain
//先放有一个好处可以在parse的时候先映射到FlowBus的chainMap然后再去解析
//这样就不用去像之前的版本那样回归调用
//同时也解决了不能循环依赖的问题
documentList.forEach(document -> {
// 解析chain节点
List<Element> chainList = document.getRootElement().elements(CHAIN);
//先在元数据里放上chain
chainList.forEach(e -> {
//校验加载的 chainName 是否有重复的
//TODO 这里是否有个问题当混合格式加载的时候2个同名的Chain在不同的文件里就不行了
String chainName = e.attributeValue(NAME);
if (!CHAIN_NAME_SET.add(chainName)) {
throw new ChainDuplicateException(String.format("[chain name duplicate] chainName=%s", chainName));
}
FlowBus.addChain(chainName);
});
});
// 清空
CHAIN_NAME_SET.clear();
for (Document document : documentList) {
Element rootElement = document.getRootElement();
Element nodesElement = rootElement.element(NODES);
// 当存在<nodes>节点定义时解析node节点
if (ObjectUtil.isNotNull(nodesElement)) {
List<Element> nodeList = nodesElement.elements(NODE);
String id, name, clazz, type, script, file;
for (Element e : nodeList) {
id = e.attributeValue(ID);
name = e.attributeValue(NAME);
clazz = e.attributeValue(_CLASS);
type = e.attributeValue(TYPE);
script = e.getTextTrim();
file = e.attributeValue(FILE);
// 构建 node
NodePropBean nodePropBean = new NodePropBean()
.setId(id)
.setName(name)
.setClazz(clazz)
.setScript(script)
.setType(type)
.setFile(file);
buildNode(nodePropBean);
}
}
//解析每一个chain
List<Element> chainList = rootElement.elements(CHAIN);
chainList.forEach(this::parseOneChain);
}
}
/**
* 解析一个chain的过程
*/
private void parseOneChain(Element e) {
//构建chainBuilder
String chainName = e.attributeValue(NAME);
String el = e.getTextTrim();
LiteFlowChainELBuilder chainELBuilder = LiteFlowChainELBuilder.createChain().setChainName(chainName);
chainELBuilder.setEL(el).build();
}
}

View File

@@ -1,49 +1,22 @@
package com.yomahub.liteflow.parser.el;
import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.collection.ListUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.yomahub.liteflow.parser.JsonFlowParser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.yaml.snakeyaml.Yaml;
import java.util.List;
import java.util.Map;
import com.yomahub.liteflow.parser.base.BaseYmlFlowParser;
import com.yomahub.liteflow.parser.helper.ParserHelper;
/**
* yml形式的EL表达式解析抽象引擎
*
* @author Bryan.Zhang
* @since 2.8.0
*/
public abstract class YmlFlowELParser extends JsonFlowELParser {
public abstract class YmlFlowELParser extends BaseYmlFlowParser {
private final Logger LOG = LoggerFactory.getLogger(YmlFlowELParser.class);
/**
* 解析一个chain的过程
*/
public void parseOneChain(JSONObject chainObject) {
ParserHelper.parseOneChainEl(chainObject);
}
@Override
public void parse(String content) throws Exception{
parse(ListUtil.toList(content));
}
@Override
public void parse(List<String> contentList) throws Exception {
if (CollectionUtil.isEmpty(contentList)) {
return;
}
List<JSONObject> jsonObjectList = ListUtil.toList();
for (String content : contentList){
JSONObject ruleObject = convertToJson(content);
jsonObjectList.add(ruleObject);
}
super.parseJsonObject(jsonObjectList);
}
protected JSONObject convertToJson(String yamlString) {
Yaml yaml= new Yaml();
Map<String, Object> map = yaml.load(yamlString);
return JSON.parseObject(JSON.toJSONString(map));
}
}

View File

@@ -1,64 +1,23 @@
package com.yomahub.liteflow.parser.el;
import cn.hutool.core.util.StrUtil;
import com.yomahub.liteflow.exception.ParseException;
import com.yomahub.liteflow.parser.JsonFlowParser;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.retry.RetryNTimes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.text.MessageFormat;
import java.util.List;
import com.alibaba.fastjson.JSONObject;
import com.yomahub.liteflow.parser.base.BaseZookeeperJsonFlowParser;
import com.yomahub.liteflow.parser.helper.ParserHelper;
/**
* 基于zk方式的json形式的解析器
* @author guodongqing
* @since 2.5.0
*/
public class ZookeeperJsonFlowELParser extends JsonFlowELParser {
private static final Logger LOG = LoggerFactory.getLogger(ZookeeperJsonFlowELParser.class);
private final String nodePath;
public class ZookeeperJsonFlowELParser extends BaseZookeeperJsonFlowParser {
public ZookeeperJsonFlowELParser(String node) {
nodePath = node;
super(node);
}
@Override
public void parseMain(List<String> pathList) throws Exception {
//zk不允许有多个path
String path = pathList.get(0);
CuratorFramework client = CuratorFrameworkFactory.newClient(
path,
new RetryNTimes(10, 5000)
);
client.start();
if (client.checkExists().forPath(nodePath) == null) {
client.create().creatingParentsIfNeeded().forPath(nodePath, "".getBytes());
}
String content = new String(client.getData().forPath(nodePath));
if (StrUtil.isBlank(content)) {
String error = MessageFormat.format("the node[{0}] value is empty", nodePath);
throw new ParseException(error);
}
parse(content);
final NodeCache cache = new NodeCache(client,nodePath);
cache.start();
cache.getListenable().addListener(() -> {
String content1 = new String(cache.getCurrentData().getData());
LOG.info("stating load flow config....");
parse(content1);
});
public void parseOneChain(JSONObject chainObject) {
ParserHelper.parseOneChainEl(chainObject);
}
}

View File

@@ -1,64 +1,22 @@
package com.yomahub.liteflow.parser.el;
import cn.hutool.core.util.StrUtil;
import com.yomahub.liteflow.exception.ParseException;
import com.yomahub.liteflow.parser.XmlFlowParser;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.retry.RetryNTimes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.text.MessageFormat;
import java.util.List;
import com.yomahub.liteflow.parser.base.BaseZookeeperXmlFlowParser;
import com.yomahub.liteflow.parser.helper.ParserHelper;
import org.dom4j.Element;
/**
* 基于zk方式的xml形式EL表达式解析器
* @author Bryan.Zhang
* @since 2.8.0
*/
public class ZookeeperXmlFlowELParser extends XmlFlowELParser {
private static final Logger LOG = LoggerFactory.getLogger(ZookeeperXmlFlowELParser.class);
private final String nodePath;
public class ZookeeperXmlFlowELParser extends BaseZookeeperXmlFlowParser {
public ZookeeperXmlFlowELParser(String node) {
nodePath = node;
super(node);
}
@Override
public void parseMain(List<String> pathList) throws Exception {
//zk不允许有多个path
String path = pathList.get(0);
CuratorFramework client = CuratorFrameworkFactory.newClient(
path,
new RetryNTimes(10, 5000)
);
client.start();
if (client.checkExists().forPath(nodePath) == null) {
client.create().creatingParentsIfNeeded().forPath(nodePath, "".getBytes());
}
String content = new String(client.getData().forPath(nodePath));
if (StrUtil.isBlank(content)) {
String error = MessageFormat.format("the node[{0}] value is empty", nodePath);
throw new ParseException(error);
}
parse(content);
final NodeCache cache = new NodeCache(client,nodePath);
cache.start();
cache.getListenable().addListener(() -> {
String content1 = new String(cache.getCurrentData().getData());
LOG.info("stating load flow config....");
parse(content1);
});
public void parseOneChain(Element chain) {
ParserHelper.parseOneChainEl(chain);
}
}

View File

@@ -1,68 +1,22 @@
package com.yomahub.liteflow.parser.el;
import cn.hutool.core.util.StrUtil;
import com.alibaba.fastjson.JSONObject;
import com.yomahub.liteflow.exception.ParseException;
import com.yomahub.liteflow.parser.YmlFlowParser;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.retry.RetryNTimes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.text.MessageFormat;
import java.util.List;
import com.yomahub.liteflow.parser.base.BaseZookeeperYmlFlowParser;
import com.yomahub.liteflow.parser.helper.ParserHelper;
/**
* 基于zk方式的yml形式EL表达式解析器
*
* @author Bryan.Zhang
* @since 2.8.0
*/
public class ZookeeperYmlFlowELParser extends YmlFlowELParser {
public class ZookeeperYmlFlowELParser extends BaseZookeeperYmlFlowParser {
public ZookeeperYmlFlowELParser(String node) {
super(node);
}
private static final Logger LOG = LoggerFactory.getLogger(ZookeeperYmlFlowELParser.class);
private final String nodePath;
public ZookeeperYmlFlowELParser(String node) {
nodePath = node;
}
@Override
public void parseMain(List<String> pathList) throws Exception {
//zk不允许有多个path
String path = pathList.get(0);
CuratorFramework client = CuratorFrameworkFactory.newClient(
path,
new RetryNTimes(10, 5000)
);
client.start();
if (client.checkExists().forPath(nodePath) == null) {
client.create().creatingParentsIfNeeded().forPath(nodePath, "".getBytes());
}
String content = new String(client.getData().forPath(nodePath));
if (StrUtil.isBlank(content)) {
String error = MessageFormat.format("the node[{0}] value is empty", nodePath);
throw new ParseException(error);
}
JSONObject ruleObject = convertToJson(content);
parse(ruleObject.toJSONString());
final NodeCache cache = new NodeCache(client,nodePath);
cache.start();
cache.getListenable().addListener(() -> {
String content1 = new String(cache.getCurrentData().getData());
LOG.info("stating load flow config....");
JSONObject ruleObject1 = convertToJson(content1);
parse(ruleObject1.toJSONString());
});
}
@Override
public void parseOneChain(JSONObject chain) {
ParserHelper.parseOneChainEl(chain);
}
}

View File

@@ -0,0 +1,390 @@
package com.yomahub.liteflow.parser.helper;
import cn.hutool.core.annotation.AnnotationUtil;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.ReflectUtil;
import cn.hutool.core.util.StrUtil;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.yomahub.liteflow.annotation.LiteflowCmpDefine;
import com.yomahub.liteflow.annotation.LiteflowSwitchCmpDefine;
import com.yomahub.liteflow.builder.LiteFlowChainBuilder;
import com.yomahub.liteflow.builder.LiteFlowConditionBuilder;
import com.yomahub.liteflow.builder.LiteFlowNodeBuilder;
import com.yomahub.liteflow.builder.el.LiteFlowChainELBuilder;
import com.yomahub.liteflow.builder.prop.ChainPropBean;
import com.yomahub.liteflow.builder.prop.NodePropBean;
import com.yomahub.liteflow.core.NodeComponent;
import com.yomahub.liteflow.core.NodeSwitchComponent;
import com.yomahub.liteflow.enums.ConditionTypeEnum;
import com.yomahub.liteflow.enums.NodeTypeEnum;
import com.yomahub.liteflow.exception.*;
import com.yomahub.liteflow.flow.FlowBus;
import org.dom4j.Document;
import org.dom4j.Element;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.function.Consumer;
import static com.yomahub.liteflow.common.ChainConstant.*;
/**
* Parser 通用 Helper
*
* @author tangkc
*/
public class ParserHelper {
/**
* 私有化构造器
*/
private ParserHelper() {
}
/**
* 构建 node
*
* @param nodePropBean 构建 node 的中间属性
*/
public static void buildNode(NodePropBean nodePropBean) {
String id = nodePropBean.getId();
String name = nodePropBean.getName();
String clazz = nodePropBean.getClazz();
String script = nodePropBean.getScript();
String type = nodePropBean.getType();
String file = nodePropBean.getFile();
//先尝试自动推断类型
if (StrUtil.isNotBlank(clazz)) {
try {
//先尝试从继承的类型中推断
Class<?> c = Class.forName(clazz);
Object o = ReflectUtil.newInstanceIfPossible(c);
if (o instanceof NodeSwitchComponent) {
type = NodeTypeEnum.SWITCH.getCode();
} else if (o instanceof NodeComponent) {
type = NodeTypeEnum.COMMON.getCode();
}
//再尝试声明式组件这部分的推断
if (type == null) {
LiteflowCmpDefine liteflowCmpDefine = AnnotationUtil.getAnnotation(c, LiteflowCmpDefine.class);
if (liteflowCmpDefine != null) {
type = NodeTypeEnum.COMMON.getCode();
}
}
if (type == null) {
LiteflowSwitchCmpDefine liteflowSwitchCmpDefine = AnnotationUtil.getAnnotation(c, LiteflowSwitchCmpDefine.class);
if (liteflowSwitchCmpDefine != null) {
type = NodeTypeEnum.SWITCH.getCode();
}
}
} catch (Exception e) {
throw new NodeClassNotFoundException(StrUtil.format("cannot find the node[{}]", clazz));
}
}
//因为脚本节点是必须设置type的所以到这里type就全都有了所以进行二次检查
if (StrUtil.isBlank(type)) {
throw new NodeTypeCanNotGuessException(StrUtil.format("cannot guess the type of node[{}]", clazz));
}
//检查nodeType是不是规定的类型
NodeTypeEnum nodeTypeEnum = NodeTypeEnum.getEnumByCode(type);
if (ObjectUtil.isNull(nodeTypeEnum)) {
throw new NodeTypeNotSupportException(StrUtil.format("type [{}] is not support", type));
}
//进行node的build过程
LiteFlowNodeBuilder.createNode()
.setId(id)
.setName(name)
.setClazz(clazz)
.setType(nodeTypeEnum)
.setScript(script)
.setFile(file)
.build();
}
/**
* 构建 chain
*
* @param chainPropBean 构建 chain 的中间属性
* @param chainBuilder chainBuilder
*/
public static void buildChain(ChainPropBean chainPropBean, LiteFlowChainBuilder chainBuilder) {
String condValueStr = chainPropBean.getCondValueStr();
String group = chainPropBean.getGroup();
String errorResume = chainPropBean.getErrorResume();
String any = chainPropBean.getAny();
String threadExecutorClass = chainPropBean.getThreadExecutorClass();
ConditionTypeEnum conditionType = chainPropBean.getConditionType();
if (ObjectUtil.isNull(conditionType)) {
throw new NotSupportConditionException("ConditionType is not supported");
}
if (StrUtil.isBlank(condValueStr)) {
throw new EmptyConditionValueException("Condition value cannot be empty");
}
//如果是when类型的话有特殊化参数要设置只针对于when的
if (conditionType.equals(ConditionTypeEnum.TYPE_WHEN)) {
chainBuilder.setCondition(
LiteFlowConditionBuilder.createWhenCondition()
.setErrorResume(errorResume)
.setGroup(group)
.setAny(any)
.setThreadExecutorClass(threadExecutorClass)
.setValue(condValueStr)
.build()
).build();
} else {
chainBuilder.setCondition(
LiteFlowConditionBuilder.createCondition(conditionType)
.setValue(condValueStr)
.build()
).build();
}
}
/**
* xml 形式的主要解析过程
*
* @param documentList documentList
* @param chainNameSet 用于去重
* @param parseOneChainConsumer parseOneChain 函数
*/
public static void parseDocument(List<Document> documentList, Set<String> chainNameSet, Consumer<Element> parseOneChainConsumer) {
//先在元数据里放上chain
//先放有一个好处可以在parse的时候先映射到FlowBus的chainMap然后再去解析
//这样就不用去像之前的版本那样回归调用
//同时也解决了不能循环依赖的问题
documentList.forEach(document -> {
// 解析chain节点
List<Element> chainList = document.getRootElement().elements(CHAIN);
//先在元数据里放上chain
chainList.forEach(e -> {
//校验加载的 chainName 是否有重复的
//TODO 这里是否有个问题当混合格式加载的时候2个同名的Chain在不同的文件里就不行了
String chainName = e.attributeValue(NAME);
if (!chainNameSet.add(chainName)) {
throw new ChainDuplicateException(String.format("[chain name duplicate] chainName=%s", chainName));
}
FlowBus.addChain(chainName);
});
});
// 清空
chainNameSet.clear();
for (Document document : documentList) {
Element rootElement = document.getRootElement();
Element nodesElement = rootElement.element(NODES);
// 当存在<nodes>节点定义时解析node节点
if (ObjectUtil.isNotNull(nodesElement)) {
List<Element> nodeList = nodesElement.elements(NODE);
String id, name, clazz, type, script, file;
for (Element e : nodeList) {
id = e.attributeValue(ID);
name = e.attributeValue(NAME);
clazz = e.attributeValue(_CLASS);
type = e.attributeValue(TYPE);
script = e.getTextTrim();
file = e.attributeValue(FILE);
// 构建 node
NodePropBean nodePropBean = new NodePropBean()
.setId(id)
.setName(name)
.setClazz(clazz)
.setScript(script)
.setType(type)
.setFile(file);
ParserHelper.buildNode(nodePropBean);
}
}
//解析每一个chain
List<Element> chainList = rootElement.elements(CHAIN);
chainList.forEach(parseOneChainConsumer::accept);
}
}
/**
* json 形式的主要解析过程
*
* @param flowJsonObjectList flowJsonObjectList
* @param chainNameSet 用于去重
* @param parseOneChainConsumer parseOneChain 函数
*/
public static void parseJsonObject(List<JSONObject> flowJsonObjectList, Set<String> chainNameSet, Consumer<JSONObject> parseOneChainConsumer) {
//先在元数据里放上chain
//先放有一个好处可以在parse的时候先映射到FlowBus的chainMap然后再去解析
//这样就不用去像之前的版本那样回归调用
//同时也解决了不能循环依赖的问题
flowJsonObjectList.forEach(jsonObject -> {
// 解析chain节点
JSONArray chainArray = jsonObject.getJSONObject(FLOW).getJSONArray(CHAIN);
//先在元数据里放上chain
chainArray.forEach(o -> {
JSONObject innerJsonObject = (JSONObject) o;
//校验加载的 chainName 是否有重复的
// TODO 这里是否有个问题当混合格式加载的时候2个同名的Chain在不同的文件里就不行了
String chainName = innerJsonObject.getString(NAME);
if (!chainNameSet.add(chainName)) {
throw new ChainDuplicateException(String.format("[chain name duplicate] chainName=%s", chainName));
}
FlowBus.addChain(innerJsonObject.getString(NAME));
});
});
// 清空
chainNameSet.clear();
for (JSONObject flowJsonObject : flowJsonObjectList) {
// 当存在<nodes>节点定义时解析node节点
if (flowJsonObject.getJSONObject(FLOW).containsKey(NODES)) {
JSONArray nodeArrayList = flowJsonObject.getJSONObject(FLOW).getJSONObject(NODES).getJSONArray(NODE);
String id, name, clazz, script, type, file;
for (int i = 0; i < nodeArrayList.size(); i++) {
JSONObject nodeObject = nodeArrayList.getJSONObject(i);
id = nodeObject.getString(ID);
name = nodeObject.getString(NAME);
clazz = nodeObject.getString(_CLASS);
type = nodeObject.getString(TYPE);
script = nodeObject.getString(VALUE);
file = nodeObject.getString(FILE);
// 构建 node
NodePropBean nodePropBean = new NodePropBean()
.setId(id)
.setName(name)
.setClazz(clazz)
.setScript(script)
.setType(type)
.setFile(file);
ParserHelper.buildNode(nodePropBean);
}
}
//解析每一个chain
JSONArray chainArray = flowJsonObject.getJSONObject(FLOW).getJSONArray(CHAIN);
chainArray.forEach(o -> {
JSONObject jsonObject = (JSONObject) o;
parseOneChainConsumer.accept(jsonObject);
});
}
}
/**
* 解析一个chain的过程
*
* @param chainObject chain 节点
*/
public static void parseOneChain(JSONObject chainObject) {
String condValueStr;
ConditionTypeEnum conditionType;
String group;
String errorResume;
String any;
String threadExecutorClass;
//构建chainBuilder
String chainName = chainObject.getString(NAME);
LiteFlowChainBuilder chainBuilder = LiteFlowChainBuilder.createChain().setChainName(chainName);
for (Object o : chainObject.getJSONArray(CONDITION)) {
JSONObject condObject = (JSONObject) o;
conditionType = ConditionTypeEnum.getEnumByCode(condObject.getString(TYPE));
condValueStr = condObject.getString(VALUE);
errorResume = condObject.getString(ERROR_RESUME);
group = condObject.getString(GROUP);
any = condObject.getString(ANY);
threadExecutorClass = condObject.getString(THREAD_EXECUTOR_CLASS);
ChainPropBean chainPropBean = new ChainPropBean()
.setCondValueStr(condValueStr)
.setGroup(group)
.setErrorResume(errorResume)
.setAny(any)
.setThreadExecutorClass(threadExecutorClass)
.setConditionType(conditionType);
// 构建 chain
ParserHelper.buildChain(chainPropBean, chainBuilder);
}
}
/**
* 解析一个chain的过程
* <p>
* param e chain 节点
*/
public static void parseOneChain(Element e) {
String condValueStr;
String group;
String errorResume;
String any;
String threadExecutorClass;
ConditionTypeEnum conditionType;
//构建chainBuilder
String chainName = e.attributeValue(NAME);
LiteFlowChainBuilder chainBuilder = LiteFlowChainBuilder.createChain().setChainName(chainName);
for (Iterator<Element> it = e.elementIterator(); it.hasNext(); ) {
Element condE = it.next();
conditionType = ConditionTypeEnum.getEnumByCode(condE.getName());
condValueStr = condE.attributeValue(VALUE);
errorResume = condE.attributeValue(ERROR_RESUME);
group = condE.attributeValue(GROUP);
any = condE.attributeValue(ANY);
threadExecutorClass = condE.attributeValue(THREAD_EXECUTOR_CLASS);
ChainPropBean chainPropBean = new ChainPropBean()
.setCondValueStr(condValueStr)
.setGroup(group)
.setErrorResume(errorResume)
.setAny(any)
.setThreadExecutorClass(threadExecutorClass)
.setConditionType(conditionType);
// 构建 chain
ParserHelper.buildChain(chainPropBean, chainBuilder);
}
}
/**
* 解析一个chain的过程
*
* @param chainObject chain 节点
*/
public static void parseOneChainEl(JSONObject chainObject) {
//构建chainBuilder
String chainName = chainObject.getString(NAME);
String el = chainObject.getString(VALUE);
LiteFlowChainELBuilder chainELBuilder = LiteFlowChainELBuilder.createChain().setChainName(chainName);
chainELBuilder.setEL(el).build();
}
/**
* 解析一个chain的过程
*
* @param e chain 节点
*/
public static void parseOneChainEl(Element e) {
//构建chainBuilder
String chainName = e.attributeValue(NAME);
String el = e.getTextTrim();
LiteFlowChainELBuilder chainELBuilder = LiteFlowChainELBuilder.createChain().setChainName(chainName);
chainELBuilder.setEL(el).build();
}
}

View File

@@ -0,0 +1,79 @@
package com.yomahub.liteflow.parser.helper;
import cn.hutool.core.util.StrUtil;
import com.yomahub.liteflow.exception.ParseException;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.retry.RetryNTimes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.text.MessageFormat;
import java.util.List;
import java.util.function.Consumer;
public class ZkParserHelper {
private static final Logger LOG = LoggerFactory.getLogger(ZkParserHelper.class);
private final String nodePath;
private final Consumer<String> parseConsumer;
public ZkParserHelper(String node, Consumer<String> parseConsumer) {
this.nodePath = node;
this.parseConsumer = parseConsumer;
}
/**
* 获取zk客户端
*
* @param pathList zk路径
* @return
* @throws Exception
*/
public CuratorFramework getZkCuratorFramework(List<String> pathList) throws Exception {
//zk不允许有多个path
String path = pathList.get(0);
CuratorFramework client = CuratorFrameworkFactory.newClient(
path,
new RetryNTimes(10, 5000)
);
client.start();
if (client.checkExists().forPath(nodePath) == null) {
client.create().creatingParentsIfNeeded().forPath(nodePath, "".getBytes());
}
return client;
}
/**
* 检查 content 是否合法
*
* @param content 内容
*/
public void checkContent(String content) {
if (StrUtil.isBlank(content)) {
String error = MessageFormat.format("the node[{0}] value is empty", nodePath);
throw new ParseException(error);
}
}
/**
* 监听 zk 节点
*
* @param client zk 客户端
* @throws Exception
*/
public void listenZkNode(CuratorFramework client) throws Exception {
final NodeCache cache = new NodeCache(client, nodePath);
cache.start();
cache.getListenable().addListener(() -> {
String content1 = new String(cache.getCurrentData().getData());
LOG.info("stating load flow config....");
parseConsumer.accept(content1);
});
}
}