1. 添加springboot下when-thread的测试用例

2. 简单抽取默认构建线程池的方法
This commit is contained in:
daiqi
2022-01-21 23:16:18 +08:00
parent ff9275bbe1
commit a7f53be766
12 changed files with 164 additions and 53 deletions

View File

@@ -1,13 +1,48 @@
package com.yomahub.liteflow.thread;
import com.alibaba.ttl.threadpool.TtlExecutors;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
/**
* 并行多线程执行器构造器接口
*
* @author Bryan.Zhang
* @since 2.6.6
*/
public interface ExecutorBuilder {
ExecutorService buildExecutor();
/**
* <p>
* 构建默认的线程池对象
* </p>
* @author sikadai
* @date 2022/1/21 23:07
* @param corePoolSize : 核心线程池数量
* @param maximumPoolSize : 最大线程池数量
* @param queueCapacity : 队列的容量
* @param threadName : 线程吃名称
* @return java.util.concurrent.ExecutorService
*/
default ExecutorService buildDefaultExecutor(int corePoolSize, int maximumPoolSize, int queueCapacity, String threadName) {
return TtlExecutors.getTtlExecutorService(new ThreadPoolExecutor(corePoolSize,
maximumPoolSize,
0L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(queueCapacity),
new ThreadFactory() {
private final AtomicLong number = new AtomicLong();
@Override
public Thread newThread(Runnable r) {
Thread newThread = Executors.defaultThreadFactory().newThread(r);
newThread.setName(threadName + number.getAndIncrement());
newThread.setDaemon(false);
return newThread;
}
},
new ThreadPoolExecutor.AbortPolicy()));
}
}

View File

