From c71efffa6a15a30b51f70298d0bdb6a7f45a5958 Mon Sep 17 00:00:00 2001 From: zendwang Date: Sun, 6 Nov 2022 22:19:21 +0800 Subject: [PATCH 1/5] =?UTF-8?q?enhancement=20=20=E6=94=AF=E6=8C=81etcd?= =?UTF-8?q?=E5=88=86=E7=A6=BBchain=E4=BB=A5=E5=8F=8A=E8=84=9A=E6=9C=AC?= =?UTF-8?q?=E7=9A=84=E5=AD=98=E5=82=A8=E7=BB=93=E6=9E=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../liteflow/parser/etcd/EtcdClient.java | 57 ++++++ .../liteflow/parser/etcd/EtcdXmlELParser.java | 29 +-- .../parser/etcd/util/EtcdParserHelper.java | 180 ++++++++++++++++-- .../liteflow/parser/etcd/vo/EtcdParserVO.java | 60 +++++- .../pom.xml | 6 + .../etcd/EtcdWithXmlELSpringbootTest.java | 31 +-- .../etcd/application-xml-cluster.properties | 6 +- .../resources/etcd/application-xml.properties | 6 +- 8 files changed, 317 insertions(+), 58 deletions(-) diff --git a/liteflow-rule-plugin/liteflow-rule-etcd/src/main/java/com/yomahub/liteflow/parser/etcd/EtcdClient.java b/liteflow-rule-plugin/liteflow-rule-etcd/src/main/java/com/yomahub/liteflow/parser/etcd/EtcdClient.java index 3b862752f..84e2db5eb 100644 --- a/liteflow-rule-plugin/liteflow-rule-etcd/src/main/java/com/yomahub/liteflow/parser/etcd/EtcdClient.java +++ b/liteflow-rule-plugin/liteflow-rule-etcd/src/main/java/com/yomahub/liteflow/parser/etcd/EtcdClient.java @@ -5,16 +5,20 @@ import io.etcd.jetcd.ByteSequence; import io.etcd.jetcd.Client; import io.etcd.jetcd.KeyValue; import io.etcd.jetcd.Watch; +import io.etcd.jetcd.options.GetOption; +import io.etcd.jetcd.options.WatchOption; import io.etcd.jetcd.watch.WatchEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.nio.charset.StandardCharsets; import java.util.List; +import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.function.BiConsumer; import java.util.function.Consumer; +import java.util.stream.Collectors; /** * Etcd 客户端封装类. @@ -79,6 +83,42 @@ public class EtcdClient { return prevKv; } + /** + * get node sub nodes. + * + * @param prefix node prefix. + * @param separator separator char + * @return sub nodes + * @throws ExecutionException the exception + * @throws InterruptedException the exception + */ + public List getChildrenKeys(final String prefix, final String separator) throws ExecutionException, InterruptedException { + ByteSequence prefixByteSequence = ByteSequence.from(prefix, StandardCharsets.UTF_8); + GetOption getOption = GetOption.newBuilder() + .withPrefix(prefixByteSequence) + .withSortField(GetOption.SortTarget.KEY) + .withSortOrder(GetOption.SortOrder.ASCEND) + .build(); + + List keyValues = client.getKVClient() + .get(prefixByteSequence, getOption) + .get() + .getKvs(); + + return keyValues.stream() + .map(e -> getSubNodeKeyName(prefix, e.getKey().toString(StandardCharsets.UTF_8), separator)) + .distinct() + .filter(e -> Objects.nonNull(e)) + .collect(Collectors.toList()); + } + + private String getSubNodeKeyName(final String prefix, final String fullPath, final String separator) { + if (prefix.length() > fullPath.length()) { + return null; + } + String pathWithoutPrefix = fullPath.substring(prefix.length()); + return pathWithoutPrefix.contains(separator) ? pathWithoutPrefix.substring(1) : pathWithoutPrefix; + } /** * subscribe data change. * @@ -94,6 +134,23 @@ public class EtcdClient { watchCache.put(key, watch); } + /** + * subscribe sub node change. + * + * @param key param node name. + * @param updateHandler sub node handler of update + * @param deleteHandler sub node delete of delete + */ + public void watchChildChange(final String key, + final BiConsumer updateHandler, + final Consumer deleteHandler) { + Watch.Listener listener = watch(updateHandler, deleteHandler); + WatchOption option = WatchOption.newBuilder() + .withPrefix(ByteSequence.from(key, StandardCharsets.UTF_8)) + .build(); + Watch.Watcher watch = client.getWatchClient().watch(ByteSequence.from(key, StandardCharsets.UTF_8), option, listener); + watchCache.put(key, watch); + } private Watch.Listener watch(final BiConsumer updateHandler, final Consumer deleteHandler) { return Watch.listener(response -> { diff --git a/liteflow-rule-plugin/liteflow-rule-etcd/src/main/java/com/yomahub/liteflow/parser/etcd/EtcdXmlELParser.java b/liteflow-rule-plugin/liteflow-rule-etcd/src/main/java/com/yomahub/liteflow/parser/etcd/EtcdXmlELParser.java index e6ddf2f63..0d74805dc 100644 --- a/liteflow-rule-plugin/liteflow-rule-etcd/src/main/java/com/yomahub/liteflow/parser/etcd/EtcdXmlELParser.java +++ b/liteflow-rule-plugin/liteflow-rule-etcd/src/main/java/com/yomahub/liteflow/parser/etcd/EtcdXmlELParser.java @@ -39,11 +39,11 @@ public class EtcdXmlELParser extends ClassXmlFlowELParser { throw new EtcdException("rule-source-ext-data is empty"); } - if (StrUtil.isBlank(etcdParserVO.getNodePath())){ - etcdParserVO.setNodePath("/lite-flow/flow"); + if (StrUtil.isBlank(etcdParserVO.getChainPath())){ + throw new EtcdException("You must configure the chainPath property"); } - if (StrUtil.isBlank(etcdParserVO.getConnectStr())){ - throw new EtcdException("Etcd connect string is empty"); + if (StrUtil.isBlank(etcdParserVO.getEndpoints())){ + throw new EtcdException("etcd endpoints is empty"); } etcdParserHelper = new EtcdParserHelper(etcdParserVO); @@ -54,17 +54,20 @@ public class EtcdXmlELParser extends ClassXmlFlowELParser { @Override public String parseCustom() { - Consumer parseConsumer = t -> { - try { - parse(t); - } catch (Exception e) { - throw new RuntimeException(e); - } - }; + try { String content = etcdParserHelper.getContent(); - etcdParserHelper.checkContent(content); - etcdParserHelper.listen(parseConsumer); + + Consumer listenerConsumer = t -> { + try { + parse(t); + } catch (Exception e) { + throw new RuntimeException(e); + } + }; + + etcdParserHelper.listen(listenerConsumer); + return content; } catch (Exception e){ throw new EtcdException(e.getMessage()); diff --git a/liteflow-rule-plugin/liteflow-rule-etcd/src/main/java/com/yomahub/liteflow/parser/etcd/util/EtcdParserHelper.java b/liteflow-rule-plugin/liteflow-rule-etcd/src/main/java/com/yomahub/liteflow/parser/etcd/util/EtcdParserHelper.java index 560ade1f1..89e4a9798 100644 --- a/liteflow-rule-plugin/liteflow-rule-etcd/src/main/java/com/yomahub/liteflow/parser/etcd/util/EtcdParserHelper.java +++ b/liteflow-rule-plugin/liteflow-rule-etcd/src/main/java/com/yomahub/liteflow/parser/etcd/util/EtcdParserHelper.java @@ -1,17 +1,23 @@ package com.yomahub.liteflow.parser.etcd.util; +import cn.hutool.core.collection.CollUtil; +import cn.hutool.core.collection.CollectionUtil; +import cn.hutool.core.util.CharsetUtil; +import cn.hutool.core.util.ReUtil; import cn.hutool.core.util.StrUtil; -import com.yomahub.liteflow.exception.ParseException; -import com.yomahub.liteflow.parser.el.XmlFlowELParser; import com.yomahub.liteflow.parser.etcd.EtcdClient; import com.yomahub.liteflow.parser.etcd.exception.EtcdException; import com.yomahub.liteflow.parser.etcd.vo.EtcdParserVO; import com.yomahub.liteflow.spi.holder.ContextAwareHolder; +import io.etcd.jetcd.ByteSequence; import io.etcd.jetcd.Client; +import io.etcd.jetcd.ClientBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.text.MessageFormat; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; import java.util.function.Consumer; /** @@ -22,22 +28,38 @@ public class EtcdParserHelper { private static final Logger LOG = LoggerFactory.getLogger(EtcdParserHelper.class); + private final String CHAIN_XML_PATTERN = "{}"; + + private final String NODE_XML_PATTERN = "{}"; + + private final String NODE_ITEM_XML_PATTERN = ""; + + private final String XML_PATTERN = "{}{}"; + + private static final String SEPARATOR = "/"; + private final EtcdParserVO etcdParserVO; - private EtcdClient etcdClient; + private EtcdClient client; public EtcdParserHelper(EtcdParserVO etcdParserVO) { this.etcdParserVO = etcdParserVO; try{ try{ - this.etcdClient = ContextAwareHolder.loadContextAware().getBean(EtcdClient.class); + this.client = ContextAwareHolder.loadContextAware().getBean(EtcdClient.class); }catch (Exception ignored){} - if (this.etcdClient == null) { - Client client = Client.builder() - .endpoints(etcdParserVO.getConnectStr().split(",")) - .build(); - this.etcdClient = new EtcdClient(client); + if (this.client == null) { + ClientBuilder clientBuilder = Client.builder() + .endpoints(etcdParserVO.getEndpoints().split(",")); + if (StrUtil.isNotBlank(etcdParserVO.getNamespace())) { + clientBuilder.namespace(ByteSequence.from(etcdParserVO.getNamespace(), CharsetUtil.CHARSET_UTF_8)); + } + if (StrUtil.isAllNotBlank(etcdParserVO.getUser(), etcdParserVO.getPassword())) { + clientBuilder.user(ByteSequence.from(etcdParserVO.getUser(), CharsetUtil.CHARSET_UTF_8)); + clientBuilder.password(ByteSequence.from(etcdParserVO.getPassword(), CharsetUtil.CHARSET_UTF_8)); + } + this.client = new EtcdClient(clientBuilder.build()); } }catch (Exception e){ throw new EtcdException(e.getMessage()); @@ -46,29 +68,149 @@ public class EtcdParserHelper { public String getContent(){ try{ - return this.etcdClient.get(etcdParserVO.getNodePath()); + //检查zk上有没有chainPath节点 +// if (client.get(etcdParserVO.getChainPath()) == null) { +// throw new EtcdException(StrUtil.format("etcd node[{}] is not exist", etcdParserVO.getChainPath())); +// } + + //检查chainPath路径下有没有子节点 + List chainNameList = client.getChildrenKeys(etcdParserVO.getChainPath(), SEPARATOR); + if (CollectionUtil.isEmpty(chainNameList)){ + throw new EtcdException(StrUtil.format("There are no chains in path [{}]", etcdParserVO.getChainPath())); + } + + //获取chainPath路径下的所有子节点内容List + List chainItemContentList = new ArrayList<>(); + for (String chainName : chainNameList){ + String chainData = client.get(StrUtil.format("{}/{}", etcdParserVO.getChainPath(), chainName)); + if (StrUtil.isNotBlank(chainData)) { + chainItemContentList.add(StrUtil.format(CHAIN_XML_PATTERN, chainName, chainData)); + } + } + //合并成所有chain的xml内容 + String chainAllContent = CollUtil.join(chainItemContentList, StrUtil.EMPTY); + + //检查是否有脚本内容,如果有,进行脚本内容的获取 + String scriptAllContent = StrUtil.EMPTY; + if (hasScript()){ + List scriptNodeValueList = client.getChildrenKeys(etcdParserVO.getScriptPath(), SEPARATOR); + + List scriptItemContentList = new ArrayList<>(); + for (String scriptNodeValue: scriptNodeValueList){ + NodeSimpleVO nodeSimpleVO = convert(scriptNodeValue); + if (Objects.isNull(nodeSimpleVO)){ + throw new EtcdException(StrUtil.format("The name of the etcd node is invalid:{}", scriptNodeValue)); + } + String scriptData = client.get(StrUtil.format("{}/{}", etcdParserVO.getScriptPath(), scriptNodeValue)); + + scriptItemContentList.add( + StrUtil.format(NODE_ITEM_XML_PATTERN, + nodeSimpleVO.getNodeId(), + nodeSimpleVO.getName(), + nodeSimpleVO.getType(), + scriptData) + ); + } + + scriptAllContent = StrUtil.format(NODE_XML_PATTERN, CollUtil.join(scriptItemContentList, StrUtil.EMPTY)); + } + + return StrUtil.format(XML_PATTERN, scriptAllContent, chainAllContent); }catch (Exception e){ throw new EtcdException(e.getMessage()); } } - /** - * 检查 content 是否合法 - */ - public void checkContent(String content) { - if (StrUtil.isBlank(content)) { - String error = MessageFormat.format("the node[{0}] value is empty", etcdParserVO.getNodePath()); - throw new ParseException(error); + public boolean hasScript(){ + //没有配置scriptPath + if (StrUtil.isBlank(etcdParserVO.getScriptPath())){ + return false; + } + + try{ + //配置了,但是不存在这个节点 +// if (client.get(etcdParserVO.getScriptPath()) == null){ +// return false; +// } + + //存在这个节点,但是子节点不存在 + List chainNameList = client.getChildrenKeys(etcdParserVO.getScriptPath(), SEPARATOR); + if (CollUtil.isEmpty(chainNameList)){ + return false; + } + + return true; + }catch (Exception e){ + return false; } } + /** * 监听 etcd 节点 */ public void listen(Consumer parseConsumer) { - this.etcdClient.watchDataChange(this.etcdParserVO.getNodePath(), (updatePath, updateValue) -> { + this.client.watchChildChange(this.etcdParserVO.getChainPath(), (updatePath, updateValue) -> { LOG.info("starting load flow config...."); parseConsumer.accept(updateValue); }, null); + this.client.watchChildChange(this.etcdParserVO.getScriptPath(), (updatePath, updateValue) -> { + LOG.info("starting load flow config...."); + parseConsumer.accept(updateValue); + }, null); + } + + public NodeSimpleVO convert(String str){ + //不需要去理解这串正则,就是一个匹配冒号的 + //一定得是a:b,或是a:b:c...这种完整类型的字符串的 + List matchItemList = ReUtil.findAllGroup0("(?<=[^:]:)[^:]+|[^:]+(?=:[^:])", str); + if (CollUtil.isEmpty(matchItemList)){ + return null; + } + + NodeSimpleVO nodeSimpleVO = new NodeSimpleVO(); + if (matchItemList.size() > 1){ + nodeSimpleVO.setNodeId(matchItemList.get(0)); + nodeSimpleVO.setType(matchItemList.get(1)); + } + + if (matchItemList.size() > 2){ + nodeSimpleVO.setName(matchItemList.get(2)); + } + + return nodeSimpleVO; + } + + private static class NodeSimpleVO{ + + private String nodeId; + + private String type; + + private String name=""; + + public String getNodeId() { + return nodeId; + } + + public void setNodeId(String nodeId) { + this.nodeId = nodeId; + } + + public String getType() { + return type; + } + + public void setType(String type) { + this.type = type; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } } } diff --git a/liteflow-rule-plugin/liteflow-rule-etcd/src/main/java/com/yomahub/liteflow/parser/etcd/vo/EtcdParserVO.java b/liteflow-rule-plugin/liteflow-rule-etcd/src/main/java/com/yomahub/liteflow/parser/etcd/vo/EtcdParserVO.java index c7e5b291a..4bb32a9d6 100644 --- a/liteflow-rule-plugin/liteflow-rule-etcd/src/main/java/com/yomahub/liteflow/parser/etcd/vo/EtcdParserVO.java +++ b/liteflow-rule-plugin/liteflow-rule-etcd/src/main/java/com/yomahub/liteflow/parser/etcd/vo/EtcdParserVO.java @@ -7,23 +7,63 @@ package com.yomahub.liteflow.parser.etcd.vo; */ public class EtcdParserVO { - private String connectStr; + private String endpoints; - private String nodePath; + private String user; - public String getConnectStr() { - return connectStr; + private String password; + + private String namespace; + + private String chainPath; + + private String scriptPath; + + public String getEndpoints() { + return endpoints; } - public void setConnectStr(String connectStr) { - this.connectStr = connectStr; + public void setEndpoints(String endpoints) { + this.endpoints = endpoints; } - public String getNodePath() { - return nodePath; + public String getUser() { + return user; } - public void setNodePath(String nodePath) { - this.nodePath = nodePath; + public void setUser(String user) { + this.user = user; + } + + public String getPassword() { + return password; + } + + public void setPassword(String password) { + this.password = password; + } + + public String getNamespace() { + return namespace; + } + + public void setNamespace(String namespace) { + this.namespace = namespace; + } + + public String getChainPath() { + return chainPath; + } + + public void setChainPath(String chainPath) { + this.chainPath = chainPath; + } + + public String getScriptPath() { + return scriptPath; + } + + public void setScriptPath(String scriptPath) { + this.scriptPath = scriptPath; } } diff --git a/liteflow-testcase-el/liteflow-testcase-el-etcd-springboot/pom.xml b/liteflow-testcase-el/liteflow-testcase-el-etcd-springboot/pom.xml index 292cf4003..917a7ab1a 100644 --- a/liteflow-testcase-el/liteflow-testcase-el-etcd-springboot/pom.xml +++ b/liteflow-testcase-el/liteflow-testcase-el-etcd-springboot/pom.xml @@ -26,6 +26,12 @@ test + + com.yomahub + liteflow-script-groovy + ${revision} + + org.springframework.boot spring-boot-starter-test diff --git a/liteflow-testcase-el/liteflow-testcase-el-etcd-springboot/src/test/java/com/yomahub/liteflow/test/etcd/EtcdWithXmlELSpringbootTest.java b/liteflow-testcase-el/liteflow-testcase-el-etcd-springboot/src/test/java/com/yomahub/liteflow/test/etcd/EtcdWithXmlELSpringbootTest.java index 5a1cfb098..05adea606 100644 --- a/liteflow-testcase-el/liteflow-testcase-el-etcd-springboot/src/test/java/com/yomahub/liteflow/test/etcd/EtcdWithXmlELSpringbootTest.java +++ b/liteflow-testcase-el/liteflow-testcase-el-etcd-springboot/src/test/java/com/yomahub/liteflow/test/etcd/EtcdWithXmlELSpringbootTest.java @@ -8,6 +8,7 @@ import com.yomahub.liteflow.flow.LiteflowResponse; import com.yomahub.liteflow.parser.etcd.EtcdClient; import com.yomahub.liteflow.parser.etcd.EtcdXmlELParser; import com.yomahub.liteflow.parser.etcd.util.EtcdParserHelper; +import com.yomahub.liteflow.slot.DefaultContext; import com.yomahub.liteflow.spi.holder.ContextAwareHolder; import com.yomahub.liteflow.test.BaseTest; import org.junit.*; @@ -34,14 +35,14 @@ import static org.mockito.Mockito.*; * springboot环境下的etcd 规则解析器 测试 */ @RunWith(SpringRunner.class) -@TestPropertySource(value = "classpath:/etcd/application-xml-cluster.properties") +@TestPropertySource(value = "classpath:/etcd/application-xml.properties") @SpringBootTest(classes = EtcdWithXmlELSpringbootTest.class) @EnableAutoConfiguration @ComponentScan({"com.yomahub.liteflow.test.etcd.cmp"}) public class EtcdWithXmlELSpringbootTest extends BaseTest { - @MockBean - private EtcdClient etcdClient; + //@MockBean + //private EtcdClient etcdClient; @Resource private FlowExecutor flowExecutor; @@ -58,29 +59,31 @@ public class EtcdWithXmlELSpringbootTest extends BaseTest { @Test public void testEtcdNodeWithXml1() throws Exception { - String flowXml = "THEN(a, b, c);"; - when(etcdClient.get(anyString())).thenReturn(flowXml); + //String flowXml = "THEN(a, b, c);"; + //when(etcdClient.get(anyString())).thenReturn(flowXml); LiteflowResponse response = flowExecutor.execute2Resp("chain1", "arg"); + DefaultContext context = response.getFirstContextBean(); Assert.assertTrue(response.isSuccess()); - Assert.assertEquals("a==>b==>c", response.getExecuteStepStr()); + Assert.assertEquals("a==>b==>c==>s1[脚本s1]", response.getExecuteStepStr()); + Assert.assertEquals("hello", context.getData("test")); } @Test public void testEtcdNodeWithXml2() throws Exception { - String flowXml = "THEN(a, b, c);"; - String changedFlowXml = "THEN(a, c);"; - when(etcdClient.get(anyString())).thenReturn(flowXml).thenReturn(changedFlowXml); - +// String flowXml = "THEN(a, b, c);"; +// String changedFlowXml = "THEN(a, c);"; +// when(etcdClient.get(anyString())).thenReturn(flowXml).thenReturn(changedFlowXml); +// LiteflowResponse response = flowExecutor.execute2Resp("chain1", "arg"); Assert.assertTrue(response.isSuccess()); - Assert.assertEquals("a==>b==>c", response.getExecuteStepStr()); + Assert.assertEquals("a==>b==>c==>s1[脚本s1]", response.getExecuteStepStr()); // 手动触发一次 模拟节点数据变更 - FlowBus.refreshFlowMetaData(FlowParserTypeEnum.TYPE_EL_XML,changedFlowXml); - + //FlowBus.refreshFlowMetaData(FlowParserTypeEnum.TYPE_EL_XML,changedFlowXml); + Thread.sleep(9000); LiteflowResponse response2 = flowExecutor.execute2Resp("chain1", "arg"); Assert.assertTrue(response2.isSuccess()); - Assert.assertEquals("a==>c", response2.getExecuteStepStr()); + Assert.assertEquals("a==>b==>s1[脚本s1]", response2.getExecuteStepStr()); } } diff --git a/liteflow-testcase-el/liteflow-testcase-el-etcd-springboot/src/test/resources/etcd/application-xml-cluster.properties b/liteflow-testcase-el/liteflow-testcase-el-etcd-springboot/src/test/resources/etcd/application-xml-cluster.properties index bf67b167c..a71c2a7f9 100644 --- a/liteflow-testcase-el/liteflow-testcase-el-etcd-springboot/src/test/resources/etcd/application-xml-cluster.properties +++ b/liteflow-testcase-el/liteflow-testcase-el-etcd-springboot/src/test/resources/etcd/application-xml-cluster.properties @@ -1,2 +1,6 @@ -liteflow.rule-source-ext-data={"connectStr":"http://localhost:2379,http://localhost:3379,http://localhost:4379"} +liteflow.rule-source-ext-data={\ + "endpoints":"http://127.0.0.1:2379,http://127.0.0.1:3379,http://127.0.0.1:4379",\ + "chainPath": "/liteflow/chain",\ + "scriptPath": "/liteflow/script"\ + } liteflow.parse-on-start=false \ No newline at end of file diff --git a/liteflow-testcase-el/liteflow-testcase-el-etcd-springboot/src/test/resources/etcd/application-xml.properties b/liteflow-testcase-el/liteflow-testcase-el-etcd-springboot/src/test/resources/etcd/application-xml.properties index 3938e12e9..9f880ba1b 100644 --- a/liteflow-testcase-el/liteflow-testcase-el-etcd-springboot/src/test/resources/etcd/application-xml.properties +++ b/liteflow-testcase-el/liteflow-testcase-el-etcd-springboot/src/test/resources/etcd/application-xml.properties @@ -1 +1,5 @@ -liteflow.rule-source-ext-data={"connectStr":"http://localhost:2379"} \ No newline at end of file +liteflow.rule-source-ext-data={\ + "endpoints":"http://127.0.0.1:2379",\ + "chainPath": "/liteflow/chain",\ + "scriptPath": "/liteflow/script"\ + } \ No newline at end of file From b05aba0e1d099922e9a7017034b7ff5a6ce8fdd7 Mon Sep 17 00:00:00 2001 From: zendwang Date: Mon, 7 Nov 2022 22:51:47 +0800 Subject: [PATCH 2/5] =?UTF-8?q?enhancement=20=20=E6=94=AF=E6=8C=81etcd?= =?UTF-8?q?=E5=88=86=E7=A6=BBchain=E4=BB=A5=E5=8F=8A=E8=84=9A=E6=9C=AC?= =?UTF-8?q?=E7=9A=84=E5=AD=98=E5=82=A8=E7=BB=93=E6=9E=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../parser/etcd/util/EtcdParserHelper.java | 28 ++++++++----------- .../etcd/EtcdWithXmlELSpringbootTest.java | 13 ++++++--- 2 files changed, 21 insertions(+), 20 deletions(-) diff --git a/liteflow-rule-plugin/liteflow-rule-etcd/src/main/java/com/yomahub/liteflow/parser/etcd/util/EtcdParserHelper.java b/liteflow-rule-plugin/liteflow-rule-etcd/src/main/java/com/yomahub/liteflow/parser/etcd/util/EtcdParserHelper.java index 89e4a9798..45b378fc1 100644 --- a/liteflow-rule-plugin/liteflow-rule-etcd/src/main/java/com/yomahub/liteflow/parser/etcd/util/EtcdParserHelper.java +++ b/liteflow-rule-plugin/liteflow-rule-etcd/src/main/java/com/yomahub/liteflow/parser/etcd/util/EtcdParserHelper.java @@ -68,11 +68,6 @@ public class EtcdParserHelper { public String getContent(){ try{ - //检查zk上有没有chainPath节点 -// if (client.get(etcdParserVO.getChainPath()) == null) { -// throw new EtcdException(StrUtil.format("etcd node[{}] is not exist", etcdParserVO.getChainPath())); -// } - //检查chainPath路径下有没有子节点 List chainNameList = client.getChildrenKeys(etcdParserVO.getChainPath(), SEPARATOR); if (CollectionUtil.isEmpty(chainNameList)){ @@ -128,11 +123,6 @@ public class EtcdParserHelper { } try{ - //配置了,但是不存在这个节点 -// if (client.get(etcdParserVO.getScriptPath()) == null){ -// return false; -// } - //存在这个节点,但是子节点不存在 List chainNameList = client.getChildrenKeys(etcdParserVO.getScriptPath(), SEPARATOR); if (CollUtil.isEmpty(chainNameList)){ @@ -151,13 +141,19 @@ public class EtcdParserHelper { */ public void listen(Consumer parseConsumer) { this.client.watchChildChange(this.etcdParserVO.getChainPath(), (updatePath, updateValue) -> { - LOG.info("starting load flow config...."); - parseConsumer.accept(updateValue); - }, null); + LOG.info("update path={} value={},starting reload flow config...", updatePath, updateValue); + parseConsumer.accept(getContent()); + }, (deletePath) -> { + LOG.info("delete path={},starting reload flow config...", deletePath); + parseConsumer.accept(getContent()); + }); this.client.watchChildChange(this.etcdParserVO.getScriptPath(), (updatePath, updateValue) -> { - LOG.info("starting load flow config...."); - parseConsumer.accept(updateValue); - }, null); + LOG.info("update path={} value={},starting reload flow config...", updatePath, updateValue); + parseConsumer.accept(getContent()); + }, (deletePath) -> { + LOG.info("delete path={},starting reload flow config....", deletePath); + parseConsumer.accept(getContent()); + }); } public NodeSimpleVO convert(String str){ diff --git a/liteflow-testcase-el/liteflow-testcase-el-etcd-springboot/src/test/java/com/yomahub/liteflow/test/etcd/EtcdWithXmlELSpringbootTest.java b/liteflow-testcase-el/liteflow-testcase-el-etcd-springboot/src/test/java/com/yomahub/liteflow/test/etcd/EtcdWithXmlELSpringbootTest.java index 05adea606..082e36f10 100644 --- a/liteflow-testcase-el/liteflow-testcase-el-etcd-springboot/src/test/java/com/yomahub/liteflow/test/etcd/EtcdWithXmlELSpringbootTest.java +++ b/liteflow-testcase-el/liteflow-testcase-el-etcd-springboot/src/test/java/com/yomahub/liteflow/test/etcd/EtcdWithXmlELSpringbootTest.java @@ -1,5 +1,6 @@ package com.yomahub.liteflow.test.etcd; +import cn.hutool.core.lang.Console; import cn.hutool.core.util.ReflectUtil; import com.yomahub.liteflow.core.FlowExecutor; import com.yomahub.liteflow.enums.FlowParserTypeEnum; @@ -65,7 +66,7 @@ public class EtcdWithXmlELSpringbootTest extends BaseTest { LiteflowResponse response = flowExecutor.execute2Resp("chain1", "arg"); DefaultContext context = response.getFirstContextBean(); Assert.assertTrue(response.isSuccess()); - Assert.assertEquals("a==>b==>c==>s1[脚本s1]", response.getExecuteStepStr()); + Assert.assertEquals("a==>b==>s1[脚本s1]", response.getExecuteStepStr()); Assert.assertEquals("hello", context.getData("test")); } @@ -77,13 +78,17 @@ public class EtcdWithXmlELSpringbootTest extends BaseTest { // LiteflowResponse response = flowExecutor.execute2Resp("chain1", "arg"); Assert.assertTrue(response.isSuccess()); - Assert.assertEquals("a==>b==>c==>s1[脚本s1]", response.getExecuteStepStr()); + Assert.assertEquals("a==>b==>s1[脚本s1]", response.getExecuteStepStr()); + int i=0; + while (i <= 100000) { + i++; + } // 手动触发一次 模拟节点数据变更 //FlowBus.refreshFlowMetaData(FlowParserTypeEnum.TYPE_EL_XML,changedFlowXml); - Thread.sleep(9000); + LiteflowResponse response2 = flowExecutor.execute2Resp("chain1", "arg"); Assert.assertTrue(response2.isSuccess()); - Assert.assertEquals("a==>b==>s1[脚本s1]", response2.getExecuteStepStr()); + Assert.assertEquals("a==>b==>c==>s1[脚本s1]", response.getExecuteStepStr()); } } From 9467f8ba6b0eaf858eaf3b374c82322edabb6240 Mon Sep 17 00:00:00 2001 From: zendwang Date: Tue, 8 Nov 2022 20:10:28 +0800 Subject: [PATCH 3/5] =?UTF-8?q?enhancement=20=E6=94=AF=E6=8C=81etcd?= =?UTF-8?q?=E5=88=86=E7=A6=BBchain=E4=BB=A5=E5=8F=8A=E8=84=9A=E6=9C=AC?= =?UTF-8?q?=E7=9A=84=E5=AD=98=E5=82=A8=E7=BB=93=E6=9E=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../parser/etcd/util/EtcdParserHelper.java | 4 +- .../etcd/EtcdWithXmlELSpringbootTest.java | 75 +++++++++++-------- .../resources/etcd/application-xml.properties | 3 +- 3 files changed, 47 insertions(+), 35 deletions(-) diff --git a/liteflow-rule-plugin/liteflow-rule-etcd/src/main/java/com/yomahub/liteflow/parser/etcd/util/EtcdParserHelper.java b/liteflow-rule-plugin/liteflow-rule-etcd/src/main/java/com/yomahub/liteflow/parser/etcd/util/EtcdParserHelper.java index 45b378fc1..4cb4aaebc 100644 --- a/liteflow-rule-plugin/liteflow-rule-etcd/src/main/java/com/yomahub/liteflow/parser/etcd/util/EtcdParserHelper.java +++ b/liteflow-rule-plugin/liteflow-rule-etcd/src/main/java/com/yomahub/liteflow/parser/etcd/util/EtcdParserHelper.java @@ -142,7 +142,9 @@ public class EtcdParserHelper { public void listen(Consumer parseConsumer) { this.client.watchChildChange(this.etcdParserVO.getChainPath(), (updatePath, updateValue) -> { LOG.info("update path={} value={},starting reload flow config...", updatePath, updateValue); - parseConsumer.accept(getContent()); + String content = getContent(); + LOG.info("update path={} value={},content={}", content); +// parseConsumer.accept(content); }, (deletePath) -> { LOG.info("delete path={},starting reload flow config...", deletePath); parseConsumer.accept(getContent()); diff --git a/liteflow-testcase-el/liteflow-testcase-el-etcd-springboot/src/test/java/com/yomahub/liteflow/test/etcd/EtcdWithXmlELSpringbootTest.java b/liteflow-testcase-el/liteflow-testcase-el-etcd-springboot/src/test/java/com/yomahub/liteflow/test/etcd/EtcdWithXmlELSpringbootTest.java index 082e36f10..283edd5fe 100644 --- a/liteflow-testcase-el/liteflow-testcase-el-etcd-springboot/src/test/java/com/yomahub/liteflow/test/etcd/EtcdWithXmlELSpringbootTest.java +++ b/liteflow-testcase-el/liteflow-testcase-el-etcd-springboot/src/test/java/com/yomahub/liteflow/test/etcd/EtcdWithXmlELSpringbootTest.java @@ -1,53 +1,49 @@ package com.yomahub.liteflow.test.etcd; -import cn.hutool.core.lang.Console; -import cn.hutool.core.util.ReflectUtil; +import com.google.common.collect.Lists; import com.yomahub.liteflow.core.FlowExecutor; -import com.yomahub.liteflow.enums.FlowParserTypeEnum; import com.yomahub.liteflow.flow.FlowBus; import com.yomahub.liteflow.flow.LiteflowResponse; import com.yomahub.liteflow.parser.etcd.EtcdClient; -import com.yomahub.liteflow.parser.etcd.EtcdXmlELParser; -import com.yomahub.liteflow.parser.etcd.util.EtcdParserHelper; import com.yomahub.liteflow.slot.DefaultContext; -import com.yomahub.liteflow.spi.holder.ContextAwareHolder; import com.yomahub.liteflow.test.BaseTest; import org.junit.*; import org.junit.runner.RunWith; -import org.mockito.Answers; -import org.mockito.Mock; import org.mockito.MockitoAnnotations; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.boot.test.mock.mockito.MockBean; -import org.springframework.boot.test.mock.mockito.MockitoTestExecutionListener; import org.springframework.context.annotation.ComponentScan; -import org.springframework.test.context.TestExecutionListeners; import org.springframework.test.context.TestPropertySource; import org.springframework.test.context.junit4.SpringRunner; import javax.annotation.Resource; -import java.util.function.Consumer; - -import static org.mockito.ArgumentMatchers.any; +import java.util.List; import static org.mockito.Mockito.*; /** * springboot环境下的etcd 规则解析器 测试 */ @RunWith(SpringRunner.class) -@TestPropertySource(value = "classpath:/etcd/application-xml.properties") +@TestPropertySource(value = "classpath:/etcd/application-xml-cluster.properties") @SpringBootTest(classes = EtcdWithXmlELSpringbootTest.class) @EnableAutoConfiguration @ComponentScan({"com.yomahub.liteflow.test.etcd.cmp"}) public class EtcdWithXmlELSpringbootTest extends BaseTest { - //@MockBean - //private EtcdClient etcdClient; + @MockBean + private EtcdClient etcdClient; @Resource private FlowExecutor flowExecutor; + private static final String SEPARATOR = "/"; + + private static final String CHAIN_PATH = "/liteflow/chain"; + + private static final String SCRIPT_PATH = "/liteflow/script"; + + @Before public void setUp(){ MockitoAnnotations.initMocks(this); @@ -60,35 +56,48 @@ public class EtcdWithXmlELSpringbootTest extends BaseTest { @Test public void testEtcdNodeWithXml1() throws Exception { - //String flowXml = "THEN(a, b, c);"; - //when(etcdClient.get(anyString())).thenReturn(flowXml); + List chainNameList = Lists.newArrayList("chain1"); + List scriptNodeValueList = Lists.newArrayList("s1:script:脚本s1"); + when(etcdClient.getChildrenKeys(anyString(), anyString())).thenReturn(chainNameList).thenReturn(scriptNodeValueList); + + String chain1Data = "THEN(a, b, c, s1);"; + String scriptNodeValue = "defaultContext.setData(\"test\",\"hello\");"; + when(etcdClient.get(anyString())).thenReturn(chain1Data).thenReturn(scriptNodeValue); LiteflowResponse response = flowExecutor.execute2Resp("chain1", "arg"); DefaultContext context = response.getFirstContextBean(); Assert.assertTrue(response.isSuccess()); - Assert.assertEquals("a==>b==>s1[脚本s1]", response.getExecuteStepStr()); + Assert.assertEquals("a==>b==>c==>s1[脚本s1]", response.getExecuteStepStr()); Assert.assertEquals("hello", context.getData("test")); } @Test public void testEtcdNodeWithXml2() throws Exception { -// String flowXml = "THEN(a, b, c);"; -// String changedFlowXml = "THEN(a, c);"; -// when(etcdClient.get(anyString())).thenReturn(flowXml).thenReturn(changedFlowXml); -// - LiteflowResponse response = flowExecutor.execute2Resp("chain1", "arg"); - Assert.assertTrue(response.isSuccess()); - Assert.assertEquals("a==>b==>s1[脚本s1]", response.getExecuteStepStr()); + List chainNameList = Lists.newArrayList("chain1"); + List scriptNodeValueList = Lists.newArrayList("s1:script:脚本s1"); + when(etcdClient.getChildrenKeys(CHAIN_PATH, SEPARATOR)).thenReturn(chainNameList); + when(etcdClient.getChildrenKeys(SCRIPT_PATH, SEPARATOR)).thenReturn(scriptNodeValueList); - int i=0; - while (i <= 100000) { - i++; - } - // 手动触发一次 模拟节点数据变更 - //FlowBus.refreshFlowMetaData(FlowParserTypeEnum.TYPE_EL_XML,changedFlowXml); + String chain1Data = "THEN(a, b, c, s1);"; + String chain1ChangedData = "THEN(a, b, s1);"; + String scriptNodeValue = "defaultContext.setData(\"test\",\"hello\");"; + String scriptNodeChangedValue = "defaultContext.setData(\"test\",\"hello world\");"; + when(etcdClient.get(CHAIN_PATH + SEPARATOR + "chain1")).thenReturn(chain1Data).thenReturn(chain1ChangedData); + when(etcdClient.get(SCRIPT_PATH + SEPARATOR + "s1:script:脚本s1")).thenReturn(scriptNodeValue).thenReturn(scriptNodeChangedValue); + + LiteflowResponse response = flowExecutor.execute2Resp("chain1", "arg"); + DefaultContext context = response.getFirstContextBean(); + Assert.assertTrue(response.isSuccess()); + Assert.assertEquals("a==>b==>c==>s1[脚本s1]", response.getExecuteStepStr()); + Assert.assertEquals("hello", context.getData("test")); + + flowExecutor.reloadRule(); LiteflowResponse response2 = flowExecutor.execute2Resp("chain1", "arg"); + DefaultContext context2 = response2.getFirstContextBean(); Assert.assertTrue(response2.isSuccess()); - Assert.assertEquals("a==>b==>c==>s1[脚本s1]", response.getExecuteStepStr()); + Assert.assertEquals("a==>b==>s1[脚本s1]", response2.getExecuteStepStr()); + Assert.assertEquals("hello world", context2.getData("test")); + } } diff --git a/liteflow-testcase-el/liteflow-testcase-el-etcd-springboot/src/test/resources/etcd/application-xml.properties b/liteflow-testcase-el/liteflow-testcase-el-etcd-springboot/src/test/resources/etcd/application-xml.properties index 9f880ba1b..60f528aa4 100644 --- a/liteflow-testcase-el/liteflow-testcase-el-etcd-springboot/src/test/resources/etcd/application-xml.properties +++ b/liteflow-testcase-el/liteflow-testcase-el-etcd-springboot/src/test/resources/etcd/application-xml.properties @@ -2,4 +2,5 @@ liteflow.rule-source-ext-data={\ "endpoints":"http://127.0.0.1:2379",\ "chainPath": "/liteflow/chain",\ "scriptPath": "/liteflow/script"\ - } \ No newline at end of file + } +liteflow.parse-on-start=false \ No newline at end of file From bd55d91f00f925c2d5d9bb5b9c7aa4b02c2927cf Mon Sep 17 00:00:00 2001 From: zendwang Date: Tue, 8 Nov 2022 20:17:43 +0800 Subject: [PATCH 4/5] =?UTF-8?q?enhancement=20=E6=94=AF=E6=8C=81etcd?= =?UTF-8?q?=E5=88=86=E7=A6=BBchain=E4=BB=A5=E5=8F=8A=E8=84=9A=E6=9C=AC?= =?UTF-8?q?=E7=9A=84=E5=AD=98=E5=82=A8=E7=BB=93=E6=9E=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../liteflow/parser/etcd/util/EtcdParserHelper.java | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/liteflow-rule-plugin/liteflow-rule-etcd/src/main/java/com/yomahub/liteflow/parser/etcd/util/EtcdParserHelper.java b/liteflow-rule-plugin/liteflow-rule-etcd/src/main/java/com/yomahub/liteflow/parser/etcd/util/EtcdParserHelper.java index 4cb4aaebc..1f8590c91 100644 --- a/liteflow-rule-plugin/liteflow-rule-etcd/src/main/java/com/yomahub/liteflow/parser/etcd/util/EtcdParserHelper.java +++ b/liteflow-rule-plugin/liteflow-rule-etcd/src/main/java/com/yomahub/liteflow/parser/etcd/util/EtcdParserHelper.java @@ -141,19 +141,17 @@ public class EtcdParserHelper { */ public void listen(Consumer parseConsumer) { this.client.watchChildChange(this.etcdParserVO.getChainPath(), (updatePath, updateValue) -> { - LOG.info("update path={} value={},starting reload flow config...", updatePath, updateValue); - String content = getContent(); - LOG.info("update path={} value={},content={}", content); -// parseConsumer.accept(content); + LOG.info("starting reload flow config... update path={} value={},", updatePath, updateValue); + parseConsumer.accept(getContent()); }, (deletePath) -> { - LOG.info("delete path={},starting reload flow config...", deletePath); + LOG.info("starting reload flow config... delete path={}", deletePath); parseConsumer.accept(getContent()); }); this.client.watchChildChange(this.etcdParserVO.getScriptPath(), (updatePath, updateValue) -> { - LOG.info("update path={} value={},starting reload flow config...", updatePath, updateValue); + LOG.info("starting reload flow config... update path={} value={}", updatePath, updateValue); parseConsumer.accept(getContent()); }, (deletePath) -> { - LOG.info("delete path={},starting reload flow config....", deletePath); + LOG.info("starting reload flow config... delete path={}", deletePath); parseConsumer.accept(getContent()); }); } From 7de33403bcb5415ea644cc68eab0f41d2b357fac Mon Sep 17 00:00:00 2001 From: zendwang Date: Wed, 9 Nov 2022 19:56:43 +0800 Subject: [PATCH 5/5] =?UTF-8?q?enhancement=20=E6=94=AF=E6=8C=81etcd?= =?UTF-8?q?=E5=88=86=E7=A6=BBchain=E4=BB=A5=E5=8F=8A=E8=84=9A=E6=9C=AC?= =?UTF-8?q?=E7=9A=84=E5=AD=98=E5=82=A8=E7=BB=93=E6=9E=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../liteflow/parser/etcd/EtcdClient.java | 38 +++++++++++++------ .../parser/etcd/util/EtcdParserHelper.java | 21 ++++++++-- 2 files changed, 43 insertions(+), 16 deletions(-) diff --git a/liteflow-rule-plugin/liteflow-rule-etcd/src/main/java/com/yomahub/liteflow/parser/etcd/EtcdClient.java b/liteflow-rule-plugin/liteflow-rule-etcd/src/main/java/com/yomahub/liteflow/parser/etcd/EtcdClient.java index 84e2db5eb..54c151e1f 100644 --- a/liteflow-rule-plugin/liteflow-rule-etcd/src/main/java/com/yomahub/liteflow/parser/etcd/EtcdClient.java +++ b/liteflow-rule-plugin/liteflow-rule-etcd/src/main/java/com/yomahub/liteflow/parser/etcd/EtcdClient.java @@ -11,7 +11,6 @@ import io.etcd.jetcd.watch.WatchEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.nio.charset.StandardCharsets; import java.util.List; import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; @@ -20,6 +19,8 @@ import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.stream.Collectors; +import static java.nio.charset.StandardCharsets.UTF_8; + /** * Etcd 客户端封装类. * @author zendwang @@ -53,7 +54,7 @@ public class EtcdClient { public String get(final String key) { List keyValues = null; try { - keyValues = client.getKVClient().get(ByteSequence.from(key, StandardCharsets.UTF_8)).get().getKvs(); + keyValues = client.getKVClient().get(bytesOf(key)).get().getKvs(); } catch (InterruptedException | ExecutionException e) { LOG.error(e.getMessage(), e); } @@ -62,7 +63,7 @@ public class EtcdClient { return null; } - return keyValues.iterator().next().getValue().toString(StandardCharsets.UTF_8); + return keyValues.iterator().next().getValue().toString(UTF_8); } /** @@ -73,8 +74,8 @@ public class EtcdClient { */ public KeyValue put(final String key, final String value) { KeyValue prevKv = null; - ByteSequence keyByteSequence = ByteSequence.from(key, StandardCharsets.UTF_8); - ByteSequence valueByteSequence = ByteSequence.from(value, StandardCharsets.UTF_8); + ByteSequence keyByteSequence = bytesOf(key); + ByteSequence valueByteSequence = bytesOf(value); try { prevKv = client.getKVClient().put(keyByteSequence, valueByteSequence).get().getPrevKv(); } catch (InterruptedException | ExecutionException e) { @@ -83,6 +84,8 @@ public class EtcdClient { return prevKv; } + + /** * get node sub nodes. * @@ -93,7 +96,7 @@ public class EtcdClient { * @throws InterruptedException the exception */ public List getChildrenKeys(final String prefix, final String separator) throws ExecutionException, InterruptedException { - ByteSequence prefixByteSequence = ByteSequence.from(prefix, StandardCharsets.UTF_8); + ByteSequence prefixByteSequence = bytesOf(prefix); GetOption getOption = GetOption.newBuilder() .withPrefix(prefixByteSequence) .withSortField(GetOption.SortTarget.KEY) @@ -106,7 +109,7 @@ public class EtcdClient { .getKvs(); return keyValues.stream() - .map(e -> getSubNodeKeyName(prefix, e.getKey().toString(StandardCharsets.UTF_8), separator)) + .map(e -> getSubNodeKeyName(prefix, e.getKey().toString(UTF_8), separator)) .distinct() .filter(e -> Objects.nonNull(e)) .collect(Collectors.toList()); @@ -130,7 +133,7 @@ public class EtcdClient { final BiConsumer updateHandler, final Consumer deleteHandler) { Watch.Listener listener = watch(updateHandler, deleteHandler); - Watch.Watcher watch = client.getWatchClient().watch(ByteSequence.from(key, StandardCharsets.UTF_8), listener); + Watch.Watcher watch = client.getWatchClient().watch(bytesOf(key), listener); watchCache.put(key, watch); } @@ -146,17 +149,28 @@ public class EtcdClient { final Consumer deleteHandler) { Watch.Listener listener = watch(updateHandler, deleteHandler); WatchOption option = WatchOption.newBuilder() - .withPrefix(ByteSequence.from(key, StandardCharsets.UTF_8)) + .withPrefix(bytesOf(key)) .build(); - Watch.Watcher watch = client.getWatchClient().watch(ByteSequence.from(key, StandardCharsets.UTF_8), option, listener); + Watch.Watcher watch = client.getWatchClient().watch(bytesOf(key), option, listener); watchCache.put(key, watch); } + + + /** + * bytesOf string. + * @param val val. + * @return bytes val. + */ + public ByteSequence bytesOf(final String val) { + return ByteSequence.from(val, UTF_8); + } + private Watch.Listener watch(final BiConsumer updateHandler, final Consumer deleteHandler) { return Watch.listener(response -> { for (WatchEvent event : response.getEvents()) { - String path = event.getKeyValue().getKey().toString(StandardCharsets.UTF_8); - String value = event.getKeyValue().getValue().toString(StandardCharsets.UTF_8); + String path = event.getKeyValue().getKey().toString(UTF_8); + String value = event.getKeyValue().getValue().toString(UTF_8); switch (event.getEventType()) { case PUT: updateHandler.accept(path, value); diff --git a/liteflow-rule-plugin/liteflow-rule-etcd/src/main/java/com/yomahub/liteflow/parser/etcd/util/EtcdParserHelper.java b/liteflow-rule-plugin/liteflow-rule-etcd/src/main/java/com/yomahub/liteflow/parser/etcd/util/EtcdParserHelper.java index 1f8590c91..523e880b2 100644 --- a/liteflow-rule-plugin/liteflow-rule-etcd/src/main/java/com/yomahub/liteflow/parser/etcd/util/EtcdParserHelper.java +++ b/liteflow-rule-plugin/liteflow-rule-etcd/src/main/java/com/yomahub/liteflow/parser/etcd/util/EtcdParserHelper.java @@ -5,6 +5,10 @@ import cn.hutool.core.collection.CollectionUtil; import cn.hutool.core.util.CharsetUtil; import cn.hutool.core.util.ReUtil; import cn.hutool.core.util.StrUtil; +import com.yomahub.liteflow.builder.LiteFlowNodeBuilder; +import com.yomahub.liteflow.builder.el.LiteFlowChainELBuilder; +import com.yomahub.liteflow.enums.NodeTypeEnum; +import com.yomahub.liteflow.flow.FlowBus; import com.yomahub.liteflow.parser.etcd.EtcdClient; import com.yomahub.liteflow.parser.etcd.exception.EtcdException; import com.yomahub.liteflow.parser.etcd.vo.EtcdParserVO; @@ -142,17 +146,26 @@ public class EtcdParserHelper { public void listen(Consumer parseConsumer) { this.client.watchChildChange(this.etcdParserVO.getChainPath(), (updatePath, updateValue) -> { LOG.info("starting reload flow config... update path={} value={},", updatePath, updateValue); - parseConsumer.accept(getContent()); + String chainName = updatePath.replace(this.etcdParserVO.getChainPath() + SEPARATOR, ""); + LiteFlowChainELBuilder.createChain().setChainName(chainName).setEL(updateValue).build(); }, (deletePath) -> { LOG.info("starting reload flow config... delete path={}", deletePath); - parseConsumer.accept(getContent()); + String chainName = deletePath.replace(this.etcdParserVO.getChainPath() + SEPARATOR, ""); + FlowBus.removeChain(chainName); }); this.client.watchChildChange(this.etcdParserVO.getScriptPath(), (updatePath, updateValue) -> { LOG.info("starting reload flow config... update path={} value={}", updatePath, updateValue); - parseConsumer.accept(getContent()); + String scriptNodeValue = updatePath.replace(this.etcdParserVO.getScriptPath() + SEPARATOR, "");; + NodeSimpleVO nodeSimpleVO = convert(scriptNodeValue); + LiteFlowNodeBuilder.createScriptNode().setId(nodeSimpleVO.getNodeId()) + .setType(NodeTypeEnum.getEnumByCode(nodeSimpleVO.type)) + .setName(nodeSimpleVO.getName()) + .setScript(updateValue).build(); }, (deletePath) -> { LOG.info("starting reload flow config... delete path={}", deletePath); - parseConsumer.accept(getContent()); + String scriptNodeValue = deletePath.replace(this.etcdParserVO.getScriptPath() + SEPARATOR, "");; + NodeSimpleVO nodeSimpleVO = convert(scriptNodeValue); + FlowBus.getNodeMap().remove(nodeSimpleVO.getNodeId()); }); }