diff --git a/liteflow-rule-plugin/liteflow-rule-etcd/src/main/java/com/yomahub/liteflow/parser/etcd/EtcdClient.java b/liteflow-rule-plugin/liteflow-rule-etcd/src/main/java/com/yomahub/liteflow/parser/etcd/EtcdClient.java index 84e2db5eb..54c151e1f 100644 --- a/liteflow-rule-plugin/liteflow-rule-etcd/src/main/java/com/yomahub/liteflow/parser/etcd/EtcdClient.java +++ b/liteflow-rule-plugin/liteflow-rule-etcd/src/main/java/com/yomahub/liteflow/parser/etcd/EtcdClient.java @@ -11,7 +11,6 @@ import io.etcd.jetcd.watch.WatchEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.nio.charset.StandardCharsets; import java.util.List; import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; @@ -20,6 +19,8 @@ import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.stream.Collectors; +import static java.nio.charset.StandardCharsets.UTF_8; + /** * Etcd 客户端封装类. * @author zendwang @@ -53,7 +54,7 @@ public class EtcdClient { public String get(final String key) { List keyValues = null; try { - keyValues = client.getKVClient().get(ByteSequence.from(key, StandardCharsets.UTF_8)).get().getKvs(); + keyValues = client.getKVClient().get(bytesOf(key)).get().getKvs(); } catch (InterruptedException | ExecutionException e) { LOG.error(e.getMessage(), e); } @@ -62,7 +63,7 @@ public class EtcdClient { return null; } - return keyValues.iterator().next().getValue().toString(StandardCharsets.UTF_8); + return keyValues.iterator().next().getValue().toString(UTF_8); } /** @@ -73,8 +74,8 @@ public class EtcdClient { */ public KeyValue put(final String key, final String value) { KeyValue prevKv = null; - ByteSequence keyByteSequence = ByteSequence.from(key, StandardCharsets.UTF_8); - ByteSequence valueByteSequence = ByteSequence.from(value, StandardCharsets.UTF_8); + ByteSequence keyByteSequence = bytesOf(key); + ByteSequence valueByteSequence = bytesOf(value); try { prevKv = client.getKVClient().put(keyByteSequence, valueByteSequence).get().getPrevKv(); } catch (InterruptedException | ExecutionException e) { @@ -83,6 +84,8 @@ public class EtcdClient { return prevKv; } + + /** * get node sub nodes. * @@ -93,7 +96,7 @@ public class EtcdClient { * @throws InterruptedException the exception */ public List getChildrenKeys(final String prefix, final String separator) throws ExecutionException, InterruptedException { - ByteSequence prefixByteSequence = ByteSequence.from(prefix, StandardCharsets.UTF_8); + ByteSequence prefixByteSequence = bytesOf(prefix); GetOption getOption = GetOption.newBuilder() .withPrefix(prefixByteSequence) .withSortField(GetOption.SortTarget.KEY) @@ -106,7 +109,7 @@ public class EtcdClient { .getKvs(); return keyValues.stream() - .map(e -> getSubNodeKeyName(prefix, e.getKey().toString(StandardCharsets.UTF_8), separator)) + .map(e -> getSubNodeKeyName(prefix, e.getKey().toString(UTF_8), separator)) .distinct() .filter(e -> Objects.nonNull(e)) .collect(Collectors.toList()); @@ -130,7 +133,7 @@ public class EtcdClient { final BiConsumer updateHandler, final Consumer deleteHandler) { Watch.Listener listener = watch(updateHandler, deleteHandler); - Watch.Watcher watch = client.getWatchClient().watch(ByteSequence.from(key, StandardCharsets.UTF_8), listener); + Watch.Watcher watch = client.getWatchClient().watch(bytesOf(key), listener); watchCache.put(key, watch); } @@ -146,17 +149,28 @@ public class EtcdClient { final Consumer deleteHandler) { Watch.Listener listener = watch(updateHandler, deleteHandler); WatchOption option = WatchOption.newBuilder() - .withPrefix(ByteSequence.from(key, StandardCharsets.UTF_8)) + .withPrefix(bytesOf(key)) .build(); - Watch.Watcher watch = client.getWatchClient().watch(ByteSequence.from(key, StandardCharsets.UTF_8), option, listener); + Watch.Watcher watch = client.getWatchClient().watch(bytesOf(key), option, listener); watchCache.put(key, watch); } + + + /** + * bytesOf string. + * @param val val. + * @return bytes val. + */ + public ByteSequence bytesOf(final String val) { + return ByteSequence.from(val, UTF_8); + } + private Watch.Listener watch(final BiConsumer updateHandler, final Consumer deleteHandler) { return Watch.listener(response -> { for (WatchEvent event : response.getEvents()) { - String path = event.getKeyValue().getKey().toString(StandardCharsets.UTF_8); - String value = event.getKeyValue().getValue().toString(StandardCharsets.UTF_8); + String path = event.getKeyValue().getKey().toString(UTF_8); + String value = event.getKeyValue().getValue().toString(UTF_8); switch (event.getEventType()) { case PUT: updateHandler.accept(path, value); diff --git a/liteflow-rule-plugin/liteflow-rule-etcd/src/main/java/com/yomahub/liteflow/parser/etcd/util/EtcdParserHelper.java b/liteflow-rule-plugin/liteflow-rule-etcd/src/main/java/com/yomahub/liteflow/parser/etcd/util/EtcdParserHelper.java index 1f8590c91..523e880b2 100644 --- a/liteflow-rule-plugin/liteflow-rule-etcd/src/main/java/com/yomahub/liteflow/parser/etcd/util/EtcdParserHelper.java +++ b/liteflow-rule-plugin/liteflow-rule-etcd/src/main/java/com/yomahub/liteflow/parser/etcd/util/EtcdParserHelper.java @@ -5,6 +5,10 @@ import cn.hutool.core.collection.CollectionUtil; import cn.hutool.core.util.CharsetUtil; import cn.hutool.core.util.ReUtil; import cn.hutool.core.util.StrUtil; +import com.yomahub.liteflow.builder.LiteFlowNodeBuilder; +import com.yomahub.liteflow.builder.el.LiteFlowChainELBuilder; +import com.yomahub.liteflow.enums.NodeTypeEnum; +import com.yomahub.liteflow.flow.FlowBus; import com.yomahub.liteflow.parser.etcd.EtcdClient; import com.yomahub.liteflow.parser.etcd.exception.EtcdException; import com.yomahub.liteflow.parser.etcd.vo.EtcdParserVO; @@ -142,17 +146,26 @@ public class EtcdParserHelper { public void listen(Consumer parseConsumer) { this.client.watchChildChange(this.etcdParserVO.getChainPath(), (updatePath, updateValue) -> { LOG.info("starting reload flow config... update path={} value={},", updatePath, updateValue); - parseConsumer.accept(getContent()); + String chainName = updatePath.replace(this.etcdParserVO.getChainPath() + SEPARATOR, ""); + LiteFlowChainELBuilder.createChain().setChainName(chainName).setEL(updateValue).build(); }, (deletePath) -> { LOG.info("starting reload flow config... delete path={}", deletePath); - parseConsumer.accept(getContent()); + String chainName = deletePath.replace(this.etcdParserVO.getChainPath() + SEPARATOR, ""); + FlowBus.removeChain(chainName); }); this.client.watchChildChange(this.etcdParserVO.getScriptPath(), (updatePath, updateValue) -> { LOG.info("starting reload flow config... update path={} value={}", updatePath, updateValue); - parseConsumer.accept(getContent()); + String scriptNodeValue = updatePath.replace(this.etcdParserVO.getScriptPath() + SEPARATOR, "");; + NodeSimpleVO nodeSimpleVO = convert(scriptNodeValue); + LiteFlowNodeBuilder.createScriptNode().setId(nodeSimpleVO.getNodeId()) + .setType(NodeTypeEnum.getEnumByCode(nodeSimpleVO.type)) + .setName(nodeSimpleVO.getName()) + .setScript(updateValue).build(); }, (deletePath) -> { LOG.info("starting reload flow config... delete path={}", deletePath); - parseConsumer.accept(getContent()); + String scriptNodeValue = deletePath.replace(this.etcdParserVO.getScriptPath() + SEPARATOR, "");; + NodeSimpleVO nodeSimpleVO = convert(scriptNodeValue); + FlowBus.getNodeMap().remove(nodeSimpleVO.getNodeId()); }); }