enhancement #I61D1N 解析增加 enable 逻辑,完成 redis 改造

This commit is contained in:
gaibu
2024-01-20 22:37:51 +08:00
parent 20c4aca0c3
commit d39de336d4
7 changed files with 142 additions and 84 deletions

View File

@@ -34,12 +34,8 @@ public class EtcdParserHelper {
private static final Logger LOG = LoggerFactory.getLogger(EtcdParserHelper.class);
private final String CHAIN_XML_PATTERN = "<chain name=\"{}\">{}</chain>";
private final String NODE_XML_PATTERN = "<nodes>{}</nodes>";
private final String NODE_ITEM_XML_PATTERN = "<node id=\"{}\" name=\"{}\" type=\"{}\"><![CDATA[{}]]></node>";
private final String XML_PATTERN = "<?xml version=\"1.0\" encoding=\"UTF-8\"?><flow>{}{}</flow>";
private static final String SEPARATOR = "/";
@@ -82,7 +78,7 @@ public class EtcdParserHelper {
for (String chainName : chainNameList) {
RuleParsePluginUtil.ChainDto chainDto = RuleParsePluginUtil.parseChainKey(chainName);
String chainData = client.get(StrUtil.format("{}/{}", etcdParserVO.getChainPath(), chainName));
if (StrUtil.isNotBlank(chainData) && chainDto.isEnable()) {
if (StrUtil.isNotBlank(chainData)) {
chainItemContentList.add(chainDto.toElXml(chainData));
}
}

View File

@@ -1,5 +1,6 @@
package com.yomahub.liteflow.parser.redis.mode.polling;
import cn.hutool.core.lang.Pair;
import cn.hutool.core.util.StrUtil;
import cn.hutool.crypto.digest.DigestUtil;
import com.yomahub.liteflow.builder.el.LiteFlowChainELBuilder;
@@ -8,6 +9,7 @@ import com.yomahub.liteflow.log.LFLog;
import com.yomahub.liteflow.log.LFLoggerManager;
import com.yomahub.liteflow.parser.redis.mode.RClient;
import com.yomahub.liteflow.parser.redis.vo.RedisParserVO;
import com.yomahub.liteflow.util.RuleParsePluginUtil;
import java.util.*;
@@ -61,11 +63,19 @@ public class ChainPollingTask implements Runnable {
for (Map.Entry<String, String> entry : chainSHAMap.entrySet()) {
String chainId = entry.getKey();
String oldSHA = entry.getValue();
Pair<Boolean/*启停*/, String/*id*/> pair = RuleParsePluginUtil.parseIdKey(chainId);
// 如果是停用,就直接进删除
if (pair.getKey()){
FlowBus.removeChain(pair.getValue());
needDelete.add(chainId);
continue;
}
//在redis服务端通过Lua脚本计算SHA值
String newSHA = chainClient.evalSha(valueLua, chainKey, chainId);
if (StrUtil.equals(newSHA, "nil")) {
//新SHA值为nil, 即未获取到该chain,表示该chain已被删除
FlowBus.removeChain(chainId);
FlowBus.removeChain(pair.getValue());
LOG.info("starting reload flow config... delete key={}", chainId);
//添加到待删除的list 后续统一从SHAMap中移除
@@ -75,7 +85,7 @@ public class ChainPollingTask implements Runnable {
else if (!StrUtil.equals(newSHA, oldSHA)) {
//SHA值发生变化,表示该chain的值已被修改,重新拉取变化的chain
String chainData = chainClient.hget(chainKey, chainId);
LiteFlowChainELBuilder.createChain().setChainId(chainId).setEL(chainData).build();
LiteFlowChainELBuilder.createChain().setChainId(pair.getValue()).setEL(chainData).build();
LOG.info("starting reload flow config... update key={} new value={},", chainId, chainData);
//修改SHAMap
@@ -98,12 +108,17 @@ public class ChainPollingTask implements Runnable {
//在此处重新拉取所有chainId集合,补充添加新chain
Set<String> newChainSet = chainClient.hkeys(chainKey);
for (String chainId : newChainSet) {
Pair<Boolean/*启停*/, String/*id*/> pair = RuleParsePluginUtil.parseIdKey(chainId);
if (!chainSHAMap.containsKey(chainId)) {
//将新chainId添加到LiteFlowChainELBuilder和SHAMap
String chainData = chainClient.hget(chainKey, chainId);
LiteFlowChainELBuilder.createChain().setChainId(chainId).setEL(chainData).build();
LOG.info("starting reload flow config... create key={} new value={},", chainId, chainData);
chainSHAMap.put(chainId, DigestUtil.sha1Hex(chainData));
// 如果是启用,才装配
if (pair.getKey()){
LiteFlowChainELBuilder.createChain().setChainId(pair.getValue()).setEL(chainData).build();
LOG.info("starting reload flow config... create key={} new value={},", chainId, chainData);
chainSHAMap.put(chainId, DigestUtil.sha1Hex(chainData));
}
}
}
}

View File

@@ -1,7 +1,6 @@
package com.yomahub.liteflow.parser.redis.mode.polling;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.thread.NamedThreadFactory;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
@@ -13,11 +12,15 @@ import com.yomahub.liteflow.parser.redis.mode.RedisMode;
import com.yomahub.liteflow.parser.redis.mode.RedisParserHelper;
import com.yomahub.liteflow.parser.redis.vo.RedisParserVO;
import com.yomahub.liteflow.spi.holder.ContextAwareHolder;
import com.yomahub.liteflow.util.RuleParsePluginUtil;
import org.redisson.Redisson;
import org.redisson.config.Config;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* Redis 轮询机制实现类
@@ -126,8 +129,9 @@ public class RedisParserPollingMode implements RedisParserHelper {
List<String> chainItemContentList = new ArrayList<>();
for (String chainName : chainNameSet) {
String chainData = chainClient.hget(chainKey, chainName);
RuleParsePluginUtil.ChainDto chainDto = RuleParsePluginUtil.parseChainKey(chainName);
if (StrUtil.isNotBlank(chainData)) {
chainItemContentList.add(StrUtil.format(CHAIN_XML_PATTERN, chainName, chainData));
chainItemContentList.add(chainDto.toElXml(chainData));
}else{
continue;
}
@@ -154,19 +158,10 @@ public class RedisParserPollingMode implements RedisParserHelper {
StrUtil.format("The name of the redis field [{}] in scriptKey [{}] is invalid",
scriptFieldValue, scriptKey));
}
String scriptData = scriptClient.hget(scriptKey, scriptFieldValue);
// 有语言类型
if (StrUtil.isNotBlank(nodeSimpleVO.getLanguage())) {
scriptItemContentList.add(StrUtil.format(NODE_ITEM_WITH_LANGUAGE_XML_PATTERN,
nodeSimpleVO.getNodeId(), nodeSimpleVO.getName(), nodeSimpleVO.getType(),
nodeSimpleVO.getLanguage(), scriptData));
}
// 没有语言类型
else {
scriptItemContentList.add(StrUtil.format(NODE_ITEM_XML_PATTERN, nodeSimpleVO.getNodeId(),
nodeSimpleVO.getName(), nodeSimpleVO.getType(), scriptData));
}
String scriptData = scriptClient.hget(scriptKey, scriptFieldValue);
nodeSimpleVO.setScript(scriptData);
scriptItemContentList.add(RuleParsePluginUtil.toScriptXml(nodeSimpleVO));
//计算scriptData的SHA值
String scriptSHA = DigestUtil.sha1Hex(scriptData);

View File

@@ -76,15 +76,23 @@ public class ScriptPollingTask implements Runnable {
//添加到待删除的list 后续统一从SHAMap中移除
//不在这里直接移除是为了避免先删除导致scriptSHAMap并没有完全遍历完 script删除不全
needDelete.add(scriptFieldValue);
}
else if (!StrUtil.equals(newSHA, oldSHA)) {
} else if (!StrUtil.equals(newSHA, oldSHA)) {
//SHA值发生变化,表示该script的值已被修改,重新拉取变化的script
String scriptData = scriptClient.hget(scriptKey, scriptFieldValue);
RedisParserHelper.changeScriptNode(scriptFieldValue, scriptData);
LOG.info("starting reload flow config... update key={} new value={},", scriptFieldValue, scriptData);
//修改SHAMap
scriptSHAMap.put(scriptFieldValue, newSHA);
NodeConvertHelper.NodeSimpleVO nodeSimpleVO = NodeConvertHelper.convert(scriptFieldValue);
nodeSimpleVO.setScript(scriptData);
if (nodeSimpleVO.getEnable()) {
RedisParserHelper.changeScriptNode(scriptFieldValue, scriptData);
LOG.info("starting reload flow config... update key={} new value={},", scriptFieldValue, scriptData);
//修改SHAMap
scriptSHAMap.put(scriptFieldValue, newSHA);
} else {
FlowBus.getNodeMap().remove(nodeSimpleVO.getNodeId());
LOG.info("starting reload flow config... delete key={}", scriptFieldValue);
needDelete.add(scriptFieldValue);
}
}
//SHA值无变化,表示该script未改变
}
@@ -106,9 +114,16 @@ public class ScriptPollingTask implements Runnable {
if (!scriptSHAMap.containsKey(scriptFieldValue)) {
//将新script添加到LiteFlowChainELBuilder和SHAMap
String scriptData = scriptClient.hget(scriptKey, scriptFieldValue);
RedisParserHelper.changeScriptNode(scriptFieldValue, scriptData);
LOG.info("starting reload flow config... create key={} new value={},", scriptFieldValue, scriptData);
scriptSHAMap.put(scriptFieldValue, DigestUtil.sha1Hex(scriptData));
NodeConvertHelper.NodeSimpleVO nodeSimpleVO = NodeConvertHelper.convert(scriptFieldValue);
if (nodeSimpleVO.getEnable()) {
RedisParserHelper.changeScriptNode(scriptFieldValue, scriptData);
LOG.info("starting reload flow config... create key={} new value={},", scriptFieldValue, scriptData);
scriptSHAMap.put(scriptFieldValue, DigestUtil.sha1Hex(scriptData));
} else {
FlowBus.getNodeMap().remove(nodeSimpleVO.getNodeId());
LOG.info("starting reload flow config... delete key={}", scriptFieldValue);
needDelete.add(scriptFieldValue);
}
}
}
}

View File

@@ -1,10 +1,12 @@
package com.yomahub.liteflow.parser.redis.mode.subscribe;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.lang.Pair;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import com.yomahub.liteflow.builder.LiteFlowNodeBuilder;
import com.yomahub.liteflow.builder.el.LiteFlowChainELBuilder;
import com.yomahub.liteflow.enums.NodeTypeEnum;
import com.yomahub.liteflow.flow.FlowBus;
import com.yomahub.liteflow.parser.helper.NodeConvertHelper;
import com.yomahub.liteflow.parser.redis.exception.RedisException;
@@ -13,10 +15,10 @@ import com.yomahub.liteflow.parser.redis.mode.RedisMode;
import com.yomahub.liteflow.parser.redis.mode.RedisParserHelper;
import com.yomahub.liteflow.parser.redis.vo.RedisParserVO;
import com.yomahub.liteflow.spi.holder.ContextAwareHolder;
import com.yomahub.liteflow.util.RuleParsePluginUtil;
import org.redisson.Redisson;
import org.redisson.api.map.event.EntryCreatedListener;
import org.redisson.api.map.event.EntryRemovedListener;
import org.redisson.api.map.event.EntryUpdatedListener;
import org.redisson.config.Config;
import java.util.ArrayList;
@@ -28,7 +30,7 @@ import java.util.Map;
* 使用 Redisson客户端 RMapCache存储结构
*
* @author hxinyu
* @since 2.11.0
* @since 2.11.0
*/
public class RedisParserSubscribeMode implements RedisParserHelper {
@@ -46,14 +48,13 @@ public class RedisParserSubscribeMode implements RedisParserHelper {
try {
this.chainClient = ContextAwareHolder.loadContextAware().getBean("chainClient");
this.scriptClient = ContextAwareHolder.loadContextAware().getBean("scriptClient");
}
catch (Exception ignored) {
} catch (Exception ignored) {
}
if (ObjectUtil.isNull(chainClient)) {
RedisMode redisMode = redisParserVO.getRedisMode();
Config config;
//Redis单点模式
if (redisMode.equals(RedisMode.SINGLE)){
if (redisMode.equals(RedisMode.SINGLE)) {
config = getSingleRedissonConfig(redisParserVO, redisParserVO.getChainDataBase());
this.chainClient = new RClient(Redisson.create(config));
//如果有脚本数据
@@ -74,8 +75,7 @@ public class RedisParserSubscribeMode implements RedisParserHelper {
}
}
}
}
catch (Exception e) {
} catch (Exception e) {
throw new RedisException(e.getMessage());
}
@@ -91,8 +91,9 @@ public class RedisParserSubscribeMode implements RedisParserHelper {
for (Map.Entry<String, String> entry : chainMap.entrySet()) {
String chainId = entry.getKey();
String chainData = entry.getValue();
RuleParsePluginUtil.ChainDto chainDto = RuleParsePluginUtil.parseChainKey(chainId);
if (StrUtil.isNotBlank(chainData)) {
chainItemContentList.add(StrUtil.format(CHAIN_XML_PATTERN, chainId, chainData));
chainItemContentList.add(chainDto.toElXml(chainData));
}
}
// 合并成所有chain的xml内容
@@ -112,17 +113,9 @@ public class RedisParserSubscribeMode implements RedisParserHelper {
StrUtil.format("The name of the redis field [{}] in scriptKey [{}] is invalid",
scriptFieldValue, redisParserVO.getScriptKey()));
}
// 有语言类型
if (StrUtil.isNotBlank(nodeSimpleVO.getLanguage())) {
scriptItemContentList.add(StrUtil.format(NODE_ITEM_WITH_LANGUAGE_XML_PATTERN,
nodeSimpleVO.getNodeId(), nodeSimpleVO.getName(), nodeSimpleVO.getType(),
nodeSimpleVO.getLanguage(), scriptData));
}
// 没有语言类型
else {
scriptItemContentList.add(StrUtil.format(NODE_ITEM_XML_PATTERN, nodeSimpleVO.getNodeId(),
nodeSimpleVO.getName(), nodeSimpleVO.getType(), scriptData));
}
nodeSimpleVO.setScript(scriptData);
scriptItemContentList.add(RuleParsePluginUtil.toScriptXml(nodeSimpleVO));
}
scriptAllContent = StrUtil.format(NODE_XML_PATTERN,
@@ -130,8 +123,7 @@ public class RedisParserSubscribeMode implements RedisParserHelper {
}
return StrUtil.format(XML_PATTERN, scriptAllContent, chainAllContent);
}
catch (Exception e) {
} catch (Exception e) {
throw new RedisException(e.getMessage());
}
}
@@ -145,8 +137,7 @@ public class RedisParserSubscribeMode implements RedisParserHelper {
// 存在这个节点,但是子节点不存在
Map<String, String> scriptMap = scriptClient.getMap(redisParserVO.getScriptKey());
return !CollUtil.isEmpty(scriptMap);
}
catch (Exception e) {
} catch (Exception e) {
return false;
}
}
@@ -158,35 +149,58 @@ public class RedisParserSubscribeMode implements RedisParserHelper {
public void listenRedis() {
//监听 chain
String chainKey = redisParserVO.getChainKey();
EntryCreatedListener<String, String> chainModifyFunc = event -> {
LOG.info("starting modify flow config... create key={} value={},", event.getKey(), event.getValue());
String chainName = event.getKey();
String value = event.getValue();
Pair<Boolean/*启停*/, String/*id*/> pair = RuleParsePluginUtil.parseIdKey(chainName);
String id = pair.getValue();
// 如果是启用,就正常更新
if (pair.getKey()) {
LiteFlowChainELBuilder.createChain().setChainId(id).setEL(value).build();
}
// 如果是禁用,就删除
else {
FlowBus.removeChain(id);
}
};
//添加新 chain
chainClient.addListener(chainKey, (EntryCreatedListener<String, String>) event -> {
LOG.info("starting reload flow config... create key={} value={},", event.getKey(), event.getValue());
LiteFlowChainELBuilder.createChain().setChainId(event.getKey()).setEL(event.getValue()).build();
});
chainClient.addListener(chainKey, chainModifyFunc);
//修改 chain
chainClient.addListener(chainKey, (EntryUpdatedListener<String, String>) event -> {
LOG.info("starting reload flow config... update key={} new value={},", event.getKey(), event.getValue());
LiteFlowChainELBuilder.createChain().setChainId(event.getKey()).setEL(event.getValue()).build();
});
chainClient.addListener(chainKey, chainModifyFunc);
//删除 chain
chainClient.addListener(chainKey, (EntryRemovedListener<String, String>) event -> {
LOG.info("starting reload flow config... delete key={}", event.getKey());
FlowBus.removeChain(event.getKey());
Pair<Boolean/*启停*/, String/*id*/> pair = RuleParsePluginUtil.parseIdKey(event.getKey());
FlowBus.removeChain(pair.getValue());
});
//监听 script
EntryCreatedListener<String, String> scriptModifyFunc = event -> {
LOG.info("starting modify flow config... create key={} value={},", event.getKey(), event.getValue());
NodeConvertHelper.NodeSimpleVO nodeSimpleVO = NodeConvertHelper.convert(event.getKey());
nodeSimpleVO.setScript(event.getValue());
// 启用就正常更新
if (nodeSimpleVO.getEnable()) {
LiteFlowNodeBuilder.createScriptNode()
.setId(nodeSimpleVO.getNodeId())
.setType(NodeTypeEnum.getEnumByCode(nodeSimpleVO.getType()))
.setName(nodeSimpleVO.getName())
.setScript(nodeSimpleVO.getScript())
.setLanguage(nodeSimpleVO.getLanguage())
.build();
}
// 禁用就删除
else {
FlowBus.getNodeMap().remove(nodeSimpleVO.getNodeId());
}
};
if (ObjectUtil.isNotNull(scriptClient) && ObjectUtil.isNotNull(redisParserVO.getScriptDataBase())) {
String scriptKey = redisParserVO.getScriptKey();
//添加 script
scriptClient.addListener(scriptKey, (EntryCreatedListener<String, String>) event -> {
LOG.info("starting reload flow config... create key={} value={},", event.getKey(), event.getValue());
RedisParserHelper.changeScriptNode(event.getKey(), event.getValue());
});
scriptClient.addListener(scriptKey, scriptModifyFunc);
//修改 script
scriptClient.addListener(scriptKey, (EntryUpdatedListener<String, String>) event -> {
LOG.info("starting reload flow config... update key={} new value={},", event.getKey(), event.getValue());
RedisParserHelper.changeScriptNode(event.getKey(), event.getValue());
});
scriptClient.addListener(scriptKey, scriptModifyFunc);
//删除 script
scriptClient.addListener(scriptKey, (EntryRemovedListener<String, String>) event -> {
LOG.info("starting reload flow config... delete key={}", event.getKey());

View File

@@ -33,14 +33,8 @@ public class ZkParserHelper {
private final CuratorFramework client;
private final String CHAIN_XML_PATTERN = "<chain name=\"{}\">{}</chain>";
private final String NODE_XML_PATTERN = "<nodes>{}</nodes>";
private final String NODE_ITEM_XML_PATTERN = "<node id=\"{}\" name=\"{}\" type=\"{}\"><![CDATA[{}]]></node>";
private final String NODE_ITEM_XML_WITH_LANGUAGE_PATTERN = "<node id=\"{}\" name=\"{}\" type=\"{}\" language=\"{}\"><![CDATA[{}]]></node>";
private final String XML_PATTERN = "<?xml version=\"1.0\" encoding=\"UTF-8\"?><flow>{}{}</flow>";
public ZkParserHelper(ZkParserVO zkParserVO) {
@@ -72,7 +66,7 @@ public class ZkParserHelper {
RuleParsePluginUtil.ChainDto chainDto = RuleParsePluginUtil.parseChainKey(chainName);
String chainData = new String(
client.getData().forPath(StrUtil.format("{}/{}", zkParserVO.getChainPath(), chainName)));
if (StrUtil.isNotBlank(chainData) && chainDto.isEnable()) {
if (StrUtil.isNotBlank(chainData)) {
chainItemContentList.add(chainDto.toElXml(chainData));
}
}

View File

@@ -2,6 +2,8 @@ package com.yomahub.liteflow.test.redis;
import cn.hutool.crypto.digest.DigestUtil;
import com.yomahub.liteflow.core.FlowExecutor;
import com.yomahub.liteflow.exception.ChainNotFoundException;
import com.yomahub.liteflow.flow.FlowBus;
import com.yomahub.liteflow.flow.LiteflowResponse;
import com.yomahub.liteflow.log.LFLog;
import com.yomahub.liteflow.log.LFLoggerManager;
@@ -9,7 +11,9 @@ import com.yomahub.liteflow.parser.redis.mode.RClient;
import com.yomahub.liteflow.parser.redis.mode.polling.RedisParserPollingMode;
import com.yomahub.liteflow.slot.DefaultContext;
import com.yomahub.liteflow.test.BaseTest;
import org.junit.jupiter.api.*;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.test.context.SpringBootTest;
@@ -17,13 +21,15 @@ import org.springframework.boot.test.mock.mockito.MockBean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.test.context.TestPropertySource;
import org.springframework.test.context.junit.jupiter.SpringExtension;
import javax.annotation.Resource;
import java.lang.reflect.Field;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import static org.mockito.ArgumentMatchers.*;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.when;
/**
@@ -163,4 +169,27 @@ public class RedisWithXmlELPollSpringbootTest extends BaseTest {
Assertions.assertTrue(response.isSuccess());
Assertions.assertEquals("hello world", context.getData("test11"));
}
@Test
public void testDisablePollWithXml() throws InterruptedException {
Set<String> chainNameSet = new HashSet<>();
chainNameSet.add("chain1122:false");
String chainValue = "THEN(a, b, c);";
when(chainClient.hkeys("pollChainKey")).thenReturn(chainNameSet);
when(chainClient.hget("pollChainKey", "chain1122:true")).thenReturn(chainValue);
Set<String> scriptFieldSet = new HashSet<>();
scriptFieldSet.add("s4:script:脚本s3:groovy:false");
when(scriptClient.hkeys("pollScriptKey")).thenReturn(scriptFieldSet);
when(scriptClient.hget("pollScriptKey", "s4:script:脚本s3:groovy:true")).thenReturn("defaultContext.setData(\"test\",\"hello\");");
// 测试 chain 停用
Assertions.assertThrows(ChainNotFoundException.class, () -> {
throw flowExecutor.execute2Resp("chain1122", "arg").getCause();
});
// 测试 script 停用
Assertions.assertTrue(!FlowBus.getNodeMap().containsKey("s4"));
}
}