feature #I4E5NX 异步线程池自定义

This commit is contained in:
bryan31
2021-12-10 23:47:07 +08:00
parent 94760b9f21
commit 8ca854c188
29 changed files with 452 additions and 52 deletions

View File

@@ -22,12 +22,11 @@ import com.yomahub.liteflow.exception.FlowSystemException;
import com.yomahub.liteflow.exception.WhenExecuteException;
import com.yomahub.liteflow.property.LiteflowConfig;
import com.yomahub.liteflow.property.LiteflowConfigGetter;
import com.yomahub.liteflow.util.ExecutorHelper;
import com.yomahub.liteflow.thread.ExecutorHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
import java.util.concurrent.*;
import java.util.function.Predicate;
import java.util.stream.Collectors;
/**
@@ -116,7 +115,7 @@ public class Chain implements Executable {
//这块涉及到挺多的多线程逻辑,所以注释比较详细,看到这里的童鞋可以仔细阅读
private void executeAsyncCondition(WhenCondition condition, Integer slotIndex, String requestId) throws Exception{
//此方法其实只会初始化一次Executor不会每次都会初始化。Executor是唯一的
ExecutorService parallelExecutor = TtlExecutors.getTtlExecutorService(ExecutorHelper.loadInstance().buildExecutor());
ExecutorService parallelExecutor = ExecutorHelper.loadInstance().buildExecutor();
//获得liteflow的参数
LiteflowConfig liteflowConfig = LiteflowConfigGetter.get();

View File

@@ -0,0 +1,25 @@
package com.yomahub.liteflow.exception;
/**
* 并行多线程创建异常
* @author Bryan.Zhang
* @since 2.6.6
*/
public class ThreadExecutorServiceCreateException extends RuntimeException {
private static final long serialVersionUID = 1L;
/** 异常信息 */
private String message;
public ThreadExecutorServiceCreateException(String message) {
this.message = message;
}
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
}

View File

