From a7f53be766a706702db4d7a0a127c3b4ef9ca49e Mon Sep 17 00:00:00 2001 From: daiqi <466608943@qq.com> Date: Fri, 21 Jan 2022 23:16:18 +0800 Subject: [PATCH] =?UTF-8?q?1.=20=E6=B7=BB=E5=8A=A0springboot=E4=B8=8Bwhen-?= =?UTF-8?q?thread=E7=9A=84=E6=B5=8B=E8=AF=95=E7=94=A8=E4=BE=8B=202.=20?= =?UTF-8?q?=E7=AE=80=E5=8D=95=E6=8A=BD=E5=8F=96=E9=BB=98=E8=AE=A4=E6=9E=84?= =?UTF-8?q?=E5=BB=BA=E7=BA=BF=E7=A8=8B=E6=B1=A0=E7=9A=84=E6=96=B9=E6=B3=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../liteflow/thread/ExecutorBuilder.java | 35 +++++++++++++++ .../liteflow/thread/ExecutorHelper.java | 27 +++++++++++- .../LiteFlowDefaultExecutorBuilder.java | 19 ++------ .../CustomThreadExecutor1.java | 19 ++------ .../CustomThreadExecutor2.java | 19 ++------ .../CustomThreadExecutor3.java | 24 +++++++++++ .../CustomWhenThreadPoolSpringbootTest.java | 43 +++++++++++++++++-- .../test/customWhenThreadPool/cmp/CCmp.java | 1 + .../test/customWhenThreadPool/cmp/DCmp.java | 1 + .../test/customWhenThreadPool/cmp/ECmp.java | 1 + .../test/customWhenThreadPool/cmp/FCmp.java | 22 ++++++++++ .../resources/customWhenThreadPool/flow.xml | 6 +-- 12 files changed, 164 insertions(+), 53 deletions(-) create mode 100644 liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/customWhenThreadPool/CustomThreadExecutor3.java create mode 100644 liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/customWhenThreadPool/cmp/FCmp.java diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/thread/ExecutorBuilder.java b/liteflow-core/src/main/java/com/yomahub/liteflow/thread/ExecutorBuilder.java index a974a2261..3a8e80e48 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/thread/ExecutorBuilder.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/thread/ExecutorBuilder.java @@ -1,13 +1,48 @@ package com.yomahub.liteflow.thread; +import com.alibaba.ttl.threadpool.TtlExecutors; + import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicLong; /** * 并行多线程执行器构造器接口 + * * @author Bryan.Zhang * @since 2.6.6 */ public interface ExecutorBuilder { ExecutorService buildExecutor(); + + /** + *

+ * 构建默认的线程池对象 + *

+ * @author sikadai + * @date 2022/1/21 23:07 + * @param corePoolSize : 核心线程池数量 + * @param maximumPoolSize : 最大线程池数量 + * @param queueCapacity : 队列的容量 + * @param threadName : 线程吃名称 + * @return java.util.concurrent.ExecutorService + */ + default ExecutorService buildDefaultExecutor(int corePoolSize, int maximumPoolSize, int queueCapacity, String threadName) { + return TtlExecutors.getTtlExecutorService(new ThreadPoolExecutor(corePoolSize, + maximumPoolSize, + 0L, TimeUnit.MILLISECONDS, + new ArrayBlockingQueue<>(queueCapacity), + new ThreadFactory() { + private final AtomicLong number = new AtomicLong(); + + @Override + public Thread newThread(Runnable r) { + Thread newThread = Executors.defaultThreadFactory().newThread(r); + newThread.setName(threadName + number.getAndIncrement()); + newThread.setDaemon(false); + return newThread; + } + }, + new ThreadPoolExecutor.AbortPolicy())); + } } diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/thread/ExecutorHelper.java b/liteflow-core/src/main/java/com/yomahub/liteflow/thread/ExecutorHelper.java index d49c65930..a269d81dc 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/thread/ExecutorHelper.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/thread/ExecutorHelper.java @@ -34,7 +34,12 @@ public class ExecutorHelper { private ExecutorService executorService; - private Map executorServiceMap; + /** + * 此处使用Map缓存线程池信息 + * key - 线程池构建者的Class全类名 + * value - 线程池对象 + * */ + private final Map executorServiceMap; private ExecutorHelper() { executorServiceMap = Maps.newConcurrentMap(); @@ -80,6 +85,7 @@ public class ExecutorHelper { } } + /** 构建全局默认线程池 */ public ExecutorService buildExecutor() { if (ObjectUtil.isNull(executorService)) { LiteflowConfig liteflowConfig = SpringAware.getBean(LiteflowConfig.class); @@ -89,6 +95,15 @@ public class ExecutorHelper { return executorService; } + /** + *

+ * 构建线程池执行器 - 支持多个when公用一个线程池 + *

+ * @author sikadai + * @date 2022/1/21 23:00 + * @param threadExecutorClass : 线程池构建者的Class全类名 + * @return java.util.concurrent.ExecutorService + */ public ExecutorService buildExecutor(String threadExecutorClass) { try { if (StrUtil.isBlank(threadExecutorClass)) { @@ -108,6 +123,16 @@ public class ExecutorHelper { } } + /** + *

+ * 根据线程执行构建者Class类名获取ExecutorBuilder实例 + *

+ * + * @author sikadai + * @date 2022/1/21 23:04 + * @param threadExecutorClass + * @return com.yomahub.liteflow.thread.ExecutorBuilder + */ private ExecutorBuilder getExecutorBuilder(String threadExecutorClass) throws Exception { return (ExecutorBuilder) Class.forName(threadExecutorClass).newInstance(); } diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/thread/LiteFlowDefaultExecutorBuilder.java b/liteflow-core/src/main/java/com/yomahub/liteflow/thread/LiteFlowDefaultExecutorBuilder.java index c0ea6878f..76c2b87ba 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/thread/LiteFlowDefaultExecutorBuilder.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/thread/LiteFlowDefaultExecutorBuilder.java @@ -21,21 +21,10 @@ public class LiteFlowDefaultExecutorBuilder implements ExecutorBuilder{ if (ObjectUtil.isNull(liteflowConfig)){ liteflowConfig = new LiteflowConfig(); } - - return TtlExecutors.getTtlExecutorService(new ThreadPoolExecutor(liteflowConfig.getWhenMaxWorkers(), + return buildDefaultExecutor( liteflowConfig.getWhenMaxWorkers(), - 0L, TimeUnit.MILLISECONDS, - new ArrayBlockingQueue<>(liteflowConfig.getWhenQueueLimit()), - new ThreadFactory() { - private final AtomicLong number = new AtomicLong(); - @Override - public Thread newThread(Runnable r) { - Thread newThread = Executors.defaultThreadFactory().newThread(r); - newThread.setName("lf-when-thead-" + number.getAndIncrement()); - newThread.setDaemon(false); - return newThread; - } - }, - new ThreadPoolExecutor.AbortPolicy())); + liteflowConfig.getWhenMaxWorkers(), + liteflowConfig.getWhenQueueLimit(), + "lf-when-thead-"); } } diff --git a/liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/customWhenThreadPool/CustomThreadExecutor1.java b/liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/customWhenThreadPool/CustomThreadExecutor1.java index f942c8fb2..ba9fe5eaf 100644 --- a/liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/customWhenThreadPool/CustomThreadExecutor1.java +++ b/liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/customWhenThreadPool/CustomThreadExecutor1.java @@ -18,21 +18,10 @@ public class CustomThreadExecutor1 implements ExecutorBuilder { if (ObjectUtil.isNull(liteflowConfig)) { liteflowConfig = new LiteflowConfig(); } - return TtlExecutors.getTtlExecutorService(new ThreadPoolExecutor(liteflowConfig.getWhenMaxWorkers(), + return buildDefaultExecutor( liteflowConfig.getWhenMaxWorkers(), - 0L, TimeUnit.MILLISECONDS, - new ArrayBlockingQueue<>(liteflowConfig.getWhenQueueLimit()), - new ThreadFactory() { - private final AtomicLong number = new AtomicLong(); - - @Override - public Thread newThread(Runnable r) { - Thread newThread = Executors.defaultThreadFactory().newThread(r); - newThread.setName("Customer-when-1-thead-" + number.getAndIncrement()); - newThread.setDaemon(false); - return newThread; - } - }, - new ThreadPoolExecutor.AbortPolicy())); + liteflowConfig.getWhenMaxWorkers(), + liteflowConfig.getWhenQueueLimit(), + "customer-when-1-thead-"); } } diff --git a/liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/customWhenThreadPool/CustomThreadExecutor2.java b/liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/customWhenThreadPool/CustomThreadExecutor2.java index 0e75e7a7a..1ff6d22a0 100644 --- a/liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/customWhenThreadPool/CustomThreadExecutor2.java +++ b/liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/customWhenThreadPool/CustomThreadExecutor2.java @@ -17,21 +17,10 @@ public class CustomThreadExecutor2 implements ExecutorBuilder { if (ObjectUtil.isNull(liteflowConfig)) { liteflowConfig = new LiteflowConfig(); } - return TtlExecutors.getTtlExecutorService(new ThreadPoolExecutor(liteflowConfig.getWhenMaxWorkers(), + return buildDefaultExecutor( liteflowConfig.getWhenMaxWorkers(), - 0L, TimeUnit.MILLISECONDS, - new ArrayBlockingQueue<>(liteflowConfig.getWhenQueueLimit()), - new ThreadFactory() { - private final AtomicLong number = new AtomicLong(); - - @Override - public Thread newThread(Runnable r) { - Thread newThread = Executors.defaultThreadFactory().newThread(r); - newThread.setName("Customer-when-2-thead-" + number.getAndIncrement()); - newThread.setDaemon(false); - return newThread; - } - }, - new ThreadPoolExecutor.AbortPolicy())); + liteflowConfig.getWhenMaxWorkers(), + liteflowConfig.getWhenQueueLimit(), + "customer-when-2-thead-"); } } diff --git a/liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/customWhenThreadPool/CustomThreadExecutor3.java b/liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/customWhenThreadPool/CustomThreadExecutor3.java new file mode 100644 index 000000000..3519cfe20 --- /dev/null +++ b/liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/customWhenThreadPool/CustomThreadExecutor3.java @@ -0,0 +1,24 @@ +package com.yomahub.liteflow.test.customWhenThreadPool; + +import cn.hutool.core.util.ObjectUtil; +import com.yomahub.liteflow.property.LiteflowConfig; +import com.yomahub.liteflow.thread.ExecutorBuilder; +import com.yomahub.liteflow.util.SpringAware; + +import java.util.concurrent.ExecutorService; + +public class CustomThreadExecutor3 implements ExecutorBuilder { + @Override + public ExecutorService buildExecutor() { + LiteflowConfig liteflowConfig = SpringAware.getBean(LiteflowConfig.class); + //只有在非spring的场景下liteflowConfig才会为null + if (ObjectUtil.isNull(liteflowConfig)) { + liteflowConfig = new LiteflowConfig(); + } + return buildDefaultExecutor( + liteflowConfig.getWhenMaxWorkers(), + liteflowConfig.getWhenMaxWorkers(), + liteflowConfig.getWhenQueueLimit(), + "customer-when-3-thead-"); + } +} diff --git a/liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/customWhenThreadPool/CustomWhenThreadPoolSpringbootTest.java b/liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/customWhenThreadPool/CustomWhenThreadPoolSpringbootTest.java index 8d7b7ddd7..60eddce48 100644 --- a/liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/customWhenThreadPool/CustomWhenThreadPoolSpringbootTest.java +++ b/liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/customWhenThreadPool/CustomWhenThreadPoolSpringbootTest.java @@ -1,8 +1,12 @@ package com.yomahub.liteflow.test.customWhenThreadPool; +import com.yomahub.liteflow.builder.LiteFlowChainBuilder; +import com.yomahub.liteflow.builder.LiteFlowConditionBuilder; +import com.yomahub.liteflow.builder.LiteFlowNodeBuilder; import com.yomahub.liteflow.core.FlowExecutor; import com.yomahub.liteflow.entity.data.DefaultSlot; import com.yomahub.liteflow.entity.data.LiteflowResponse; +import com.yomahub.liteflow.enums.NodeTypeEnum; import com.yomahub.liteflow.test.BaseTest; import org.junit.Assert; import org.junit.Test; @@ -19,6 +23,7 @@ import javax.annotation.Resource; /** * springboot环境下异步线程超时日志打印测试 + * * @author Bryan.Zhang * @since 2.6.4 */ @@ -35,17 +40,47 @@ public class CustomWhenThreadPoolSpringbootTest extends BaseTest { private FlowExecutor flowExecutor; @Test - public void testCustomThreadPool() throws Exception{ + public void testCustomThreadPool() throws Exception { + LiteflowResponse response = flowExecutor.execute2Resp("chain", "arg"); + Assert.assertTrue(response.isSuccess()); + Assert.assertTrue(response.getSlot().getData("threadName").toString().startsWith("lf-when-thead")); + LiteflowResponse response1 = flowExecutor.execute2Resp("chain1", "arg"); Assert.assertTrue(response1.isSuccess()); - Assert.assertTrue(response1.getSlot().getData("threadName").toString().startsWith("lf-when-thead")); + Assert.assertTrue(response1.getSlot().getData("threadName").toString().startsWith("customer-when-1-thead")); LiteflowResponse response2 = flowExecutor.execute2Resp("chain2", "arg"); Assert.assertTrue(response2.isSuccess()); - Assert.assertTrue(response2.getSlot().getData("threadName").toString().startsWith("Customer-when-1-thead")); + Assert.assertTrue(response2.getSlot().getData("threadName").toString().startsWith("customer-when-2-thead")); + + // 使用build模式构建chain测试when条件的多线程 + LiteFlowNodeBuilder.createNode().setId("a") + .setName("组件A") + .setType(NodeTypeEnum.COMMON) + .setClazz("com.yomahub.liteflow.test.builder.cmp.ACmp") + .build(); + LiteFlowNodeBuilder.createNode().setId("b") + .setName("组件B") + .setType(NodeTypeEnum.COMMON) + .setClazz("com.yomahub.liteflow.test.builder.cmp.BCmp") + .build(); + LiteFlowNodeBuilder.createNode().setId("c") + .setName("组件C") + .setType(NodeTypeEnum.COMMON) + .setClazz("com.yomahub.liteflow.test.builder.cmp.CCmp") + .build(); + + + LiteFlowChainBuilder.createChain().setChainName("chain3").setCondition( + LiteFlowConditionBuilder + .createWhenCondition() + .setThreadExecutorClass(CustomThreadExecutor3.class.getName()) + .setValue("a,b,c,d") + .build() + ).build(); LiteflowResponse response3 = flowExecutor.execute2Resp("chain3", "arg"); Assert.assertTrue(response3.isSuccess()); - Assert.assertTrue(response3.getSlot().getData("threadName").toString().startsWith("Customer-when-2-thead")); + Assert.assertTrue(response3.getSlot().getData("threadName").toString().startsWith("customer-when-3-thead")); } } diff --git a/liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/customWhenThreadPool/cmp/CCmp.java b/liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/customWhenThreadPool/cmp/CCmp.java index 81ccd9353..df355c4b6 100644 --- a/liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/customWhenThreadPool/cmp/CCmp.java +++ b/liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/customWhenThreadPool/cmp/CCmp.java @@ -15,6 +15,7 @@ public class CCmp extends NodeComponent { @Override public void process() { + this.getSlot().setData("threadName", Thread.currentThread().getName()); System.out.println("CCmp executed!"); } diff --git a/liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/customWhenThreadPool/cmp/DCmp.java b/liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/customWhenThreadPool/cmp/DCmp.java index 9d96d43c1..a67ec4b9b 100644 --- a/liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/customWhenThreadPool/cmp/DCmp.java +++ b/liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/customWhenThreadPool/cmp/DCmp.java @@ -15,6 +15,7 @@ public class DCmp extends NodeComponent { @Override public void process() { + this.getSlot().setData("threadName", Thread.currentThread().getName()); System.out.println("DCmp executed!"); } diff --git a/liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/customWhenThreadPool/cmp/ECmp.java b/liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/customWhenThreadPool/cmp/ECmp.java index 2a403abf1..929e41767 100644 --- a/liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/customWhenThreadPool/cmp/ECmp.java +++ b/liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/customWhenThreadPool/cmp/ECmp.java @@ -15,6 +15,7 @@ public class ECmp extends NodeComponent { @Override public void process() { + this.getSlot().setData("threadName", Thread.currentThread().getName()); System.out.println("ECmp executed!"); } diff --git a/liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/customWhenThreadPool/cmp/FCmp.java b/liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/customWhenThreadPool/cmp/FCmp.java new file mode 100644 index 000000000..f4285a637 --- /dev/null +++ b/liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/customWhenThreadPool/cmp/FCmp.java @@ -0,0 +1,22 @@ +/** + *

Title: liteflow

+ *

Description: 轻量级的组件式流程框架

+ * @author Bryan.Zhang + * @email weenyc31@163.com + * @Date 2020/4/1 + */ +package com.yomahub.liteflow.test.customWhenThreadPool.cmp; + +import com.yomahub.liteflow.core.NodeComponent; +import org.springframework.stereotype.Component; + +@Component("f") +public class FCmp extends NodeComponent { + + @Override + public void process() { + this.getSlot().setData("threadName", Thread.currentThread().getName()); + System.out.println("FCmp executed!"); + } + +} diff --git a/liteflow-testcase-springboot/src/test/resources/customWhenThreadPool/flow.xml b/liteflow-testcase-springboot/src/test/resources/customWhenThreadPool/flow.xml index eb5e85959..c1fc18751 100644 --- a/liteflow-testcase-springboot/src/test/resources/customWhenThreadPool/flow.xml +++ b/liteflow-testcase-springboot/src/test/resources/customWhenThreadPool/flow.xml @@ -1,12 +1,12 @@ - + - + - + \ No newline at end of file