新增chain线程池隔离

This commit is contained in:
jason
2024-10-28 15:10:49 +08:00
parent dee0e9ee10
commit 91d3e9b368
29 changed files with 198 additions and 382 deletions

View File

@@ -1,7 +1,10 @@
package com.yomahub.liteflow.builder.el;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.*;
import cn.hutool.core.util.ArrayUtil;
import cn.hutool.core.util.CharUtil;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.ql.util.express.DefaultContext;
@@ -30,7 +33,6 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.function.Consumer;
/**
* Chain基于代码形式的组装器 EL表达式规则专属组装器
@@ -243,6 +245,11 @@ public class LiteFlowChainELBuilder {
return this;
}
public LiteFlowChainELBuilder setThreadPoolExecutorClass(String threadPoolExecutorClass) {
this.chain.setThreadPoolExecutorClass(threadPoolExecutorClass);
return this;
}
/**
* EL表达式校验此方法已经过时请使用 {@link LiteFlowChainELBuilder#validateWithEx(String)}
*

View File

@@ -26,9 +26,6 @@ public class WhenOperator extends BaseOperator<WhenCondition> {
OperatorHelper.checkObjMustBeCommonTypeItem(obj);
whenCondition.addExecutable(OperatorHelper.convert(obj, Executable.class));
whenCondition.setThreadExecutorClass(liteflowConfig.getThreadExecutorClass());
if (liteflowConfig.getChainThreadPoolIsolate()) {
whenCondition.setThreadExecutorClass(liteflowConfig.getChainThreadExecutorClass());
}
}
return whenCondition;
}

View File

@@ -34,6 +34,8 @@ public interface ChainConstant {
String NAMESPACE = "namespace";
String THREAD_POOL_EXECUTOR_CLASS = "thread-pool-executor-class";
String DEFAULT_NAMESPACE = "default";
String VALUE = "value";

View File

@@ -12,18 +12,17 @@ import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.BooleanUtil;
import com.yomahub.liteflow.builder.el.LiteFlowChainELBuilder;
import com.yomahub.liteflow.common.ChainConstant;
import com.yomahub.liteflow.enums.ExecuteableTypeEnum;
import com.yomahub.liteflow.exception.ChainEndException;
import com.yomahub.liteflow.exception.FlowSystemException;
import com.yomahub.liteflow.lifecycle.LifeCycleHolder;
import com.yomahub.liteflow.lifecycle.PostProcessChainExecuteLifeCycle;
import com.yomahub.liteflow.log.LFLog;
import com.yomahub.liteflow.log.LFLoggerManager;
import com.yomahub.liteflow.slot.DataBus;
import com.yomahub.liteflow.slot.Slot;
import com.yomahub.liteflow.enums.ExecuteableTypeEnum;
import com.yomahub.liteflow.exception.FlowSystemException;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Consumer;
/**
* chain对象实现可执行器
@@ -46,6 +45,8 @@ public class Chain implements Executable{
private String namespace = ChainConstant.DEFAULT_NAMESPACE;
private String threadPoolExecutorClass;
public Chain(String chainName) {
this.chainId = chainName;
}
@@ -223,4 +224,12 @@ public class Chain implements Executable{
public void setNamespace(String namespace) {
this.namespace = namespace;
}
public String getThreadPoolExecutorClass() {
return threadPoolExecutorClass;
}
public void setThreadPoolExecutorClass(String threadPoolExecutorClass) {
this.threadPoolExecutorClass = threadPoolExecutorClass;
}
}

View File

@@ -133,16 +133,22 @@ public abstract class ParallelStrategyExecutor {
// 默认设置不隔离。也就是说,默认情况是一个线程池类一个实例,如果什么都不配置,那也就是在 when 的情况下,全局一个线程池。
ExecutorService parallelExecutor;
String chainId = DataBus.getSlot(slotIndex).getChainId();
Chain chain = FlowBus.getChain(chainId);
if (BooleanUtil.isTrue(liteflowConfig.getWhenThreadPoolIsolate())) {
parallelExecutor = ExecutorHelper.loadInstance().buildWhenExecutorWithHash(whenCondition.getThreadExecutorClass(), String.valueOf(whenCondition.hashCode()));
} else if (BooleanUtil.isTrue(liteflowConfig.getChainThreadPoolIsolate())) {
//chain 线程池隔离
String chainId = DataBus.getSlot(slotIndex).getChainId();
Chain chain = FlowBus.getChain(chainId);
//condition层级线程池
parallelExecutor =
ExecutorHelper.loadInstance().buildChainExecutorWithHash(whenCondition.getThreadExecutorClass(),
ExecutorHelper.loadInstance().buildWhenExecutorWithHash(whenCondition.getThreadExecutorClass(),
String.valueOf(whenCondition.hashCode()));
} else if (ObjectUtil.isNotEmpty(chain.getThreadPoolExecutorClass())) {
//chain层级线程池
parallelExecutor =
ExecutorHelper.loadInstance().buildWhenExecutorWithHash(chain.getThreadPoolExecutorClass(),
String.valueOf(chain.hashCode()));
} else {
//全局线程池
parallelExecutor = ExecutorHelper.loadInstance().buildWhenExecutor(whenCondition.getThreadExecutorClass());
}

View File

@@ -1,6 +1,5 @@
package com.yomahub.liteflow.parser.helper;
import cn.hutool.core.util.BooleanUtil;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import com.fasterxml.jackson.databind.JsonNode;
@@ -17,15 +16,8 @@ import com.yomahub.liteflow.util.ElRegexUtil;
import org.dom4j.Document;
import org.dom4j.Element;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.*;
import java.util.function.Consumer;
import java.util.function.Function;
import static com.yomahub.liteflow.common.ChainConstant.*;
@@ -96,10 +88,6 @@ public class ParserHelper {
.build();
}
/**
* xml 形式的主要解析过程
* @param documentList documentList
*/
/**
* xml 形式的主要解析过程
* @param documentList documentList
@@ -316,7 +304,12 @@ public class ParserHelper {
JsonNode routeJsonNode = chainNode.get(ROUTE);
LiteFlowChainELBuilder builder = LiteFlowChainELBuilder.createChain().setChainId(chainId).setNamespace(namespace);
String threadPoolExecutorClass = chainNode.get(THREAD_POOL_EXECUTOR_CLASS) == null ?
null : chainNode.get(THREAD_POOL_EXECUTOR_CLASS).textValue();
LiteFlowChainELBuilder builder =
LiteFlowChainELBuilder.createChain().setChainId(chainId).setNamespace(namespace)
.setThreadPoolExecutorClass(threadPoolExecutorClass);
// 如果有route这个标签说明是决策表chain
// 决策表链路必须有route和body这两个标签
@@ -347,7 +340,12 @@ public class ParserHelper {
Element routeElement = e.element(ROUTE);
LiteFlowChainELBuilder builder = LiteFlowChainELBuilder.createChain().setChainId(chainId).setNamespace(namespace);
String threadPoolExecutorClass = e.attributeValue(THREAD_POOL_EXECUTOR_CLASS) == null ?
null : e.attributeValue(THREAD_POOL_EXECUTOR_CLASS);
LiteFlowChainELBuilder builder =
LiteFlowChainELBuilder.createChain().setChainId(chainId).setNamespace(namespace)
.setThreadPoolExecutorClass(threadPoolExecutorClass);
// 如果有route这个标签说明是决策表chain
// 决策表链路必须有route和body这两个标签

View File

@@ -123,20 +123,6 @@ public class LiteflowConfig {
//脚本特殊设置选项
private Map<String, String> scriptSetting;
// chain线程池最大线程数
private Integer chainMaxWorkers;
// chain线程池最大队列数量
private Integer chainQueueLimit;
// chain线程执行器class路径
private String chainThreadExecutorClass;
// chain线程池是否隔离
// 每一个chain里的when和异步循环合并起来都用单独的线程池。也就是说定义了多少个chain就有多少个线程池
private Boolean chainThreadPoolIsolate;
public Boolean getEnableMonitorFile() {
return enableMonitorFile;
}
@@ -522,52 +508,4 @@ public class LiteflowConfig {
public void setScriptSetting(Map<String, String> scriptSetting) {
this.scriptSetting = scriptSetting;
}
public Boolean getChainThreadPoolIsolate() {
if (ObjectUtil.isNull(chainThreadPoolIsolate)) {
return Boolean.FALSE;
} else {
return chainThreadPoolIsolate;
}
}
public void setChainThreadPoolIsolate(Boolean chainThreadPoolIsolate) {
this.chainThreadPoolIsolate = chainThreadPoolIsolate;
}
public Integer getChainMaxWorkers() {
if (ObjectUtil.isNull(chainMaxWorkers)) {
return 16;
} else {
return chainMaxWorkers;
}
}
public void setChainMaxWorkers(Integer chainMaxWorkers) {
this.chainMaxWorkers = chainMaxWorkers;
}
public Integer getChainQueueLimit() {
if (ObjectUtil.isNull(chainMaxWorkers)) {
return 512;
} else {
return chainQueueLimit;
}
}
public void setChainQueueLimit(Integer chainQueueLimit) {
this.chainQueueLimit = chainQueueLimit;
}
public String getChainThreadExecutorClass() {
if (StrUtil.isBlank(chainThreadExecutorClass)) {
return "com.yomahub.liteflow.thread.LiteFlowDefaultChainExecutorBuilder";
} else {
return chainThreadExecutorClass;
}
}
public void setChainThreadExecutorClass(String chainThreadExecutorClass) {
this.chainThreadExecutorClass = chainThreadExecutorClass;
}
}

View File

@@ -9,7 +9,6 @@
package com.yomahub.liteflow.thread;
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.BooleanUtil;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import com.yomahub.liteflow.exception.ThreadExecutorServiceCreateException;
@@ -132,15 +131,24 @@ public class ExecutorHelper {
//构造并行循环的线程池
public ExecutorService buildLoopParallelExecutor(Integer slotIndex) {
ExecutorService parallelExecutor;
LiteflowConfig liteflowConfig = LiteflowConfigGetter.get();
//chain线程池
if (BooleanUtil.isTrue(liteflowConfig.getChainThreadPoolIsolate())) {
//获取chain的hash
String chainId = DataBus.getSlot(slotIndex).getChainId();
Chain chain = FlowBus.getChain(chainId);
return getExecutorService(liteflowConfig.getChainThreadExecutorClass(), String.valueOf(chain.hashCode()));
//获取chain的hash
String chainId = DataBus.getSlot(slotIndex).getChainId();
Chain chain = FlowBus.getChain(chainId);
//condition层级线程池 TODO
//chain层级线程池
if (ObjectUtil.isNotEmpty(chain.getThreadPoolExecutorClass())) {
//chain层级线程池
parallelExecutor = getExecutorService(chain.getThreadPoolExecutorClass(),
String.valueOf(chain.hashCode()));
} else {
//全局线程池
parallelExecutor = getExecutorService(liteflowConfig.getParallelLoopExecutorClass());
}
return getExecutorService(liteflowConfig.getParallelLoopExecutorClass());
return parallelExecutor;
}
private ExecutorService getExecutorService(String clazz){
@@ -183,19 +191,4 @@ public class ExecutorHelper {
}
}
// 构建chain线程池 - clazz和condition的hash值共同作为缓存key
public ExecutorService buildChainExecutorWithHash(String conditionHash) {
LiteflowConfig liteflowConfig = LiteflowConfigGetter.get();
return buildChainExecutorWithHash(liteflowConfig.getChainThreadExecutorClass(), conditionHash);
}
// 构建chain线程池 - clazz和condition的hash值共同作为缓存key
public ExecutorService buildChainExecutorWithHash(String clazz, String conditionHash) {
if (StrUtil.isBlank(clazz)) {
return buildChainExecutorWithHash(conditionHash);
}
return getExecutorService(clazz, conditionHash);
}
}

View File

@@ -1,25 +0,0 @@
package com.yomahub.liteflow.thread;
import cn.hutool.core.util.ObjectUtil;
import com.yomahub.liteflow.property.LiteflowConfig;
import com.yomahub.liteflow.property.LiteflowConfigGetter;
import java.util.concurrent.ExecutorService;
/**
* LiteFlow默认的chain多线程执行器实现
*/
public class LiteFlowDefaultChainExecutorBuilder implements ExecutorBuilder {
@Override
public ExecutorService buildExecutor() {
LiteflowConfig liteflowConfig = LiteflowConfigGetter.get();
// 只有在非spring的场景下liteflowConfig才会为null
if (ObjectUtil.isNull(liteflowConfig)) {
liteflowConfig = new LiteflowConfig();
}
return buildDefaultExecutor(liteflowConfig.getChainMaxWorkers(), liteflowConfig.getChainMaxWorkers(),
liteflowConfig.getChainQueueLimit(), "chain-thread-");
}
}

