兼容全局体系线程池

This commit is contained in:
jason
2024-10-28 16:34:00 +08:00
parent 91d3e9b368
commit 51d1bfd43d
16 changed files with 391 additions and 26 deletions

View File

@@ -7,6 +7,8 @@ import com.yomahub.liteflow.flow.element.condition.WhenCondition;
import com.yomahub.liteflow.property.LiteflowConfig;
import com.yomahub.liteflow.property.LiteflowConfigGetter;
import java.util.Optional;
/**
* EL规则中的WHEN的操作符
*
@@ -25,7 +27,8 @@ public class WhenOperator extends BaseOperator<WhenCondition> {
for (Object obj : objects) {
OperatorHelper.checkObjMustBeCommonTypeItem(obj);
whenCondition.addExecutable(OperatorHelper.convert(obj, Executable.class));
whenCondition.setThreadExecutorClass(liteflowConfig.getThreadExecutorClass());
whenCondition.setThreadExecutorClass(Optional.ofNullable(liteflowConfig.getThreadExecutorClass())
.orElse(liteflowConfig.getGlobalThreadPoolExecutorClass()));
}
return whenCondition;
}

View File

@@ -42,6 +42,7 @@ public class LiteflowConfig {
private Integer slotSize;
// 并行线程执行器class路径
@Deprecated
private String threadExecutorClass;
// 异步线程最大等待秒数
@@ -68,9 +69,11 @@ public class LiteflowConfig {
private Long period;
// 异步线程池最大线程数
@Deprecated
private Integer whenMaxWorkers;
// 异步线程池最大队列数量
@Deprecated
private Integer whenQueueLimit;
// 解析模式,一共有三种,具体看其定义
@@ -106,12 +109,15 @@ public class LiteflowConfig {
private Boolean enableMonitorFile = Boolean.FALSE;
//并行循环线程池所用class路径
@Deprecated
private String parallelLoopExecutorClass;
//使用默认并行循环线程池时,最大线程数
@Deprecated
private Integer parallelMaxWorkers;
//使用默认并行循环线程池时,最大队列数
@Deprecated
private Integer parallelQueueLimit;
// 是否启用组件降级
@@ -131,6 +137,15 @@ public class LiteflowConfig {
this.enableMonitorFile = enableMonitorFile;
}
//全局线程池所用class路径(when+异步循环)
private String globalThreadPoolExecutorClass;
//全局线程池最大线程数(when+异步循环)
private Integer globalThreadPoolSize;
//全局线程池最大队列数(when+异步循环)
private Integer globalThreadPoolQueueSize;
public Boolean getEnable() {
if (ObjectUtil.isNull(enable)) {
return Boolean.TRUE;
@@ -230,6 +245,7 @@ public class LiteflowConfig {
this.enableLog = enableLog;
}
@Deprecated
public Integer getWhenMaxWorkers() {
if (ObjectUtil.isNull(whenMaxWorkers)) {
return 16;
@@ -239,10 +255,13 @@ public class LiteflowConfig {
}
}
@Deprecated
public void setWhenMaxWorkers(Integer whenMaxWorkers) {
this.whenMaxWorkers = whenMaxWorkers;
}
@Deprecated
public Integer getWhenQueueLimit() {
if (ObjectUtil.isNull(whenQueueLimit)) {
return 512;
@@ -252,6 +271,7 @@ public class LiteflowConfig {
}
}
@Deprecated
public void setWhenQueueLimit(Integer whenQueueLimit) {
this.whenQueueLimit = whenQueueLimit;
}
@@ -297,6 +317,7 @@ public class LiteflowConfig {
this.printBanner = printBanner;
}
@Deprecated
public String getThreadExecutorClass() {
if (StrUtil.isBlank(threadExecutorClass)) {
return "com.yomahub.liteflow.thread.LiteFlowDefaultWhenExecutorBuilder";
@@ -306,6 +327,7 @@ public class LiteflowConfig {
}
}
@Deprecated
public void setThreadExecutorClass(String threadExecutorClass) {
this.threadExecutorClass = threadExecutorClass;
}
@@ -423,6 +445,7 @@ public class LiteflowConfig {
this.parallelMaxWorkers = parallelMaxWorkers;
}
@Deprecated
public Integer getParallelQueueLimit() {
if(ObjectUtil.isNull(parallelQueueLimit)){
return 512;
@@ -431,10 +454,12 @@ public class LiteflowConfig {
}
}
@Deprecated
public void setParallelQueueLimit(Integer parallelQueueLimit) {
this.parallelQueueLimit = parallelQueueLimit;
}
@Deprecated
public String getParallelLoopExecutorClass() {
if (StrUtil.isBlank(parallelLoopExecutorClass)) {
return "com.yomahub.liteflow.thread.LiteFlowDefaultParallelLoopExecutorBuilder";
@@ -443,6 +468,8 @@ public class LiteflowConfig {
return parallelLoopExecutorClass;
}
}
@Deprecated
public void setParallelLoopExecutorClass(String parallelLoopExecutorClass) {
this.parallelLoopExecutorClass = parallelLoopExecutorClass;
}
@@ -508,4 +535,40 @@ public class LiteflowConfig {
public void setScriptSetting(Map<String, String> scriptSetting) {
this.scriptSetting = scriptSetting;
}
public Integer getGlobalThreadPoolSize() {
if (ObjectUtil.isNull(globalThreadPoolSize)) {
return 16;
} else {
return globalThreadPoolSize;
}
}
public void setGlobalThreadPoolSize(Integer globalThreadPoolSize) {
this.globalThreadPoolSize = globalThreadPoolSize;
}
public Integer getGlobalThreadPoolQueueSize() {
if (ObjectUtil.isNull(globalThreadPoolQueueSize)) {
return 512;
} else {
return globalThreadPoolQueueSize;
}
}
public void setGlobalThreadPoolQueueSize(Integer globalThreadPoolQueueSize) {
this.globalThreadPoolQueueSize = globalThreadPoolQueueSize;
}
public String getGlobalThreadPoolExecutorClass() {
if (StrUtil.isBlank(globalThreadPoolExecutorClass)) {
return "com.yomahub.liteflow.thread.LiteFlowDefaultGlobalExecutorBuilder";
} else {
return globalThreadPoolExecutorClass;
}
}
public void setGlobalThreadPoolExecutorClass(String globalThreadPoolExecutorClass) {
this.globalThreadPoolExecutorClass = globalThreadPoolExecutorClass;
}
}

View File

@@ -22,6 +22,7 @@ import com.yomahub.liteflow.slot.DataBus;
import com.yomahub.liteflow.spi.holder.ContextAwareHolder;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
@@ -91,7 +92,8 @@ public class ExecutorHelper {
// 构建默认when线程池
public ExecutorService buildWhenExecutor() {
LiteflowConfig liteflowConfig = LiteflowConfigGetter.get();
return buildWhenExecutor(liteflowConfig.getThreadExecutorClass());
return buildWhenExecutor(Optional.ofNullable(liteflowConfig.getGlobalThreadPoolExecutorClass())
.orElse(liteflowConfig.getThreadExecutorClass()));
}
// 构建when线程池 - 支持多个when公用一个线程池
@@ -105,7 +107,9 @@ public class ExecutorHelper {
// 构建when线程池 - clazz和condition的hash值共同作为缓存key
public ExecutorService buildWhenExecutorWithHash(String conditionHash) {
LiteflowConfig liteflowConfig = LiteflowConfigGetter.get();
return buildWhenExecutorWithHash(liteflowConfig.getThreadExecutorClass(), conditionHash);
return buildWhenExecutorWithHash(Optional.ofNullable(liteflowConfig.getThreadExecutorClass())
.orElse(liteflowConfig.getGlobalThreadPoolExecutorClass()),
conditionHash);
}
// 构建when线程池 - clazz和condition的hash值共同作为缓存key
@@ -146,7 +150,8 @@ public class ExecutorHelper {
String.valueOf(chain.hashCode()));
} else {
//全局线程池
parallelExecutor = getExecutorService(liteflowConfig.getParallelLoopExecutorClass());
parallelExecutor = getExecutorService(Optional.ofNullable(liteflowConfig.getParallelLoopExecutorClass())
.orElse(liteflowConfig.getGlobalThreadPoolExecutorClass()));
}
return parallelExecutor;
}

View File

@@ -0,0 +1,27 @@
package com.yomahub.liteflow.thread;
import cn.hutool.core.util.ObjectUtil;
import com.yomahub.liteflow.property.LiteflowConfig;
import com.yomahub.liteflow.property.LiteflowConfigGetter;
import java.util.concurrent.ExecutorService;
/**
* LiteFlow默认的when线程池+异步多线程执行器实现
*
* @author jason
*/
public class LiteFlowDefaultGlobalExecutorBuilder implements ExecutorBuilder {
@Override
public ExecutorService buildExecutor() {
LiteflowConfig liteflowConfig = LiteflowConfigGetter.get();
// 只有在非spring的场景下liteflowConfig才会为null
if (ObjectUtil.isNull(liteflowConfig)) {
liteflowConfig = new LiteflowConfig();
}
return buildDefaultExecutor(liteflowConfig.getGlobalThreadPoolSize(), liteflowConfig.getGlobalThreadPoolSize(),
liteflowConfig.getGlobalThreadPoolQueueSize(), "global-thread-");
}
}

