Merge branch 'dev' of https://gitee.com/dromara/liteFlow into issues/ICM6TX

This commit is contained in:
luoyi
2025-07-27 15:17:10 +08:00
42 changed files with 1818 additions and 8 deletions

View File

@@ -67,5 +67,9 @@
<groupId>org.apache.commons</groupId>
<artifactId>commons-text</artifactId>
</dependency>
<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
</dependency>
</dependencies>
</project>

View File

@@ -28,6 +28,8 @@ import com.yomahub.liteflow.flow.element.Rollbackable;
import com.yomahub.liteflow.flow.entity.CmpStep;
import com.yomahub.liteflow.flow.id.IdGeneratorHolder;
import com.yomahub.liteflow.lifecycle.LifeCycleHolder;
import com.yomahub.liteflow.lifecycle.PostProcessChainExecuteLifeCycle;
import com.yomahub.liteflow.lifecycle.impl.ChainCacheLifeCycle;
import com.yomahub.liteflow.log.LFLog;
import com.yomahub.liteflow.log.LFLoggerManager;
import com.yomahub.liteflow.monitor.MonitorFile;
@@ -103,6 +105,12 @@ public class FlowExecutor {
IdGeneratorHolder.init();
}
// 规则缓存
if (isStart && liteflowConfig.getChainCacheEnabled()) {
// 放到解析节点后,是因为要根据节点数量判断缓存大小设置是否合理
initChainCache();
}
String ruleSource = liteflowConfig.getRuleSource();
if (StrUtil.isBlank(ruleSource)) {
// 查看有没有Parser的SPI实现
@@ -229,6 +237,11 @@ public class FlowExecutor {
}
}
// 初始化或reload时评估规则缓存容量大小
if (liteflowConfig.getChainCacheEnabled()) {
evaluateChainCacheCapacity();
}
}
// 此方法就是从原有的配置源主动拉取新的进行刷新
@@ -664,4 +677,44 @@ public class FlowExecutor {
return resultSlotList;
}
private void initChainCache() {
// 启动chain缓存必须使用 PARSE_ONE_ON_FIRST_EXEC 模式
if (!ParseModeEnum.PARSE_ONE_ON_FIRST_EXEC.equals(liteflowConfig.getParseMode())) {
LOG.warn("The parse mode is not PARSE_ONE_ON_FIRST_EXE, so the chain cache cannot be enabled.");
return;
}
// 容量不能小于等于0
Integer capacity = liteflowConfig.getChainCacheCapacity();
if (ObjectUtil.isNull(capacity) || capacity <= 0) {
throw new ConfigErrorException("The chain cache capacity must be greater than 0");
}
// 添加规则缓存生命周期
List<PostProcessChainExecuteLifeCycle> lifeCycleList = LifeCycleHolder.getPostProcessChainExecuteLifeCycleList();
boolean exist = lifeCycleList.stream()
.anyMatch(lifeCycle -> lifeCycle instanceof ChainCacheLifeCycle);
if (!exist) {
boolean success = ChainCacheLifeCycle.initIfAbsent(capacity);
if (!success) {
throw new FlowExecutorNotInitException("Initialization of ChainCacheLifeCycle failed");
}
LifeCycleHolder.addLifeCycle(ChainCacheLifeCycle.getLifeCycle());
}
}
// 评估规则缓存容量
private void evaluateChainCacheCapacity() {
if (!ParseModeEnum.PARSE_ONE_ON_FIRST_EXEC.equals(liteflowConfig.getParseMode())) {
return;
}
Integer capacity = liteflowConfig.getChainCacheCapacity();
// 容量不足chain总数的30%给予警告
int chainNum = FlowBus.getChainMap().size();
double threshold = chainNum * 0.3;
if (capacity < threshold) {
LOG.warn("The chain cache capacity {} is too small, the current total number of chains is {}, "
+"it is recommended to be greater than 30% of the number of chains", capacity, chainNum);
}
}
}

View File