View File

@@ -22,4 +22,5 @@
extends CDATA #IMPLIED
enable (true|false) #IMPLIED
namespace CDATA #IMPLIED
thread-pool-executor-class CDATA #IMPLIED
>

View File

@@ -51,11 +51,6 @@ public class LiteflowAutoConfiguration {
liteflowConfig.setParallelQueueLimit(property.getParallelQueueLimit());
liteflowConfig.setParallelLoopExecutorClass(property.getParallelLoopExecutorClass());
liteflowConfig.setFallbackCmpEnable(property.isFallbackCmpEnable());
liteflowConfig.setChainThreadExecutorClass(property.getChainThreadExecutorClass());
liteflowConfig.setChainQueueLimit(property.getParallelQueueLimit());
liteflowConfig.setChainMaxWorkers(property.getParallelMaxWorkers());
liteflowConfig.setChainThreadPoolIsolate(property.isChainThreadPoolIsolate());
return liteflowConfig;
}

View File

@@ -83,19 +83,6 @@ public class LiteflowProperty {
// 是否启用组件降级
private Boolean fallbackCmpEnable;
// chain线程池是否隔离
// 每一个chain里的when和异步循环合并起来都用单独的线程池。也就是说定义了多少个chain就有多少个线程池
private Boolean chainThreadPoolIsolate;
// chain线程池最大线程数
private int chainMaxWorkers;
// chain线程池最大队列数量
private int chainQueueLimit;
// chain线程执行器class路径
private String chainThreadExecutorClass;
public boolean isEnable() {
return enable;
}
@@ -280,37 +267,4 @@ public class LiteflowProperty {
public Boolean getFallbackCmpEnable() {
return fallbackCmpEnable;
}
public void setChainThreadPoolIsolate(boolean chainThreadPoolIsolate) {
this.chainThreadPoolIsolate = chainThreadPoolIsolate;
}
public boolean isChainThreadPoolIsolate() {
return chainThreadPoolIsolate;
}
public int getChainMaxWorkers() {
return chainMaxWorkers;
}
public void setChainMaxWorkers(int chainMaxWorkers) {
this.chainMaxWorkers = chainMaxWorkers;
}
public int getChainQueueLimit() {
return chainQueueLimit;
}
public void setChainQueueLimit(int chainQueueLimit) {
this.chainQueueLimit = chainQueueLimit;
}
public String getChainThreadExecutorClass() {
return chainThreadExecutorClass;
}
public void setChainThreadExecutorClass(String chainThreadExecutorClass) {
this.chainThreadExecutorClass = chainThreadExecutorClass;
}
}

View File

@@ -18,7 +18,3 @@ liteflow.monitor.queue-limit=200
liteflow.monitor.delay=300000
liteflow.monitor.period=300000
liteflow.fallback-cmp-enable=false
liteflow.chain-max-workers=16
liteflow.chain-queue-limit=512
liteflow.chain-thread-pool-isolate=false
liteflow.chain-thread-executor-class=com.yomahub.liteflow.thread.LiteFlowDefaultChainExecutorBuilder

View File

@@ -101,19 +101,6 @@ public class LiteflowProperty {
//脚本特殊设置选项
private Map<String, String> scriptSetting;
// chain线程池是否隔离
// 每一个chain里的when和异步循环合并起来都用单独的线程池。也就是说定义了多少个chain就有多少个线程池
private Boolean chainThreadPoolIsolate;
// chain线程池最大线程数
private int chainMaxWorkers;
// chain线程池最大队列数量
private int chainQueueLimit;
// chain线程执行器class路径
private String chainThreadExecutorClass;
public boolean isEnableMonitorFile() {
return enableMonitorFile;
}
@@ -349,37 +336,4 @@ public class LiteflowProperty {
public void setScriptSetting(Map<String, String> scriptSetting) {
this.scriptSetting = scriptSetting;
}
public void setChainThreadPoolIsolate(boolean chainThreadPoolIsolate) {
this.chainThreadPoolIsolate = chainThreadPoolIsolate;
}
public boolean isChainThreadPoolIsolate() {
return chainThreadPoolIsolate;
}
public int getChainMaxWorkers() {
return chainMaxWorkers;
}
public void setChainMaxWorkers(int chainMaxWorkers) {
this.chainMaxWorkers = chainMaxWorkers;
}
public int getChainQueueLimit() {
return chainQueueLimit;
}
public void setChainQueueLimit(int chainQueueLimit) {
this.chainQueueLimit = chainQueueLimit;
}
public String getChainThreadExecutorClass() {
return chainThreadExecutorClass;
}
public void setChainThreadExecutorClass(String chainThreadExecutorClass) {
this.chainThreadExecutorClass = chainThreadExecutorClass;
}
}

View File

@@ -54,10 +54,6 @@ public class LiteflowPropertyAutoConfiguration {
liteflowConfig.setDelay(liteflowMonitorProperty.getDelay());
liteflowConfig.setPeriod(liteflowMonitorProperty.getPeriod());
liteflowConfig.setScriptSetting(property.getScriptSetting());
liteflowConfig.setChainThreadPoolIsolate(property.isChainThreadPoolIsolate());
liteflowConfig.setChainThreadExecutorClass(property.getChainThreadExecutorClass());
liteflowConfig.setChainMaxWorkers(property.getChainMaxWorkers());
liteflowConfig.setChainQueueLimit(property.getChainQueueLimit());
return liteflowConfig;
}

View File

@@ -220,34 +220,6 @@
"type": "java.util.Map",
"description": "script special settings.",
"sourceType": "com.yomahub.liteflow.springboot.LiteflowProperty"
},
{
"name": "liteflow.chain-thread-pool-isolate",
"type": "java.lang.Boolean",
"description": "set whether the chain thread pool is isolated.",
"sourceType": "com.yomahub.liteflow.springboot.LiteflowProperty",
"defaultValue": false
},
{
"name": "liteflow.chain-max-workers",
"type": "java.lang.Integer",
"description": "Set the chain thread pool worker max-size on \" when \" mode.",
"sourceType": "com.yomahub.liteflow.springboot.LiteflowProperty",
"defaultValue": 16
},
{
"name": "liteflow.chain-queue-limit",
"type": "java.lang.Integer",
"description": "Set the chain thread pool queue max-size on \" when \" mode.",
"sourceType": "com.yomahub.liteflow.springboot.LiteflowProperty",
"defaultValue": 512
},
{
"name": "liteflow.chain-thread-executor-class",
"type": "java.lang.String",
"description": "Custom chain thread pool implement for when executor.",
"sourceType": "com.yomahub.liteflow.springboot.LiteflowProperty",
"defaultValue": "com.yomahub.liteflow.thread.LiteFlowDefaultChainExecutorBuilder"
}
]
}