View File

@@ -51,6 +51,9 @@ public class LiteflowAutoConfiguration {
liteflowConfig.setParallelQueueLimit(property.getParallelQueueLimit());
liteflowConfig.setParallelLoopExecutorClass(property.getParallelLoopExecutorClass());
liteflowConfig.setFallbackCmpEnable(property.isFallbackCmpEnable());
liteflowConfig.setGlobalThreadPoolExecutorClass(property.getGlobalThreadPoolExecutorClass());
liteflowConfig.setGlobalThreadPoolSize(property.getGlobalThreadPoolSize());
liteflowConfig.setGlobalThreadPoolQueueSize(property.getGlobalThreadPoolQueueSize());
return liteflowConfig;
}

View File

@@ -1,5 +1,7 @@
package com.yomahub.liteflow.solon.config;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import com.yomahub.liteflow.enums.ParseModeEnum;
import org.noear.solon.annotation.Configuration;
import org.noear.solon.annotation.Inject;
@@ -83,6 +85,15 @@ public class LiteflowProperty {
// 是否启用组件降级
private Boolean fallbackCmpEnable;
//全局线程池所用class路径(when+异步循环)
private String globalThreadPoolExecutorClass;
//全局线程池最大线程数(when+异步循环)
private Integer globalThreadPoolSize;
//全局线程池最大队列数(when+异步循环)
private Integer globalThreadPoolQueueSize;
public boolean isEnable() {
return enable;
}
@@ -267,4 +278,40 @@ public class LiteflowProperty {
public Boolean getFallbackCmpEnable() {
return fallbackCmpEnable;
}
public Integer getGlobalThreadPoolSize() {
if (ObjectUtil.isNull(globalThreadPoolSize)) {
return 16;
} else {
return globalThreadPoolSize;
}
}
public void setGlobalThreadPoolSize(Integer globalThreadPoolSize) {
this.globalThreadPoolSize = globalThreadPoolSize;
}
public Integer getGlobalThreadPoolQueueSize() {
if (ObjectUtil.isNull(globalThreadPoolQueueSize)) {
return 512;
} else {
return globalThreadPoolQueueSize;
}
}
public void setGlobalThreadPoolQueueSize(Integer globalThreadPoolQueueSize) {
this.globalThreadPoolQueueSize = globalThreadPoolQueueSize;
}
public String getGlobalThreadPoolExecutorClass() {
if (StrUtil.isBlank(globalThreadPoolExecutorClass)) {
return "com.yomahub.liteflow.thread.LiteFlowDefaultGlobalExecutorBuilder";
} else {
return globalThreadPoolExecutorClass;
}
}
public void setGlobalThreadPoolExecutorClass(String globalThreadPoolExecutorClass) {
this.globalThreadPoolExecutorClass = globalThreadPoolExecutorClass;
}
}

View File

@@ -1,5 +1,7 @@
package com.yomahub.liteflow.springboot;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import com.yomahub.liteflow.enums.ParseModeEnum;
import org.springframework.boot.context.properties.ConfigurationProperties;
@@ -36,6 +38,7 @@ public class LiteflowProperty {
private String mainExecutorClass;
// 并行线程执行器class路径
@Deprecated
private String threadExecutorClass;
// 异步线程最大等待描述
@@ -47,9 +50,11 @@ public class LiteflowProperty {
private TimeUnit whenMaxWaitTimeUnit;
// 异步线程池最大线程数
@Deprecated
private int whenMaxWorkers;
// 异步线程池最大队列数量
@Deprecated
private int whenQueueLimit;
// 异步线程池是否隔离
@@ -80,13 +85,15 @@ public class LiteflowProperty {
// 规则文件/脚本文件变更监听
private boolean enableMonitorFile;
@Deprecated
private String parallelLoopExecutorClass;
//使用默认并行循环线程池时,最大线程数
@Deprecated
private int parallelMaxWorkers;
//使用默认并行循环线程池时,最大队列数
@Deprecated
private int parallelQueueLimit;
// 是否启用组件降级
@@ -101,6 +108,15 @@ public class LiteflowProperty {
//脚本特殊设置选项
private Map<String, String> scriptSetting;
//全局线程池所用class路径(when+异步循环)
private String globalThreadPoolExecutorClass;
//全局线程池最大线程数(when+异步循环)
private Integer globalThreadPoolSize;
//全局线程池最大队列数(when+异步循环)
private Integer globalThreadPoolQueueSize;
public boolean isEnableMonitorFile() {
return enableMonitorFile;
}
@@ -143,18 +159,22 @@ public class LiteflowProperty {
this.whenMaxWaitSeconds = whenMaxWaitSeconds;
}
@Deprecated
public int getWhenMaxWorkers() {
return whenMaxWorkers;
}
@Deprecated
public void setWhenMaxWorkers(int whenMaxWorkers) {
this.whenMaxWorkers = whenMaxWorkers;
}
@Deprecated
public int getWhenQueueLimit() {
return whenQueueLimit;
}
@Deprecated
public void setWhenQueueLimit(int whenQueueLimit) {
this.whenQueueLimit = whenQueueLimit;
}
@@ -193,10 +213,12 @@ public class LiteflowProperty {
this.printBanner = printBanner;
}
@Deprecated
public String getThreadExecutorClass() {
return threadExecutorClass;
}
@Deprecated
public void setThreadExecutorClass(String threadExecutorClass) {
this.threadExecutorClass = threadExecutorClass;
}
@@ -273,26 +295,32 @@ public class LiteflowProperty {
this.whenMaxWaitTimeUnit = whenMaxWaitTimeUnit;
}
@Deprecated
public String getParallelLoopExecutorClass() {
return parallelLoopExecutorClass;
}
@Deprecated
public void setParallelLoopExecutorClass(String parallelLoopExecutorClass) {
this.parallelLoopExecutorClass = parallelLoopExecutorClass;
}
@Deprecated
public int getParallelMaxWorkers() {
return parallelMaxWorkers;
}
@Deprecated
public void setParallelMaxWorkers(int parallelMaxWorkers) {
this.parallelMaxWorkers = parallelMaxWorkers;
}
@Deprecated
public int getParallelQueueLimit() {
return parallelQueueLimit;
}
@Deprecated
public void setParallelQueueLimit(int parallelQueueLimit) {
this.parallelQueueLimit = parallelQueueLimit;
}
@@ -336,4 +364,40 @@ public class LiteflowProperty {
public void setScriptSetting(Map<String, String> scriptSetting) {
this.scriptSetting = scriptSetting;
}
public Integer getGlobalThreadPoolSize() {
if (ObjectUtil.isNull(globalThreadPoolSize)) {
return 16;
} else {
return globalThreadPoolSize;
}
}
public void setGlobalThreadPoolSize(Integer globalThreadPoolSize) {
this.globalThreadPoolSize = globalThreadPoolSize;
}
public Integer getGlobalThreadPoolQueueSize() {
if (ObjectUtil.isNull(globalThreadPoolQueueSize)) {
return 512;
} else {
return globalThreadPoolQueueSize;
}
}
public void setGlobalThreadPoolQueueSize(Integer globalThreadPoolQueueSize) {
this.globalThreadPoolQueueSize = globalThreadPoolQueueSize;
}
public String getGlobalThreadPoolExecutorClass() {
if (StrUtil.isBlank(globalThreadPoolExecutorClass)) {
return "com.yomahub.liteflow.thread.LiteFlowDefaultGlobalExecutorBuilder";
} else {
return globalThreadPoolExecutorClass;
}
}
public void setGlobalThreadPoolExecutorClass(String globalThreadPoolExecutorClass) {
this.globalThreadPoolExecutorClass = globalThreadPoolExecutorClass;
}
}

View File

@@ -54,6 +54,9 @@ public class LiteflowPropertyAutoConfiguration {
liteflowConfig.setDelay(liteflowMonitorProperty.getDelay());
liteflowConfig.setPeriod(liteflowMonitorProperty.getPeriod());
liteflowConfig.setScriptSetting(property.getScriptSetting());
liteflowConfig.setGlobalThreadPoolExecutorClass(property.getGlobalThreadPoolExecutorClass());
liteflowConfig.setGlobalThreadPoolQueueSize(property.getGlobalThreadPoolQueueSize());
liteflowConfig.setGlobalThreadPoolSize(property.getGlobalThreadPoolSize());
return liteflowConfig;
}

View File

@@ -220,6 +220,27 @@
"type": "java.util.Map",
"description": "script special settings.",
"sourceType": "com.yomahub.liteflow.springboot.LiteflowProperty"
},
{
"name": "liteflow.global-thread-pool-size",
"type": "java.lang.Integer",
"description": "Set the global chain thread pool worker max-size.",
"sourceType": "com.yomahub.liteflow.springboot.LiteflowProperty",
"defaultValue": 16
},
{
"name": "liteflow.global-thread-pool-queue-size",
"type": "java.lang.Integer",
"description": "Set the global chain thread pool queue max-size ",
"sourceType": "com.yomahub.liteflow.springboot.LiteflowProperty",
"defaultValue": 512
},
{
"name": "liteflow.global-thread-pool-executor-class",
"type": "java.lang.String",
"description": "Custom the global chain thread pool implement for global chain executor.",
"sourceType": "com.yomahub.liteflow.springboot.LiteflowProperty",
"defaultValue": "com.yomahub.liteflow.thread.LiteFlowDefaultGlobalExecutorBuilder"
}
]
}

View File

@@ -26,4 +26,7 @@ liteflow.monitor.queue-limit=200
liteflow.monitor.delay=300000
liteflow.monitor.period=300000
liteflow.enable-monitor-file=false
liteflow.global-thread-pool-size=16
liteflow.global-thread-pool-queue-size=512
liteflow.global-thread-pool-executor-class=comcom.yomahub.liteflow.thread.LiteFlowDefaultGlobalExecutorBuilder

View File

@@ -0,0 +1,79 @@
package com.yomahub.liteflow.test.chainThreadPool;
import cn.hutool.core.collection.ListUtil;
import com.yomahub.liteflow.core.FlowExecutor;
import com.yomahub.liteflow.flow.LiteflowResponse;
import com.yomahub.liteflow.slot.DefaultContext;
import com.yomahub.liteflow.test.BaseTest;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.test.context.TestPropertySource;
import javax.annotation.Resource;
import java.util.List;
/**
* springboot环境下chain线程池隔离测试
*/
@TestPropertySource(value = "classpath:/chainThreadPool/application.properties")
@SpringBootTest(classes = ChainThreadPoolELSpringbootTest.class)
@EnableAutoConfiguration
@ComponentScan({"com.yomahub.liteflow.test.chainThreadPool.cmp"})
public class ChainThreadPoolELSpringbootTest extends BaseTest {
private final Logger log = LoggerFactory.getLogger(this.getClass());
@Resource
private FlowExecutor flowExecutor;
/**
* 测试WHEN上全局线程池和chain线程池隔离-优先以chain上为准
*/
@Test
public void testChainThreadPool() {
LiteflowResponse response1 = flowExecutor.execute2Resp("chain1", "arg");
DefaultContext context = response1.getFirstContextBean();
Assertions.assertTrue(response1.isSuccess());
Assertions.assertTrue(context.getData("threadName").toString().startsWith("customer-chain-thead"));
}
/**
* 测试FOR上全局线程池和chain线程池隔离-优先以chain上为准
*/
@Test
public void testChainThreadPool2() {
LiteflowResponse response1 = flowExecutor.execute2Resp("chain2", "arg");
DefaultContext context = response1.getFirstContextBean();
Assertions.assertTrue(response1.isSuccess());
Assertions.assertTrue(context.getData("threadName").toString().startsWith("customer-chain-thead"));
}
/**
* 测试WHILE上全局线程池和chain线程池隔离-优先以chain上为准
*/
@Test
public void testChainThreadPool3() {
LiteflowResponse response1 = flowExecutor.execute2Resp("chain3", "arg");
DefaultContext context = response1.getFirstContextBean();
Assertions.assertTrue(response1.isSuccess());
Assertions.assertTrue(context.getData("threadName").toString().startsWith("customer-chain-thead"));
}
/**
* 测试ITERATOR上全局线程池和chain线程池隔离-优先以chain上为准
*/
@Test
public void testChainThreadPool4() {
List<String> list = ListUtil.toList("1", "2", "3", "4", "5");
LiteflowResponse response1 = flowExecutor.execute2Resp("chain4", list);
DefaultContext context = response1.getFirstContextBean();
Assertions.assertTrue(response1.isSuccess());
Assertions.assertTrue(context.getData("threadName").toString().startsWith("customer-chain-thead"));
}
}

View File

@@ -21,10 +21,10 @@ import java.util.List;
* springboot环境下chain线程池隔离测试
*/
@TestPropertySource(value = "classpath:/chainThreadPool/application2.properties")
@SpringBootTest(classes = CustomThreadPoolELSpringbootTest.class)
@SpringBootTest(classes = ConditionThreadPoolELSpringbootTest.class)
@EnableAutoConfiguration
@ComponentScan({"com.yomahub.liteflow.test.chainThreadPool.cmp"})
public class CustomThreadPoolELSpringbootTest extends BaseTest {
public class ConditionThreadPoolELSpringbootTest extends BaseTest {
private final Logger log = LoggerFactory.getLogger(this.getClass());
@@ -35,7 +35,7 @@ public class CustomThreadPoolELSpringbootTest extends BaseTest {
* 测试WEHN上condition线程池和chain线程池隔离-优先以WHEN上为准
*/
@Test
public void testCustomChainThreadPool() {
public void testConditionThreadPool() {
LiteflowResponse response1 = flowExecutor.execute2Resp("chain1", "arg");
DefaultContext context = response1.getFirstContextBean();
Assertions.assertTrue(response1.isSuccess());
@@ -46,7 +46,7 @@ public class CustomThreadPoolELSpringbootTest extends BaseTest {
* 测试FOR上condition线程池和chain线程池隔离-优先以FOR上为准
*/
@Test
public void testCustomChainThreadPool2() {
public void testConditionThreadPool2() {
LiteflowResponse response1 = flowExecutor.execute2Resp("chain2", "arg");
DefaultContext context = response1.getFirstContextBean();
Assertions.assertTrue(response1.isSuccess());
@@ -57,7 +57,7 @@ public class CustomThreadPoolELSpringbootTest extends BaseTest {
* 测试WHILE上condition线程池和chain线程池隔离-优先以WHILE上为准
*/
@Test
public void testCustomChainThreadPool3() {
public void testConditionThreadPool3() {
LiteflowResponse response1 = flowExecutor.execute2Resp("chain3", "arg");
DefaultContext context = response1.getFirstContextBean();
Assertions.assertTrue(response1.isSuccess());
@@ -68,7 +68,7 @@ public class CustomThreadPoolELSpringbootTest extends BaseTest {
* 测试ITERATOR上condition线程池和chain线程池隔离-优先以ITERATOR上为准
*/
@Test
public void testCustomChainThreadPool4() {
public void testConditionThreadPool4() {
List<String> list = ListUtil.toList("1", "2", "3", "4", "5");
LiteflowResponse response1 = flowExecutor.execute2Resp("chain4", list);
DefaultContext context = response1.getFirstContextBean();

View File

@@ -0,0 +1,23 @@
package com.yomahub.liteflow.test.chainThreadPool;
import cn.hutool.core.util.ObjectUtil;
import com.yomahub.liteflow.property.LiteflowConfig;
import com.yomahub.liteflow.property.LiteflowConfigGetter;
import com.yomahub.liteflow.thread.ExecutorBuilder;
import java.util.concurrent.ExecutorService;
public class CustomGlobalThreadExecutor implements ExecutorBuilder {
@Override
public ExecutorService buildExecutor() {
LiteflowConfig liteflowConfig = LiteflowConfigGetter.get();
// 只有在非spring的场景下liteflowConfig才会为null
if (ObjectUtil.isNull(liteflowConfig)) {
liteflowConfig = new LiteflowConfig();
}
return buildDefaultExecutor(16, 16,
512, "customer-global-thead");
}
}

View File

@@ -1,4 +1,4 @@
package com.yomahub.liteflow.test.chainThreadPool;
package com.yomahub.liteflow.test.GlobalThreadPool;
import cn.hutool.core.collection.ListUtil;
import com.yomahub.liteflow.core.FlowExecutor;
@@ -18,9 +18,9 @@ import javax.annotation.Resource;
import java.util.List;
/**
* springboot环境下chain线程池隔离测试
* springboot环境下Global线程池隔离测试
*/
@TestPropertySource(value = "classpath:/chainThreadPool/application.properties")
@TestPropertySource(value = "classpath:/chainThreadPool/application3.properties")
@SpringBootTest(classes = GlobalThreadPoolELSpringbootTest.class)
@EnableAutoConfiguration
@ComponentScan({"com.yomahub.liteflow.test.chainThreadPool.cmp"})
@@ -32,48 +32,48 @@ public class GlobalThreadPoolELSpringbootTest extends BaseTest {
private FlowExecutor flowExecutor;
/**
* 测试WHEN上全局线程池和chain线程池隔离-优先以chain上为准
* 测试WHEN上全局线程池
*/
@Test
public void testGlobalChainThreadPool() {
public void testGlobalThreadPool() {
LiteflowResponse response1 = flowExecutor.execute2Resp("chain1", "arg");
DefaultContext context = response1.getFirstContextBean();
Assertions.assertTrue(response1.isSuccess());
Assertions.assertTrue(context.getData("threadName").toString().startsWith("customer-chain-thead"));
Assertions.assertTrue(context.getData("threadName").toString().startsWith("customer-Global-thead"));
}
/**
* 测试FOR上全局线程池和chain线程池隔离-优先以chain上为准
* 测试FOR上全局线程池
*/
@Test
public void testGlobalChainThreadPool2() {
public void testGlobalThreadPool2() {
LiteflowResponse response1 = flowExecutor.execute2Resp("chain2", "arg");
DefaultContext context = response1.getFirstContextBean();
Assertions.assertTrue(response1.isSuccess());
Assertions.assertTrue(context.getData("threadName").toString().startsWith("customer-chain-thead"));
Assertions.assertTrue(context.getData("threadName").toString().startsWith("customer-Global-thead"));
}
/**
* 测试WHILE上全局线程池和chain线程池隔离-优先以chain上为准
* 测试WHILE上全局线程池
*/
@Test
public void testGlobalChainThreadPool3() {
public void testGlobalThreadPool3() {
LiteflowResponse response1 = flowExecutor.execute2Resp("chain3", "arg");
DefaultContext context = response1.getFirstContextBean();
Assertions.assertTrue(response1.isSuccess());
Assertions.assertTrue(context.getData("threadName").toString().startsWith("customer-chain-thead"));
Assertions.assertTrue(context.getData("threadName").toString().startsWith("customer-Global-thead"));
}
/**
* 测试ITERATOR上全局线程池和chain线程池隔离-优先以chain上为准
* 测试ITERATOR上全局线程池
*/
@Test
public void testGlobalChainThreadPool4() {
public void testGlobalThreadPool4() {
List<String> list = ListUtil.toList("1", "2", "3", "4", "5");
LiteflowResponse response1 = flowExecutor.execute2Resp("chain4", list);
DefaultContext context = response1.getFirstContextBean();
Assertions.assertTrue(response1.isSuccess());
Assertions.assertTrue(context.getData("threadName").toString().startsWith("customer-chain-thead"));
Assertions.assertTrue(context.getData("threadName").toString().startsWith("customer-Global-thead"));
}
}

View File

@@ -0,0 +1,4 @@
liteflow.rule-source=chainThreadPool/flow3.el.xml
liteflow.global-thread-pool-size=16
liteflow.global-thread-pool-queue-size=512
liteflow.global-thread-pool-executor-class=com.yomahub.liteflow.test.chainThreadPool.CustomGlobalThreadExecutor

View File

@@ -0,0 +1,20 @@
<?xml version="1.0" encoding="UTF-8"?>
<flow>
<chain name="chain1">
WHEN(a,b);
</chain>
<chain name="chain2">
FOR(5).parallel(true).DO(THEN(a,f
)
);
</chain>
<chain name="chain3">
WHILE(z).parallel(true).DO(THEN(w,d));
</chain>
<chain name="chain4">
ITERATOR(it).parallel(true).DO(THEN(a,i));
</chain>
</flow>