@@ -10,6 +10,8 @@ package com.yomahub.liteflow.flow.element;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.BooleanUtil;
import cn.hutool.core.util.IdUtil;
import cn.hutool.core.util.StrUtil;
import com.alibaba.ttl.TransmittableThreadLocal;
import com.yomahub.liteflow.builder.el.LiteFlowChainELBuilder;
import com.yomahub.liteflow.common.ChainConstant;
@@ -19,6 +21,7 @@ import com.yomahub.liteflow.exception.FlowSystemException;
import com.yomahub.liteflow.lifecycle.LifeCycleHolder;
import com.yomahub.liteflow.log.LFLog;
import com.yomahub.liteflow.log.LFLoggerManager;
import com.yomahub.liteflow.meta.LiteflowMetaOperator;
import com.yomahub.liteflow.slot.DataBus;
import com.yomahub.liteflow.slot.Slot;
@@ -31,6 +34,7 @@ import java.util.List;
* @author Bryan.Zhang
* @author jason
* @author luo yi
* @author DaleLee
*/
public class Chain implements Executable {
@@ -115,8 +119,15 @@ public class Chain implements Executable {
}
}
if (CollUtil.isEmpty(conditionList)) {
throw new FlowSystemException("no conditionList in this chain[" + chainId + "]");
// 这里先拿到this.conditionList的引用
// 因为在正式执行condition之前this.conditionList有可能被其他线程置空
// 比如该chain在规则缓存中被淘汰
List<Condition> conditionListRef = this.conditionList;
// 但在编译后到拿到引用之前this.conditionList还是有可能已经被置空了
if (CollUtil.isEmpty(conditionListRef)) {
// 如果conditionListRef为空
// 尝试构建临时conditionList确保本次一定可以执行
conditionListRef = buildTemporaryConditionList();
}
Slot slot = DataBus.getSlot(slotIndex);
try {
@@ -131,7 +142,7 @@ public class Chain implements Executable {
slot.setChainId(chainId);
slot.addChainInstance(this);
// 执行主体Condition
for (Condition condition : conditionList) {
for (Condition condition : conditionListRef) {
condition.setCurrChainId(chainId);
condition.execute(slotIndex);
}
@@ -258,4 +269,28 @@ public class Chain implements Executable {
public void setElMd5(String elMd5) {
this.elMd5 = elMd5;
}
// 构建临时的ConditionList
private List<Condition> buildTemporaryConditionList() {
if (StrUtil.isBlank(el)) {
// 无法构建
throw new FlowSystemException("no conditionList in this chain[" + chainId + "]");
}
// 构建临时chain
String tempChainId = chainId + "_temp_" + IdUtil.simpleUUID();
// 使用LiteFlowChainELBuilder创建chain为了设置md5
LiteFlowChainELBuilder.createChain()
.setChainId(tempChainId)
.setEL(el)
.build();
// 当前模式可能为PARSE_ONE_ON_FIRST_EXEC所以临时chain可能未编译
Chain tempChain = LiteflowMetaOperator.getChain(tempChainId);
LiteFlowChainELBuilder.buildUnCompileChain(tempChain);
// 移除临时chain
LiteflowMetaOperator.removeChain(tempChainId);
// 打印警告可用于排查临时chain与已有chain重名几乎不可能发生而将已有chain覆盖的情况
LOG.warn("The conditionList of chain[{}] is empty, " +
"temporarily using chain[{}] (now removed) to build it.", chainId, tempChainId);
return tempChain.getConditionList();
}
}

View File

@@ -0,0 +1,165 @@
package com.yomahub.liteflow.lifecycle.impl;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.ObjectUtil;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.RemovalCause;
import com.github.benmanes.caffeine.cache.RemovalListener;
import com.yomahub.liteflow.flow.element.Chain;
import com.yomahub.liteflow.flow.element.Condition;
import com.yomahub.liteflow.lifecycle.PostProcessChainExecuteLifeCycle;
import com.yomahub.liteflow.meta.LiteflowMetaOperator;
import com.yomahub.liteflow.slot.Slot;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import java.util.List;
/**
* Chain 缓存生命周期
* @author DaleLee
* @since 2.13.3
*/
public class ChainCacheLifeCycle implements PostProcessChainExecuteLifeCycle {
/**
* 缓存
*/
private final Cache<String, ChainState> cache;
/**
* 实例
*/
private static ChainCacheLifeCycle INSTANCE;
private ChainCacheLifeCycle(int capacity) {
this.cache = Caffeine.newBuilder()
.maximumSize(capacity)
.evictionListener(new ChainRemovalListener())
.build();
}
@Override
public void postProcessBeforeChainExecute(String chainId, Slot slot) {
// 记录 chainId 在缓存中
// 初始状态为 ACTIVE
cache.get(chainId, key -> ChainState.newActiveState());
}
@Override
public void postProcessAfterChainExecute(String chainId, Slot slot) {
// 检测是否有不在缓存中、或处于非活跃状态而未被清理的 Chain
if (!isActive(chainId) && !isCleaned(chainId)) {
cleanChain(chainId);
}
}
/**
* Chain 在缓存中状态
*/
public static class ChainState {
/**
* Chain 活跃状态
*/
private volatile boolean active;
public ChainState(boolean active) {
this.active = active;
}
public boolean isActive() {
return active;
}
public void setActive(boolean active) {
this.active = active;
}
public static ChainState newActiveState() {
return new ChainState(true);
}
}
/**
* 监听在缓存中被移除的 Chain
*/
private static class ChainRemovalListener implements RemovalListener<String, ChainState> {
@Override
public void onRemoval(@Nullable String chainId, @Nullable ChainState chainState, @NonNull RemovalCause removalCause) {
if (ObjectUtil.isNotNull(chainState)) {
chainState.setActive(false);
}
cleanChain(chainId);
}
}
/**
* 获取缓存
* @return cache
*/
public Cache<String, ChainState> getCache() {
return cache;
}
/**
* 判断 Chain 的 Condition 是否被清理
* @param chainId chainId
* @return 被清理返回 true否则返回 false
*/
private boolean isCleaned(String chainId) {
Chain chain = LiteflowMetaOperator.getChain(chainId);
if (ObjectUtil.isNull(chain)) {
return true;
}
List<Condition> conditionList = chain.getConditionList();
return CollUtil.isEmpty(conditionList);
}
/**
* 判断 Chain 在缓存中是活跃状态
* @param chainId chainId
* @return 活跃状态返回 true不在缓存中或处于非活状态返回 false
*/
private boolean isActive(String chainId) {
ChainState chainState = cache.getIfPresent(chainId);
return ObjectUtil.isNotNull(chainState)
&& chainState.isActive();
}
/**
* 清理 Chain 的 Condition
* @param chainId chainId
*/
private static void cleanChain(String chainId) {
Chain chain = LiteflowMetaOperator.getChain(chainId);
// chain可能已经在FlowBus中被移除了
if (ObjectUtil.isNull(chain)) {
return;
}
// 将chain设置为未编译并清空condition
chain.setCompiled(false);
chain.setConditionList(null);
}
/**
* 初始化生命周期实例
* @param capacity 缓存容量
* @return 成功 true失败返回 false
*/
public synchronized static boolean initIfAbsent(int capacity) {
if (ObjectUtil.isNull(INSTANCE)) {
INSTANCE = new ChainCacheLifeCycle(capacity);
return true;
}
return false;
}
/**
* 获取生命周期实例
* @return lifeCycle
*/
public static ChainCacheLifeCycle getLifeCycle() {
return INSTANCE;
}
}

View File

@@ -116,6 +116,12 @@ public class LiteflowConfig {
// instance id 生成器
private String instanceIdGeneratorClass;
// 是否启用chain缓存
private Boolean chainCacheEnabled;
// chain缓存容量
private Integer chainCacheCapacity;
public Boolean getEnableMonitorFile() {
return enableMonitorFile;
}
@@ -495,4 +501,26 @@ public class LiteflowConfig {
public void setInstanceIdGeneratorClass(String instanceIdGeneratorClass) {
this.instanceIdGeneratorClass = instanceIdGeneratorClass;
}
public Boolean getChainCacheEnabled() {
if (ObjectUtil.isNull(chainCacheEnabled)) {
return Boolean.FALSE;
}
return chainCacheEnabled;
}
public void setChainCacheEnabled(Boolean chainCacheEnabled) {
this.chainCacheEnabled = chainCacheEnabled;
}
public Integer getChainCacheCapacity() {
if (ObjectUtil.isNull(chainCacheCapacity)) {
return 10000;
}
return chainCacheCapacity;
}
public void setChainCacheCapacity(Integer chainCacheCapacity) {
this.chainCacheCapacity = chainCacheCapacity;
}
}

View File

@@ -51,6 +51,8 @@ public class LiteflowAutoConfiguration {
liteflowConfig.setGlobalThreadPoolQueueSize(property.getGlobalThreadPoolQueueSize());
liteflowConfig.setWhenThreadPoolIsolate(property.getWhenThreadPoolIsolate());
liteflowConfig.setEnableNodeInstanceId(property.isEnableNodeInstanceId());
liteflowConfig.setChainCacheEnabled(property.getChainCache().isEnabled());
liteflowConfig.setChainCacheCapacity(property.getChainCache().getCapacity());
return liteflowConfig;
}

View File

@@ -101,6 +101,33 @@ public class LiteflowProperty {
//是否启用节点实例ID
private boolean enableNodeInstanceId;
// 规则缓存配置
private ChainCacheProperty chainCache;
public static class ChainCacheProperty {
// 是否启用规则缓存
private Boolean enabled;
// 规则缓存容量
private Integer capacity;
public Boolean isEnabled() {
return enabled;
}
public void setEnabled(Boolean enabled) {
this.enabled = enabled;
}
public Integer getCapacity() {
return capacity;
}
public void setCapacity(Integer capacity) {
this.capacity = capacity;
}
}
public boolean isEnable() {
return enable;
}
@@ -341,4 +368,12 @@ public class LiteflowProperty {
public void setEnableNodeInstanceId(boolean enableNodeInstanceId) {
this.enableNodeInstanceId = enableNodeInstanceId;
}
public ChainCacheProperty getChainCache() {
return chainCache;
}
public void setChainCache(ChainCacheProperty chainCache) {
this.chainCache = chainCache;
}
}

View File

@@ -18,4 +18,6 @@ liteflow.fallback-cmp-enable=false
liteflow.global-thread-pool-size=16
liteflow.global-thread-pool-queue-size=512
liteflow.global-thread-pool-executor-class=com.yomahub.liteflow.thread.LiteFlowDefaultGlobalExecutorBuilder
liteflow.enable-node-instance-id=false
liteflow.enable-node-instance-id=false
liteflow.chain-cache.enabled=false
liteflow.chain-cache.capacity=10000

View File

@@ -4,6 +4,7 @@ import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import com.yomahub.liteflow.enums.ParseModeEnum;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.NestedConfigurationProperty;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@@ -99,6 +100,34 @@ public class LiteflowProperty {
//是否启用节点实例ID
private boolean enableNodeInstanceId;
// 规则缓存配置
@NestedConfigurationProperty
private ChainCacheProperty chainCache;
public static class ChainCacheProperty {
// 是否启用规则缓存
private Boolean enabled;
// chain缓存容量
private Integer capacity;
public Boolean isEnabled() {
return enabled;
}
public void setEnabled(Boolean enabled) {
this.enabled = enabled;
}
public Integer getCapacity() {
return capacity;
}
public void setCapacity(Integer capacity) {
this.capacity = capacity;
}
}
public boolean isEnableMonitorFile() {
return enableMonitorFile;
}
@@ -330,4 +359,12 @@ public class LiteflowProperty {
public void setEnableNodeInstanceId(boolean enableNodeInstanceId) {
this.enableNodeInstanceId = enableNodeInstanceId;
}
public ChainCacheProperty getChainCache() {
return chainCache;
}
public void setChainCache(ChainCacheProperty chainCache) {
this.chainCache = chainCache;
}
}

View File

@@ -53,6 +53,8 @@ public class LiteflowPropertyAutoConfiguration {
liteflowConfig.setGlobalThreadPoolQueueSize(property.getGlobalThreadPoolQueueSize());
liteflowConfig.setGlobalThreadPoolSize(property.getGlobalThreadPoolSize());
liteflowConfig.setEnableNodeInstanceId(property.isEnableNodeInstanceId());
liteflowConfig.setChainCacheEnabled(property.getChainCache().isEnabled());
liteflowConfig.setChainCacheCapacity(property.getChainCache().getCapacity());
return liteflowConfig;
}

View File

@@ -206,6 +206,19 @@
"description": "enable node instance id",
"sourceType": "com.yomahub.liteflow.springboot.LiteflowProperty",
"defaultValue": false
},
{
"name": "liteflow.rule-cache.enabled",
"type": "java.lang.Boolean",
"description": "Enable rule cache.",
"sourceType": "com.yomahub.liteflow.springboot.LiteflowProperty",
"defaultValue": false
},
{
"name": "liteflow.rule-cache.capacity",
"type": "java.lang.Integer",
"description": "Set rule cache capacity.",
"sourceType": "com.yomahub.liteflow.springboot.LiteflowProperty"
}
]
}

View File

@@ -24,4 +24,7 @@ liteflow.global-thread-pool-size=64
liteflow.global-thread-pool-queue-size=512
liteflow.global-thread-pool-executor-class=com.yomahub.liteflow.thread.LiteFlowDefaultGlobalExecutorBuilder
liteflow.enable-node-instance-id=false
liteflow.chain-cache.enabled=false
liteflow.chain-cache.capacity=10000

View File

@@ -0,0 +1,237 @@
package com.yomahub.liteflow.test.chainCache;
import cn.hutool.core.collection.CollUtil;
import com.github.benmanes.caffeine.cache.Cache;
import com.yomahub.liteflow.builder.el.LiteFlowChainELBuilder;
import com.yomahub.liteflow.core.FlowExecutor;
import com.yomahub.liteflow.core.FlowExecutorHolder;
import com.yomahub.liteflow.enums.ParseModeEnum;
import com.yomahub.liteflow.exception.ChainNotFoundException;
import com.yomahub.liteflow.flow.FlowBus;
import com.yomahub.liteflow.flow.LiteflowResponse;
import com.yomahub.liteflow.flow.element.Chain;
import com.yomahub.liteflow.flow.element.Condition;
import com.yomahub.liteflow.lifecycle.impl.ChainCacheLifeCycle;
import com.yomahub.liteflow.property.LiteflowConfig;
import com.yomahub.liteflow.test.BaseTest;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.util.HashSet;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
/**
* 非Spring环境下的chain缓存测试
* @author DaleLee
*/
public class ChainCacheTest extends BaseTest {
private static FlowExecutor flowExecutor;
@BeforeAll
public static void init() {
LiteflowConfig config = new LiteflowConfig();
config.setRuleSource("chainCache/flow.el.xml");
config.setChainCacheEnabled(true);
config.setChainCacheCapacity(5);
config.setParseMode(ParseModeEnum.PARSE_ONE_ON_FIRST_EXEC);
flowExecutor = FlowExecutorHolder.loadInstance(config);
}
@BeforeEach
public void reload() {
flowExecutor.reloadRule();
// 清空缓存
Cache<String, ChainCacheLifeCycle.ChainState> cache = getCache();
cache.invalidateAll();
cache.cleanUp();
}
// 测试chain被淘汰
@Test
public void testChainCache1() {
// 加满缓存
loadCache();
// 缓存快照
HashSet<@NonNull String> strings = CollUtil.newHashSet(getCache().asMap().keySet());
LiteflowResponse response = flowExecutor.execute2Resp("chain6", "arg");
Assertions.assertTrue(response.isSuccess());
Assertions.assertEquals("c==>b", response.getExecuteStepStr());
// 获得被淘汰chain
String chainId = getEvictedChain(strings);
testEvicted(chainId);
// 测试被淘汰的chain仍可正常执行
response = flowExecutor.execute2Resp(chainId, "arg");
Assertions.assertTrue(response.isSuccess());
}
// 测试缓存数量上限(串行)
@Test
public void testChainCache2() {
// 确保至少执行过5个不同的chain
loadCache();
// 随机执行chain
Random random = new Random();
for (int i = 0; i < 100; i++) {
int id = random.nextInt(10) + 1;
flowExecutor.execute2Resp("chain" + id);
}
// 等待缓存淘汰
getCache().cleanUp();
Assertions.assertEquals(10, FlowBus.getChainMap().size());
// 测试只有5个chain被编译
int count = 0;
for (Chain chain : FlowBus.getChainMap().values()) {
List<Condition> conditionList = chain.getConditionList();
if (chain.isCompiled()) {
Assertions.assertTrue(CollUtil.isNotEmpty(conditionList));
count++;
} else {
Assertions.assertNull(conditionList);
}
}
Assertions.assertTrue(count <= 5);
}
// 测试缓存数量上限(并行)
@Test
public void testChainCache3() {
loadCache();
Random random = new Random();
List<Future<LiteflowResponse>> futureList = CollUtil.newArrayList();
for (int i = 0; i < 100; i++) {
int id = random.nextInt(10) + 1;
Future<LiteflowResponse> future = flowExecutor.execute2Future("chain" + id, "arg");
futureList.add(future);
}
futureList.forEach(future -> {
try {
LiteflowResponse response = future.get();
Assertions.assertTrue(response.isSuccess());
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
});
// 等待缓存淘汰
getCache().cleanUp();
Assertions.assertEquals(10, FlowBus.getChainMap().size());
// 测试只有5个chain被编译
int count = 0;
for (Chain chain : FlowBus.getChainMap().values()) {
List<Condition> conditionList = chain.getConditionList();
if (chain.isCompiled()) {
Assertions.assertTrue(CollUtil.isNotEmpty(conditionList));
count++;
} else {
Assertions.assertNull(conditionList);
}
}
Assertions.assertTrue(count <= 5);
}
// 测试开启规则缓存后进入缓存的chain可以正常被更新
@Test
public void testChainCache4() {
loadCache();
// 缓存快照
HashSet<@NonNull String> strings = CollUtil.newHashSet(getCache().asMap().keySet());
LiteflowResponse response = flowExecutor.execute2Resp("chain7", "arg");
Assertions.assertTrue(response.isSuccess());
Assertions.assertEquals("x==>a==>b", response.getExecuteStepStr());
// 获得被淘汰chain
String chainId = getEvictedChain(strings);
testEvicted(chainId);
// chain7进入缓存
Assertions.assertTrue(getCache().asMap().containsKey("chain7"));
// 更新chain7
LiteFlowChainELBuilder
.createChain()
.setChainId("chain7")
.setEL("THEN(a, b, c)")
.build();
// 重新执行chain7
response = flowExecutor.execute2Resp("chain7", "arg");
Assertions.assertTrue(response.isSuccess());
Assertions.assertEquals("a==>b==>c", response.getExecuteStepStr());
}
// 测试开启规则缓存后进入缓存的chain被移除后无法执行
@Test
public void testChainCache5() {
loadCache();
// 缓存快照
HashSet<@NonNull String> strings = CollUtil.newHashSet(getCache().asMap().keySet());
LiteflowResponse response = flowExecutor.execute2Resp("chain7", "arg");
Assertions.assertTrue(response.isSuccess());
Assertions.assertEquals("x==>a==>b", response.getExecuteStepStr());
// 获得被淘汰chain
String chainId = getEvictedChain(strings);
testEvicted(chainId);
// chain7进入缓存
Assertions.assertTrue(getCache().asMap().containsKey("chain7"));
// 手动移除chain7
FlowBus.removeChain("chain7");
response = flowExecutor.execute2Resp("chain7", "arg");
Assertions.assertFalse(response.isSuccess());
Assertions.assertEquals(ChainNotFoundException.class, response.getCause().getClass());
}
// 测试并发下正在执行的chain的condition被清理但仍能执行
@Test
public void testChainCache6() throws InterruptedException {
// 模拟清空编译好的chain
Thread thread = new Thread(()-> {
Chain chain1 = FlowBus.getChain("chain1");
// 绕过初次编译
chain1.setCompiled(true);
chain1.setConditionList(null);
});
thread.start();
thread.join();
LiteflowResponse response = flowExecutor.execute2Resp("chain1", "arg");
Assertions.assertTrue(response.isSuccess());
Assertions.assertEquals("a==>b", response.getExecuteStepStr());
}
// 加载缓存, chain1~chain5
private void loadCache() {
// 容量上限为5
for (int i = 1; i <= 5; i++) {
flowExecutor.execute2Resp("chain" + i);
}
}
// 测试 chain 被淘汰
private void testEvicted(String chanId) {
Chain chain = FlowBus.getChain(chanId);
getCache().cleanUp();
// 测试缓存中不存在
Assertions.assertFalse(getCache().asMap().containsKey(chanId));
// 测试chain被设置为未编译
Assertions.assertFalse(chain.isCompiled());
Assertions.assertNull(chain.getConditionList());
}
public Cache<String, ChainCacheLifeCycle.ChainState> getCache() {
return ChainCacheLifeCycle.getLifeCycle().getCache();
}
// 获得淘汰的chain传入淘汰前的chain集合
// 确保只有一个被淘汰时使用
String getEvictedChain(Set<String> set) {
Cache<String, ChainCacheLifeCycle.ChainState> cache = getCache();
cache.cleanUp();
Set<@NonNull String> strings = cache.asMap().keySet();
set.removeAll(strings);
Assertions.assertEquals(1, set.size());
return set.iterator().next();
}
}

View File

@@ -0,0 +1,18 @@
/**
* <p>Title: liteflow</p>
* <p>Description: 轻量级的组件式流程框架</p>
* @author Bryan.Zhang
* @email weenyc31@163.com
* @Date 2020/4/1
*/
package com.yomahub.liteflow.test.chainCache.cmp;
import com.yomahub.liteflow.core.NodeComponent;
public class ACmp extends NodeComponent {
@Override
public void process() {
System.out.println("ACmp executed!");
}
}

View File

@@ -0,0 +1,19 @@
/**
* <p>Title: liteflow</p>
* <p>Description: 轻量级的组件式流程框架</p>
* @author Bryan.Zhang
* @email weenyc31@163.com
* @Date 2020/4/1
*/
package com.yomahub.liteflow.test.chainCache.cmp;
import com.yomahub.liteflow.core.NodeComponent;
public class BCmp extends NodeComponent {
@Override
public void process() {
System.out.println("BCmp executed!");
}
}

View File

@@ -0,0 +1,18 @@
/**
* <p>Title: liteflow</p>
* <p>Description: 轻量级的组件式流程框架</p>
* @author Bryan.Zhang
* @email weenyc31@163.com
* @Date 2020/4/1
*/
package com.yomahub.liteflow.test.chainCache.cmp;
import com.yomahub.liteflow.core.NodeComponent;
public class CCmp extends NodeComponent {
@Override
public void process() {
System.out.println("CCmp executed!");
}
}

View File

@@ -0,0 +1,10 @@
package com.yomahub.liteflow.test.chainCache.cmp;
import com.yomahub.liteflow.core.NodeBooleanComponent;
public class XCmp extends NodeBooleanComponent {
@Override
public boolean processBoolean() throws Exception {
return true;
}
}

View File

@@ -0,0 +1,50 @@
<?xml version="1.0" encoding="UTF-8"?>
<flow>
<nodes>
<node id="a" class="com.yomahub.liteflow.test.chainCache.cmp.ACmp"/>
<node id="b" class="com.yomahub.liteflow.test.chainCache.cmp.BCmp"/>
<node id="c" class="com.yomahub.liteflow.test.chainCache.cmp.CCmp"/>
<node id="x" class="com.yomahub.liteflow.test.chainCache.cmp.XCmp"/>
</nodes>
<chain name="chain1">
THEN(a, b);
</chain>
<chain name="chain2">
THEN(a, c);
</chain>
<chain id="chain3">
THEN(b, a);
</chain>
<chain name="chain4">
THEN(b, c);
</chain>
<chain name="chain5">
THEN(c, a);
</chain>
<chain name="chain6">
THEN(c, b);
</chain>
<chain name="chain7">
THEN(IF(x, a), b);
</chain>
<chain name="chain8">
THEN(IF(x, a), c);
</chain>
<chain name="chain9">
WHEN(a, b, c);
</chain>
<chain name="chain10">
FOR(5).DO(THEN(a, b, c));
</chain>
</flow>

View File

@@ -4,6 +4,7 @@ liteflow.rule-source-ext-data={\
"pollingInterval":1,\
"pollingStartTime":2,\
"chainKey":"pollChainKey",\
"scriptKey":"pollScriptKey"\
"scriptKey":"pollScriptKey",\
"scriptDataBase":1\
}
liteflow.parse-mode=PARSE_ALL_ON_FIRST_EXEC

View File

@@ -0,0 +1,229 @@
package com.yomahub.liteflow.test.chainCache;
import cn.hutool.core.collection.CollUtil;
import com.github.benmanes.caffeine.cache.Cache;
import com.yomahub.liteflow.builder.el.LiteFlowChainELBuilder;
import com.yomahub.liteflow.core.FlowExecutor;
import com.yomahub.liteflow.exception.ChainNotFoundException;
import com.yomahub.liteflow.flow.FlowBus;
import com.yomahub.liteflow.flow.LiteflowResponse;
import com.yomahub.liteflow.flow.element.Chain;
import com.yomahub.liteflow.flow.element.Condition;
import com.yomahub.liteflow.lifecycle.impl.ChainCacheLifeCycle;
import com.yomahub.liteflow.test.BaseTest;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.noear.solon.annotation.Import;
import org.noear.solon.annotation.Inject;
import org.noear.solon.test.SolonTest;
import java.util.HashSet;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
/**
* Solon环境下chain缓存测试
* @author DaleLee
*/
@SolonTest
@Import(profiles="classpath:/chainCache/application.properties")
public class ChainCacheSolonTest extends BaseTest {
@Inject
private FlowExecutor flowExecutor;
@BeforeEach
public void reload() {
flowExecutor.reloadRule();
// 清空缓存
Cache<String, ChainCacheLifeCycle.ChainState> cache = getCache();
cache.invalidateAll();
cache.cleanUp();
}
// 测试chain被淘汰
@Test
public void testChainCache1() {
// 加满缓存
loadCache();
// 缓存快照
HashSet<@NonNull String> strings = CollUtil.newHashSet(getCache().asMap().keySet());
LiteflowResponse response = flowExecutor.execute2Resp("chain6", "arg");
Assertions.assertTrue(response.isSuccess());
Assertions.assertEquals("c==>b", response.getExecuteStepStr());
// 获得被淘汰chain
String chainId = getEvictedChain(strings);
testEvicted(chainId);
// 测试被淘汰的chain仍可正常执行
response = flowExecutor.execute2Resp(chainId, "arg");
Assertions.assertTrue(response.isSuccess());
}
// 测试缓存数量上限(串行)
@Test
public void testChainCache2() {
// 确保至少执行过5个不同的chain
loadCache();
// 随机执行chain
Random random = new Random();
for (int i = 0; i < 100; i++) {
int id = random.nextInt(10) + 1;
flowExecutor.execute2Resp("chain" + id);
}
// 等待缓存淘汰
getCache().cleanUp();
Assertions.assertEquals(10, FlowBus.getChainMap().size());
// 测试只有5个chain被编译
int count = 0;
for (Chain chain : FlowBus.getChainMap().values()) {
List<Condition> conditionList = chain.getConditionList();
if (chain.isCompiled()) {
Assertions.assertTrue(CollUtil.isNotEmpty(conditionList));
count++;
} else {
Assertions.assertNull(conditionList);
}
}
Assertions.assertTrue(count <= 5);
}
// 测试缓存数量上限(并行)
@Test
public void testChainCache3() {
loadCache();
Random random = new Random();
List<Future<LiteflowResponse>> futureList = CollUtil.newArrayList();
for (int i = 0; i < 100; i++) {
int id = random.nextInt(10) + 1;
Future<LiteflowResponse> future = flowExecutor.execute2Future("chain" + id, "arg");
futureList.add(future);
}
futureList.forEach(future -> {
try {
LiteflowResponse response = future.get();
Assertions.assertTrue(response.isSuccess());
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
});
// 等待缓存淘汰
getCache().cleanUp();
Assertions.assertEquals(10, FlowBus.getChainMap().size());
// 测试只有5个chain被编译
int count = 0;
for (Chain chain : FlowBus.getChainMap().values()) {
List<Condition> conditionList = chain.getConditionList();
if (chain.isCompiled()) {
Assertions.assertTrue(CollUtil.isNotEmpty(conditionList));
count++;
} else {
Assertions.assertNull(conditionList);
}
}
Assertions.assertTrue(count <= 5);
}
// 测试开启规则缓存后进入缓存的chain可以正常被更新
@Test
public void testChainCache4() {
loadCache();
// 缓存快照
HashSet<@NonNull String> strings = CollUtil.newHashSet(getCache().asMap().keySet());
LiteflowResponse response = flowExecutor.execute2Resp("chain7", "arg");
Assertions.assertTrue(response.isSuccess());
Assertions.assertEquals("x==>a==>b", response.getExecuteStepStr());
// 获得被淘汰chain
String chainId = getEvictedChain(strings);
testEvicted(chainId);
// chain7进入缓存
Assertions.assertTrue(getCache().asMap().containsKey("chain7"));
// 更新chain7
LiteFlowChainELBuilder
.createChain()
.setChainId("chain7")
.setEL("THEN(a, b, c)")
.build();
// 重新执行chain7
response = flowExecutor.execute2Resp("chain7", "arg");
Assertions.assertTrue(response.isSuccess());
Assertions.assertEquals("a==>b==>c", response.getExecuteStepStr());
}
// 测试开启规则缓存后进入缓存的chain被移除后无法执行
@Test
public void testChainCache5() {
loadCache();
// 缓存快照
HashSet<@NonNull String> strings = CollUtil.newHashSet(getCache().asMap().keySet());
LiteflowResponse response = flowExecutor.execute2Resp("chain7", "arg");
Assertions.assertTrue(response.isSuccess());
Assertions.assertEquals("x==>a==>b", response.getExecuteStepStr());
// 获得被淘汰chain
String chainId = getEvictedChain(strings);
testEvicted(chainId);
// chain7进入缓存
Assertions.assertTrue(getCache().asMap().containsKey("chain7"));
// 手动移除chain7
FlowBus.removeChain("chain7");
response = flowExecutor.execute2Resp("chain7", "arg");
Assertions.assertFalse(response.isSuccess());
Assertions.assertEquals(ChainNotFoundException.class, response.getCause().getClass());
}
// 测试并发下正在执行的chain的condition被清理但仍能执行
@Test
public void testChainCache6() throws InterruptedException {
// 模拟清空编译好的chain
Thread thread = new Thread(()-> {
Chain chain1 = FlowBus.getChain("chain1");
// 绕过初次编译
chain1.setCompiled(true);
chain1.setConditionList(null);
});
thread.start();
thread.join();
LiteflowResponse response = flowExecutor.execute2Resp("chain1", "arg");
Assertions.assertTrue(response.isSuccess());
Assertions.assertEquals("a==>b", response.getExecuteStepStr());
}
// 加载缓存, chain1~chain5
private void loadCache() {
// 容量上限为5
for (int i = 1; i <= 5; i++) {
flowExecutor.execute2Resp("chain" + i);
}
}
// 测试 chain 被淘汰
private void testEvicted(String chanId) {
Chain chain = FlowBus.getChain(chanId);
getCache().cleanUp();
// 测试缓存中不存在
Assertions.assertFalse(getCache().asMap().containsKey(chanId));
// 测试chain被设置为未编译
Assertions.assertFalse(chain.isCompiled());
Assertions.assertNull(chain.getConditionList());
}
public Cache<String, ChainCacheLifeCycle.ChainState> getCache() {
return ChainCacheLifeCycle.getLifeCycle().getCache();
}
// 获得淘汰的chain传入淘汰前的chain集合
// 确保只有一个被淘汰时使用
String getEvictedChain(Set<String> set) {
Cache<String, ChainCacheLifeCycle.ChainState> cache = getCache();
cache.cleanUp();
Set<@NonNull String> strings = cache.asMap().keySet();
set.removeAll(strings);
Assertions.assertEquals(1, set.size());
return set.iterator().next();
}
}

View File

@@ -0,0 +1,20 @@
/**
* <p>Title: liteflow</p>
* <p>Description: 轻量级的组件式流程框架</p>
* @author Bryan.Zhang
* @email weenyc31@163.com
* @Date 2020/4/1
*/
package com.yomahub.liteflow.test.chainCache.cmp;
import com.yomahub.liteflow.core.NodeComponent;
import org.noear.solon.annotation.Component;
@Component("a")
public class ACmp extends NodeComponent {
@Override
public void process() {
System.out.println("ACmp executed!");
}
}

View File

@@ -0,0 +1,21 @@
/**
* <p>Title: liteflow</p>
* <p>Description: 轻量级的组件式流程框架</p>
* @author Bryan.Zhang
* @email weenyc31@163.com
* @Date 2020/4/1
*/
package com.yomahub.liteflow.test.chainCache.cmp;
import com.yomahub.liteflow.core.NodeComponent;
import org.noear.solon.annotation.Component;
@Component("b")
public class BCmp extends NodeComponent {
@Override
public void process() {
System.out.println("BCmp executed!");
}
}

View File

@@ -0,0 +1,20 @@
/**
* <p>Title: liteflow</p>
* <p>Description: 轻量级的组件式流程框架</p>
* @author Bryan.Zhang
* @email weenyc31@163.com
* @Date 2020/4/1
*/
package com.yomahub.liteflow.test.chainCache.cmp;
import com.yomahub.liteflow.core.NodeComponent;
import org.noear.solon.annotation.Component;
@Component("c")
public class CCmp extends NodeComponent {
@Override
public void process() {
System.out.println("CCmp executed!");
}
}

View File

@@ -0,0 +1,12 @@
package com.yomahub.liteflow.test.chainCache.cmp;
import com.yomahub.liteflow.core.NodeBooleanComponent;
import org.noear.solon.annotation.Component;
@Component("x")
public class XCmp extends NodeBooleanComponent {
@Override
public boolean processBoolean() throws Exception {
return true;
}
}

View File

@@ -0,0 +1,4 @@
liteflow.rule-source=chainCache/flow.el.xml
liteflow.chain-cache.enabled=true
liteflow.chain-cache.capacity=5
liteflow.parse-mode=PARSE_ONE_ON_FIRST_EXEC

View File

@@ -0,0 +1,43 @@
<?xml version="1.0" encoding="UTF-8"?>
<flow>
<chain name="chain1">
THEN(a, b);
</chain>
<chain name="chain2">
THEN(a, c);
</chain>
<chain name="chain3">
THEN(b, a);
</chain>
<chain name="chain4">
THEN(b, c);
</chain>
<chain name="chain5">
THEN(c, a);
</chain>
<chain name="chain6">
THEN(c, b);
</chain>
<chain name="chain7">
THEN(IF(x, a), b);
</chain>
<chain name="chain8">
THEN(IF(x, a), c);
</chain>
<chain name="chain9">
WHEN(a, b, c);
</chain>
<chain name="chain10">
FOR(5).DO(THEN(a, b, c));
</chain>
</flow>

View File

@@ -0,0 +1,232 @@
package com.yomahub.liteflow.test.chainCache;
import cn.hutool.core.collection.CollUtil;
import com.github.benmanes.caffeine.cache.Cache;
import com.yomahub.liteflow.builder.el.LiteFlowChainELBuilder;
import com.yomahub.liteflow.core.FlowExecutor;
import com.yomahub.liteflow.exception.ChainNotFoundException;
import com.yomahub.liteflow.flow.FlowBus;
import com.yomahub.liteflow.flow.LiteflowResponse;
import com.yomahub.liteflow.flow.element.Chain;
import com.yomahub.liteflow.flow.element.Condition;
import com.yomahub.liteflow.lifecycle.impl.ChainCacheLifeCycle;
import com.yomahub.liteflow.test.BaseTest;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.test.context.TestPropertySource;
import javax.annotation.Resource;
import java.util.HashSet;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
/**
* Springboot环境下chain缓存测试
* @author DaleLee
*/
@TestPropertySource(value = "classpath:/chainCache/application.properties")
@SpringBootTest(classes = ChainCacheSpringbootTest.class)
@EnableAutoConfiguration
@ComponentScan({"com.yomahub.liteflow.test.chainCache.cmp"})
public class ChainCacheSpringbootTest extends BaseTest {
@Resource
private FlowExecutor flowExecutor;
@BeforeEach
public void reload() {
flowExecutor.reloadRule();
// 清空缓存
Cache<String, ChainCacheLifeCycle.ChainState> cache = getCache();
cache.invalidateAll();
cache.cleanUp();
}
// 测试chain被淘汰
@Test
public void testChainCache1() {
// 加满缓存
loadCache();
// 缓存快照
HashSet<@NonNull String> strings = CollUtil.newHashSet(getCache().asMap().keySet());
LiteflowResponse response = flowExecutor.execute2Resp("chain6", "arg");
Assertions.assertTrue(response.isSuccess());
Assertions.assertEquals("c==>b", response.getExecuteStepStr());
// 获得被淘汰chain
String chainId = getEvictedChain(strings);
testEvicted(chainId);
// 测试被淘汰的chain仍可正常执行
response = flowExecutor.execute2Resp(chainId, "arg");
Assertions.assertTrue(response.isSuccess());
}
// 测试缓存数量上限(串行)
@Test
public void testChainCache2() {
// 确保至少执行过5个不同的chain
loadCache();
// 随机执行chain
Random random = new Random();
for (int i = 0; i < 100; i++) {
int id = random.nextInt(10) + 1;
flowExecutor.execute2Resp("chain" + id);
}
// 等待缓存淘汰
getCache().cleanUp();
Assertions.assertEquals(10, FlowBus.getChainMap().size());
// 测试只有5个chain被编译
int count = 0;
for (Chain chain : FlowBus.getChainMap().values()) {
List<Condition> conditionList = chain.getConditionList();
if (chain.isCompiled()) {
Assertions.assertTrue(CollUtil.isNotEmpty(conditionList));
count++;
} else {
Assertions.assertNull(conditionList);
}
}
Assertions.assertTrue(count <= 5);
}
// 测试缓存数量上限(并行)
@Test
public void testChainCache3() {
loadCache();
Random random = new Random();
List<Future<LiteflowResponse>> futureList = CollUtil.newArrayList();
for (int i = 0; i < 100; i++) {
int id = random.nextInt(10) + 1;
Future<LiteflowResponse> future = flowExecutor.execute2Future("chain" + id, "arg");
futureList.add(future);
}
futureList.forEach(future -> {
try {
LiteflowResponse response = future.get();
Assertions.assertTrue(response.isSuccess());
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
});
// 等待缓存淘汰
getCache().cleanUp();
Assertions.assertEquals(10, FlowBus.getChainMap().size());
// 测试只有5个chain被编译
int count = 0;
for (Chain chain : FlowBus.getChainMap().values()) {
List<Condition> conditionList = chain.getConditionList();
if (chain.isCompiled()) {
Assertions.assertTrue(CollUtil.isNotEmpty(conditionList));
count++;
} else {
Assertions.assertNull(conditionList);
}
}
Assertions.assertTrue(count <= 5);
}
// 测试开启规则缓存后进入缓存的chain可以正常被更新
@Test
public void testChainCache4() {
loadCache();
// 缓存快照
HashSet<@NonNull String> strings = CollUtil.newHashSet(getCache().asMap().keySet());
LiteflowResponse response = flowExecutor.execute2Resp("chain7", "arg");
Assertions.assertTrue(response.isSuccess());
Assertions.assertEquals("x==>a==>b", response.getExecuteStepStr());
// 获得被淘汰chain
String chainId = getEvictedChain(strings);
testEvicted(chainId);
// chain7进入缓存
Assertions.assertTrue(getCache().asMap().containsKey("chain7"));
// 更新chain7
LiteFlowChainELBuilder
.createChain()
.setChainId("chain7")
.setEL("THEN(a, b, c)")
.build();
// 重新执行chain7
response = flowExecutor.execute2Resp("chain7", "arg");
Assertions.assertTrue(response.isSuccess());
Assertions.assertEquals("a==>b==>c", response.getExecuteStepStr());
}
// 测试开启规则缓存后进入缓存的chain被移除后无法执行
@Test
public void testChainCache5() {
loadCache();
// 缓存快照
HashSet<@NonNull String> strings = CollUtil.newHashSet(getCache().asMap().keySet());
LiteflowResponse response = flowExecutor.execute2Resp("chain7", "arg");
Assertions.assertTrue(response.isSuccess());
Assertions.assertEquals("x==>a==>b", response.getExecuteStepStr());
// 获得被淘汰chain
String chainId = getEvictedChain(strings);
testEvicted(chainId);
// chain7进入缓存
Assertions.assertTrue(getCache().asMap().containsKey("chain7"));
// 手动移除chain7
FlowBus.removeChain("chain7");
response = flowExecutor.execute2Resp("chain7", "arg");
Assertions.assertFalse(response.isSuccess());
Assertions.assertEquals(ChainNotFoundException.class, response.getCause().getClass());
}
// 测试并发下正在执行的chain的condition被清理但仍能执行
@Test
public void testChainCache6() throws InterruptedException {
// 模拟清空编译好的chain
Thread thread = new Thread(()-> {
Chain chain1 = FlowBus.getChain("chain1");
// 绕过初次编译
chain1.setCompiled(true);
chain1.setConditionList(null);
});
thread.start();
thread.join();
LiteflowResponse response = flowExecutor.execute2Resp("chain1", "arg");
Assertions.assertTrue(response.isSuccess());
Assertions.assertEquals("a==>b", response.getExecuteStepStr());
}
// 加载缓存, chain1~chain5
private void loadCache() {
// 容量上限为5
for (int i = 1; i <= 5; i++) {
flowExecutor.execute2Resp("chain" + i);
}
}
// 测试 chain 被淘汰
private void testEvicted(String chanId) {
Chain chain = FlowBus.getChain(chanId);
getCache().cleanUp();
// 测试缓存中不存在
Assertions.assertFalse(getCache().asMap().containsKey(chanId));
// 测试chain被设置为未编译
Assertions.assertFalse(chain.isCompiled());
Assertions.assertNull(chain.getConditionList());
}
public Cache<String, ChainCacheLifeCycle.ChainState> getCache() {
return ChainCacheLifeCycle.getLifeCycle().getCache();
}
// 获得淘汰的chain传入淘汰前的chain集合
// 确保只有一个被淘汰时使用
String getEvictedChain(Set<String> set) {
Cache<String, ChainCacheLifeCycle.ChainState> cache = getCache();
cache.cleanUp();
Set<@NonNull String> strings = cache.asMap().keySet();
set.removeAll(strings);
Assertions.assertEquals(1, set.size());
return set.iterator().next();
}
}

View File

@@ -0,0 +1,20 @@
/**
* <p>Title: liteflow</p>
* <p>Description: 轻量级的组件式流程框架</p>
* @author Bryan.Zhang
* @email weenyc31@163.com
* @Date 2020/4/1
*/
package com.yomahub.liteflow.test.chainCache.cmp;
import com.yomahub.liteflow.core.NodeComponent;
import org.springframework.stereotype.Component;
@Component("a")
public class ACmp extends NodeComponent {
@Override
public void process() {
System.out.println("ACmp executed!");
}
}

View File

@@ -0,0 +1,21 @@
/**
* <p>Title: liteflow</p>
* <p>Description: 轻量级的组件式流程框架</p>
* @author Bryan.Zhang
* @email weenyc31@163.com
* @Date 2020/4/1
*/
package com.yomahub.liteflow.test.chainCache.cmp;
import com.yomahub.liteflow.core.NodeComponent;
import org.springframework.stereotype.Component;
@Component("b")
public class BCmp extends NodeComponent {
@Override
public void process() {
System.out.println("BCmp executed!");
}
}

View File

@@ -0,0 +1,20 @@
/**
* <p>Title: liteflow</p>
* <p>Description: 轻量级的组件式流程框架</p>
* @author Bryan.Zhang
* @email weenyc31@163.com
* @Date 2020/4/1
*/
package com.yomahub.liteflow.test.chainCache.cmp;
import com.yomahub.liteflow.core.NodeComponent;
import org.springframework.stereotype.Component;
@Component("c")
public class CCmp extends NodeComponent {
@Override
public void process() {
System.out.println("CCmp executed!");
}
}

View File

@@ -0,0 +1,12 @@
package com.yomahub.liteflow.test.chainCache.cmp;
import com.yomahub.liteflow.core.NodeBooleanComponent;
import org.springframework.stereotype.Component;
@Component("x")
public class XCmp extends NodeBooleanComponent {
@Override
public boolean processBoolean() throws Exception {
return true;
}
}

View File

@@ -36,7 +36,7 @@ public class SubflowSpringbootTest extends BaseTest {
public void testSubflow1() throws Exception {
LiteflowResponse response = flowExecutor.execute2Resp("chain1", "arg");
Assertions.assertTrue(response.isSuccess());
Assertions.assertTrue(ListUtil.toList("a==>c==>d==>b","a==>c==>b==>d").contains(response.getExecuteStepStr()));
Assertions.assertTrue(ListUtil.toList("a==>c==>d==>b","a==>d==>c==>b").contains(response.getExecuteStepStr()));
}
//测试子chain
@@ -44,7 +44,7 @@ public class SubflowSpringbootTest extends BaseTest {
public void testSubflow2() throws Exception {
LiteflowResponse response = flowExecutor.execute2Resp("chain2", "arg");
Assertions.assertTrue(response.isSuccess());
Assertions.assertTrue(ListUtil.toList("a==>c==>d==>b","a==>c==>b==>d").contains(response.getExecuteStepStr()));
Assertions.assertTrue(ListUtil.toList("a==>c==>d==>b","a==>d==>c==>b").contains(response.getExecuteStepStr()));
}
//测试在组件里调用另一个流程

View File

@@ -0,0 +1,4 @@
liteflow.rule-source=chainCache/flow.el.xml
liteflow.chain-cache.enabled=true
liteflow.chain-cache.capacity=5
liteflow.parse-mode=PARSE_ONE_ON_FIRST_EXEC

View File

@@ -0,0 +1,43 @@
<?xml version="1.0" encoding="UTF-8"?>
<flow>
<chain name="chain1">
THEN(a, b);
</chain>
<chain name="chain2">
THEN(a, c);
</chain>
<chain name="chain3">
THEN(b, a);
</chain>
<chain name="chain4">
THEN(b, c);
</chain>
<chain name="chain5">
THEN(c, a);
</chain>
<chain name="chain6">
THEN(c, b);
</chain>
<chain name="chain7">
THEN(IF(x, a), b);
</chain>
<chain name="chain8">
THEN(IF(x, a), c);
</chain>
<chain name="chain9">
WHEN(a, b, c);
</chain>
<chain name="chain10">
FOR(5).DO(THEN(a, b, c));
</chain>
</flow>

View File

@@ -0,0 +1,229 @@
package com.yomahub.liteflow.test.chainCache;
import cn.hutool.core.collection.CollUtil;
import com.github.benmanes.caffeine.cache.Cache;
import com.yomahub.liteflow.builder.el.LiteFlowChainELBuilder;
import com.yomahub.liteflow.core.FlowExecutor;
import com.yomahub.liteflow.exception.ChainNotFoundException;
import com.yomahub.liteflow.flow.FlowBus;
import com.yomahub.liteflow.flow.LiteflowResponse;
import com.yomahub.liteflow.flow.element.Chain;
import com.yomahub.liteflow.flow.element.Condition;
import com.yomahub.liteflow.lifecycle.impl.ChainCacheLifeCycle;
import com.yomahub.liteflow.test.BaseTest;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit.jupiter.SpringExtension;
import javax.annotation.Resource;
import java.util.HashSet;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
/**
* Spring环境下chain缓存测试
* @author DaleLee
*/
@ExtendWith(SpringExtension.class)
@ContextConfiguration("classpath:/chainCache/application.xml")
public class ChainCacheSpringTest extends BaseTest {
@Resource
private FlowExecutor flowExecutor;
@BeforeEach
public void reload() {
flowExecutor.reloadRule();
// 清空缓存
Cache<String, ChainCacheLifeCycle.ChainState> cache = getCache();
cache.invalidateAll();
cache.cleanUp();
}
// 测试chain被淘汰
@Test
public void testChainCache1() {
// 加满缓存
loadCache();
// 缓存快照
HashSet<@NonNull String> strings = CollUtil.newHashSet(getCache().asMap().keySet());
LiteflowResponse response = flowExecutor.execute2Resp("chain6", "arg");
Assertions.assertTrue(response.isSuccess());
Assertions.assertEquals("c==>b", response.getExecuteStepStr());
// 获得被淘汰chain
String chainId = getEvictedChain(strings);
testEvicted(chainId);
// 测试被淘汰的chain仍可正常执行
response = flowExecutor.execute2Resp(chainId, "arg");
Assertions.assertTrue(response.isSuccess());
}
// 测试缓存数量上限(串行)
@Test
public void testChainCache2() {
// 确保至少执行过5个不同的chain
loadCache();
// 随机执行chain
Random random = new Random();
for (int i = 0; i < 100; i++) {
int id = random.nextInt(10) + 1;
flowExecutor.execute2Resp("chain" + id);
}
// 等待缓存淘汰
getCache().cleanUp();
Assertions.assertEquals(10, FlowBus.getChainMap().size());
// 测试只有5个chain被编译
int count = 0;
for (Chain chain : FlowBus.getChainMap().values()) {
List<Condition> conditionList = chain.getConditionList();
if (chain.isCompiled()) {
Assertions.assertTrue(CollUtil.isNotEmpty(conditionList));
count++;
} else {
Assertions.assertNull(conditionList);
}
}
Assertions.assertTrue(count <= 5);
}
// 测试缓存数量上限(并行)
@Test
public void testChainCache3() {
loadCache();
Random random = new Random();
List<Future<LiteflowResponse>> futureList = CollUtil.newArrayList();
for (int i = 0; i < 100; i++) {
int id = random.nextInt(10) + 1;
Future<LiteflowResponse> future = flowExecutor.execute2Future("chain" + id, "arg");
futureList.add(future);
}
futureList.forEach(future -> {
try {
LiteflowResponse response = future.get();
Assertions.assertTrue(response.isSuccess());
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
});
// 等待缓存淘汰
getCache().cleanUp();
Assertions.assertEquals(10, FlowBus.getChainMap().size());
// 测试只有5个chain被编译
int count = 0;
for (Chain chain : FlowBus.getChainMap().values()) {
List<Condition> conditionList = chain.getConditionList();
if (chain.isCompiled()) {
Assertions.assertTrue(CollUtil.isNotEmpty(conditionList));
count++;
} else {
Assertions.assertNull(conditionList);
}
}
Assertions.assertTrue(count <= 5);
}
// 测试开启规则缓存后进入缓存的chain可以正常被更新
@Test
public void testChainCache4() {
loadCache();
// 缓存快照
HashSet<@NonNull String> strings = CollUtil.newHashSet(getCache().asMap().keySet());
LiteflowResponse response = flowExecutor.execute2Resp("chain7", "arg");
Assertions.assertTrue(response.isSuccess());
Assertions.assertEquals("x==>a==>b", response.getExecuteStepStr());
// 获得被淘汰chain
String chainId = getEvictedChain(strings);
testEvicted(chainId);
// chain7进入缓存
Assertions.assertTrue(getCache().asMap().containsKey("chain7"));
// 更新chain7
LiteFlowChainELBuilder
.createChain()
.setChainId("chain7")
.setEL("THEN(a, b, c)")
.build();
// 重新执行chain7
response = flowExecutor.execute2Resp("chain7", "arg");
Assertions.assertTrue(response.isSuccess());
Assertions.assertEquals("a==>b==>c", response.getExecuteStepStr());
}
// 测试开启规则缓存后进入缓存的chain被移除后无法执行
@Test
public void testChainCache5() {
loadCache();
// 缓存快照
HashSet<@NonNull String> strings = CollUtil.newHashSet(getCache().asMap().keySet());
LiteflowResponse response = flowExecutor.execute2Resp("chain7", "arg");
Assertions.assertTrue(response.isSuccess());
Assertions.assertEquals("x==>a==>b", response.getExecuteStepStr());
// 获得被淘汰chain
String chainId = getEvictedChain(strings);
testEvicted(chainId);
// chain7进入缓存
Assertions.assertTrue(getCache().asMap().containsKey("chain7"));
// 手动移除chain7
FlowBus.removeChain("chain7");
response = flowExecutor.execute2Resp("chain7", "arg");
Assertions.assertFalse(response.isSuccess());
Assertions.assertEquals(ChainNotFoundException.class, response.getCause().getClass());
}
// 测试并发下正在执行的chain的condition被清理但仍能执行
@Test
public void testChainCache6() throws InterruptedException {
// 模拟清空编译好的chain
Thread thread = new Thread(()-> {
Chain chain1 = FlowBus.getChain("chain1");
// 绕过初次编译
chain1.setCompiled(true);
chain1.setConditionList(null);
});
thread.start();
thread.join();
LiteflowResponse response = flowExecutor.execute2Resp("chain1", "arg");
Assertions.assertTrue(response.isSuccess());
Assertions.assertEquals("a==>b", response.getExecuteStepStr());
}
// 加载缓存, chain1~chain5
private void loadCache() {
// 容量上限为5
for (int i = 1; i <= 5; i++) {
flowExecutor.execute2Resp("chain" + i);
}
}
// 测试 chain 被淘汰
private void testEvicted(String chanId) {
Chain chain = FlowBus.getChain(chanId);
getCache().cleanUp();
// 测试缓存中不存在
Assertions.assertFalse(getCache().asMap().containsKey(chanId));
// 测试chain被设置为未编译
Assertions.assertFalse(chain.isCompiled());
Assertions.assertNull(chain.getConditionList());
}
public Cache<String, ChainCacheLifeCycle.ChainState> getCache() {
return ChainCacheLifeCycle.getLifeCycle().getCache();
}
// 获得淘汰的chain传入淘汰前的chain集合
// 确保只有一个被淘汰时使用
String getEvictedChain(Set<String> set) {
Cache<String, ChainCacheLifeCycle.ChainState> cache = getCache();
cache.cleanUp();
Set<@NonNull String> strings = cache.asMap().keySet();
set.removeAll(strings);
Assertions.assertEquals(1, set.size());
return set.iterator().next();
}
}

View File

@@ -0,0 +1,20 @@
/**
* <p>Title: liteflow</p>
* <p>Description: 轻量级的组件式流程框架</p>
* @author Bryan.Zhang
* @email weenyc31@163.com
* @Date 2020/4/1
*/
package com.yomahub.liteflow.test.chainCache.cmp;
import com.yomahub.liteflow.core.NodeComponent;
import org.springframework.stereotype.Component;
@Component("a")
public class ACmp extends NodeComponent {
@Override
public void process() {
System.out.println("ACmp executed!");
}
}

View File

@@ -0,0 +1,21 @@
/**
* <p>Title: liteflow</p>
* <p>Description: 轻量级的组件式流程框架</p>
* @author Bryan.Zhang
* @email weenyc31@163.com
* @Date 2020/4/1
*/
package com.yomahub.liteflow.test.chainCache.cmp;
import com.yomahub.liteflow.core.NodeComponent;
import org.springframework.stereotype.Component;
@Component("b")
public class BCmp extends NodeComponent {
@Override
public void process() {
System.out.println("BCmp executed!");
}
}

View File

@@ -0,0 +1,20 @@
/**
* <p>Title: liteflow</p>
* <p>Description: 轻量级的组件式流程框架</p>
* @author Bryan.Zhang
* @email weenyc31@163.com
* @Date 2020/4/1
*/
package com.yomahub.liteflow.test.chainCache.cmp;
import com.yomahub.liteflow.core.NodeComponent;
import org.springframework.stereotype.Component;
@Component("c")
public class CCmp extends NodeComponent {
@Override
public void process() {
System.out.println("CCmp executed!");
}
}

View File

@@ -0,0 +1,12 @@
package com.yomahub.liteflow.test.chainCache.cmp;
import com.yomahub.liteflow.core.NodeBooleanComponent;
import org.springframework.stereotype.Component;
@Component("x")
public class XCmp extends NodeBooleanComponent {
@Override
public boolean processBoolean() throws Exception {
return true;
}
}

View File

@@ -0,0 +1,26 @@
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://www.springframework.org/schema/beans"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-4.0.xsd">
<context:component-scan base-package="com.yomahub.liteflow.test.chainCache.cmp" />
<bean id="springAware" class="com.yomahub.liteflow.spi.spring.SpringAware"/>
<bean class="com.yomahub.liteflow.spring.ComponentScanner"/>
<bean id="liteflowConfig" class="com.yomahub.liteflow.property.LiteflowConfig">
<property name="ruleSource" value="chainCache/flow.el.xml"/>
<property name="chainCacheEnabled" value="true"/>
<property name="chainCacheCapacity" value="5"/>
<property name="parseMode" value="PARSE_ONE_ON_FIRST_EXEC"/>
</bean>
<bean id="flowExecutor" class="com.yomahub.liteflow.core.FlowExecutor">
<constructor-arg name="liteflowConfig" ref="liteflowConfig"/>
</bean>
</beans>

View File

@@ -0,0 +1,43 @@
<?xml version="1.0" encoding="UTF-8"?>
<flow>
<chain name="chain1">
THEN(a, b);
</chain>
<chain name="chain2">
THEN(a, c);
</chain>
<chain name="chain3">
THEN(b, a);
</chain>
<chain name="chain4">
THEN(b, c);
</chain>
<chain name="chain5">
THEN(c, a);
</chain>
<chain name="chain6">
THEN(c, b);
</chain>
<chain name="chain7">
THEN(IF(x, a), b);
</chain>
<chain name="chain8">
THEN(IF(x, a), c);
</chain>
<chain name="chain9">
WHEN(a, b, c);
</chain>
<chain name="chain10">
FOR(5).DO(THEN(a, b, c));
</chain>
</flow>

View File

@@ -58,7 +58,7 @@
<zkclient.version>0.10</zkclient.version>
<jetcd.version>0.7.3</jetcd.version>
<nacos.version>1.4.4</nacos.version>
<qlexpress.version>3.3.2</qlexpress.version>
<qlexpress.version>3.3.4</qlexpress.version>
<groovy.version>3.0.8</groovy.version>
<graalvm.version>22.0.0</graalvm.version>
<bytebuddy.version>1.14.10</bytebuddy.version>
@@ -81,6 +81,7 @@
<dynamic-datasource.version>4.3.1</dynamic-datasource.version>
<sharding-jdbc.version>4.1.1</sharding-jdbc.version>
<apache-commons-test.version>1.13.1</apache-commons-test.version>
<caffeine.version>2.9.3</caffeine.version>
</properties>
<dependencyManagement>
@@ -339,6 +340,11 @@
<artifactId>commons-text</artifactId>
<version>${apache-commons-test.version}</version>
</dependency>
<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
<version>${caffeine.version}</version>
</dependency>
</dependencies>
</dependencyManagement>