feature #IAY66T 处理并发下conditionList被清除的问题

This commit is contained in:
DaleLee
2024-12-06 21:40:42 +08:00
parent 73e4507b9d
commit b9ddc93384
3 changed files with 49 additions and 6 deletions

View File

@@ -10,11 +10,13 @@ package com.yomahub.liteflow.flow.element;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.BooleanUtil;
import cn.hutool.core.util.IdUtil;
import com.yomahub.liteflow.builder.el.LiteFlowChainELBuilder;
import com.yomahub.liteflow.common.ChainConstant;
import com.yomahub.liteflow.enums.ExecuteableTypeEnum;
import com.yomahub.liteflow.exception.ChainEndException;
import com.yomahub.liteflow.exception.FlowSystemException;
import com.yomahub.liteflow.flow.FlowBus;
import com.yomahub.liteflow.lifecycle.LifeCycleHolder;
import com.yomahub.liteflow.log.LFLog;
import com.yomahub.liteflow.log.LFLoggerManager;
@@ -29,6 +31,7 @@ import java.util.List;
*
* @author Bryan.Zhang
* @author jason
* @author DaleLee
*/
public class Chain implements Executable{
@@ -100,8 +103,15 @@ public class Chain implements Executable{
LiteFlowChainELBuilder.buildUnCompileChain(this);
}
if (CollUtil.isEmpty(conditionList)) {
throw new FlowSystemException("no conditionList in this chain[" + chainId + "]");
// 这里先拿到this.conditionList的引用
// 因为在正式执行condition之前this.conditionList有可能被其他线程置空
// 比如该chain在规则缓存中被淘汰
List<Condition> conditionListRef = this.conditionList;
// 但在编译后到拿到引用之前this.conditionList也有可能被置空
if (CollUtil.isEmpty(conditionListRef)) {
// 如果conditionListRef为空
// 构建临时conditionList确保本次一定可以执行
conditionListRef = buildTemporaryConditionList();
}
Slot slot = DataBus.getSlot(slotIndex);
try {
@@ -115,7 +125,7 @@ public class Chain implements Executable{
// 设置主ChainName
slot.setChainId(chainId);
// 执行主体Condition
for (Condition condition : conditionList) {
for (Condition condition : conditionListRef) {
condition.setCurrChainId(chainId);
condition.execute(slotIndex);
}
@@ -234,4 +244,22 @@ public class Chain implements Executable{
public void setThreadPoolExecutorClass(String threadPoolExecutorClass) {
this.threadPoolExecutorClass = threadPoolExecutorClass;
}
// 构建临时的ConditionList
private List<Condition> buildTemporaryConditionList() {
String tempChainId = chainId + "_temp_" + IdUtil.simpleUUID();
Chain tempChain = new Chain(tempChainId);
tempChain.setEl(this.el);
tempChain.setCompiled(false);
LiteFlowChainELBuilder.buildUnCompileChain(tempChain);
FlowBus.removeChain(tempChainId);
List<Condition> tempConditionList = tempChain.getConditionList();
if (CollUtil.isEmpty(tempConditionList)) {
throw new FlowSystemException("no conditionList in this chain[" + chainId + "]");
}
// 打印警告用于排查临时chain与已有chain重名几乎不可能而将已有chain覆盖的情况
LOG.warn("The conditionList of chain[{}] is empty, temporarily using chain[{}] (now removed) to build it.", chainId, tempChainId);
return tempConditionList;
}
}

View File

@@ -15,7 +15,7 @@ import org.checkerframework.checker.nullness.qual.Nullable;
/**
* Chain执行前的缓存处理
* @author DaleLee
* @since
* @since 2.13.0
*/
public class RuleCacheLifeCycle implements PostProcessFlowExecuteLifeCycle {
// 缓存
@@ -61,9 +61,9 @@ public class RuleCacheLifeCycle implements PostProcessFlowExecuteLifeCycle {
if (ObjectUtil.isNull(chain)) {
return;
}
// 清空condition并将chain设置为未编译
chain.setConditionList(null);
// 将chain设置为未编译并清空condition
chain.setCompiled(false);
chain.setConditionList(null);
}
}
}

View File

@@ -27,10 +27,13 @@ import java.util.HashSet;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
/**
* Springboot环境下规则缓存测试
* @author DaleLee
* @since 2.13.0
*/
@TestPropertySource(value = "classpath:/ruleCache/application.properties")
@SpringBootTest(classes = RuleCacheSpringbootTest.class)
@@ -129,6 +132,18 @@ public class RuleCacheSpringbootTest extends BaseTest {
Assertions.assertEquals(ChainNotFoundException.class, response.getCause().getClass());
}
@Test
public void testRuleCache5() throws InterruptedException, ExecutionException {
Future<LiteflowResponse> liteflowResponseFuture = flowExecutor.execute2Future("chain1", "arg");
new Thread(() -> {
flowExecutor.execute2Resp("chain2");
flowExecutor.execute2Resp("chain3");
flowExecutor.execute2Resp("chain4");
}).start();
LiteflowResponse liteflowResponse = liteflowResponseFuture.get();
}
// 加载缓存, chain1、chain2、chain3
private void loadCache() {