mirror of
https://gitee.com/dromara/liteFlow.git
synced 2026-05-14 12:12:08 +08:00
enhancement #I61D1N 解析增加 enable 逻辑,完成 etcd 改造
This commit is contained in:
@@ -1,10 +1,9 @@
|
||||
package com.yomahub.liteflow.parser.etcd.util;
|
||||
|
||||
import cn.hutool.core.collection.CollUtil;
|
||||
import cn.hutool.core.collection.CollectionUtil;
|
||||
import cn.hutool.core.io.file.FileNameUtil;
|
||||
import cn.hutool.core.lang.Pair;
|
||||
import cn.hutool.core.util.CharsetUtil;
|
||||
import cn.hutool.core.util.ReUtil;
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import com.yomahub.liteflow.builder.LiteFlowNodeBuilder;
|
||||
import com.yomahub.liteflow.builder.el.LiteFlowChainELBuilder;
|
||||
@@ -13,7 +12,9 @@ import com.yomahub.liteflow.flow.FlowBus;
|
||||
import com.yomahub.liteflow.parser.etcd.EtcdClient;
|
||||
import com.yomahub.liteflow.parser.etcd.exception.EtcdException;
|
||||
import com.yomahub.liteflow.parser.etcd.vo.EtcdParserVO;
|
||||
import com.yomahub.liteflow.parser.helper.NodeConvertHelper;
|
||||
import com.yomahub.liteflow.spi.holder.ContextAwareHolder;
|
||||
import com.yomahub.liteflow.util.RuleParsePluginUtil;
|
||||
import io.etcd.jetcd.ByteSequence;
|
||||
import io.etcd.jetcd.Client;
|
||||
import io.etcd.jetcd.ClientBuilder;
|
||||
@@ -31,200 +32,162 @@ import java.util.stream.Collectors;
|
||||
*/
|
||||
public class EtcdParserHelper {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(EtcdParserHelper.class);
|
||||
private static final Logger LOG = LoggerFactory.getLogger(EtcdParserHelper.class);
|
||||
|
||||
private final String CHAIN_XML_PATTERN = "<chain name=\"{}\">{}</chain>";
|
||||
private final String CHAIN_XML_PATTERN = "<chain name=\"{}\">{}</chain>";
|
||||
|
||||
private final String NODE_XML_PATTERN = "<nodes>{}</nodes>";
|
||||
private final String NODE_XML_PATTERN = "<nodes>{}</nodes>";
|
||||
|
||||
private final String NODE_ITEM_XML_PATTERN = "<node id=\"{}\" name=\"{}\" type=\"{}\"><![CDATA[{}]]></node>";
|
||||
private final String NODE_ITEM_XML_PATTERN = "<node id=\"{}\" name=\"{}\" type=\"{}\"><![CDATA[{}]]></node>";
|
||||
|
||||
private final String XML_PATTERN = "<?xml version=\"1.0\" encoding=\"UTF-8\"?><flow>{}{}</flow>";
|
||||
private final String XML_PATTERN = "<?xml version=\"1.0\" encoding=\"UTF-8\"?><flow>{}{}</flow>";
|
||||
|
||||
private static final String SEPARATOR = "/";
|
||||
private static final String SEPARATOR = "/";
|
||||
|
||||
private final EtcdParserVO etcdParserVO;
|
||||
private final EtcdParserVO etcdParserVO;
|
||||
|
||||
private EtcdClient client;
|
||||
private EtcdClient client;
|
||||
|
||||
public EtcdParserHelper(EtcdParserVO etcdParserVO) {
|
||||
this.etcdParserVO = etcdParserVO;
|
||||
public EtcdParserHelper(EtcdParserVO etcdParserVO) {
|
||||
this.etcdParserVO = etcdParserVO;
|
||||
|
||||
try {
|
||||
try {
|
||||
this.client = ContextAwareHolder.loadContextAware().getBean(EtcdClient.class);
|
||||
}
|
||||
catch (Exception ignored) {
|
||||
}
|
||||
if (this.client == null) {
|
||||
ClientBuilder clientBuilder = Client.builder().endpoints(etcdParserVO.getEndpoints().split(","));
|
||||
if (StrUtil.isNotBlank(etcdParserVO.getNamespace())) {
|
||||
clientBuilder.namespace(ByteSequence.from(etcdParserVO.getNamespace(), CharsetUtil.CHARSET_UTF_8));
|
||||
}
|
||||
if (StrUtil.isAllNotBlank(etcdParserVO.getUser(), etcdParserVO.getPassword())) {
|
||||
clientBuilder.user(ByteSequence.from(etcdParserVO.getUser(), CharsetUtil.CHARSET_UTF_8));
|
||||
clientBuilder.password(ByteSequence.from(etcdParserVO.getPassword(), CharsetUtil.CHARSET_UTF_8));
|
||||
}
|
||||
this.client = new EtcdClient(clientBuilder.build());
|
||||
}
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw new EtcdException(e.getMessage());
|
||||
}
|
||||
}
|
||||
try {
|
||||
try {
|
||||
this.client = ContextAwareHolder.loadContextAware().getBean(EtcdClient.class);
|
||||
} catch (Exception ignored) {
|
||||
}
|
||||
if (this.client == null) {
|
||||
ClientBuilder clientBuilder = Client.builder().endpoints(etcdParserVO.getEndpoints().split(","));
|
||||
if (StrUtil.isNotBlank(etcdParserVO.getNamespace())) {
|
||||
clientBuilder.namespace(ByteSequence.from(etcdParserVO.getNamespace(), CharsetUtil.CHARSET_UTF_8));
|
||||
}
|
||||
if (StrUtil.isAllNotBlank(etcdParserVO.getUser(), etcdParserVO.getPassword())) {
|
||||
clientBuilder.user(ByteSequence.from(etcdParserVO.getUser(), CharsetUtil.CHARSET_UTF_8));
|
||||
clientBuilder.password(ByteSequence.from(etcdParserVO.getPassword(), CharsetUtil.CHARSET_UTF_8));
|
||||
}
|
||||
this.client = new EtcdClient(clientBuilder.build());
|
||||
}
|
||||
} catch (Exception e) {
|
||||
throw new EtcdException(e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
public String getContent() {
|
||||
try {
|
||||
// 检查chainPath路径下有没有子节点
|
||||
List<String> chainNameList = client.getChildrenKeys(etcdParserVO.getChainPath(), SEPARATOR);
|
||||
public String getContent() {
|
||||
try {
|
||||
// 检查chainPath路径下有没有子节点
|
||||
List<String> chainNameList = client.getChildrenKeys(etcdParserVO.getChainPath(), SEPARATOR);
|
||||
|
||||
// 获取chainPath路径下的所有子节点内容List
|
||||
List<String> chainItemContentList = new ArrayList<>();
|
||||
for (String chainName : chainNameList) {
|
||||
String chainData = client.get(StrUtil.format("{}/{}", etcdParserVO.getChainPath(), chainName));
|
||||
if (StrUtil.isNotBlank(chainData)) {
|
||||
chainItemContentList.add(StrUtil.format(CHAIN_XML_PATTERN, chainName, chainData));
|
||||
}
|
||||
}
|
||||
// 合并成所有chain的xml内容
|
||||
String chainAllContent = CollUtil.join(chainItemContentList, StrUtil.EMPTY);
|
||||
// 获取chainPath路径下的所有子节点内容List
|
||||
List<String> chainItemContentList = new ArrayList<>();
|
||||
for (String chainName : chainNameList) {
|
||||
RuleParsePluginUtil.ChainDto chainDto = RuleParsePluginUtil.parseChainKey(chainName);
|
||||
String chainData = client.get(StrUtil.format("{}/{}", etcdParserVO.getChainPath(), chainName));
|
||||
if (StrUtil.isNotBlank(chainData) && chainDto.isEnable()) {
|
||||
chainItemContentList.add(chainDto.toElXml(chainData));
|
||||
}
|
||||
}
|
||||
// 合并成所有chain的xml内容
|
||||
String chainAllContent = CollUtil.join(chainItemContentList, StrUtil.EMPTY);
|
||||
|
||||
// 检查是否有脚本内容,如果有,进行脚本内容的获取
|
||||
String scriptAllContent = StrUtil.EMPTY;
|
||||
if (hasScript()) {
|
||||
List<String> scriptNodeValueList = client.getChildrenKeys(etcdParserVO.getScriptPath(), SEPARATOR)
|
||||
.stream()
|
||||
.filter(StrUtil::isNotBlank)
|
||||
.collect(Collectors.toList());
|
||||
// 检查是否有脚本内容,如果有,进行脚本内容的获取
|
||||
String scriptAllContent = StrUtil.EMPTY;
|
||||
if (hasScript()) {
|
||||
List<String> scriptNodeValueList = client.getChildrenKeys(etcdParserVO.getScriptPath(), SEPARATOR)
|
||||
.stream()
|
||||
.filter(StrUtil::isNotBlank)
|
||||
.collect(Collectors.toList());
|
||||
|
||||
List<String> scriptItemContentList = new ArrayList<>();
|
||||
for (String scriptNodeValue : scriptNodeValueList) {
|
||||
NodeSimpleVO nodeSimpleVO = convert(scriptNodeValue);
|
||||
if (Objects.isNull(nodeSimpleVO)) {
|
||||
throw new EtcdException(
|
||||
StrUtil.format("The name of the etcd node is invalid:{}", scriptNodeValue));
|
||||
}
|
||||
String scriptData = client
|
||||
.get(StrUtil.format("{}/{}", etcdParserVO.getScriptPath(), scriptNodeValue));
|
||||
List<String> scriptItemContentList = new ArrayList<>();
|
||||
for (String scriptNodeValue : scriptNodeValueList) {
|
||||
NodeConvertHelper.NodeSimpleVO nodeSimpleVO = NodeConvertHelper.convert(scriptNodeValue);
|
||||
if (Objects.isNull(nodeSimpleVO)) {
|
||||
throw new EtcdException(
|
||||
StrUtil.format("The name of the etcd node is invalid:{}", scriptNodeValue));
|
||||
}
|
||||
String scriptData = client
|
||||
.get(StrUtil.format("{}/{}", etcdParserVO.getScriptPath(), scriptNodeValue));
|
||||
|
||||
scriptItemContentList.add(StrUtil.format(NODE_ITEM_XML_PATTERN, nodeSimpleVO.getNodeId(),
|
||||
nodeSimpleVO.getName(), nodeSimpleVO.getType(), scriptData));
|
||||
}
|
||||
nodeSimpleVO.setScript(scriptData);
|
||||
scriptItemContentList.add(RuleParsePluginUtil.toScriptXml(nodeSimpleVO));
|
||||
}
|
||||
|
||||
scriptAllContent = StrUtil.format(NODE_XML_PATTERN,
|
||||
CollUtil.join(scriptItemContentList, StrUtil.EMPTY));
|
||||
}
|
||||
|
||||
return StrUtil.format(XML_PATTERN, scriptAllContent, chainAllContent);
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw new EtcdException(e.getMessage());
|
||||
}
|
||||
}
|
||||
scriptAllContent = StrUtil.format(NODE_XML_PATTERN,
|
||||
CollUtil.join(scriptItemContentList, StrUtil.EMPTY));
|
||||
}
|
||||
|
||||
public boolean hasScript() {
|
||||
// 没有配置scriptPath
|
||||
if (StrUtil.isBlank(etcdParserVO.getScriptPath())) {
|
||||
return false;
|
||||
}
|
||||
return StrUtil.format(XML_PATTERN, scriptAllContent, chainAllContent);
|
||||
} catch (Exception e) {
|
||||
throw new EtcdException(e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
// 存在这个节点,但是子节点不存在
|
||||
List<String> chainNameList = client.getChildrenKeys(etcdParserVO.getScriptPath(), SEPARATOR);
|
||||
return !CollUtil.isEmpty(chainNameList);
|
||||
}
|
||||
catch (Exception e) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
public boolean hasScript() {
|
||||
// 没有配置scriptPath
|
||||
if (StrUtil.isBlank(etcdParserVO.getScriptPath())) {
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* 监听 etcd 节点
|
||||
*/
|
||||
public void listen() {
|
||||
this.client.watchChildChange(this.etcdParserVO.getChainPath(), (updatePath, updateValue) -> {
|
||||
LOG.info("starting reload flow config... update path={} value={},", updatePath, updateValue);
|
||||
String chainName = FileNameUtil.getName(updatePath);
|
||||
LiteFlowChainELBuilder.createChain().setChainId(chainName).setEL(updateValue).build();
|
||||
}, (deletePath) -> {
|
||||
LOG.info("starting reload flow config... delete path={}", deletePath);
|
||||
String chainName = FileNameUtil.getName(deletePath);
|
||||
FlowBus.removeChain(chainName);
|
||||
});
|
||||
try {
|
||||
// 存在这个节点,但是子节点不存在
|
||||
List<String> chainNameList = client.getChildrenKeys(etcdParserVO.getScriptPath(), SEPARATOR);
|
||||
return !CollUtil.isEmpty(chainNameList);
|
||||
} catch (Exception e) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
if (StrUtil.isNotBlank(this.etcdParserVO.getScriptPath())) {
|
||||
this.client.watchChildChange(this.etcdParserVO.getScriptPath(), (updatePath, updateValue) -> {
|
||||
LOG.info("starting reload flow config... update path={} value={}", updatePath, updateValue);
|
||||
String scriptNodeValue = FileNameUtil.getName(updatePath);
|
||||
NodeSimpleVO nodeSimpleVO = convert(scriptNodeValue);
|
||||
LiteFlowNodeBuilder.createScriptNode()
|
||||
.setId(nodeSimpleVO.getNodeId())
|
||||
.setType(NodeTypeEnum.getEnumByCode(nodeSimpleVO.type))
|
||||
.setName(nodeSimpleVO.getName())
|
||||
.setScript(updateValue)
|
||||
.build();
|
||||
}, (deletePath) -> {
|
||||
LOG.info("starting reload flow config... delete path={}", deletePath);
|
||||
String scriptNodeValue = FileNameUtil.getName(deletePath);
|
||||
NodeSimpleVO nodeSimpleVO = convert(scriptNodeValue);
|
||||
FlowBus.getNodeMap().remove(nodeSimpleVO.getNodeId());
|
||||
});
|
||||
}
|
||||
}
|
||||
/**
|
||||
* 监听 etcd 节点
|
||||
*/
|
||||
public void listen() {
|
||||
this.client.watchChildChange(this.etcdParserVO.getChainPath(), (updatePath, updateValue) -> {
|
||||
LOG.info("starting reload flow config... update path={} value={},", updatePath, updateValue);
|
||||
String changeKey = FileNameUtil.getName(updatePath);
|
||||
Pair<Boolean/*启停*/, String/*id*/> pair = RuleParsePluginUtil.parseIdKey(changeKey);
|
||||
Boolean enable = pair.getKey();
|
||||
String id = pair.getValue();
|
||||
// 如果是启用,就正常更新
|
||||
if (pair.getKey()) {
|
||||
LiteFlowChainELBuilder.createChain().setChainId(id).setEL(updateValue).build();
|
||||
}
|
||||
// 如果是禁用,就删除
|
||||
else {
|
||||
FlowBus.removeChain(id);
|
||||
}
|
||||
}, (deletePath) -> {
|
||||
LOG.info("starting reload flow config... delete path={}", deletePath);
|
||||
String chainName = FileNameUtil.getName(deletePath);
|
||||
Pair<Boolean/*启停*/, String/*id*/> pair = RuleParsePluginUtil.parseIdKey(chainName);
|
||||
FlowBus.removeChain(pair.getValue());
|
||||
});
|
||||
|
||||
public NodeSimpleVO convert(String str) {
|
||||
// 不需要去理解这串正则,就是一个匹配冒号的
|
||||
// 一定得是a:b,或是a:b:c...这种完整类型的字符串的
|
||||
List<String> matchItemList = ReUtil.findAllGroup0("(?<=[^:]:)[^:]+|[^:]+(?=:[^:])", str);
|
||||
if (CollUtil.isEmpty(matchItemList)) {
|
||||
return null;
|
||||
}
|
||||
if (StrUtil.isNotBlank(this.etcdParserVO.getScriptPath())) {
|
||||
this.client.watchChildChange(this.etcdParserVO.getScriptPath(), (updatePath, updateValue) -> {
|
||||
LOG.info("starting reload flow config... update path={} value={}", updatePath, updateValue);
|
||||
String scriptNodeValue = FileNameUtil.getName(updatePath);
|
||||
NodeConvertHelper.NodeSimpleVO nodeSimpleVO = NodeConvertHelper.convert(scriptNodeValue);
|
||||
// 启用就正常更新
|
||||
if (nodeSimpleVO.getEnable()) {
|
||||
LiteFlowNodeBuilder.createScriptNode()
|
||||
.setId(nodeSimpleVO.getNodeId())
|
||||
.setType(NodeTypeEnum.getEnumByCode(nodeSimpleVO.getType()))
|
||||
.setName(nodeSimpleVO.getName())
|
||||
.setScript(nodeSimpleVO.getScript())
|
||||
.build();
|
||||
}
|
||||
// 禁用就删除
|
||||
else {
|
||||
FlowBus.getNodeMap().remove(nodeSimpleVO.getNodeId());
|
||||
}
|
||||
}, (deletePath) -> {
|
||||
LOG.info("starting reload flow config... delete path={}", deletePath);
|
||||
String scriptNodeValue = FileNameUtil.getName(deletePath);
|
||||
NodeConvertHelper.NodeSimpleVO nodeSimpleVO = NodeConvertHelper.convert(scriptNodeValue);
|
||||
FlowBus.getNodeMap().remove(nodeSimpleVO.getNodeId());
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
NodeSimpleVO nodeSimpleVO = new NodeSimpleVO();
|
||||
if (matchItemList.size() > 1) {
|
||||
nodeSimpleVO.setNodeId(matchItemList.get(0));
|
||||
nodeSimpleVO.setType(matchItemList.get(1));
|
||||
}
|
||||
|
||||
if (matchItemList.size() > 2) {
|
||||
nodeSimpleVO.setName(matchItemList.get(2));
|
||||
}
|
||||
|
||||
return nodeSimpleVO;
|
||||
}
|
||||
|
||||
private static class NodeSimpleVO {
|
||||
|
||||
private String nodeId;
|
||||
|
||||
private String type;
|
||||
|
||||
private String name = "";
|
||||
|
||||
public String getNodeId() {
|
||||
return nodeId;
|
||||
}
|
||||
|
||||
public void setNodeId(String nodeId) {
|
||||
this.nodeId = nodeId;
|
||||
}
|
||||
|
||||
public String getType() {
|
||||
return type;
|
||||
}
|
||||
|
||||
public void setType(String type) {
|
||||
this.type = type;
|
||||
}
|
||||
|
||||
public String getName() {
|
||||
return name;
|
||||
}
|
||||
|
||||
public void setName(String name) {
|
||||
this.name = name;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -2,6 +2,7 @@ package com.yomahub.liteflow.test.etcd;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import com.yomahub.liteflow.core.FlowExecutor;
|
||||
import com.yomahub.liteflow.exception.ChainNotFoundException;
|
||||
import com.yomahub.liteflow.flow.FlowBus;
|
||||
import com.yomahub.liteflow.flow.LiteflowResponse;
|
||||
import com.yomahub.liteflow.parser.etcd.EtcdClient;
|
||||
@@ -55,20 +56,29 @@ public class EtcdWithXmlELSpringbootTest extends BaseTest {
|
||||
|
||||
@Test
|
||||
public void testEtcdNodeWithXml1() throws Exception {
|
||||
List<String> chainNameList = Lists.newArrayList("chain1");
|
||||
List<String> scriptNodeValueList = Lists.newArrayList("s1:script:脚本s1");
|
||||
List<String> chainNameList = Lists.newArrayList("chain1","chain2:false");
|
||||
List<String> scriptNodeValueList = Lists.newArrayList("s1:script:脚本s1", "s2:script:脚本s1:groovy:false");
|
||||
when(etcdClient.getChildrenKeys(anyString(), anyString())).thenReturn(chainNameList)
|
||||
.thenReturn(scriptNodeValueList);
|
||||
|
||||
String chain1Data = "THEN(a, b, c, s1);";
|
||||
String scriptNodeValue = "defaultContext.setData(\"test\",\"hello\");";
|
||||
when(etcdClient.get(anyString())).thenReturn(chain1Data).thenReturn(scriptNodeValue);
|
||||
when(etcdClient.get("chain1")).thenReturn("THEN(a, b, c, s1);");
|
||||
when(etcdClient.get("chain2:false")).thenReturn("THEN(a, b, c, s1);");
|
||||
when(etcdClient.get("s1:script:脚本s1")).thenReturn("defaultContext.setData(\"test\",\"hello\");");
|
||||
when(etcdClient.get("s2:script:脚本s1:groovy:false")).thenReturn("defaultContext.setData(\"test\",\"hello\");");
|
||||
|
||||
LiteflowResponse response = flowExecutor.execute2Resp("chain1", "arg");
|
||||
DefaultContext context = response.getFirstContextBean();
|
||||
Assertions.assertTrue(response.isSuccess());
|
||||
Assertions.assertEquals("a==>b==>c==>s1[脚本s1]", response.getExecuteStepStr());
|
||||
Assertions.assertEquals("hello", context.getData("test"));
|
||||
|
||||
// 测试 chain 停用
|
||||
Assertions.assertThrows(ChainNotFoundException.class, () -> {
|
||||
throw flowExecutor.execute2Resp("chain2", "arg").getCause();
|
||||
});
|
||||
|
||||
// 测试 script 停用
|
||||
Assertions.assertTrue(!FlowBus.getNodeMap().containsKey("s2"));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
||||
Reference in New Issue
Block a user