@@ -34,7 +34,12 @@ public class ExecutorHelper {
private ExecutorService executorService;
private Map<String, ExecutorService> executorServiceMap;
/**
* 此处使用Map缓存线程池信息
* key - 线程池构建者的Class全类名
* value - 线程池对象
* */
private final Map<String, ExecutorService> executorServiceMap;
private ExecutorHelper() {
executorServiceMap = Maps.newConcurrentMap();
@@ -80,6 +85,7 @@ public class ExecutorHelper {
}
}
/** 构建全局默认线程池 */
public ExecutorService buildExecutor() {
if (ObjectUtil.isNull(executorService)) {
LiteflowConfig liteflowConfig = SpringAware.getBean(LiteflowConfig.class);
@@ -89,6 +95,15 @@ public class ExecutorHelper {
return executorService;
}
/**
* <p>
* 构建线程池执行器 - 支持多个when公用一个线程池
* </p>
* @author sikadai
* @date 2022/1/21 23:00
* @param threadExecutorClass : 线程池构建者的Class全类名
* @return java.util.concurrent.ExecutorService
*/
public ExecutorService buildExecutor(String threadExecutorClass) {
try {
if (StrUtil.isBlank(threadExecutorClass)) {
@@ -108,6 +123,16 @@ public class ExecutorHelper {
}
}
/**
* <p>
* 根据线程执行构建者Class类名获取ExecutorBuilder实例
* </p>
*
* @author sikadai
* @date 2022/1/21 23:04
* @param threadExecutorClass
* @return com.yomahub.liteflow.thread.ExecutorBuilder
*/
private ExecutorBuilder getExecutorBuilder(String threadExecutorClass) throws Exception {
return (ExecutorBuilder) Class.forName(threadExecutorClass).newInstance();
}

View File

@@ -21,21 +21,10 @@ public class LiteFlowDefaultExecutorBuilder implements ExecutorBuilder{
if (ObjectUtil.isNull(liteflowConfig)){
liteflowConfig = new LiteflowConfig();
}
return TtlExecutors.getTtlExecutorService(new ThreadPoolExecutor(liteflowConfig.getWhenMaxWorkers(),
return buildDefaultExecutor(
liteflowConfig.getWhenMaxWorkers(),
0L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(liteflowConfig.getWhenQueueLimit()),
new ThreadFactory() {
private final AtomicLong number = new AtomicLong();
@Override
public Thread newThread(Runnable r) {
Thread newThread = Executors.defaultThreadFactory().newThread(r);
newThread.setName("lf-when-thead-" + number.getAndIncrement());
newThread.setDaemon(false);
return newThread;
}
},
new ThreadPoolExecutor.AbortPolicy()));
liteflowConfig.getWhenMaxWorkers(),
liteflowConfig.getWhenQueueLimit(),
"lf-when-thead-");
}
}

View File

@@ -18,21 +18,10 @@ public class CustomThreadExecutor1 implements ExecutorBuilder {
if (ObjectUtil.isNull(liteflowConfig)) {
liteflowConfig = new LiteflowConfig();
}
return TtlExecutors.getTtlExecutorService(new ThreadPoolExecutor(liteflowConfig.getWhenMaxWorkers(),
return buildDefaultExecutor(
liteflowConfig.getWhenMaxWorkers(),
0L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(liteflowConfig.getWhenQueueLimit()),
new ThreadFactory() {
private final AtomicLong number = new AtomicLong();
@Override
public Thread newThread(Runnable r) {
Thread newThread = Executors.defaultThreadFactory().newThread(r);
newThread.setName("Customer-when-1-thead-" + number.getAndIncrement());
newThread.setDaemon(false);
return newThread;
}
},
new ThreadPoolExecutor.AbortPolicy()));
liteflowConfig.getWhenMaxWorkers(),
liteflowConfig.getWhenQueueLimit(),
"customer-when-1-thead-");
}
}

View File

@@ -17,21 +17,10 @@ public class CustomThreadExecutor2 implements ExecutorBuilder {
if (ObjectUtil.isNull(liteflowConfig)) {
liteflowConfig = new LiteflowConfig();
}
return TtlExecutors.getTtlExecutorService(new ThreadPoolExecutor(liteflowConfig.getWhenMaxWorkers(),
return buildDefaultExecutor(
liteflowConfig.getWhenMaxWorkers(),
0L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(liteflowConfig.getWhenQueueLimit()),
new ThreadFactory() {
private final AtomicLong number = new AtomicLong();
@Override
public Thread newThread(Runnable r) {
Thread newThread = Executors.defaultThreadFactory().newThread(r);
newThread.setName("Customer-when-2-thead-" + number.getAndIncrement());
newThread.setDaemon(false);
return newThread;
}
},
new ThreadPoolExecutor.AbortPolicy()));
liteflowConfig.getWhenMaxWorkers(),
liteflowConfig.getWhenQueueLimit(),
"customer-when-2-thead-");
}
}

View File

@@ -0,0 +1,24 @@
package com.yomahub.liteflow.test.customWhenThreadPool;
import cn.hutool.core.util.ObjectUtil;
import com.yomahub.liteflow.property.LiteflowConfig;
import com.yomahub.liteflow.thread.ExecutorBuilder;
import com.yomahub.liteflow.util.SpringAware;
import java.util.concurrent.ExecutorService;
public class CustomThreadExecutor3 implements ExecutorBuilder {
@Override
public ExecutorService buildExecutor() {
LiteflowConfig liteflowConfig = SpringAware.getBean(LiteflowConfig.class);
//只有在非spring的场景下liteflowConfig才会为null
if (ObjectUtil.isNull(liteflowConfig)) {
liteflowConfig = new LiteflowConfig();
}
return buildDefaultExecutor(
liteflowConfig.getWhenMaxWorkers(),
liteflowConfig.getWhenMaxWorkers(),
liteflowConfig.getWhenQueueLimit(),
"customer-when-3-thead-");
}
}

View File

@@ -1,8 +1,12 @@
package com.yomahub.liteflow.test.customWhenThreadPool;
import com.yomahub.liteflow.builder.LiteFlowChainBuilder;
import com.yomahub.liteflow.builder.LiteFlowConditionBuilder;
import com.yomahub.liteflow.builder.LiteFlowNodeBuilder;
import com.yomahub.liteflow.core.FlowExecutor;
import com.yomahub.liteflow.entity.data.DefaultSlot;
import com.yomahub.liteflow.entity.data.LiteflowResponse;
import com.yomahub.liteflow.enums.NodeTypeEnum;
import com.yomahub.liteflow.test.BaseTest;
import org.junit.Assert;
import org.junit.Test;
@@ -19,6 +23,7 @@ import javax.annotation.Resource;
/**
* springboot环境下异步线程超时日志打印测试
*
* @author Bryan.Zhang
* @since 2.6.4
*/
@@ -35,17 +40,47 @@ public class CustomWhenThreadPoolSpringbootTest extends BaseTest {
private FlowExecutor flowExecutor;
@Test
public void testCustomThreadPool() throws Exception{
public void testCustomThreadPool() throws Exception {
LiteflowResponse<DefaultSlot> response = flowExecutor.execute2Resp("chain", "arg");
Assert.assertTrue(response.isSuccess());
Assert.assertTrue(response.getSlot().getData("threadName").toString().startsWith("lf-when-thead"));
LiteflowResponse<DefaultSlot> response1 = flowExecutor.execute2Resp("chain1", "arg");
Assert.assertTrue(response1.isSuccess());
Assert.assertTrue(response1.getSlot().getData("threadName").toString().startsWith("lf-when-thead"));
Assert.assertTrue(response1.getSlot().getData("threadName").toString().startsWith("customer-when-1-thead"));
LiteflowResponse<DefaultSlot> response2 = flowExecutor.execute2Resp("chain2", "arg");
Assert.assertTrue(response2.isSuccess());
Assert.assertTrue(response2.getSlot().getData("threadName").toString().startsWith("Customer-when-1-thead"));
Assert.assertTrue(response2.getSlot().getData("threadName").toString().startsWith("customer-when-2-thead"));
// 使用build模式构建chain测试when条件的多线程
LiteFlowNodeBuilder.createNode().setId("a")
.setName("组件A")
.setType(NodeTypeEnum.COMMON)
.setClazz("com.yomahub.liteflow.test.builder.cmp.ACmp")
.build();
LiteFlowNodeBuilder.createNode().setId("b")
.setName("组件B")
.setType(NodeTypeEnum.COMMON)
.setClazz("com.yomahub.liteflow.test.builder.cmp.BCmp")
.build();
LiteFlowNodeBuilder.createNode().setId("c")
.setName("组件C")
.setType(NodeTypeEnum.COMMON)
.setClazz("com.yomahub.liteflow.test.builder.cmp.CCmp")
.build();
LiteFlowChainBuilder.createChain().setChainName("chain3").setCondition(
LiteFlowConditionBuilder
.createWhenCondition()
.setThreadExecutorClass(CustomThreadExecutor3.class.getName())
.setValue("a,b,c,d")
.build()
).build();
LiteflowResponse<DefaultSlot> response3 = flowExecutor.execute2Resp("chain3", "arg");
Assert.assertTrue(response3.isSuccess());
Assert.assertTrue(response3.getSlot().getData("threadName").toString().startsWith("Customer-when-2-thead"));
Assert.assertTrue(response3.getSlot().getData("threadName").toString().startsWith("customer-when-3-thead"));
}
}

View File

@@ -15,6 +15,7 @@ public class CCmp extends NodeComponent {
@Override
public void process() {
this.getSlot().setData("threadName", Thread.currentThread().getName());
System.out.println("CCmp executed!");
}

View File

@@ -15,6 +15,7 @@ public class DCmp extends NodeComponent {
@Override
public void process() {
this.getSlot().setData("threadName", Thread.currentThread().getName());
System.out.println("DCmp executed!");
}

View File

@@ -15,6 +15,7 @@ public class ECmp extends NodeComponent {
@Override
public void process() {
this.getSlot().setData("threadName", Thread.currentThread().getName());
System.out.println("ECmp executed!");
}

View File

@@ -0,0 +1,22 @@
/**
* <p>Title: liteflow</p>
* <p>Description: 轻量级的组件式流程框架</p>
* @author Bryan.Zhang
* @email weenyc31@163.com
* @Date 2020/4/1
*/
package com.yomahub.liteflow.test.customWhenThreadPool.cmp;
import com.yomahub.liteflow.core.NodeComponent;
import org.springframework.stereotype.Component;
@Component("f")
public class FCmp extends NodeComponent {
@Override
public void process() {
this.getSlot().setData("threadName", Thread.currentThread().getName());
System.out.println("FCmp executed!");
}
}

View File

@@ -1,12 +1,12 @@
<?xml version="1.0" encoding="UTF-8"?>
<flow>
<chain name="chain1">
<chain name="chain">
<when value="a,b"/>
</chain>
<chain name="chain2">
<chain name="chain1">
<when value="c,d" threadExecutorClass="com.yomahub.liteflow.test.customWhenThreadPool.CustomThreadExecutor1"/>
</chain>
<chain name="chain3">
<chain name="chain2">
<when value="e,f" threadExecutorClass="com.yomahub.liteflow.test.customWhenThreadPool.CustomThreadExecutor2"/>
</chain>
</flow>