enhancement 支持etcd分离chain以及脚本的存储结构

This commit is contained in:
zendwang
2022-11-09 19:56:43 +08:00
parent bd55d91f00
commit 7de33403bc
2 changed files with 43 additions and 16 deletions

View File

@@ -11,7 +11,6 @@ import io.etcd.jetcd.watch.WatchEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
@@ -20,6 +19,8 @@ import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import static java.nio.charset.StandardCharsets.UTF_8;
/**
* Etcd 客户端封装类.
* @author zendwang
@@ -53,7 +54,7 @@ public class EtcdClient {
public String get(final String key) {
List<KeyValue> keyValues = null;
try {
keyValues = client.getKVClient().get(ByteSequence.from(key, StandardCharsets.UTF_8)).get().getKvs();
keyValues = client.getKVClient().get(bytesOf(key)).get().getKvs();
} catch (InterruptedException | ExecutionException e) {
LOG.error(e.getMessage(), e);
}
@@ -62,7 +63,7 @@ public class EtcdClient {
return null;
}
return keyValues.iterator().next().getValue().toString(StandardCharsets.UTF_8);
return keyValues.iterator().next().getValue().toString(UTF_8);
}
/**
@@ -73,8 +74,8 @@ public class EtcdClient {
*/
public KeyValue put(final String key, final String value) {
KeyValue prevKv = null;
ByteSequence keyByteSequence = ByteSequence.from(key, StandardCharsets.UTF_8);
ByteSequence valueByteSequence = ByteSequence.from(value, StandardCharsets.UTF_8);
ByteSequence keyByteSequence = bytesOf(key);
ByteSequence valueByteSequence = bytesOf(value);
try {
prevKv = client.getKVClient().put(keyByteSequence, valueByteSequence).get().getPrevKv();
} catch (InterruptedException | ExecutionException e) {
@@ -83,6 +84,8 @@ public class EtcdClient {
return prevKv;
}
/**
* get node sub nodes.
*
@@ -93,7 +96,7 @@ public class EtcdClient {
* @throws InterruptedException the exception
*/
public List<String> getChildrenKeys(final String prefix, final String separator) throws ExecutionException, InterruptedException {
ByteSequence prefixByteSequence = ByteSequence.from(prefix, StandardCharsets.UTF_8);
ByteSequence prefixByteSequence = bytesOf(prefix);
GetOption getOption = GetOption.newBuilder()
.withPrefix(prefixByteSequence)
.withSortField(GetOption.SortTarget.KEY)
@@ -106,7 +109,7 @@ public class EtcdClient {
.getKvs();
return keyValues.stream()
.map(e -> getSubNodeKeyName(prefix, e.getKey().toString(StandardCharsets.UTF_8), separator))
.map(e -> getSubNodeKeyName(prefix, e.getKey().toString(UTF_8), separator))
.distinct()
.filter(e -> Objects.nonNull(e))
.collect(Collectors.toList());
@@ -130,7 +133,7 @@ public class EtcdClient {
final BiConsumer<String, String> updateHandler,
final Consumer<String> deleteHandler) {
Watch.Listener listener = watch(updateHandler, deleteHandler);
Watch.Watcher watch = client.getWatchClient().watch(ByteSequence.from(key, StandardCharsets.UTF_8), listener);
Watch.Watcher watch = client.getWatchClient().watch(bytesOf(key), listener);
watchCache.put(key, watch);
}
@@ -146,17 +149,28 @@ public class EtcdClient {
final Consumer<String> deleteHandler) {
Watch.Listener listener = watch(updateHandler, deleteHandler);
WatchOption option = WatchOption.newBuilder()
.withPrefix(ByteSequence.from(key, StandardCharsets.UTF_8))
.withPrefix(bytesOf(key))
.build();
Watch.Watcher watch = client.getWatchClient().watch(ByteSequence.from(key, StandardCharsets.UTF_8), option, listener);
Watch.Watcher watch = client.getWatchClient().watch(bytesOf(key), option, listener);
watchCache.put(key, watch);
}
/**
* bytesOf string.
* @param val val.
* @return bytes val.
*/
public ByteSequence bytesOf(final String val) {
return ByteSequence.from(val, UTF_8);
}
private Watch.Listener watch(final BiConsumer<String, String> updateHandler,
final Consumer<String> deleteHandler) {
return Watch.listener(response -> {
for (WatchEvent event : response.getEvents()) {
String path = event.getKeyValue().getKey().toString(StandardCharsets.UTF_8);
String value = event.getKeyValue().getValue().toString(StandardCharsets.UTF_8);
String path = event.getKeyValue().getKey().toString(UTF_8);
String value = event.getKeyValue().getValue().toString(UTF_8);
switch (event.getEventType()) {
case PUT:
updateHandler.accept(path, value);

View File

@@ -5,6 +5,10 @@ import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.util.CharsetUtil;
import cn.hutool.core.util.ReUtil;
import cn.hutool.core.util.StrUtil;
import com.yomahub.liteflow.builder.LiteFlowNodeBuilder;
import com.yomahub.liteflow.builder.el.LiteFlowChainELBuilder;
import com.yomahub.liteflow.enums.NodeTypeEnum;
import com.yomahub.liteflow.flow.FlowBus;
import com.yomahub.liteflow.parser.etcd.EtcdClient;
import com.yomahub.liteflow.parser.etcd.exception.EtcdException;
import com.yomahub.liteflow.parser.etcd.vo.EtcdParserVO;
@@ -142,17 +146,26 @@ public class EtcdParserHelper {
public void listen(Consumer<String> parseConsumer) {
this.client.watchChildChange(this.etcdParserVO.getChainPath(), (updatePath, updateValue) -> {
LOG.info("starting reload flow config... update path={} value={},", updatePath, updateValue);
parseConsumer.accept(getContent());
String chainName = updatePath.replace(this.etcdParserVO.getChainPath() + SEPARATOR, "");
LiteFlowChainELBuilder.createChain().setChainName(chainName).setEL(updateValue).build();
}, (deletePath) -> {
LOG.info("starting reload flow config... delete path={}", deletePath);
parseConsumer.accept(getContent());
String chainName = deletePath.replace(this.etcdParserVO.getChainPath() + SEPARATOR, "");
FlowBus.removeChain(chainName);
});
this.client.watchChildChange(this.etcdParserVO.getScriptPath(), (updatePath, updateValue) -> {
LOG.info("starting reload flow config... update path={} value={}", updatePath, updateValue);
parseConsumer.accept(getContent());
String scriptNodeValue = updatePath.replace(this.etcdParserVO.getScriptPath() + SEPARATOR, "");;
NodeSimpleVO nodeSimpleVO = convert(scriptNodeValue);
LiteFlowNodeBuilder.createScriptNode().setId(nodeSimpleVO.getNodeId())
.setType(NodeTypeEnum.getEnumByCode(nodeSimpleVO.type))
.setName(nodeSimpleVO.getName())
.setScript(updateValue).build();
}, (deletePath) -> {
LOG.info("starting reload flow config... delete path={}", deletePath);
parseConsumer.accept(getContent());
String scriptNodeValue = deletePath.replace(this.etcdParserVO.getScriptPath() + SEPARATOR, "");;
NodeSimpleVO nodeSimpleVO = convert(scriptNodeValue);
FlowBus.getNodeMap().remove(nodeSimpleVO.getNodeId());
});
}