diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/thread/ExecutorBuilder.java b/liteflow-core/src/main/java/com/yomahub/liteflow/thread/ExecutorBuilder.java index a974a2261..3a8e80e48 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/thread/ExecutorBuilder.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/thread/ExecutorBuilder.java @@ -1,13 +1,48 @@ package com.yomahub.liteflow.thread; +import com.alibaba.ttl.threadpool.TtlExecutors; + import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicLong; /** * 并行多线程执行器构造器接口 + * * @author Bryan.Zhang * @since 2.6.6 */ public interface ExecutorBuilder { ExecutorService buildExecutor(); + + /** + *
+ * 构建默认的线程池对象 + *
+ * @author sikadai + * @date 2022/1/21 23:07 + * @param corePoolSize : 核心线程池数量 + * @param maximumPoolSize : 最大线程池数量 + * @param queueCapacity : 队列的容量 + * @param threadName : 线程吃名称 + * @return java.util.concurrent.ExecutorService + */ + default ExecutorService buildDefaultExecutor(int corePoolSize, int maximumPoolSize, int queueCapacity, String threadName) { + return TtlExecutors.getTtlExecutorService(new ThreadPoolExecutor(corePoolSize, + maximumPoolSize, + 0L, TimeUnit.MILLISECONDS, + new ArrayBlockingQueue<>(queueCapacity), + new ThreadFactory() { + private final AtomicLong number = new AtomicLong(); + + @Override + public Thread newThread(Runnable r) { + Thread newThread = Executors.defaultThreadFactory().newThread(r); + newThread.setName(threadName + number.getAndIncrement()); + newThread.setDaemon(false); + return newThread; + } + }, + new ThreadPoolExecutor.AbortPolicy())); + } } diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/thread/ExecutorHelper.java b/liteflow-core/src/main/java/com/yomahub/liteflow/thread/ExecutorHelper.java index d49c65930..a269d81dc 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/thread/ExecutorHelper.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/thread/ExecutorHelper.java @@ -34,7 +34,12 @@ public class ExecutorHelper { private ExecutorService executorService; - private Map+ * 构建线程池执行器 - 支持多个when公用一个线程池 + *
+ * @author sikadai + * @date 2022/1/21 23:00 + * @param threadExecutorClass : 线程池构建者的Class全类名 + * @return java.util.concurrent.ExecutorService + */ public ExecutorService buildExecutor(String threadExecutorClass) { try { if (StrUtil.isBlank(threadExecutorClass)) { @@ -108,6 +123,16 @@ public class ExecutorHelper { } } + /** + *+ * 根据线程执行构建者Class类名获取ExecutorBuilder实例 + *
+ * + * @author sikadai + * @date 2022/1/21 23:04 + * @param threadExecutorClass + * @return com.yomahub.liteflow.thread.ExecutorBuilder + */ private ExecutorBuilder getExecutorBuilder(String threadExecutorClass) throws Exception { return (ExecutorBuilder) Class.forName(threadExecutorClass).newInstance(); } diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/thread/LiteFlowDefaultExecutorBuilder.java b/liteflow-core/src/main/java/com/yomahub/liteflow/thread/LiteFlowDefaultExecutorBuilder.java index c0ea6878f..76c2b87ba 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/thread/LiteFlowDefaultExecutorBuilder.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/thread/LiteFlowDefaultExecutorBuilder.java @@ -21,21 +21,10 @@ public class LiteFlowDefaultExecutorBuilder implements ExecutorBuilder{ if (ObjectUtil.isNull(liteflowConfig)){ liteflowConfig = new LiteflowConfig(); } - - return TtlExecutors.getTtlExecutorService(new ThreadPoolExecutor(liteflowConfig.getWhenMaxWorkers(), + return buildDefaultExecutor( liteflowConfig.getWhenMaxWorkers(), - 0L, TimeUnit.MILLISECONDS, - new ArrayBlockingQueue<>(liteflowConfig.getWhenQueueLimit()), - new ThreadFactory() { - private final AtomicLong number = new AtomicLong(); - @Override - public Thread newThread(Runnable r) { - Thread newThread = Executors.defaultThreadFactory().newThread(r); - newThread.setName("lf-when-thead-" + number.getAndIncrement()); - newThread.setDaemon(false); - return newThread; - } - }, - new ThreadPoolExecutor.AbortPolicy())); + liteflowConfig.getWhenMaxWorkers(), + liteflowConfig.getWhenQueueLimit(), + "lf-when-thead-"); } } diff --git a/liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/customWhenThreadPool/CustomThreadExecutor1.java b/liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/customWhenThreadPool/CustomThreadExecutor1.java index f942c8fb2..ba9fe5eaf 100644 --- a/liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/customWhenThreadPool/CustomThreadExecutor1.java +++ b/liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/customWhenThreadPool/CustomThreadExecutor1.java @@ -18,21 +18,10 @@ public class CustomThreadExecutor1 implements ExecutorBuilder { if (ObjectUtil.isNull(liteflowConfig)) { liteflowConfig = new LiteflowConfig(); } - return TtlExecutors.getTtlExecutorService(new ThreadPoolExecutor(liteflowConfig.getWhenMaxWorkers(), + return buildDefaultExecutor( liteflowConfig.getWhenMaxWorkers(), - 0L, TimeUnit.MILLISECONDS, - new ArrayBlockingQueue<>(liteflowConfig.getWhenQueueLimit()), - new ThreadFactory() { - private final AtomicLong number = new AtomicLong(); - - @Override - public Thread newThread(Runnable r) { - Thread newThread = Executors.defaultThreadFactory().newThread(r); - newThread.setName("Customer-when-1-thead-" + number.getAndIncrement()); - newThread.setDaemon(false); - return newThread; - } - }, - new ThreadPoolExecutor.AbortPolicy())); + liteflowConfig.getWhenMaxWorkers(), + liteflowConfig.getWhenQueueLimit(), + "customer-when-1-thead-"); } } diff --git a/liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/customWhenThreadPool/CustomThreadExecutor2.java b/liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/customWhenThreadPool/CustomThreadExecutor2.java index 0e75e7a7a..1ff6d22a0 100644 --- a/liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/customWhenThreadPool/CustomThreadExecutor2.java +++ b/liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/customWhenThreadPool/CustomThreadExecutor2.java @@ -17,21 +17,10 @@ public class CustomThreadExecutor2 implements ExecutorBuilder { if (ObjectUtil.isNull(liteflowConfig)) { liteflowConfig = new LiteflowConfig(); } - return TtlExecutors.getTtlExecutorService(new ThreadPoolExecutor(liteflowConfig.getWhenMaxWorkers(), + return buildDefaultExecutor( liteflowConfig.getWhenMaxWorkers(), - 0L, TimeUnit.MILLISECONDS, - new ArrayBlockingQueue<>(liteflowConfig.getWhenQueueLimit()), - new ThreadFactory() { - private final AtomicLong number = new AtomicLong(); - - @Override - public Thread newThread(Runnable r) { - Thread newThread = Executors.defaultThreadFactory().newThread(r); - newThread.setName("Customer-when-2-thead-" + number.getAndIncrement()); - newThread.setDaemon(false); - return newThread; - } - }, - new ThreadPoolExecutor.AbortPolicy())); + liteflowConfig.getWhenMaxWorkers(), + liteflowConfig.getWhenQueueLimit(), + "customer-when-2-thead-"); } } diff --git a/liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/customWhenThreadPool/CustomThreadExecutor3.java b/liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/customWhenThreadPool/CustomThreadExecutor3.java new file mode 100644 index 000000000..3519cfe20 --- /dev/null +++ b/liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/customWhenThreadPool/CustomThreadExecutor3.java @@ -0,0 +1,24 @@ +package com.yomahub.liteflow.test.customWhenThreadPool; + +import cn.hutool.core.util.ObjectUtil; +import com.yomahub.liteflow.property.LiteflowConfig; +import com.yomahub.liteflow.thread.ExecutorBuilder; +import com.yomahub.liteflow.util.SpringAware; + +import java.util.concurrent.ExecutorService; + +public class CustomThreadExecutor3 implements ExecutorBuilder { + @Override + public ExecutorService buildExecutor() { + LiteflowConfig liteflowConfig = SpringAware.getBean(LiteflowConfig.class); + //只有在非spring的场景下liteflowConfig才会为null + if (ObjectUtil.isNull(liteflowConfig)) { + liteflowConfig = new LiteflowConfig(); + } + return buildDefaultExecutor( + liteflowConfig.getWhenMaxWorkers(), + liteflowConfig.getWhenMaxWorkers(), + liteflowConfig.getWhenQueueLimit(), + "customer-when-3-thead-"); + } +} diff --git a/liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/customWhenThreadPool/CustomWhenThreadPoolSpringbootTest.java b/liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/customWhenThreadPool/CustomWhenThreadPoolSpringbootTest.java index 8d7b7ddd7..60eddce48 100644 --- a/liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/customWhenThreadPool/CustomWhenThreadPoolSpringbootTest.java +++ b/liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/customWhenThreadPool/CustomWhenThreadPoolSpringbootTest.java @@ -1,8 +1,12 @@ package com.yomahub.liteflow.test.customWhenThreadPool; +import com.yomahub.liteflow.builder.LiteFlowChainBuilder; +import com.yomahub.liteflow.builder.LiteFlowConditionBuilder; +import com.yomahub.liteflow.builder.LiteFlowNodeBuilder; import com.yomahub.liteflow.core.FlowExecutor; import com.yomahub.liteflow.entity.data.DefaultSlot; import com.yomahub.liteflow.entity.data.LiteflowResponse; +import com.yomahub.liteflow.enums.NodeTypeEnum; import com.yomahub.liteflow.test.BaseTest; import org.junit.Assert; import org.junit.Test; @@ -19,6 +23,7 @@ import javax.annotation.Resource; /** * springboot环境下异步线程超时日志打印测试 + * * @author Bryan.Zhang * @since 2.6.4 */ @@ -35,17 +40,47 @@ public class CustomWhenThreadPoolSpringbootTest extends BaseTest { private FlowExecutor flowExecutor; @Test - public void testCustomThreadPool() throws Exception{ + public void testCustomThreadPool() throws Exception { + LiteflowResponseTitle: liteflow
+ *Description: 轻量级的组件式流程框架
+ * @author Bryan.Zhang + * @email weenyc31@163.com + * @Date 2020/4/1 + */ +package com.yomahub.liteflow.test.customWhenThreadPool.cmp; + +import com.yomahub.liteflow.core.NodeComponent; +import org.springframework.stereotype.Component; + +@Component("f") +public class FCmp extends NodeComponent { + + @Override + public void process() { + this.getSlot().setData("threadName", Thread.currentThread().getName()); + System.out.println("FCmp executed!"); + } + +} diff --git a/liteflow-testcase-springboot/src/test/resources/customWhenThreadPool/flow.xml b/liteflow-testcase-springboot/src/test/resources/customWhenThreadPool/flow.xml index eb5e85959..c1fc18751 100644 --- a/liteflow-testcase-springboot/src/test/resources/customWhenThreadPool/flow.xml +++ b/liteflow-testcase-springboot/src/test/resources/customWhenThreadPool/flow.xml @@ -1,12 +1,12 @@