feature #IAY66T 修复并发下chain清理问题

This commit is contained in:
DaleLee
2025-02-16 18:14:40 +08:00
parent 43d5b3d529
commit 316a539699
6 changed files with 419 additions and 392 deletions

View File

@@ -254,10 +254,16 @@ public class Chain implements Executable{
tempChain.setEl(el);
tempChain.setCompiled(false);
LiteFlowChainELBuilder.buildUnCompileChain(tempChain);
// 移除临时chain
FlowBus.removeChain(tempChainId);
List<Condition> tempConditionList = tempChain.getConditionList();
this.conditionList = tempChain.getConditionList();
this.isCompiled = true;
// 移除临时chain
FlowBus.removeChain(tempChainId);
// if (true) {
// throw new RuntimeException("test...");
// }
if (CollUtil.isEmpty(tempConditionList)) {
throw new FlowSystemException("no conditionList in this chain[" + chainId + "]");
}

View File

@@ -1,27 +1,31 @@
package com.yomahub.liteflow.lifecycle.impl;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.ObjectUtil;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.RemovalCause;
import com.github.benmanes.caffeine.cache.RemovalListener;
import com.yomahub.liteflow.flow.FlowBus;
import com.yomahub.liteflow.flow.element.Chain;
import com.yomahub.liteflow.flow.element.Condition;
import com.yomahub.liteflow.lifecycle.PostProcessChainExecuteLifeCycle;
import com.yomahub.liteflow.meta.LiteflowMetaOperator;
import com.yomahub.liteflow.slot.Slot;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import java.util.List;
/**
* Chain 缓存处理
* @author DaleLee
* @since 2.13.0
*/
public class RuleCacheLifeCycle implements PostProcessChainExecuteLifeCycle {
// 缓存
private final Cache<String, Object> cache;
// 在缓存中与key关联的虚拟值
private static final Object PRESENT = new Object();
/**
* 缓存
*/
private final Cache<String, ChainState> cache;
public RuleCacheLifeCycle(int capacity) {
this.cache = Caffeine.newBuilder()
@@ -32,52 +36,113 @@ public class RuleCacheLifeCycle implements PostProcessChainExecuteLifeCycle {
@Override
public void postProcessBeforeChainExecute(String chainId, Slot slot) {
// 记录chainId在缓存中
// 这里不记录实际的chain是因为chainId对应的chain之后有可能在FlowBus中被移除
// 或被更新替换以FlowBus中实际存在的chain为准
cache.get(chainId, key -> PRESENT);
// 记录 chainId 在缓存中
// 初始状态为 ACTIVE
cache.get(chainId, key -> new ChainState(State.ACTIVE));
}
@Override
public void postProcessAfterChainExecute(String chainId, Slot slot) {
// chain执行时有可能在未编译前就被淘汰
// 结果使被淘汰的chain仍持有condition淘汰后就立刻编译
// 这里做兜底操作,执行完后再次判断其是否在缓存中
// 若不在则清空chain的condition
// ConcurrentMap<@NonNull String, @NonNull Object> concurrentMap = cache.asMap();
// concurrentMap.computeIfAbsent(chainId, key -> {
// cleanChain(key);
// return null;
// });
}
public Cache<String, Object> getCache() {
return cache;
}
/**
* 监听在缓存中被移除的chain
*/
private static class ChainRemovalListener implements RemovalListener<String, Object> {
@Override
public void onRemoval(@Nullable String chainId, @Nullable Object object, @NonNull RemovalCause removalCause) {
// 不在缓存中、或出于非活跃状态,但未被清理
if (!isActive(chainId) && !isCleaned(chainId)) {
cleanChain(chainId);
}
}
/**
* Chain 状态枚举
*/
public enum State {
/**
* 活跃状态
*/
ACTIVE,
/**
* 非活跃状态 (处于淘汰流程中)
*/
INACTIVE
}
/**
* Chain 在缓存中状态
*/
public static class ChainState {
/**
* Chain 状态
*/
private State state;
public ChainState(State state) {
this.state = state;
}
public State getState() {
return state;
}
public void setState(State state) {
this.state = state;
}
}
/**
* 监听在缓存中被移除的 Chain
*/
private static class ChainRemovalListener implements RemovalListener<String, ChainState> {
@Override
public void onRemoval(@Nullable String chainId, @Nullable ChainState chainState, @NonNull RemovalCause removalCause) {
if (ObjectUtil.isNotNull(chainState)) {
chainState.setState(State.INACTIVE);
}
cleanChain(chainId);
}
}
/**
* 获取缓存
* @return cache
*/
public Cache<String, ChainState> getCache() {
return cache;
}
/**
* 判断 Chain 的 Condition 是否被清理
* @param chainId chainId
* @return 被清理返回 true否则返回 false
*/
private boolean isCleaned(String chainId) {
Chain chain = LiteflowMetaOperator.getChain(chainId);
if (ObjectUtil.isNull(chain)) {
return true;
}
List<Condition> conditionList = chain.getConditionList();
return CollUtil.isEmpty(conditionList);
}
/**
* 判断 Chain 在缓存中是活跃状态
* @param chainId chainId
* @return 活跃状态返回 true不在缓存中或处于非活状态返回 false
*/
private boolean isActive(String chainId) {
ChainState chainState = cache.getIfPresent(chainId);
return ObjectUtil.isNotNull(chainState)
&& State.ACTIVE.equals(chainState.getState());
}
/**
* 清理 Chain 的 Condition
* @param chainId chainId
*/
private static void cleanChain(String chainId) {
Chain chain = FlowBus.getChain(chainId);
Chain chain = LiteflowMetaOperator.getChain(chainId);
// chain可能已经在FlowBus中被移除了
if (ObjectUtil.isNull(chain)) {
return;
}
// if (CollUtil.isEmpty(chain.getConditionList())) {
// return;
// }
// 将chain设置为未编译并清空condition
chain.setCompiled(false);
chain.setConditionList(null);

View File

@@ -11,7 +11,7 @@ import com.yomahub.liteflow.flow.LiteflowResponse;
import com.yomahub.liteflow.flow.element.Chain;
import com.yomahub.liteflow.flow.element.Condition;
import com.yomahub.liteflow.lifecycle.LifeCycleHolder;
import com.yomahub.liteflow.lifecycle.PostProcessFlowExecuteLifeCycle;
import com.yomahub.liteflow.lifecycle.PostProcessChainExecuteLifeCycle;
import com.yomahub.liteflow.lifecycle.impl.RuleCacheLifeCycle;
import com.yomahub.liteflow.property.LiteflowConfig;
import com.yomahub.liteflow.test.BaseTest;
@@ -21,6 +21,7 @@ import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import javax.annotation.Resource;
import java.util.HashSet;
import java.util.List;
import java.util.Random;
@@ -50,7 +51,7 @@ public class RuleCacheTest extends BaseTest {
public void reload() {
flowExecutor.reloadRule();
// 清空缓存
Cache<String, Object> cache = getCache();
Cache<String, RuleCacheLifeCycle.ChainState> cache = getCache();
cache.invalidateAll();
cache.cleanUp();
}
@@ -73,15 +74,20 @@ public class RuleCacheTest extends BaseTest {
Assertions.assertTrue(response.isSuccess());
}
// 测试缓存数量
// 测试缓存数量上限(串行)
@Test
public void testRuleCache2() {
// 确保至少执行过5个不同的chain
loadCache();
// 随机执行chain
loadCache(100);
Random random = new Random();
for (int i = 0; i < 100; i++) {
int id = random.nextInt(10) + 1;
flowExecutor.execute2Resp("chain" + id);
}
// 等待缓存淘汰
getCache().cleanUp();
Assertions.assertEquals(10, FlowBus.getChainMap().size());
// 测试只有5个chain被编译
int count = 0;
for (Chain chain : FlowBus.getChainMap().values()) {
@@ -96,72 +102,9 @@ public class RuleCacheTest extends BaseTest {
Assertions.assertTrue(count <= 5);
}
// 测试开启规则缓存后进入缓存的chain可以正常被更新
// 测试缓存数量上限(并行)
@Test
public void testRuleCache3() {
loadCache();
// 缓存快照
HashSet<@NonNull String> strings = CollUtil.newHashSet(getCache().asMap().keySet());
LiteflowResponse response = flowExecutor.execute2Resp("chain7", "arg");
Assertions.assertTrue(response.isSuccess());
Assertions.assertEquals("x==>a==>b", response.getExecuteStepStr());
// chain7进入缓存
Assertions.assertTrue(getCache().asMap().containsKey("chain7"));
// 获得被淘汰chain
String chainId = getEvictedChain(strings);
testEvicted(chainId);
// 更新chain7
LiteFlowChainELBuilder
.createChain()
.setChainId("chain7")
.setEL("THEN(a, b, c)")
.build();
// 重新执行chain7
response = flowExecutor.execute2Resp("chain7", "arg");
Assertions.assertTrue(response.isSuccess());
Assertions.assertEquals("a==>b==>c", response.getExecuteStepStr());
}
// 测试开启规则缓存后进入缓存的chain被移除后无法执行
@Test
public void testRuleCache4() {
loadCache();
// 缓存快照
HashSet<@NonNull String> strings = CollUtil.newHashSet(getCache().asMap().keySet());
LiteflowResponse response = flowExecutor.execute2Resp("chain7", "arg");
Assertions.assertTrue(response.isSuccess());
Assertions.assertEquals("x==>a==>b", response.getExecuteStepStr());
// chain7进入缓存
Assertions.assertTrue(getCache().asMap().containsKey("chain7"));
// 获得被淘汰chain
String chainId = getEvictedChain(strings);
testEvicted(chainId);
// 手动移除chain7
FlowBus.removeChain("chain7");
response = flowExecutor.execute2Resp("chain7", "arg");
Assertions.assertFalse(response.isSuccess());
Assertions.assertEquals(ChainNotFoundException.class, response.getCause().getClass());
}
// 测试并发下正在执行的chain被淘汰仍能执行
@Test
public void testRuleCache5() throws InterruptedException {
// 模拟清空编译好的chain
Thread thread = new Thread(()-> {
Chain chain1 = FlowBus.getChain("chain1");
chain1.setCompiled(true);
chain1.setConditionList(null);
});
thread.start();
thread.join();
LiteflowResponse response = flowExecutor.execute2Resp("chain1", "arg");
Assertions.assertTrue(response.isSuccess());
Assertions.assertEquals("a==>b", response.getExecuteStepStr());
}
// 测试开启规则缓存后并发执行chain
@Test
public void test6() {
loadCache();
Random random = new Random();
List<Future<LiteflowResponse>> futureList = CollUtil.newArrayList();
@@ -181,9 +124,8 @@ public class RuleCacheTest extends BaseTest {
// 等待缓存淘汰
getCache().cleanUp();
// 测试只有5个chain被编译
Assertions.assertEquals(10, FlowBus.getChainMap().size());
// 测试只有5个chain被编译
int count = 0;
for (Chain chain : FlowBus.getChainMap().values()) {
List<Condition> conditionList = chain.getConditionList();
@@ -197,6 +139,70 @@ public class RuleCacheTest extends BaseTest {
Assertions.assertTrue(count <= 5);
}
// 测试开启规则缓存后进入缓存的chain可以正常被更新
@Test
public void testRuleCache4() {
loadCache();
// 缓存快照
HashSet<@NonNull String> strings = CollUtil.newHashSet(getCache().asMap().keySet());
LiteflowResponse response = flowExecutor.execute2Resp("chain7", "arg");
Assertions.assertTrue(response.isSuccess());
Assertions.assertEquals("x==>a==>b", response.getExecuteStepStr());
// 获得被淘汰chain
String chainId = getEvictedChain(strings);
testEvicted(chainId);
// chain7进入缓存
Assertions.assertTrue(getCache().asMap().containsKey("chain7"));
// 更新chain7
LiteFlowChainELBuilder
.createChain()
.setChainId("chain7")
.setEL("THEN(a, b, c)")
.build();
// 重新执行chain7
response = flowExecutor.execute2Resp("chain7", "arg");
Assertions.assertTrue(response.isSuccess());
Assertions.assertEquals("a==>b==>c", response.getExecuteStepStr());
}
// 测试开启规则缓存后进入缓存的chain被移除后无法执行
@Test
public void testRuleCache5() {
loadCache();
// 缓存快照
HashSet<@NonNull String> strings = CollUtil.newHashSet(getCache().asMap().keySet());
LiteflowResponse response = flowExecutor.execute2Resp("chain7", "arg");
Assertions.assertTrue(response.isSuccess());
Assertions.assertEquals("x==>a==>b", response.getExecuteStepStr());
// 获得被淘汰chain
String chainId = getEvictedChain(strings);
testEvicted(chainId);
// chain7进入缓存
Assertions.assertTrue(getCache().asMap().containsKey("chain7"));
// 手动移除chain7
FlowBus.removeChain("chain7");
response = flowExecutor.execute2Resp("chain7", "arg");
Assertions.assertFalse(response.isSuccess());
Assertions.assertEquals(ChainNotFoundException.class, response.getCause().getClass());
}
// 测试并发下正在执行的chain的condition被清理但仍能执行
@Test
public void testRuleCache6() throws InterruptedException {
// 模拟清空编译好的chain
Thread thread = new Thread(()-> {
Chain chain1 = FlowBus.getChain("chain1");
// 绕过初次编译
chain1.setCompiled(true);
chain1.setConditionList(null);
});
thread.start();
thread.join();
LiteflowResponse response = flowExecutor.execute2Resp("chain1", "arg");
Assertions.assertTrue(response.isSuccess());
Assertions.assertEquals("a==>b", response.getExecuteStepStr());
}
// 加载缓存, chain1~chain5
private void loadCache() {
// 容量上限为5
@@ -205,15 +211,6 @@ public class RuleCacheTest extends BaseTest {
}
}
private void loadCache(int count) {
// 随机执行chain
Random random = new Random();
for (int i = 0; i < count; i++) {
int id = random.nextInt(10) + 1;
flowExecutor.execute2Resp("chain" + id);
}
}
// 测试 chain 被淘汰
private void testEvicted(String chanId) {
Chain chain = FlowBus.getChain(chanId);
@@ -225,10 +222,10 @@ public class RuleCacheTest extends BaseTest {
Assertions.assertNull(chain.getConditionList());
}
public Cache<String, Object> getCache() {
List<PostProcessFlowExecuteLifeCycle> lifeCycleList
= LifeCycleHolder.getPostProcessFlowExecuteLifeCycleList();
for (PostProcessFlowExecuteLifeCycle lifeCycle : lifeCycleList) {
public Cache<String, RuleCacheLifeCycle.ChainState> getCache() {
List<PostProcessChainExecuteLifeCycle> lifeCycleList
= LifeCycleHolder.getPostProcessChainExecuteLifeCycleList();
for (PostProcessChainExecuteLifeCycle lifeCycle : lifeCycleList) {
if (lifeCycle.getClass().equals(RuleCacheLifeCycle.class)) {
RuleCacheLifeCycle ruleCacheLifeCycle = (RuleCacheLifeCycle) lifeCycle;
return ruleCacheLifeCycle.getCache();
@@ -240,12 +237,11 @@ public class RuleCacheTest extends BaseTest {
// 获得淘汰的chain传入淘汰前的chain集合
// 确保只有一个被淘汰时使用
String getEvictedChain(Set<String> set) {
Cache<String, Object> cache = getCache();
Cache<String, RuleCacheLifeCycle.ChainState> cache = getCache();
cache.cleanUp();
Set<@NonNull String> strings = cache.asMap().keySet();
set.removeAll(strings);
Assertions.assertEquals(1, set.size());
return set.iterator().next();
}
}

View File

@@ -10,7 +10,7 @@ import com.yomahub.liteflow.flow.LiteflowResponse;
import com.yomahub.liteflow.flow.element.Chain;
import com.yomahub.liteflow.flow.element.Condition;
import com.yomahub.liteflow.lifecycle.LifeCycleHolder;
import com.yomahub.liteflow.lifecycle.PostProcessFlowExecuteLifeCycle;
import com.yomahub.liteflow.lifecycle.PostProcessChainExecuteLifeCycle;
import com.yomahub.liteflow.lifecycle.impl.RuleCacheLifeCycle;
import com.yomahub.liteflow.test.BaseTest;
import org.checkerframework.checker.nullness.qual.NonNull;
@@ -44,7 +44,7 @@ public class RuleCacheSolonTest extends BaseTest {
public void reload() {
flowExecutor.reloadRule();
// 清空缓存
Cache<String, Object> cache = getCache();
Cache<String, RuleCacheLifeCycle.ChainState> cache = getCache();
cache.invalidateAll();
cache.cleanUp();
}
@@ -67,15 +67,20 @@ public class RuleCacheSolonTest extends BaseTest {
Assertions.assertTrue(response.isSuccess());
}
// 测试缓存数量
// 测试缓存数量上限(串行)
@Test
public void testRuleCache2() {
// 确保至少执行过5个不同的chain
loadCache();
// 随机执行chain
loadCache(100);
Random random = new Random();
for (int i = 0; i < 100; i++) {
int id = random.nextInt(10) + 1;
flowExecutor.execute2Resp("chain" + id);
}
// 等待缓存淘汰
getCache().cleanUp();
Assertions.assertEquals(10, FlowBus.getChainMap().size());
// 测试只有5个chain被编译
int count = 0;
for (Chain chain : FlowBus.getChainMap().values()) {
@@ -90,72 +95,9 @@ public class RuleCacheSolonTest extends BaseTest {
Assertions.assertTrue(count <= 5);
}
// 测试开启规则缓存后进入缓存的chain可以正常被更新
// 测试缓存数量上限(并行)
@Test
public void testRuleCache3() {
loadCache();
// 缓存快照
HashSet<@NonNull String> strings = CollUtil.newHashSet(getCache().asMap().keySet());
LiteflowResponse response = flowExecutor.execute2Resp("chain7", "arg");
Assertions.assertTrue(response.isSuccess());
Assertions.assertEquals("x==>a==>b", response.getExecuteStepStr());
// chain7进入缓存
Assertions.assertTrue(getCache().asMap().containsKey("chain7"));
// 获得被淘汰chain
String chainId = getEvictedChain(strings);
testEvicted(chainId);
// 更新chain7
LiteFlowChainELBuilder
.createChain()
.setChainId("chain7")
.setEL("THEN(a, b, c)")
.build();
// 重新执行chain7
response = flowExecutor.execute2Resp("chain7", "arg");
Assertions.assertTrue(response.isSuccess());
Assertions.assertEquals("a==>b==>c", response.getExecuteStepStr());
}
// 测试开启规则缓存后进入缓存的chain被移除后无法执行
@Test
public void testRuleCache4() {
loadCache();
// 缓存快照
HashSet<@NonNull String> strings = CollUtil.newHashSet(getCache().asMap().keySet());
LiteflowResponse response = flowExecutor.execute2Resp("chain7", "arg");
Assertions.assertTrue(response.isSuccess());
Assertions.assertEquals("x==>a==>b", response.getExecuteStepStr());
// chain7进入缓存
Assertions.assertTrue(getCache().asMap().containsKey("chain7"));
// 获得被淘汰chain
String chainId = getEvictedChain(strings);
testEvicted(chainId);
// 手动移除chain7
FlowBus.removeChain("chain7");
response = flowExecutor.execute2Resp("chain7", "arg");
Assertions.assertFalse(response.isSuccess());
Assertions.assertEquals(ChainNotFoundException.class, response.getCause().getClass());
}
// 测试并发下正在执行的chain被淘汰仍能执行
@Test
public void testRuleCache5() throws InterruptedException {
// 模拟清空编译好的chain
Thread thread = new Thread(()-> {
Chain chain1 = FlowBus.getChain("chain1");
chain1.setCompiled(true);
chain1.setConditionList(null);
});
thread.start();
thread.join();
LiteflowResponse response = flowExecutor.execute2Resp("chain1", "arg");
Assertions.assertTrue(response.isSuccess());
Assertions.assertEquals("a==>b", response.getExecuteStepStr());
}
// 测试开启规则缓存后并发执行chain
@Test
public void test6() {
loadCache();
Random random = new Random();
List<Future<LiteflowResponse>> futureList = CollUtil.newArrayList();
@@ -175,9 +117,8 @@ public class RuleCacheSolonTest extends BaseTest {
// 等待缓存淘汰
getCache().cleanUp();
// 测试只有5个chain被编译
Assertions.assertEquals(10, FlowBus.getChainMap().size());
// 测试只有5个chain被编译
int count = 0;
for (Chain chain : FlowBus.getChainMap().values()) {
List<Condition> conditionList = chain.getConditionList();
@@ -191,6 +132,70 @@ public class RuleCacheSolonTest extends BaseTest {
Assertions.assertTrue(count <= 5);
}
// 测试开启规则缓存后进入缓存的chain可以正常被更新
@Test
public void testRuleCache4() {
loadCache();
// 缓存快照
HashSet<@NonNull String> strings = CollUtil.newHashSet(getCache().asMap().keySet());
LiteflowResponse response = flowExecutor.execute2Resp("chain7", "arg");
Assertions.assertTrue(response.isSuccess());
Assertions.assertEquals("x==>a==>b", response.getExecuteStepStr());
// 获得被淘汰chain
String chainId = getEvictedChain(strings);
testEvicted(chainId);
// chain7进入缓存
Assertions.assertTrue(getCache().asMap().containsKey("chain7"));
// 更新chain7
LiteFlowChainELBuilder
.createChain()
.setChainId("chain7")
.setEL("THEN(a, b, c)")
.build();
// 重新执行chain7
response = flowExecutor.execute2Resp("chain7", "arg");
Assertions.assertTrue(response.isSuccess());
Assertions.assertEquals("a==>b==>c", response.getExecuteStepStr());
}
// 测试开启规则缓存后进入缓存的chain被移除后无法执行
@Test
public void testRuleCache5() {
loadCache();
// 缓存快照
HashSet<@NonNull String> strings = CollUtil.newHashSet(getCache().asMap().keySet());
LiteflowResponse response = flowExecutor.execute2Resp("chain7", "arg");
Assertions.assertTrue(response.isSuccess());
Assertions.assertEquals("x==>a==>b", response.getExecuteStepStr());
// 获得被淘汰chain
String chainId = getEvictedChain(strings);
testEvicted(chainId);
// chain7进入缓存
Assertions.assertTrue(getCache().asMap().containsKey("chain7"));
// 手动移除chain7
FlowBus.removeChain("chain7");
response = flowExecutor.execute2Resp("chain7", "arg");
Assertions.assertFalse(response.isSuccess());
Assertions.assertEquals(ChainNotFoundException.class, response.getCause().getClass());
}
// 测试并发下正在执行的chain的condition被清理但仍能执行
@Test
public void testRuleCache6() throws InterruptedException {
// 模拟清空编译好的chain
Thread thread = new Thread(()-> {
Chain chain1 = FlowBus.getChain("chain1");
// 绕过初次编译
chain1.setCompiled(true);
chain1.setConditionList(null);
});
thread.start();
thread.join();
LiteflowResponse response = flowExecutor.execute2Resp("chain1", "arg");
Assertions.assertTrue(response.isSuccess());
Assertions.assertEquals("a==>b", response.getExecuteStepStr());
}
// 加载缓存, chain1~chain5
private void loadCache() {
// 容量上限为5
@@ -199,15 +204,6 @@ public class RuleCacheSolonTest extends BaseTest {
}
}
private void loadCache(int count) {
// 随机执行chain
Random random = new Random();
for (int i = 0; i < count; i++) {
int id = random.nextInt(10) + 1;
flowExecutor.execute2Resp("chain" + id);
}
}
// 测试 chain 被淘汰
private void testEvicted(String chanId) {
Chain chain = FlowBus.getChain(chanId);
@@ -219,10 +215,10 @@ public class RuleCacheSolonTest extends BaseTest {
Assertions.assertNull(chain.getConditionList());
}
public Cache<String, Object> getCache() {
List<PostProcessFlowExecuteLifeCycle> lifeCycleList
= LifeCycleHolder.getPostProcessFlowExecuteLifeCycleList();
for (PostProcessFlowExecuteLifeCycle lifeCycle : lifeCycleList) {
public Cache<String, RuleCacheLifeCycle.ChainState> getCache() {
List<PostProcessChainExecuteLifeCycle> lifeCycleList
= LifeCycleHolder.getPostProcessChainExecuteLifeCycleList();
for (PostProcessChainExecuteLifeCycle lifeCycle : lifeCycleList) {
if (lifeCycle.getClass().equals(RuleCacheLifeCycle.class)) {
RuleCacheLifeCycle ruleCacheLifeCycle = (RuleCacheLifeCycle) lifeCycle;
return ruleCacheLifeCycle.getCache();
@@ -234,7 +230,7 @@ public class RuleCacheSolonTest extends BaseTest {
// 获得淘汰的chain传入淘汰前的chain集合
// 确保只有一个被淘汰时使用
String getEvictedChain(Set<String> set) {
Cache<String, Object> cache = getCache();
Cache<String, RuleCacheLifeCycle.ChainState> cache = getCache();
cache.cleanUp();
Set<@NonNull String> strings = cache.asMap().keySet();
set.removeAll(strings);

View File

@@ -11,7 +11,6 @@ import com.yomahub.liteflow.flow.element.Chain;
import com.yomahub.liteflow.flow.element.Condition;
import com.yomahub.liteflow.lifecycle.LifeCycleHolder;
import com.yomahub.liteflow.lifecycle.PostProcessChainExecuteLifeCycle;
import com.yomahub.liteflow.lifecycle.PostProcessFlowExecuteLifeCycle;
import com.yomahub.liteflow.lifecycle.impl.RuleCacheLifeCycle;
import com.yomahub.liteflow.test.BaseTest;
import org.checkerframework.checker.nullness.qual.NonNull;
@@ -48,7 +47,7 @@ public class RuleCacheSpringbootTest extends BaseTest {
public void reload() {
flowExecutor.reloadRule();
// 清空缓存
Cache<String, Object> cache = getCache();
Cache<String, RuleCacheLifeCycle.ChainState> cache = getCache();
cache.invalidateAll();
cache.cleanUp();
}
@@ -71,15 +70,20 @@ public class RuleCacheSpringbootTest extends BaseTest {
Assertions.assertTrue(response.isSuccess());
}
// 测试缓存数量
// 测试缓存数量上限(串行)
@Test
public void testRuleCache2() {
// 确保至少执行过5个不同的chain
loadCache();
// 随机执行chain
loadCache(100);
Random random = new Random();
for (int i = 0; i < 100; i++) {
int id = random.nextInt(10) + 1;
flowExecutor.execute2Resp("chain" + id);
}
// 等待缓存淘汰
getCache().cleanUp();
Assertions.assertEquals(10, FlowBus.getChainMap().size());
// 测试只有5个chain被编译
int count = 0;
for (Chain chain : FlowBus.getChainMap().values()) {
@@ -91,76 +95,12 @@ public class RuleCacheSpringbootTest extends BaseTest {
Assertions.assertNull(conditionList);
}
}
//Assertions.assertTrue(count <= 5);
Assertions.assertEquals(count, 5);
Assertions.assertTrue(count <= 5);
}
// 测试开启规则缓存后进入缓存的chain可以正常被更新
// 测试缓存数量上限(并行)
@Test
public void testRuleCache3() {
loadCache();
// 缓存快照
HashSet<@NonNull String> strings = CollUtil.newHashSet(getCache().asMap().keySet());
LiteflowResponse response = flowExecutor.execute2Resp("chain7", "arg");
Assertions.assertTrue(response.isSuccess());
Assertions.assertEquals("x==>a==>b", response.getExecuteStepStr());
// chain7进入缓存
Assertions.assertTrue(getCache().asMap().containsKey("chain7"));
// 获得被淘汰chain
String chainId = getEvictedChain(strings);
testEvicted(chainId);
// 更新chain7
LiteFlowChainELBuilder
.createChain()
.setChainId("chain7")
.setEL("THEN(a, b, c)")
.build();
// 重新执行chain7
response = flowExecutor.execute2Resp("chain7", "arg");
Assertions.assertTrue(response.isSuccess());
Assertions.assertEquals("a==>b==>c", response.getExecuteStepStr());
}
// 测试开启规则缓存后进入缓存的chain被移除后无法执行
@Test
public void testRuleCache4() {
loadCache();
// 缓存快照
HashSet<@NonNull String> strings = CollUtil.newHashSet(getCache().asMap().keySet());
LiteflowResponse response = flowExecutor.execute2Resp("chain7", "arg");
Assertions.assertTrue(response.isSuccess());
Assertions.assertEquals("x==>a==>b", response.getExecuteStepStr());
// chain7进入缓存
Assertions.assertTrue(getCache().asMap().containsKey("chain7"));
// 获得被淘汰chain
String chainId = getEvictedChain(strings);
testEvicted(chainId);
// 手动移除chain7
FlowBus.removeChain("chain7");
response = flowExecutor.execute2Resp("chain7", "arg");
Assertions.assertFalse(response.isSuccess());
Assertions.assertEquals(ChainNotFoundException.class, response.getCause().getClass());
}
// 测试并发下正在执行的chain被淘汰仍能执行
@Test
public void testRuleCache5() throws InterruptedException {
// 模拟清空编译好的chain
Thread thread = new Thread(()-> {
Chain chain1 = FlowBus.getChain("chain1");
chain1.setCompiled(true);
chain1.setConditionList(null);
});
thread.start();
thread.join();
LiteflowResponse response = flowExecutor.execute2Resp("chain1", "arg");
Assertions.assertTrue(response.isSuccess());
Assertions.assertEquals("a==>b", response.getExecuteStepStr());
}
// 测试开启规则缓存后并发执行chain
@Test
public void test6() {
loadCache();
Random random = new Random();
List<Future<LiteflowResponse>> futureList = CollUtil.newArrayList();
@@ -180,9 +120,8 @@ public class RuleCacheSpringbootTest extends BaseTest {
// 等待缓存淘汰
getCache().cleanUp();
// 测试只有5个chain被编译
Assertions.assertEquals(10, FlowBus.getChainMap().size());
// 测试只有5个chain被编译
int count = 0;
for (Chain chain : FlowBus.getChainMap().values()) {
List<Condition> conditionList = chain.getConditionList();
@@ -193,15 +132,71 @@ public class RuleCacheSpringbootTest extends BaseTest {
Assertions.assertNull(conditionList);
}
}
//Assertions.assertTrue(count <= 5);
Assertions.assertEquals(5, count);
Assertions.assertTrue(count <= 5);
}
// 测试开启规则缓存后进入缓存的chain可以正常被更新
@Test
public void test7() {
for (int i = 0; i < 1000; i++) {
test6();
}
public void testRuleCache4() {
loadCache();
// 缓存快照
HashSet<@NonNull String> strings = CollUtil.newHashSet(getCache().asMap().keySet());
LiteflowResponse response = flowExecutor.execute2Resp("chain7", "arg");
Assertions.assertTrue(response.isSuccess());
Assertions.assertEquals("x==>a==>b", response.getExecuteStepStr());
// 获得被淘汰chain
String chainId = getEvictedChain(strings);
testEvicted(chainId);
// chain7进入缓存
Assertions.assertTrue(getCache().asMap().containsKey("chain7"));
// 更新chain7
LiteFlowChainELBuilder
.createChain()
.setChainId("chain7")
.setEL("THEN(a, b, c)")
.build();
// 重新执行chain7
response = flowExecutor.execute2Resp("chain7", "arg");
Assertions.assertTrue(response.isSuccess());
Assertions.assertEquals("a==>b==>c", response.getExecuteStepStr());
}
// 测试开启规则缓存后进入缓存的chain被移除后无法执行
@Test
public void testRuleCache5() {
loadCache();
// 缓存快照
HashSet<@NonNull String> strings = CollUtil.newHashSet(getCache().asMap().keySet());
LiteflowResponse response = flowExecutor.execute2Resp("chain7", "arg");
Assertions.assertTrue(response.isSuccess());
Assertions.assertEquals("x==>a==>b", response.getExecuteStepStr());
// 获得被淘汰chain
String chainId = getEvictedChain(strings);
testEvicted(chainId);
// chain7进入缓存
Assertions.assertTrue(getCache().asMap().containsKey("chain7"));
// 手动移除chain7
FlowBus.removeChain("chain7");
response = flowExecutor.execute2Resp("chain7", "arg");
Assertions.assertFalse(response.isSuccess());
Assertions.assertEquals(ChainNotFoundException.class, response.getCause().getClass());
}
// 测试并发下正在执行的chain的condition被清理但仍能执行
@Test
public void testRuleCache6() throws InterruptedException {
// 模拟清空编译好的chain
Thread thread = new Thread(()-> {
Chain chain1 = FlowBus.getChain("chain1");
// 绕过初次编译
chain1.setCompiled(true);
chain1.setConditionList(null);
});
thread.start();
thread.join();
LiteflowResponse response = flowExecutor.execute2Resp("chain1", "arg");
Assertions.assertTrue(response.isSuccess());
Assertions.assertEquals("a==>b", response.getExecuteStepStr());
}
// 加载缓存, chain1~chain5
@@ -212,15 +207,6 @@ public class RuleCacheSpringbootTest extends BaseTest {
}
}
private void loadCache(int count) {
// 随机执行chain
Random random = new Random();
for (int i = 0; i < count; i++) {
int id = random.nextInt(10) + 1;
flowExecutor.execute2Resp("chain" + id);
}
}
// 测试 chain 被淘汰
private void testEvicted(String chanId) {
Chain chain = FlowBus.getChain(chanId);
@@ -232,7 +218,7 @@ public class RuleCacheSpringbootTest extends BaseTest {
Assertions.assertNull(chain.getConditionList());
}
public Cache<String, Object> getCache() {
public Cache<String, RuleCacheLifeCycle.ChainState> getCache() {
List<PostProcessChainExecuteLifeCycle> lifeCycleList
= LifeCycleHolder.getPostProcessChainExecuteLifeCycleList();
for (PostProcessChainExecuteLifeCycle lifeCycle : lifeCycleList) {
@@ -247,7 +233,7 @@ public class RuleCacheSpringbootTest extends BaseTest {
// 获得淘汰的chain传入淘汰前的chain集合
// 确保只有一个被淘汰时使用
String getEvictedChain(Set<String> set) {
Cache<String, Object> cache = getCache();
Cache<String, RuleCacheLifeCycle.ChainState> cache = getCache();
cache.cleanUp();
Set<@NonNull String> strings = cache.asMap().keySet();
set.removeAll(strings);

View File

@@ -11,7 +11,6 @@ import com.yomahub.liteflow.flow.element.Chain;
import com.yomahub.liteflow.flow.element.Condition;
import com.yomahub.liteflow.lifecycle.LifeCycleHolder;
import com.yomahub.liteflow.lifecycle.PostProcessChainExecuteLifeCycle;
import com.yomahub.liteflow.lifecycle.PostProcessFlowExecuteLifeCycle;
import com.yomahub.liteflow.lifecycle.impl.RuleCacheLifeCycle;
import com.yomahub.liteflow.test.BaseTest;
import org.checkerframework.checker.nullness.qual.NonNull;
@@ -45,7 +44,7 @@ public class RuleCacheSpringTest extends BaseTest {
public void reload() {
flowExecutor.reloadRule();
// 清空缓存
Cache<String, Object> cache = getCache();
Cache<String, RuleCacheLifeCycle.ChainState> cache = getCache();
cache.invalidateAll();
cache.cleanUp();
}
@@ -68,15 +67,20 @@ public class RuleCacheSpringTest extends BaseTest {
Assertions.assertTrue(response.isSuccess());
}
// 测试缓存数量
// 测试缓存数量上限(串行)
@Test
public void testRuleCache2() {
// 确保至少执行过5个不同的chain
loadCache();
// 随机执行chain
loadCache(100);
Random random = new Random();
for (int i = 0; i < 100; i++) {
int id = random.nextInt(10) + 1;
flowExecutor.execute2Resp("chain" + id);
}
// 等待缓存淘汰
getCache().cleanUp();
Assertions.assertEquals(10, FlowBus.getChainMap().size());
// 测试只有5个chain被编译
int count = 0;
for (Chain chain : FlowBus.getChainMap().values()) {
@@ -91,72 +95,9 @@ public class RuleCacheSpringTest extends BaseTest {
Assertions.assertTrue(count <= 5);
}
// 测试开启规则缓存后进入缓存的chain可以正常被更新
// 测试缓存数量上限(并行)
@Test
public void testRuleCache3() {
loadCache();
// 缓存快照
HashSet<@NonNull String> strings = CollUtil.newHashSet(getCache().asMap().keySet());
LiteflowResponse response = flowExecutor.execute2Resp("chain7", "arg");
Assertions.assertTrue(response.isSuccess());
Assertions.assertEquals("x==>a==>b", response.getExecuteStepStr());
// chain7进入缓存
Assertions.assertTrue(getCache().asMap().containsKey("chain7"));
// 获得被淘汰chain
String chainId = getEvictedChain(strings);
testEvicted(chainId);
// 更新chain7
LiteFlowChainELBuilder
.createChain()
.setChainId("chain7")
.setEL("THEN(a, b, c)")
.build();
// 重新执行chain7
response = flowExecutor.execute2Resp("chain7", "arg");
Assertions.assertTrue(response.isSuccess());
Assertions.assertEquals("a==>b==>c", response.getExecuteStepStr());
}
// 测试开启规则缓存后进入缓存的chain被移除后无法执行
@Test
public void testRuleCache4() {
loadCache();
// 缓存快照
HashSet<@NonNull String> strings = CollUtil.newHashSet(getCache().asMap().keySet());
LiteflowResponse response = flowExecutor.execute2Resp("chain7", "arg");
Assertions.assertTrue(response.isSuccess());
Assertions.assertEquals("x==>a==>b", response.getExecuteStepStr());
// chain7进入缓存
Assertions.assertTrue(getCache().asMap().containsKey("chain7"));
// 获得被淘汰chain
String chainId = getEvictedChain(strings);
testEvicted(chainId);
// 手动移除chain7
FlowBus.removeChain("chain7");
response = flowExecutor.execute2Resp("chain7", "arg");
Assertions.assertFalse(response.isSuccess());
Assertions.assertEquals(ChainNotFoundException.class, response.getCause().getClass());
}
// 测试并发下正在执行的chain被淘汰仍能执行
@Test
public void testRuleCache5() throws InterruptedException {
// 模拟清空编译好的chain
Thread thread = new Thread(()-> {
Chain chain1 = FlowBus.getChain("chain1");
chain1.setCompiled(true);
chain1.setConditionList(null);
});
thread.start();
thread.join();
LiteflowResponse response = flowExecutor.execute2Resp("chain1", "arg");
Assertions.assertTrue(response.isSuccess());
Assertions.assertEquals("a==>b", response.getExecuteStepStr());
}
// 测试开启规则缓存后并发执行chain
@Test
public void test6() {
loadCache();
Random random = new Random();
List<Future<LiteflowResponse>> futureList = CollUtil.newArrayList();
@@ -176,9 +117,8 @@ public class RuleCacheSpringTest extends BaseTest {
// 等待缓存淘汰
getCache().cleanUp();
// 测试只有5个chain被编译
Assertions.assertEquals(10, FlowBus.getChainMap().size());
// 测试只有5个chain被编译
int count = 0;
for (Chain chain : FlowBus.getChainMap().values()) {
List<Condition> conditionList = chain.getConditionList();
@@ -186,26 +126,74 @@ public class RuleCacheSpringTest extends BaseTest {
Assertions.assertTrue(CollUtil.isNotEmpty(conditionList));
count++;
} else {
Assertions.assertTrue(CollUtil.isEmpty(conditionList));
Assertions.assertNull(conditionList);
}
}
//Assertions.assertTrue(count <= 5);
//System.out.println(getCache().asMap().size());
Assertions.assertEquals(5, count);
Assertions.assertTrue(count <= 5);
}
// 测试开启规则缓存后进入缓存的chain可以正常被更新
@Test
public void test7() {
for (int i = 0; i < 1000; i++) {
test6();
}
public void testRuleCache4() {
loadCache();
// 缓存快照
HashSet<@NonNull String> strings = CollUtil.newHashSet(getCache().asMap().keySet());
LiteflowResponse response = flowExecutor.execute2Resp("chain7", "arg");
Assertions.assertTrue(response.isSuccess());
Assertions.assertEquals("x==>a==>b", response.getExecuteStepStr());
// 获得被淘汰chain
String chainId = getEvictedChain(strings);
testEvicted(chainId);
// chain7进入缓存
Assertions.assertTrue(getCache().asMap().containsKey("chain7"));
// 更新chain7
LiteFlowChainELBuilder
.createChain()
.setChainId("chain7")
.setEL("THEN(a, b, c)")
.build();
// 重新执行chain7
response = flowExecutor.execute2Resp("chain7", "arg");
Assertions.assertTrue(response.isSuccess());
Assertions.assertEquals("a==>b==>c", response.getExecuteStepStr());
}
// 测试开启规则缓存后进入缓存的chain被移除后无法执行
@Test
public void test8() {
for (int i = 0; i < 1000; i++) {
testRuleCache2();
}
public void testRuleCache5() {
loadCache();
// 缓存快照
HashSet<@NonNull String> strings = CollUtil.newHashSet(getCache().asMap().keySet());
LiteflowResponse response = flowExecutor.execute2Resp("chain7", "arg");
Assertions.assertTrue(response.isSuccess());
Assertions.assertEquals("x==>a==>b", response.getExecuteStepStr());
// 获得被淘汰chain
String chainId = getEvictedChain(strings);
testEvicted(chainId);
// chain7进入缓存
Assertions.assertTrue(getCache().asMap().containsKey("chain7"));
// 手动移除chain7
FlowBus.removeChain("chain7");
response = flowExecutor.execute2Resp("chain7", "arg");
Assertions.assertFalse(response.isSuccess());
Assertions.assertEquals(ChainNotFoundException.class, response.getCause().getClass());
}
// 测试并发下正在执行的chain的condition被清理但仍能执行
@Test
public void testRuleCache6() throws InterruptedException {
// 模拟清空编译好的chain
Thread thread = new Thread(()-> {
Chain chain1 = FlowBus.getChain("chain1");
// 绕过初次编译
chain1.setCompiled(true);
chain1.setConditionList(null);
});
thread.start();
thread.join();
LiteflowResponse response = flowExecutor.execute2Resp("chain1", "arg");
Assertions.assertTrue(response.isSuccess());
Assertions.assertEquals("a==>b", response.getExecuteStepStr());
}
// 加载缓存, chain1~chain5
@@ -216,16 +204,6 @@ public class RuleCacheSpringTest extends BaseTest {
}
}
private void loadCache(int count) {
// 随机执行chain
Random random = new Random();
for (int i = 0; i < count; i++) {
int id = random.nextInt(10) + 1;
flowExecutor.execute2Resp("chain" + id);
}
}
// 测试 chain 被淘汰
private void testEvicted(String chanId) {
Chain chain = FlowBus.getChain(chanId);
@@ -237,7 +215,7 @@ public class RuleCacheSpringTest extends BaseTest {
Assertions.assertNull(chain.getConditionList());
}
public Cache<String, Object> getCache() {
public Cache<String, RuleCacheLifeCycle.ChainState> getCache() {
List<PostProcessChainExecuteLifeCycle> lifeCycleList
= LifeCycleHolder.getPostProcessChainExecuteLifeCycleList();
for (PostProcessChainExecuteLifeCycle lifeCycle : lifeCycleList) {
@@ -252,7 +230,7 @@ public class RuleCacheSpringTest extends BaseTest {
// 获得淘汰的chain传入淘汰前的chain集合
// 确保只有一个被淘汰时使用
String getEvictedChain(Set<String> set) {
Cache<String, Object> cache = getCache();
Cache<String, RuleCacheLifeCycle.ChainState> cache = getCache();
cache.cleanUp();
Set<@NonNull String> strings = cache.asMap().keySet();
set.removeAll(strings);