@@ -36,6 +36,9 @@ public class LiteflowConfig {
//slot的数量
private Integer slotSize;
//并行线程执行器class路径
private String threadExecutorClass;
//异步线程最大等待秒数
private Integer whenMaxWaitSeconds;
@@ -246,4 +249,16 @@ public class LiteflowConfig {
public void setPrintBanner(Boolean printBanner) {
this.printBanner = printBanner;
}
public String getThreadExecutorClass() {
if (StrUtil.isBlank(threadExecutorClass)){
return "com.yomahub.liteflow.thread.LiteFlowDefaultExecutorBuilder";
}else{
return threadExecutorClass;
}
}
public void setThreadExecutorClass(String threadExecutorClass) {
this.threadExecutorClass = threadExecutorClass;
}
}

View File

@@ -0,0 +1,13 @@
package com.yomahub.liteflow.thread;
import java.util.concurrent.*;
/**
* 并行多线程执行器构造器接口
* @author Bryan.Zhang
* @since 2.6.6
*/
public interface ExecutorBuilder {
ExecutorService buildExecutor();
}

View File

@@ -5,15 +5,15 @@
* @email weenyc31@163.com
* @Date 2020/4/1
*/
package com.yomahub.liteflow.util;
package com.yomahub.liteflow.thread;
import cn.hutool.core.util.ObjectUtil;
import com.yomahub.liteflow.exception.ThreadExecutorServiceCreateException;
import com.yomahub.liteflow.property.LiteflowConfig;
import com.yomahub.liteflow.util.SpringAware;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
/**
@@ -71,56 +71,28 @@ public class ExecutorHelper {
}
}
/**
* 返回一个线程工厂,这是一个可以定义线程名字的线程工厂,返回的线程都将被命名.
* 创建的线程都是非后台线程.
*
* @param name 名称.
* @return 线程工厂实例.
*/
public ThreadFactory buildExecutorFactory(final String name) {
return buildExecutorFactory(name, false);
}
/**
* 返回一个线程工厂,这是一个可以定义线程名字的线程工厂,返回的线程都将被命名.
*
* @param name 名称.
* @param daemon 是否为后台线程.
* @return 线程工厂实例.
*/
public ThreadFactory buildExecutorFactory(final String name, final boolean daemon) {
return new ThreadFactory() {
private final AtomicLong number = new AtomicLong();
@Override
public Thread newThread(Runnable r) {
Thread newThread = Executors.defaultThreadFactory().newThread(r);
newThread.setName(name + "-" + number.getAndIncrement());
newThread.setDaemon(daemon);
return newThread;
}
};
}
public ExecutorService buildExecutor() {
if (ObjectUtil.isNull(executorService)){
LiteflowConfig liteflowConfig = SpringAware.getBean(LiteflowConfig.class);
//只有在非spring的场景下liteflowConfig才会为null
if (ObjectUtil.isNull(liteflowConfig)){
liteflowConfig = new LiteflowConfig();
try{
assert liteflowConfig != null;
ExecutorBuilder executorBuilder = (ExecutorBuilder)Class.forName(liteflowConfig.getThreadExecutorClass()).newInstance();
executorService = executorBuilder.buildExecutor();
}catch (Exception e){
LOG.error(e.getMessage(), e);
throw new ThreadExecutorServiceCreateException(e.getMessage());
}
executorService = new ThreadPoolExecutor(liteflowConfig.getWhenMaxWorkers(),
liteflowConfig.getWhenMaxWorkers(),
0L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(liteflowConfig.getWhenQueueLimit()),
buildExecutorFactory("liteflow-when-thead", false),
new ThreadPoolExecutor.AbortPolicy());
}
return executorService;
}
public ExecutorService getExecutorService() {
return executorService;
}
public void setExecutorService(ExecutorService executorService) {
this.executorService = executorService;
}
}

View File

@@ -0,0 +1,41 @@
package com.yomahub.liteflow.thread;
import cn.hutool.core.util.ObjectUtil;
import com.alibaba.ttl.threadpool.TtlExecutors;
import com.yomahub.liteflow.property.LiteflowConfig;
import com.yomahub.liteflow.util.SpringAware;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
/**
* LiteFlow默认的并行多线程执行器实现
* @author Bryan.Zhang
* @since 2.6.6
*/
public class LiteFlowDefaultExecutorBuilder implements ExecutorBuilder{
@Override
public ExecutorService buildExecutor() {
LiteflowConfig liteflowConfig = SpringAware.getBean(LiteflowConfig.class);
//只有在非spring的场景下liteflowConfig才会为null
if (ObjectUtil.isNull(liteflowConfig)){
liteflowConfig = new LiteflowConfig();
}
return TtlExecutors.getTtlExecutorService(new ThreadPoolExecutor(liteflowConfig.getWhenMaxWorkers(),
liteflowConfig.getWhenMaxWorkers(),
0L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(liteflowConfig.getWhenQueueLimit()),
new ThreadFactory() {
private final AtomicLong number = new AtomicLong();
@Override
public Thread newThread(Runnable r) {
Thread newThread = Executors.defaultThreadFactory().newThread(r);
newThread.setName("lf-when-thead-" + number.getAndIncrement());
newThread.setDaemon(false);
return newThread;
}
},
new ThreadPoolExecutor.AbortPolicy()));
}
}

View File

@@ -1,5 +1,6 @@
package com.yomahub.liteflow.util;
import com.yomahub.liteflow.thread.ExecutorHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

View File

@@ -21,6 +21,9 @@ public class LiteflowProperty {
//slot的数量
private int slotSize;
//并行线程执行器class路径
private String threadExecutorClass;
//异步线程最大等待描述
private int whenMaxWaitSeconds;
@@ -131,4 +134,12 @@ public class LiteflowProperty {
public void setPrintBanner(boolean printBanner) {
this.printBanner = printBanner;
}
public String getThreadExecutorClass() {
return threadExecutorClass;
}
public void setThreadExecutorClass(String threadExecutorClass) {
this.threadExecutorClass = threadExecutorClass;
}
}

View File

@@ -1,7 +1,7 @@
package com.yomahub.liteflow.springboot.config;
import com.yomahub.liteflow.property.LiteflowConfig;
import com.yomahub.liteflow.util.ExecutorHelper;
import com.yomahub.liteflow.thread.ExecutorHelper;
import com.yomahub.liteflow.util.LiteFlowExecutorPoolShutdown;
import com.yomahub.liteflow.util.SpringAware;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
@@ -30,7 +30,14 @@ public class LiteflowExecutorAutoConfiguration {
return ExecutorHelper.loadInstance().buildExecutor();
}
@Bean
//为什么要注释掉这个@Bean
//LiteFlowExecutorPoolShutdown这个类会在spring上下文移除这个bean的时候执行也就是应用被停止或者kill的时候
//这个类主要用于卸载掉线程池,会等待线程池中的线程执行完,再卸载掉,相当于一个钩子
//但这段代码在实际中并没有太多用户,就算结束掉应用进程时很多公司也会优雅停机。就显得这段代码很鸡肋
//之所以注释掉是因为在单元测试中每一个testcase结束时都会调这个方法。
//当异步线程配置超时的时候。由于这个方法会去关闭掉线程池,会导致单元测试在所有一起运行时(单个运行没有问题)会出错
//按理说这个方法会等待线程池里的全部线程执行完再销毁,但是事实上在单元测试中的确会报错。具体原因还没深究,由于这个类比较鸡肋,就干脆不注册了。
//@Bean
public LiteFlowExecutorPoolShutdown liteFlowExecutorPoolShutdown() {
return new LiteFlowExecutorPoolShutdown();
}

View File

@@ -27,6 +27,7 @@ public class LiteflowPropertyAutoConfiguration {
LiteflowConfig liteflowConfig = new LiteflowConfig();
liteflowConfig.setRuleSource(property.getRuleSource());
liteflowConfig.setSlotSize(property.getSlotSize());
liteflowConfig.setThreadExecutorClass(property.getThreadExecutorClass());
liteflowConfig.setWhenMaxWaitSeconds(property.getWhenMaxWaitSeconds());
liteflowConfig.setEnableLog(liteflowMonitorProperty.isEnableLog());
liteflowConfig.setQueueLimit(liteflowMonitorProperty.getQueueLimit());

View File

@@ -30,6 +30,12 @@
"description": "Node definition for ZK configuration.",
"sourceType": "com.yomahub.liteflow.springboot.LiteflowProperty"
},
{
"name": "liteflow.thread-executor-class",
"type": "java.lang.String",
"description": "Multi thread pool.",
"sourceType": "com.yomahub.liteflow.springboot.LiteflowProperty"
},
{
"name": "liteflow.when-max-wait-seconds",
"type": "java.lang.Integer",

View File

@@ -3,6 +3,7 @@ liteflow.print-banner=true
liteflow.rule-source=config/flow.xml
liteflow.zk-node=/lite-flow/flow
liteflow.slot-size=1024
liteflow.thread-executor-class=com.yomahub.liteflow.thread.LiteFlowDefaultExecutorBuilder
liteflow.when-max-wait-seconds=15
liteflow.when-max-workers=16
liteflow.when-queue-limit=512

View File

@@ -3,6 +3,7 @@ package com.yomahub.liteflow.test;
import com.yomahub.liteflow.entity.data.DataBus;
import com.yomahub.liteflow.flow.FlowBus;
import com.yomahub.liteflow.spring.ComponentScanner;
import com.yomahub.liteflow.thread.ExecutorHelper;
import org.junit.AfterClass;
public class BaseTest {
@@ -11,5 +12,6 @@ public class BaseTest {
public static void cleanScanCache(){
ComponentScanner.cleanCache();
FlowBus.cleanCache();
ExecutorHelper.loadInstance().setExecutorService(null);
}
}

View File

@@ -0,0 +1,13 @@
package com.yomahub.liteflow.test.customThreadPool;
import com.yomahub.liteflow.thread.ExecutorBuilder;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class CustomThreadExecutor implements ExecutorBuilder {
@Override
public ExecutorService buildExecutor() {
return Executors.newCachedThreadPool();
}
}

View File

@@ -0,0 +1,43 @@
package com.yomahub.liteflow.test.customThreadPool;
import com.yomahub.liteflow.core.FlowExecutor;
import com.yomahub.liteflow.entity.data.DefaultSlot;
import com.yomahub.liteflow.entity.data.LiteflowResponse;
import com.yomahub.liteflow.test.BaseTest;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.test.context.TestPropertySource;
import org.springframework.test.context.junit4.SpringRunner;
import javax.annotation.Resource;
/**
* springboot环境下异步线程超时日志打印测试
* @author Bryan.Zhang
* @since 2.6.4
*/
@RunWith(SpringRunner.class)
@TestPropertySource(value = "classpath:/customThreadPool/application.properties")
@SpringBootTest(classes = CustomThreadPoolSpringbootTest.class)
@EnableAutoConfiguration
@ComponentScan({"com.yomahub.liteflow.test.customThreadPool.cmp"})
public class CustomThreadPoolSpringbootTest extends BaseTest {
private final Logger log = LoggerFactory.getLogger(this.getClass());
@Resource
private FlowExecutor flowExecutor;
@Test
public void testCustomThreadPool() throws Exception{
LiteflowResponse<DefaultSlot> response = flowExecutor.execute2Resp("chain1", "arg");
Assert.assertTrue(response.isSuccess());
Assert.assertFalse(response.getSlot().getData("threadName").toString().startsWith("lf-when-thead"));
}
}

View File

@@ -0,0 +1,20 @@
/**
* <p>Title: liteflow</p>
* <p>Description: 轻量级的组件式流程框架</p>
* @author Bryan.Zhang
* @email weenyc31@163.com
* @Date 2020/4/1
*/
package com.yomahub.liteflow.test.customThreadPool.cmp;
import com.yomahub.liteflow.core.NodeComponent;
import org.springframework.stereotype.Component;
@Component("a")
public class ACmp extends NodeComponent {
@Override
public void process() {
System.out.println("ACmp executed!");
}
}

View File

@@ -0,0 +1,22 @@
/**
* <p>Title: liteflow</p>
* <p>Description: 轻量级的组件式流程框架</p>
* @author Bryan.Zhang
* @email weenyc31@163.com
* @Date 2020/4/1
*/
package com.yomahub.liteflow.test.customThreadPool.cmp;
import com.yomahub.liteflow.core.NodeComponent;
import org.springframework.stereotype.Component;
@Component("b")
public class BCmp extends NodeComponent {
@Override
public void process() {
this.getSlot().setData("threadName", Thread.currentThread().getName());
System.out.println("BCmp executed!");
}
}

View File

@@ -0,0 +1,21 @@
/**
* <p>Title: liteflow</p>
* <p>Description: 轻量级的组件式流程框架</p>
* @author Bryan.Zhang
* @email weenyc31@163.com
* @Date 2020/4/1
*/
package com.yomahub.liteflow.test.customThreadPool.cmp;
import com.yomahub.liteflow.core.NodeComponent;
import org.springframework.stereotype.Component;
@Component("c")
public class CCmp extends NodeComponent {
@Override
public void process() {
System.out.println("CCmp executed!");
}
}

View File

@@ -0,0 +1,2 @@
liteflow.rule-source=customThreadPool/flow.xml
liteflow.thread-executor-class=com.yomahub.liteflow.test.customThreadPool.CustomThreadExecutor

View File

@@ -0,0 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<flow>
<chain name="chain1">
<when value="a,b,c"/>
</chain>
</flow>

View File

@@ -18,6 +18,12 @@
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.3</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>

View File

@@ -0,0 +1,13 @@
package com.yomahub.liteflow.test.customThreadPool;
import com.yomahub.liteflow.thread.ExecutorBuilder;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class CustomThreadExecutor implements ExecutorBuilder {
@Override
public ExecutorService buildExecutor() {
return Executors.newCachedThreadPool();
}
}

View File

@@ -0,0 +1,37 @@
package com.yomahub.liteflow.test.customThreadPool;
import com.yomahub.liteflow.core.FlowExecutor;
import com.yomahub.liteflow.entity.data.DefaultSlot;
import com.yomahub.liteflow.entity.data.LiteflowResponse;
import com.yomahub.liteflow.test.BaseTest;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringRunner;
import javax.annotation.Resource;
/**
* springboot环境下异步线程超时日志打印测试
* @author Bryan.Zhang
* @since 2.6.4
*/
@RunWith(SpringRunner.class)
@ContextConfiguration("classpath:/customThreadPool/application.xml")
public class CustomThreadPoolSpringTest extends BaseTest {
private final Logger log = LoggerFactory.getLogger(this.getClass());
@Resource
private FlowExecutor flowExecutor;
@Test
public void testCustomThreadPool() throws Exception{
LiteflowResponse<DefaultSlot> response = flowExecutor.execute2Resp("chain1", "arg");
Assert.assertTrue(response.isSuccess());
Assert.assertFalse(response.getSlot().getData("threadName").toString().startsWith("lf-when-thead"));
}
}

View File

@@ -0,0 +1,20 @@
/**
* <p>Title: liteflow</p>
* <p>Description: 轻量级的组件式流程框架</p>
* @author Bryan.Zhang
* @email weenyc31@163.com
* @Date 2020/4/1
*/
package com.yomahub.liteflow.test.customThreadPool.cmp;
import com.yomahub.liteflow.core.NodeComponent;
import org.springframework.stereotype.Component;
@Component("a")
public class ACmp extends NodeComponent {
@Override
public void process() {
System.out.println("ACmp executed!");
}
}

View File

@@ -0,0 +1,22 @@
/**
* <p>Title: liteflow</p>
* <p>Description: 轻量级的组件式流程框架</p>
* @author Bryan.Zhang
* @email weenyc31@163.com
* @Date 2020/4/1
*/
package com.yomahub.liteflow.test.customThreadPool.cmp;
import com.yomahub.liteflow.core.NodeComponent;
import org.springframework.stereotype.Component;
@Component("b")
public class BCmp extends NodeComponent {
@Override
public void process() {
this.getSlot().setData("threadName", Thread.currentThread().getName());
System.out.println("BCmp executed!");
}
}

View File

@@ -0,0 +1,21 @@
/**
* <p>Title: liteflow</p>
* <p>Description: 轻量级的组件式流程框架</p>
* @author Bryan.Zhang
* @email weenyc31@163.com
* @Date 2020/4/1
*/
package com.yomahub.liteflow.test.customThreadPool.cmp;
import com.yomahub.liteflow.core.NodeComponent;
import org.springframework.stereotype.Component;
@Component("c")
public class CCmp extends NodeComponent {
@Override
public void process() {
System.out.println("CCmp executed!");
}
}

View File

@@ -0,0 +1,24 @@
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://www.springframework.org/schema/beans"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-4.0.xsd">
<context:component-scan base-package="com.yomahub.liteflow.test.customThreadPool.cmp" />
<bean id="springAware" class="com.yomahub.liteflow.util.SpringAware"/>
<bean class="com.yomahub.liteflow.spring.ComponentScanner"/>
<bean id="liteflowConfig" class="com.yomahub.liteflow.property.LiteflowConfig">
<property name="ruleSource" value="customThreadPool/flow.xml"/>
<property name="threadExecutorClass" value="com.yomahub.liteflow.test.customThreadPool.CustomThreadExecutor"/>
</bean>
<bean id="flowExecutor" class="com.yomahub.liteflow.core.FlowExecutor">
<property name="liteflowConfig" ref="liteflowConfig"/>
</bean>
</beans>

View File

@@ -0,0 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<flow>
<chain name="chain1">
<when value="a,b,c"/>
</chain>
</flow>

View File

@@ -0,0 +1,30 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Copyright 2010-2011 The myBatis Team
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<configuration debug="false">
<!--定义日志文件的存储地址 勿在 LogBack 的配置中使用相对路径-->
<property name="APP_NAME" value="testcase"/>
<property name="LOG_HOME" value="./logs" />
<!-- 控制台输出 -->
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
<!--格式化输出:%d表示日期%thread表示线程名%-5level级别从左显示5个字符宽度%msg日志消息%n是换行符-->
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n</pattern>
</encoder>
</appender>
<!-- 日志输出级别 -->
<root level="INFO">
<appender-ref ref="STDOUT" />
</root>
</configuration>