enhancement #I37QVR WhenCondition时候,并发执行目前会每次新建线程可不可走线程池

修正了打印错误的问题
This commit is contained in:
徐佳
2021-03-26 18:47:05 +08:00
parent 4b186ef4b3
commit e41db2e7b8
40 changed files with 165 additions and 289 deletions

View File

@@ -1,30 +1,28 @@
package com.yomahub.liteflow.springboot;
import org.springframework.beans.factory.annotation.Value;
import com.yomahub.liteflow.property.LiteflowConfig;
import com.yomahub.liteflow.util.ExecutorHelper;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.concurrent.ExecutorService;
import static com.yomahub.liteflow.util.ExecutorHelper.buildExecutor;
/**
* desc :
* name : LiteflowExecutorAutoConfiguration
*
* @author : xujia
* date : 2021/3/24
* @since : 1.8
* 线程池装配类
* 这个装配前置条件是需要LiteflowConfigLiteflowPropertyAutoConfiguration以及SpringAware
* @author justin.xu
*/
@Configuration
@ConditionalOnBean(LiteflowConfig.class)
@AutoConfigureAfter({LiteflowPropertyAutoConfiguration.class})
public class LiteflowExecutorAutoConfiguration {
@Bean("parallelExecutor")
public ExecutorService parallelExecutor(
@Value("${threadPool.parallel.worker:0}") int worker,
@Value("${threadPool.parallel.queue:512}") int queue) {
int useWorker = worker;
int useQueue = queue;
@Bean
public ExecutorService executorService(LiteflowConfig liteflowConfig) {
int useWorker = liteflowConfig.getWhenMaxWorkers();
int useQueue = liteflowConfig.getWhenQueueLimit();
if (useWorker == 0) {
useWorker = Runtime.getRuntime().availableProcessors() + 1;
}
@@ -33,6 +31,6 @@ public class LiteflowExecutorAutoConfiguration {
useQueue = 512;
}
return buildExecutor(useWorker, useQueue, "parallel-executors", false);
return ExecutorHelper.buildExecutor(useWorker, useQueue, "liteflow-when-calls", false);
}
}

View File

@@ -1,26 +1,15 @@
package com.yomahub.liteflow.springboot;
import cn.hutool.core.util.StrUtil;
import com.google.common.collect.Lists;
import com.yomahub.liteflow.core.FlowExecutor;
import com.yomahub.liteflow.entity.data.DataBus;
import com.yomahub.liteflow.monitor.MonitorBus;
import com.yomahub.liteflow.property.LiteflowConfig;
import com.yomahub.liteflow.spring.ComponentScaner;
import com.yomahub.liteflow.util.SpringAware;
import org.apache.commons.lang3.StringUtils;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.context.annotation.PropertySource;
import javax.swing.*;
import java.util.List;
import java.util.concurrent.ExecutorService;
/**
* 主要的业务装配器
@@ -30,16 +19,15 @@ import java.util.concurrent.ExecutorService;
*/
@Configuration
@ConditionalOnBean(LiteflowConfig.class)
@AutoConfigureAfter({LiteflowPropertyAutoConfiguration.class, LiteflowExecutorAutoConfiguration.class})
@AutoConfigureAfter({LiteflowPropertyAutoConfiguration.class})
@Import(SpringAware.class)
public class LiteflowMainAutoConfiguration {
@Bean
public FlowExecutor flowExecutor(LiteflowConfig liteflowConfig, ExecutorService parallelExecutor){
public FlowExecutor flowExecutor(LiteflowConfig liteflowConfig){
if(StrUtil.isNotBlank(liteflowConfig.getRuleSource())){
FlowExecutor flowExecutor = new FlowExecutor();
flowExecutor.setLiteflowConfig(liteflowConfig);
flowExecutor.setParallelExecutor(parallelExecutor);
return flowExecutor;
}else{
return null;

View File

@@ -18,6 +18,12 @@ public class LiteflowProperty {
//异步线程最大等待描述
private int whenMaxWaitSeconds;
//异步线程池最大线程数
private int whenMaxWorkers;
//异步线程池最大队列数量
private int whenQueueLimit;
public String getRuleSource() {
return ruleSource;
}
@@ -41,4 +47,20 @@ public class LiteflowProperty {
public void setWhenMaxWaitSeconds(int whenMaxWaitSeconds) {
this.whenMaxWaitSeconds = whenMaxWaitSeconds;
}
public int getWhenMaxWorkers() {
return whenMaxWorkers;
}
public void setWhenMaxWorkers(int whenMaxWorkers) {
this.whenMaxWorkers = whenMaxWorkers;
}
public int getWhenQueueLimit() {
return whenQueueLimit;
}
public void setWhenQueueLimit(int whenQueueLimit) {
this.whenQueueLimit = whenQueueLimit;
}
}

View File

@@ -31,6 +31,8 @@ public class LiteflowPropertyAutoConfiguration {
liteflowConfig.setQueueLimit(liteflowMonitorProperty.getQueueLimit());
liteflowConfig.setDelay(liteflowMonitorProperty.getDelay());
liteflowConfig.setPeriod(liteflowMonitorProperty.getPeriod());
liteflowConfig.setWhenMaxWorkers(property.getWhenMaxWorkers());
liteflowConfig.setWhenQueueLimit(property.getWhenQueueLimit());
return liteflowConfig;
}
}

View File

@@ -11,12 +11,9 @@ import javax.annotation.Resource;
import java.util.concurrent.ExecutorService;
/**
* desc :
* name : Shutdown
*
* @author : xujia
* date : 2021/3/24
* @since : 1.8
* 关闭shutdown类
* 执行清理工作
* @author justin.xu
*/
@Order(Integer.MIN_VALUE)
@Component
@@ -24,14 +21,14 @@ public class Shutdown {
private static final Logger LOG = LoggerFactory.getLogger(Shutdown.class);
@Resource(name = "parallelExecutor")
private ExecutorService parallelExecutor;
@Resource
private ExecutorService executorService;
@PreDestroy
public void destroy() throws Exception {
LOG.info("Start closing the parallel-executors...");
ExecutorHelper.shutdownAwaitTermination(parallelExecutor, 3600);
LOG.info("Succeed closing the parallel-executors ok...");
LOG.info("Start closing the liteflow-when-calls...");
ExecutorHelper.shutdownAwaitTermination(executorService);
LOG.info("Succeed closing the liteflow-when-calls ok...");
}
}