mirror of
https://gitee.com/dromara/liteFlow.git
synced 2026-06-10 19:26:54 +08:00
优化代码,避免过多的类属性,会造成测试用例的错误,因为测试用例jvm只启动一遍
This commit is contained in:
@@ -9,6 +9,7 @@ package com.yomahub.liteflow.entity.data;
|
||||
|
||||
import cn.hutool.core.util.ObjectUtil;
|
||||
import com.yomahub.liteflow.property.LiteflowConfig;
|
||||
import com.yomahub.liteflow.property.LiteflowConfigGetter;
|
||||
import com.yomahub.liteflow.util.SpringAware;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
@@ -34,16 +35,9 @@ public class DataBus {
|
||||
private static ConcurrentLinkedQueue<Integer> QUEUE;
|
||||
|
||||
static {
|
||||
LiteflowConfig liteflowConfig = SpringAware.getBean(LiteflowConfig.class);
|
||||
|
||||
if (ObjectUtil.isNull(liteflowConfig)){
|
||||
//liteflowConfig有自己的默认值
|
||||
liteflowConfig = new LiteflowConfig();
|
||||
}
|
||||
LiteflowConfig liteflowConfig = LiteflowConfigGetter.get();
|
||||
int slotSize = liteflowConfig.getSlotSize();
|
||||
|
||||
SLOTS = new AtomicReferenceArray<>(slotSize);
|
||||
|
||||
QUEUE = IntStream.range(0, slotSize - 1).boxed().collect(Collectors.toCollection(ConcurrentLinkedQueue::new));
|
||||
}
|
||||
|
||||
|
||||
@@ -18,6 +18,7 @@ import com.yomahub.liteflow.exception.ConfigErrorException;
|
||||
import com.yomahub.liteflow.exception.FlowSystemException;
|
||||
import com.yomahub.liteflow.exception.WhenExecuteException;
|
||||
import com.yomahub.liteflow.property.LiteflowConfig;
|
||||
import com.yomahub.liteflow.property.LiteflowConfigGetter;
|
||||
import com.yomahub.liteflow.util.ExecutorHelper;
|
||||
import com.yomahub.liteflow.util.SpringAware;
|
||||
import org.slf4j.Logger;
|
||||
@@ -39,28 +40,6 @@ public class Chain implements Executable {
|
||||
|
||||
private List<Condition> conditionList;
|
||||
|
||||
private static ExecutorService parallelExecutor;
|
||||
|
||||
private static LiteflowConfig liteflowConfig;
|
||||
|
||||
static {
|
||||
//这里liteflowConfig不可能为null
|
||||
//如果在springboot环境,由于自动装配,所以不可能为null
|
||||
//在spring环境,如果xml没配置,在FlowExecutor的init时候就已经报错了
|
||||
liteflowConfig = SpringAware.getBean(LiteflowConfig.class);
|
||||
|
||||
//这里为了非spring环境下的严谨,还是判断
|
||||
if (ObjectUtil.isNull(liteflowConfig)){
|
||||
//liteflowConfig有自己的默认值
|
||||
liteflowConfig = new LiteflowConfig();
|
||||
}
|
||||
|
||||
parallelExecutor = SpringAware.getBean(ExecutorService.class);
|
||||
if (ObjectUtil.isNull(parallelExecutor)){
|
||||
parallelExecutor = ExecutorHelper.buildExecutor(liteflowConfig.getWhenMaxWorkers(), liteflowConfig.getWhenQueueLimit(), "liteflow-when-thread", false);
|
||||
}
|
||||
}
|
||||
|
||||
public Chain(String chainName, List<Condition> conditionList) {
|
||||
this.chainName = chainName;
|
||||
this.conditionList = conditionList;
|
||||
@@ -89,6 +68,8 @@ public class Chain implements Executable {
|
||||
throw new FlowSystemException("no conditionList in this chain[" + chainName + "]");
|
||||
}
|
||||
|
||||
LiteflowConfig liteflowConfig = LiteflowConfigGetter.get();
|
||||
|
||||
Slot slot = DataBus.getSlot(slotIndex);
|
||||
|
||||
//循环chain里包含的condition,每一个condition有可能是then,也有可能是when
|
||||
@@ -133,6 +114,10 @@ public class Chain implements Executable {
|
||||
final CountDownLatch latch = new CountDownLatch(condition.getNodeList().size());
|
||||
final List<Future<Boolean>> futures = new ArrayList<>(condition.getNodeList().size());
|
||||
|
||||
//此方法其实只会初始化一次Executor,不会每次都会初始化。Executor是唯一的
|
||||
ExecutorService parallelExecutor = ExecutorHelper.loadInstance().buildExecutor();
|
||||
|
||||
LiteflowConfig liteflowConfig = LiteflowConfigGetter.get();
|
||||
|
||||
for (int i = 0; i < condition.getNodeList().size(); i++) {
|
||||
futures.add(parallelExecutor.submit(
|
||||
|
||||
@@ -0,0 +1,22 @@
|
||||
package com.yomahub.liteflow.property;
|
||||
|
||||
import cn.hutool.core.util.ObjectUtil;
|
||||
import com.yomahub.liteflow.util.SpringAware;
|
||||
|
||||
/**
|
||||
* liteflow的配置获取器
|
||||
*/
|
||||
public class LiteflowConfigGetter {
|
||||
|
||||
public static LiteflowConfig get(){
|
||||
LiteflowConfig liteflowConfig = SpringAware.getBean(LiteflowConfig.class);
|
||||
//这里liteflowConfig不可能为null
|
||||
//如果在springboot环境,由于自动装配,所以不可能为null
|
||||
//在spring环境,如果xml没配置,在FlowExecutor的init时候就已经报错了
|
||||
//只有在非spring环境下,是为null
|
||||
if (ObjectUtil.isNull(liteflowConfig)){
|
||||
liteflowConfig = new LiteflowConfig();
|
||||
}
|
||||
return liteflowConfig;
|
||||
}
|
||||
}
|
||||
@@ -7,6 +7,8 @@
|
||||
*/
|
||||
package com.yomahub.liteflow.util;
|
||||
|
||||
import cn.hutool.core.util.ObjectUtil;
|
||||
import com.yomahub.liteflow.property.LiteflowConfig;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@@ -20,18 +22,29 @@ import java.util.concurrent.atomic.AtomicLong;
|
||||
*/
|
||||
public class ExecutorHelper {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(ExecutorHelper.class);
|
||||
private final Logger LOG = LoggerFactory.getLogger(ExecutorHelper.class);
|
||||
|
||||
private static ExecutorHelper executorHelper;
|
||||
|
||||
private ExecutorService executorService;
|
||||
|
||||
private ExecutorHelper() {
|
||||
}
|
||||
|
||||
public static ExecutorHelper loadInstance(){
|
||||
if (ObjectUtil.isNull(executorHelper)){
|
||||
executorHelper = new ExecutorHelper();
|
||||
}
|
||||
return executorHelper;
|
||||
}
|
||||
|
||||
/**
|
||||
* 使用默认的等待时间1分钟,来关闭目标线程组。
|
||||
* <p>
|
||||
*
|
||||
* @param pool 需要关闭的线程组.
|
||||
*/
|
||||
public static void shutdownAwaitTermination(ExecutorService pool) {
|
||||
public void shutdownAwaitTermination(ExecutorService pool) {
|
||||
shutdownAwaitTermination(pool, 60L);
|
||||
}
|
||||
|
||||
@@ -42,7 +55,7 @@ public class ExecutorHelper {
|
||||
* @param pool 需要关闭的管理者
|
||||
* @param timeout 等待时间
|
||||
*/
|
||||
public static void shutdownAwaitTermination(ExecutorService pool,
|
||||
public void shutdownAwaitTermination(ExecutorService pool,
|
||||
long timeout) {
|
||||
pool.shutdown();
|
||||
try {
|
||||
@@ -65,7 +78,7 @@ public class ExecutorHelper {
|
||||
* @param name 名称.
|
||||
* @return 线程工厂实例.
|
||||
*/
|
||||
public static ThreadFactory buildExecutorFactory(final String name) {
|
||||
public ThreadFactory buildExecutorFactory(final String name) {
|
||||
return buildExecutorFactory(name, false);
|
||||
}
|
||||
|
||||
@@ -76,7 +89,7 @@ public class ExecutorHelper {
|
||||
* @param daemon 是否为后台线程.
|
||||
* @return 线程工厂实例.
|
||||
*/
|
||||
public static ThreadFactory buildExecutorFactory(final String name, final boolean daemon) {
|
||||
public ThreadFactory buildExecutorFactory(final String name, final boolean daemon) {
|
||||
return new ThreadFactory() {
|
||||
|
||||
private final AtomicLong number = new AtomicLong();
|
||||
@@ -92,12 +105,22 @@ public class ExecutorHelper {
|
||||
};
|
||||
}
|
||||
|
||||
public static ExecutorService buildExecutor(int worker, int queue, String namePrefix, boolean daemon) {
|
||||
return new ThreadPoolExecutor(worker, worker,
|
||||
0L, TimeUnit.MILLISECONDS,
|
||||
new ArrayBlockingQueue<>(queue),
|
||||
buildExecutorFactory(namePrefix, daemon),
|
||||
new ThreadPoolExecutor.AbortPolicy()
|
||||
);
|
||||
public ExecutorService buildExecutor() {
|
||||
if (ObjectUtil.isNull(executorService)){
|
||||
LiteflowConfig liteflowConfig = SpringAware.getBean(LiteflowConfig.class);
|
||||
//只有在非spring的场景下liteflowConfig才会为null
|
||||
if (ObjectUtil.isNull(liteflowConfig)){
|
||||
liteflowConfig = new LiteflowConfig();
|
||||
}
|
||||
|
||||
|
||||
executorService = new ThreadPoolExecutor(liteflowConfig.getWhenMaxWorkers(),
|
||||
liteflowConfig.getWhenMaxWorkers(),
|
||||
0L, TimeUnit.MILLISECONDS,
|
||||
new ArrayBlockingQueue<>(liteflowConfig.getWhenQueueLimit()),
|
||||
buildExecutorFactory("liteflow-when-thead", false),
|
||||
new ThreadPoolExecutor.AbortPolicy());
|
||||
}
|
||||
return executorService;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -20,7 +20,7 @@ public class LiteFlowExecutorPoolShutdown {
|
||||
ExecutorService executorService = SpringAware.getBean("whenExecutors");
|
||||
|
||||
LOG.info("Start closing the liteflow-when-calls...");
|
||||
ExecutorHelper.shutdownAwaitTermination(executorService);
|
||||
ExecutorHelper.loadInstance().shutdownAwaitTermination(executorService);
|
||||
LOG.info("Succeed closing the liteflow-when-calls ok...");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -24,9 +24,7 @@ public class LiteflowExecutorAutoConfiguration {
|
||||
|
||||
@Bean("whenExecutors")
|
||||
public ExecutorService executorService(LiteflowConfig liteflowConfig) {
|
||||
Integer useWorker = liteflowConfig.getWhenMaxWorkers();
|
||||
Integer useQueue = liteflowConfig.getWhenQueueLimit();
|
||||
return ExecutorHelper.buildExecutor(useWorker, useQueue, "liteflow-when-thead", false);
|
||||
return ExecutorHelper.loadInstance().buildExecutor();
|
||||
}
|
||||
|
||||
@Bean
|
||||
|
||||
Reference in New Issue
Block a user