View File

@@ -26,8 +26,4 @@ liteflow.monitor.queue-limit=200
liteflow.monitor.delay=300000
liteflow.monitor.period=300000
liteflow.enable-monitor-file=false
liteflow.chain-max-workers=16
liteflow.chain-queue-limit=512
liteflow.chain-thread-pool-isolate=false
liteflow.chain-thread-executor-class=com.yomahub.liteflow.thread.LiteFlowDefaultChainExecutorBuilder

View File

@@ -7,7 +7,7 @@ import com.yomahub.liteflow.thread.ExecutorBuilder;
import java.util.concurrent.ExecutorService;
public class CustomThreadExecutor1 implements ExecutorBuilder {
public class CustomChainThreadExecutor implements ExecutorBuilder {
@Override
public ExecutorService buildExecutor() {
@@ -16,8 +16,8 @@ public class CustomThreadExecutor1 implements ExecutorBuilder {
if (ObjectUtil.isNull(liteflowConfig)) {
liteflowConfig = new LiteflowConfig();
}
return buildDefaultExecutor(liteflowConfig.getChainMaxWorkers(), liteflowConfig.getChainMaxWorkers(),
liteflowConfig.getChainQueueLimit(), "customer-chain-thead-1");
return buildDefaultExecutor(16, 16,
512, "customer-chain-thead");
}
}

View File

@@ -21,10 +21,10 @@ import java.util.List;
* springboot环境下chain线程池隔离测试
*/
@TestPropertySource(value = "classpath:/chainThreadPool/application2.properties")
@SpringBootTest(classes = CustomChainThreadPoolELSpringbootTest.class)
@SpringBootTest(classes = CustomThreadPoolELSpringbootTest.class)
@EnableAutoConfiguration
@ComponentScan({"com.yomahub.liteflow.test.chainThreadPool.cmp"})
public class CustomChainThreadPoolELSpringbootTest extends BaseTest {
public class CustomThreadPoolELSpringbootTest extends BaseTest {
private final Logger log = LoggerFactory.getLogger(this.getClass());
@@ -32,61 +32,48 @@ public class CustomChainThreadPoolELSpringbootTest extends BaseTest {
private FlowExecutor flowExecutor;
/**
* 测试chain自定义线程池隔离
* 测试WEHN上condition线程池和chain线程池隔离-优先以WHEN上为准
*/
@Test
public void testCustomChainThreadPool1() {
LiteflowResponse response = flowExecutor.execute2Resp("chain", "arg");
DefaultContext context = response.getFirstContextBean();
Assertions.assertTrue(response.isSuccess());
Assertions.assertTrue(context.getData("threadNameFor").toString().startsWith("customer-chain-thead-1"));
Assertions.assertTrue(context.getData("threadName").toString().startsWith("customer-chain-thead-1"));
public void testCustomChainThreadPool() {
LiteflowResponse response1 = flowExecutor.execute2Resp("chain1", "arg");
DefaultContext context = response1.getFirstContextBean();
Assertions.assertTrue(response1.isSuccess());
Assertions.assertTrue(context.getData("threadName").toString().startsWith("customer-when-thead"));
}
/**
* 测试when上自定义线程池和chain线程池隔离-优先以when上为准
* 测试FOR上condition线程池和chain线程池隔离-优先以FOR上为准
*/
@Test
public void testCustomChainThreadPool2() {
LiteflowResponse response1 = flowExecutor.execute2Resp("chain2", "arg");
DefaultContext context = response1.getFirstContextBean();
Assertions.assertTrue(response1.isSuccess());
Assertions.assertTrue(context.getData("threadName").toString().startsWith("customer-chain-thead-2"));
Assertions.assertTrue(context.getData("threadName").toString().startsWith("customer-chain-thead"));
}
/**
* 测试并行FOR循环全局线程池和chain线程池隔离-优先以chain线程池上为准
* 测试WHILE上condition线程池和chain线程池隔离-优先以WHILE上为准
*/
@Test
public void testCustomChainThreadPool3() {
LiteflowResponse response1 = flowExecutor.execute2Resp("chain3", "arg");
DefaultContext context = response1.getFirstContextBean();
Assertions.assertTrue(response1.isSuccess());
Assertions.assertTrue(context.getData("threadNameFor").toString().startsWith("customer-chain-thead-1"));
Assertions.assertTrue(context.getData("threadName").toString().startsWith("customer-chain-thead"));
}
/**
* 测试并行条件循环全局线程池和chain线程池隔离-优先以chain线程池上为准
* 测试ITERATOR上condition线程池和chain线程池隔离-优先以ITERATOR上为准
*/
@Test
public void testCustomChainThreadPool4() {
LiteflowResponse response1 = flowExecutor.execute2Resp("chain4", "arg");
DefaultContext context = response1.getFirstContextBean();
Assertions.assertTrue(response1.isSuccess());
Assertions.assertTrue(context.getData("threadNameWhile").toString().startsWith("customer-chain-thead-1"));
}
/**
* 测试并行迭代循环全局线程池和chain线程池隔离-优先以chain线程池上为准
*/
@Test
public void testCustomChainThreadPool5() {
List<String> list = ListUtil.toList("1", "2", "3", "4", "5");
LiteflowResponse response1 = flowExecutor.execute2Resp("chain5", list);
LiteflowResponse response1 = flowExecutor.execute2Resp("chain4", list);
DefaultContext context = response1.getFirstContextBean();
Assertions.assertTrue(response1.isSuccess());
Assertions.assertTrue(context.getData("threadNameIterator").toString().startsWith("customer-chain-thead-1"));
Assertions.assertTrue(context.getData("threadName").toString().startsWith("customer-chain-thead"));
}
}

View File

@@ -7,7 +7,7 @@ import com.yomahub.liteflow.thread.ExecutorBuilder;
import java.util.concurrent.ExecutorService;
public class CustomThreadExecutor2 implements ExecutorBuilder {
public class CustomWhenThreadExecutor implements ExecutorBuilder {
@Override
public ExecutorService buildExecutor() {
@@ -16,8 +16,8 @@ public class CustomThreadExecutor2 implements ExecutorBuilder {
if (ObjectUtil.isNull(liteflowConfig)) {
liteflowConfig = new LiteflowConfig();
}
return buildDefaultExecutor(liteflowConfig.getChainMaxWorkers(), liteflowConfig.getChainMaxWorkers(),
liteflowConfig.getChainQueueLimit(), "customer-chain-thead-2");
return buildDefaultExecutor(16, 16,
512, "customer-when-thead");
}
}

View File

@@ -1,45 +0,0 @@
package com.yomahub.liteflow.test.chainThreadPool;
import com.yomahub.liteflow.core.FlowExecutor;
import com.yomahub.liteflow.flow.LiteflowResponse;
import com.yomahub.liteflow.slot.DefaultContext;
import com.yomahub.liteflow.test.BaseTest;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.test.context.TestPropertySource;
import javax.annotation.Resource;
/**
* springboot环境下chain线程池隔离测试
*/
@TestPropertySource(value = "classpath:/chainThreadPool/application.properties")
@SpringBootTest(classes = DefaultChainThreadPoolELSpringbootTest.class)
@EnableAutoConfiguration
@ComponentScan({"com.yomahub.liteflow.test.chainThreadPool.cmp"})
public class DefaultChainThreadPoolELSpringbootTest extends BaseTest {
private final Logger log = LoggerFactory.getLogger(this.getClass());
@Resource
private FlowExecutor flowExecutor;
/**
* 测试chain默认线程池隔离
*/
@Test
public void testDefaultChainThreadPool() {
LiteflowResponse response = flowExecutor.execute2Resp("chain", "arg");
DefaultContext context = response.getFirstContextBean();
Assertions.assertTrue(response.isSuccess());
Assertions.assertTrue(context.getData("threadNameFor").toString().startsWith("chain-thread-"));
Assertions.assertTrue(context.getData("threadName").toString().startsWith("chain-thread-"));
}
}

View File

@@ -0,0 +1,79 @@
package com.yomahub.liteflow.test.chainThreadPool;
import cn.hutool.core.collection.ListUtil;
import com.yomahub.liteflow.core.FlowExecutor;
import com.yomahub.liteflow.flow.LiteflowResponse;
import com.yomahub.liteflow.slot.DefaultContext;
import com.yomahub.liteflow.test.BaseTest;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.test.context.TestPropertySource;
import javax.annotation.Resource;
import java.util.List;
/**
* springboot环境下chain线程池隔离测试
*/
@TestPropertySource(value = "classpath:/chainThreadPool/application.properties")
@SpringBootTest(classes = GlobalThreadPoolELSpringbootTest.class)
@EnableAutoConfiguration
@ComponentScan({"com.yomahub.liteflow.test.chainThreadPool.cmp"})
public class GlobalThreadPoolELSpringbootTest extends BaseTest {
private final Logger log = LoggerFactory.getLogger(this.getClass());
@Resource
private FlowExecutor flowExecutor;
/**
* 测试WHEN上全局线程池和chain线程池隔离-优先以chain上为准
*/
@Test
public void testGlobalChainThreadPool() {
LiteflowResponse response1 = flowExecutor.execute2Resp("chain1", "arg");
DefaultContext context = response1.getFirstContextBean();
Assertions.assertTrue(response1.isSuccess());
Assertions.assertTrue(context.getData("threadName").toString().startsWith("customer-chain-thead"));
}
/**
* 测试FOR上全局线程池和chain线程池隔离-优先以chain上为准
*/
@Test
public void testGlobalChainThreadPool2() {
LiteflowResponse response1 = flowExecutor.execute2Resp("chain2", "arg");
DefaultContext context = response1.getFirstContextBean();
Assertions.assertTrue(response1.isSuccess());
Assertions.assertTrue(context.getData("threadName").toString().startsWith("customer-chain-thead"));
}
/**
* 测试WHILE上全局线程池和chain线程池隔离-优先以chain上为准
*/
@Test
public void testGlobalChainThreadPool3() {
LiteflowResponse response1 = flowExecutor.execute2Resp("chain3", "arg");
DefaultContext context = response1.getFirstContextBean();
Assertions.assertTrue(response1.isSuccess());
Assertions.assertTrue(context.getData("threadName").toString().startsWith("customer-chain-thead"));
}
/**
* 测试ITERATOR上全局线程池和chain线程池隔离-优先以chain上为准
*/
@Test
public void testGlobalChainThreadPool4() {
List<String> list = ListUtil.toList("1", "2", "3", "4", "5");
LiteflowResponse response1 = flowExecutor.execute2Resp("chain4", list);
DefaultContext context = response1.getFirstContextBean();
Assertions.assertTrue(response1.isSuccess());
Assertions.assertTrue(context.getData("threadName").toString().startsWith("customer-chain-thead"));
}
}

View File

@@ -18,7 +18,7 @@ public class FCmp extends NodeComponent {
@Override
public void process() {
DefaultContext context = this.getFirstContextBean();
context.setData("threadNameFor", Thread.currentThread().getName());
context.setData("threadName", Thread.currentThread().getName());
System.out.println("FCmp executed!");
}

View File

@@ -18,7 +18,7 @@ public class ICmp extends NodeComponent {
@Override
public void process() {
DefaultContext context = this.getFirstContextBean();
context.setData("threadNameIterator", Thread.currentThread().getName());
context.setData("threadName", Thread.currentThread().getName());
System.out.println("ICmp executed!");
}

View File

@@ -18,7 +18,7 @@ public class WCmp extends NodeComponent {
@Override
public void process() {
DefaultContext context = this.getFirstContextBean();
context.setData("threadNameWhile", Thread.currentThread().getName());
context.setData("threadName", Thread.currentThread().getName());
System.out.println("WCmp executed!");
}

View File

@@ -1,4 +1 @@
liteflow.rule-source=chainThreadPool/flow.el.xml
liteflow.chain-thread-pool-isolate=true
liteflow.chain-max-workers=10
liteflow.chain-queue-limit=1024

View File

@@ -1,5 +1,2 @@
liteflow.rule-source=chainThreadPool/flow2.el.xml
liteflow.chain-thread-pool-isolate=true
liteflow.chain-max-workers=10
liteflow.chain-queue-limit=1024
liteflow.chain-thread-executor-class=com.yomahub.liteflow.test.chainThreadPool.CustomThreadExecutor1
liteflow.when-thread-pool-isolate=true

View File

@@ -1,9 +1,24 @@
<?xml version="1.0" encoding="UTF-8"?>
<flow>
<chain name="chain">
FOR(5).parallel(true).DO(THEN(f,WHEN(
THEN(a,b)
))
<chain name="chain1"
thread-pool-executor-class="com.yomahub.liteflow.test.chainThreadPool.CustomChainThreadExecutor">
WHEN(a,b);
</chain>
<chain name="chain2"
thread-pool-executor-class="com.yomahub.liteflow.test.chainThreadPool.CustomChainThreadExecutor">
FOR(5).parallel(true).DO(THEN(a,f
)
);
</chain>
<chain name="chain3"
thread-pool-executor-class="com.yomahub.liteflow.test.chainThreadPool.CustomChainThreadExecutor">
WHILE(z).parallel(true).DO(THEN(w,d));
</chain>
<chain name="chain4"
thread-pool-executor-class="com.yomahub.liteflow.test.chainThreadPool.CustomChainThreadExecutor">
ITERATOR(it).parallel(true).DO(THEN(a,i));
</chain>
</flow>

View File

@@ -1,27 +1,24 @@
<?xml version="1.0" encoding="UTF-8"?>
<flow>
<chain name="chain">
FOR(5).parallel(true).DO(THEN(f,WHEN(
THEN(a,b)
))
);
<chain name="chain1"
thread-pool-executor-class="com.yomahub.liteflow.test.chainThreadPool.CustomChainThreadExecutor">
WHEN(a,b).threadPool("com.yomahub.liteflow.test.chainThreadPool.CustomWhenThreadExecutor");
</chain>
<chain name="chain2">
WHEN(a, b).threadPool("com.yomahub.liteflow.test.chainThreadPool.CustomThreadExecutor2");
</chain>
<chain name="chain3">
<chain name="chain2"
thread-pool-executor-class="com.yomahub.liteflow.test.chainThreadPool.CustomChainThreadExecutor">
FOR(5).parallel(true).DO(THEN(a,f
)
);
</chain>
<chain name="chain4">
<chain name="chain3"
thread-pool-executor-class="com.yomahub.liteflow.test.chainThreadPool.CustomChainThreadExecutor">
WHILE(z).parallel(true).DO(THEN(w,d));
</chain>
<chain name="chain5">
<chain name="chain4"
thread-pool-executor-class="com.yomahub.liteflow.test.chainThreadPool.CustomChainThreadExecutor">
ITERATOR(it).parallel(true).DO(THEN(a,i));
</chain>
</flow>