From 51d1bfd43d6ce18c215664c108244d5fecc81f7c Mon Sep 17 00:00:00 2001 From: jason <2353220944@qq.com> Date: Mon, 28 Oct 2024 16:34:00 +0800 Subject: [PATCH] =?UTF-8?q?=E5=85=BC=E5=AE=B9=E5=85=A8=E5=B1=80=E4=BD=93?= =?UTF-8?q?=E7=B3=BB=E7=BA=BF=E7=A8=8B=E6=B1=A0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../builder/el/operator/WhenOperator.java | 5 +- .../liteflow/property/LiteflowConfig.java | 63 +++++++++++++++ .../liteflow/thread/ExecutorHelper.java | 11 ++- .../LiteFlowDefaultGlobalExecutorBuilder.java | 27 +++++++ .../config/LiteflowAutoConfiguration.java | 3 + .../solon/config/LiteflowProperty.java | 47 +++++++++++ .../liteflow/springboot/LiteflowProperty.java | 66 +++++++++++++++- .../LiteflowPropertyAutoConfiguration.java | 3 + ...itional-spring-configuration-metadata.json | 21 +++++ .../META-INF/liteflow-default.properties | 3 + .../ChainThreadPoolELSpringbootTest.java | 79 +++++++++++++++++++ ... ConditionThreadPoolELSpringbootTest.java} | 12 +-- .../CustomGlobalThreadExecutor.java | 23 ++++++ .../GlobalThreadPoolELSpringbootTest.java | 30 +++---- .../chainThreadPool/application3.properties | 4 + .../resources/chainThreadPool/flow3.el.xml | 20 +++++ 16 files changed, 391 insertions(+), 26 deletions(-) create mode 100644 liteflow-core/src/main/java/com/yomahub/liteflow/thread/LiteFlowDefaultGlobalExecutorBuilder.java create mode 100644 liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/java/com/yomahub/liteflow/test/chainThreadPool/ChainThreadPoolELSpringbootTest.java rename liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/java/com/yomahub/liteflow/test/chainThreadPool/{CustomThreadPoolELSpringbootTest.java => ConditionThreadPoolELSpringbootTest.java} (89%) create mode 100644 liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/java/com/yomahub/liteflow/test/chainThreadPool/CustomGlobalThreadExecutor.java create mode 100644 liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/resources/chainThreadPool/application3.properties create mode 100644 liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/resources/chainThreadPool/flow3.el.xml diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/builder/el/operator/WhenOperator.java b/liteflow-core/src/main/java/com/yomahub/liteflow/builder/el/operator/WhenOperator.java index 00cfa58e0..2be0c5365 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/builder/el/operator/WhenOperator.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/builder/el/operator/WhenOperator.java @@ -7,6 +7,8 @@ import com.yomahub.liteflow.flow.element.condition.WhenCondition; import com.yomahub.liteflow.property.LiteflowConfig; import com.yomahub.liteflow.property.LiteflowConfigGetter; +import java.util.Optional; + /** * EL规则中的WHEN的操作符 * @@ -25,7 +27,8 @@ public class WhenOperator extends BaseOperator { for (Object obj : objects) { OperatorHelper.checkObjMustBeCommonTypeItem(obj); whenCondition.addExecutable(OperatorHelper.convert(obj, Executable.class)); - whenCondition.setThreadExecutorClass(liteflowConfig.getThreadExecutorClass()); + whenCondition.setThreadExecutorClass(Optional.ofNullable(liteflowConfig.getThreadExecutorClass()) + .orElse(liteflowConfig.getGlobalThreadPoolExecutorClass())); } return whenCondition; } diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/property/LiteflowConfig.java b/liteflow-core/src/main/java/com/yomahub/liteflow/property/LiteflowConfig.java index 7568b95ab..b6d4982d8 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/property/LiteflowConfig.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/property/LiteflowConfig.java @@ -42,6 +42,7 @@ public class LiteflowConfig { private Integer slotSize; // 并行线程执行器class路径 + @Deprecated private String threadExecutorClass; // 异步线程最大等待秒数 @@ -68,9 +69,11 @@ public class LiteflowConfig { private Long period; // 异步线程池最大线程数 + @Deprecated private Integer whenMaxWorkers; // 异步线程池最大队列数量 + @Deprecated private Integer whenQueueLimit; // 解析模式,一共有三种,具体看其定义 @@ -106,12 +109,15 @@ public class LiteflowConfig { private Boolean enableMonitorFile = Boolean.FALSE; //并行循环线程池所用class路径 + @Deprecated private String parallelLoopExecutorClass; //使用默认并行循环线程池时,最大线程数 + @Deprecated private Integer parallelMaxWorkers; //使用默认并行循环线程池时,最大队列数 + @Deprecated private Integer parallelQueueLimit; // 是否启用组件降级 @@ -131,6 +137,15 @@ public class LiteflowConfig { this.enableMonitorFile = enableMonitorFile; } + //全局线程池所用class路径(when+异步循环) + private String globalThreadPoolExecutorClass; + + //全局线程池最大线程数(when+异步循环) + private Integer globalThreadPoolSize; + + //全局线程池最大队列数(when+异步循环) + private Integer globalThreadPoolQueueSize; + public Boolean getEnable() { if (ObjectUtil.isNull(enable)) { return Boolean.TRUE; @@ -230,6 +245,7 @@ public class LiteflowConfig { this.enableLog = enableLog; } + @Deprecated public Integer getWhenMaxWorkers() { if (ObjectUtil.isNull(whenMaxWorkers)) { return 16; @@ -239,10 +255,13 @@ public class LiteflowConfig { } } + @Deprecated + public void setWhenMaxWorkers(Integer whenMaxWorkers) { this.whenMaxWorkers = whenMaxWorkers; } + @Deprecated public Integer getWhenQueueLimit() { if (ObjectUtil.isNull(whenQueueLimit)) { return 512; @@ -252,6 +271,7 @@ public class LiteflowConfig { } } + @Deprecated public void setWhenQueueLimit(Integer whenQueueLimit) { this.whenQueueLimit = whenQueueLimit; } @@ -297,6 +317,7 @@ public class LiteflowConfig { this.printBanner = printBanner; } + @Deprecated public String getThreadExecutorClass() { if (StrUtil.isBlank(threadExecutorClass)) { return "com.yomahub.liteflow.thread.LiteFlowDefaultWhenExecutorBuilder"; @@ -306,6 +327,7 @@ public class LiteflowConfig { } } + @Deprecated public void setThreadExecutorClass(String threadExecutorClass) { this.threadExecutorClass = threadExecutorClass; } @@ -423,6 +445,7 @@ public class LiteflowConfig { this.parallelMaxWorkers = parallelMaxWorkers; } + @Deprecated public Integer getParallelQueueLimit() { if(ObjectUtil.isNull(parallelQueueLimit)){ return 512; @@ -431,10 +454,12 @@ public class LiteflowConfig { } } + @Deprecated public void setParallelQueueLimit(Integer parallelQueueLimit) { this.parallelQueueLimit = parallelQueueLimit; } + @Deprecated public String getParallelLoopExecutorClass() { if (StrUtil.isBlank(parallelLoopExecutorClass)) { return "com.yomahub.liteflow.thread.LiteFlowDefaultParallelLoopExecutorBuilder"; @@ -443,6 +468,8 @@ public class LiteflowConfig { return parallelLoopExecutorClass; } } + + @Deprecated public void setParallelLoopExecutorClass(String parallelLoopExecutorClass) { this.parallelLoopExecutorClass = parallelLoopExecutorClass; } @@ -508,4 +535,40 @@ public class LiteflowConfig { public void setScriptSetting(Map scriptSetting) { this.scriptSetting = scriptSetting; } + + public Integer getGlobalThreadPoolSize() { + if (ObjectUtil.isNull(globalThreadPoolSize)) { + return 16; + } else { + return globalThreadPoolSize; + } + } + + public void setGlobalThreadPoolSize(Integer globalThreadPoolSize) { + this.globalThreadPoolSize = globalThreadPoolSize; + } + + public Integer getGlobalThreadPoolQueueSize() { + if (ObjectUtil.isNull(globalThreadPoolQueueSize)) { + return 512; + } else { + return globalThreadPoolQueueSize; + } + } + + public void setGlobalThreadPoolQueueSize(Integer globalThreadPoolQueueSize) { + this.globalThreadPoolQueueSize = globalThreadPoolQueueSize; + } + + public String getGlobalThreadPoolExecutorClass() { + if (StrUtil.isBlank(globalThreadPoolExecutorClass)) { + return "com.yomahub.liteflow.thread.LiteFlowDefaultGlobalExecutorBuilder"; + } else { + return globalThreadPoolExecutorClass; + } + } + + public void setGlobalThreadPoolExecutorClass(String globalThreadPoolExecutorClass) { + this.globalThreadPoolExecutorClass = globalThreadPoolExecutorClass; + } } diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/thread/ExecutorHelper.java b/liteflow-core/src/main/java/com/yomahub/liteflow/thread/ExecutorHelper.java index d8b49f82a..9f8d75104 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/thread/ExecutorHelper.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/thread/ExecutorHelper.java @@ -22,6 +22,7 @@ import com.yomahub.liteflow.slot.DataBus; import com.yomahub.liteflow.spi.holder.ContextAwareHolder; import java.util.Map; +import java.util.Optional; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; @@ -91,7 +92,8 @@ public class ExecutorHelper { // 构建默认when线程池 public ExecutorService buildWhenExecutor() { LiteflowConfig liteflowConfig = LiteflowConfigGetter.get(); - return buildWhenExecutor(liteflowConfig.getThreadExecutorClass()); + return buildWhenExecutor(Optional.ofNullable(liteflowConfig.getGlobalThreadPoolExecutorClass()) + .orElse(liteflowConfig.getThreadExecutorClass())); } // 构建when线程池 - 支持多个when公用一个线程池 @@ -105,7 +107,9 @@ public class ExecutorHelper { // 构建when线程池 - clazz和condition的hash值共同作为缓存key public ExecutorService buildWhenExecutorWithHash(String conditionHash) { LiteflowConfig liteflowConfig = LiteflowConfigGetter.get(); - return buildWhenExecutorWithHash(liteflowConfig.getThreadExecutorClass(), conditionHash); + return buildWhenExecutorWithHash(Optional.ofNullable(liteflowConfig.getThreadExecutorClass()) + .orElse(liteflowConfig.getGlobalThreadPoolExecutorClass()), + conditionHash); } // 构建when线程池 - clazz和condition的hash值共同作为缓存key @@ -146,7 +150,8 @@ public class ExecutorHelper { String.valueOf(chain.hashCode())); } else { //全局线程池 - parallelExecutor = getExecutorService(liteflowConfig.getParallelLoopExecutorClass()); + parallelExecutor = getExecutorService(Optional.ofNullable(liteflowConfig.getParallelLoopExecutorClass()) + .orElse(liteflowConfig.getGlobalThreadPoolExecutorClass())); } return parallelExecutor; } diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/thread/LiteFlowDefaultGlobalExecutorBuilder.java b/liteflow-core/src/main/java/com/yomahub/liteflow/thread/LiteFlowDefaultGlobalExecutorBuilder.java new file mode 100644 index 000000000..e1c44bc18 --- /dev/null +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/thread/LiteFlowDefaultGlobalExecutorBuilder.java @@ -0,0 +1,27 @@ +package com.yomahub.liteflow.thread; + +import cn.hutool.core.util.ObjectUtil; +import com.yomahub.liteflow.property.LiteflowConfig; +import com.yomahub.liteflow.property.LiteflowConfigGetter; + +import java.util.concurrent.ExecutorService; + +/** + * LiteFlow默认的when线程池+异步多线程执行器实现 + * + * @author jason + */ +public class LiteFlowDefaultGlobalExecutorBuilder implements ExecutorBuilder { + + @Override + public ExecutorService buildExecutor() { + LiteflowConfig liteflowConfig = LiteflowConfigGetter.get(); + // 只有在非spring的场景下liteflowConfig才会为null + if (ObjectUtil.isNull(liteflowConfig)) { + liteflowConfig = new LiteflowConfig(); + } + return buildDefaultExecutor(liteflowConfig.getGlobalThreadPoolSize(), liteflowConfig.getGlobalThreadPoolSize(), + liteflowConfig.getGlobalThreadPoolQueueSize(), "global-thread-"); + } + +} diff --git a/liteflow-solon-plugin/src/main/java/com/yomahub/liteflow/solon/config/LiteflowAutoConfiguration.java b/liteflow-solon-plugin/src/main/java/com/yomahub/liteflow/solon/config/LiteflowAutoConfiguration.java index 5d560cfde..f4763fd46 100644 --- a/liteflow-solon-plugin/src/main/java/com/yomahub/liteflow/solon/config/LiteflowAutoConfiguration.java +++ b/liteflow-solon-plugin/src/main/java/com/yomahub/liteflow/solon/config/LiteflowAutoConfiguration.java @@ -51,6 +51,9 @@ public class LiteflowAutoConfiguration { liteflowConfig.setParallelQueueLimit(property.getParallelQueueLimit()); liteflowConfig.setParallelLoopExecutorClass(property.getParallelLoopExecutorClass()); liteflowConfig.setFallbackCmpEnable(property.isFallbackCmpEnable()); + liteflowConfig.setGlobalThreadPoolExecutorClass(property.getGlobalThreadPoolExecutorClass()); + liteflowConfig.setGlobalThreadPoolSize(property.getGlobalThreadPoolSize()); + liteflowConfig.setGlobalThreadPoolQueueSize(property.getGlobalThreadPoolQueueSize()); return liteflowConfig; } diff --git a/liteflow-solon-plugin/src/main/java/com/yomahub/liteflow/solon/config/LiteflowProperty.java b/liteflow-solon-plugin/src/main/java/com/yomahub/liteflow/solon/config/LiteflowProperty.java index d58dd333f..663d3c241 100644 --- a/liteflow-solon-plugin/src/main/java/com/yomahub/liteflow/solon/config/LiteflowProperty.java +++ b/liteflow-solon-plugin/src/main/java/com/yomahub/liteflow/solon/config/LiteflowProperty.java @@ -1,5 +1,7 @@ package com.yomahub.liteflow.solon.config; +import cn.hutool.core.util.ObjectUtil; +import cn.hutool.core.util.StrUtil; import com.yomahub.liteflow.enums.ParseModeEnum; import org.noear.solon.annotation.Configuration; import org.noear.solon.annotation.Inject; @@ -83,6 +85,15 @@ public class LiteflowProperty { // 是否启用组件降级 private Boolean fallbackCmpEnable; + //全局线程池所用class路径(when+异步循环) + private String globalThreadPoolExecutorClass; + + //全局线程池最大线程数(when+异步循环) + private Integer globalThreadPoolSize; + + //全局线程池最大队列数(when+异步循环) + private Integer globalThreadPoolQueueSize; + public boolean isEnable() { return enable; } @@ -267,4 +278,40 @@ public class LiteflowProperty { public Boolean getFallbackCmpEnable() { return fallbackCmpEnable; } + + public Integer getGlobalThreadPoolSize() { + if (ObjectUtil.isNull(globalThreadPoolSize)) { + return 16; + } else { + return globalThreadPoolSize; + } + } + + public void setGlobalThreadPoolSize(Integer globalThreadPoolSize) { + this.globalThreadPoolSize = globalThreadPoolSize; + } + + public Integer getGlobalThreadPoolQueueSize() { + if (ObjectUtil.isNull(globalThreadPoolQueueSize)) { + return 512; + } else { + return globalThreadPoolQueueSize; + } + } + + public void setGlobalThreadPoolQueueSize(Integer globalThreadPoolQueueSize) { + this.globalThreadPoolQueueSize = globalThreadPoolQueueSize; + } + + public String getGlobalThreadPoolExecutorClass() { + if (StrUtil.isBlank(globalThreadPoolExecutorClass)) { + return "com.yomahub.liteflow.thread.LiteFlowDefaultGlobalExecutorBuilder"; + } else { + return globalThreadPoolExecutorClass; + } + } + + public void setGlobalThreadPoolExecutorClass(String globalThreadPoolExecutorClass) { + this.globalThreadPoolExecutorClass = globalThreadPoolExecutorClass; + } } diff --git a/liteflow-spring-boot-starter/src/main/java/com/yomahub/liteflow/springboot/LiteflowProperty.java b/liteflow-spring-boot-starter/src/main/java/com/yomahub/liteflow/springboot/LiteflowProperty.java index 930a62d4c..34c93c967 100644 --- a/liteflow-spring-boot-starter/src/main/java/com/yomahub/liteflow/springboot/LiteflowProperty.java +++ b/liteflow-spring-boot-starter/src/main/java/com/yomahub/liteflow/springboot/LiteflowProperty.java @@ -1,5 +1,7 @@ package com.yomahub.liteflow.springboot; +import cn.hutool.core.util.ObjectUtil; +import cn.hutool.core.util.StrUtil; import com.yomahub.liteflow.enums.ParseModeEnum; import org.springframework.boot.context.properties.ConfigurationProperties; @@ -36,6 +38,7 @@ public class LiteflowProperty { private String mainExecutorClass; // 并行线程执行器class路径 + @Deprecated private String threadExecutorClass; // 异步线程最大等待描述 @@ -47,9 +50,11 @@ public class LiteflowProperty { private TimeUnit whenMaxWaitTimeUnit; // 异步线程池最大线程数 + @Deprecated private int whenMaxWorkers; // 异步线程池最大队列数量 + @Deprecated private int whenQueueLimit; // 异步线程池是否隔离 @@ -80,13 +85,15 @@ public class LiteflowProperty { // 规则文件/脚本文件变更监听 private boolean enableMonitorFile; - + @Deprecated private String parallelLoopExecutorClass; //使用默认并行循环线程池时,最大线程数 + @Deprecated private int parallelMaxWorkers; //使用默认并行循环线程池时,最大队列数 + @Deprecated private int parallelQueueLimit; // 是否启用组件降级 @@ -101,6 +108,15 @@ public class LiteflowProperty { //脚本特殊设置选项 private Map scriptSetting; + //全局线程池所用class路径(when+异步循环) + private String globalThreadPoolExecutorClass; + + //全局线程池最大线程数(when+异步循环) + private Integer globalThreadPoolSize; + + //全局线程池最大队列数(when+异步循环) + private Integer globalThreadPoolQueueSize; + public boolean isEnableMonitorFile() { return enableMonitorFile; } @@ -143,18 +159,22 @@ public class LiteflowProperty { this.whenMaxWaitSeconds = whenMaxWaitSeconds; } + @Deprecated public int getWhenMaxWorkers() { return whenMaxWorkers; } + @Deprecated public void setWhenMaxWorkers(int whenMaxWorkers) { this.whenMaxWorkers = whenMaxWorkers; } + @Deprecated public int getWhenQueueLimit() { return whenQueueLimit; } + @Deprecated public void setWhenQueueLimit(int whenQueueLimit) { this.whenQueueLimit = whenQueueLimit; } @@ -193,10 +213,12 @@ public class LiteflowProperty { this.printBanner = printBanner; } + @Deprecated public String getThreadExecutorClass() { return threadExecutorClass; } + @Deprecated public void setThreadExecutorClass(String threadExecutorClass) { this.threadExecutorClass = threadExecutorClass; } @@ -273,26 +295,32 @@ public class LiteflowProperty { this.whenMaxWaitTimeUnit = whenMaxWaitTimeUnit; } + @Deprecated public String getParallelLoopExecutorClass() { return parallelLoopExecutorClass; } + @Deprecated public void setParallelLoopExecutorClass(String parallelLoopExecutorClass) { this.parallelLoopExecutorClass = parallelLoopExecutorClass; } + @Deprecated public int getParallelMaxWorkers() { return parallelMaxWorkers; } + @Deprecated public void setParallelMaxWorkers(int parallelMaxWorkers) { this.parallelMaxWorkers = parallelMaxWorkers; } + @Deprecated public int getParallelQueueLimit() { return parallelQueueLimit; } + @Deprecated public void setParallelQueueLimit(int parallelQueueLimit) { this.parallelQueueLimit = parallelQueueLimit; } @@ -336,4 +364,40 @@ public class LiteflowProperty { public void setScriptSetting(Map scriptSetting) { this.scriptSetting = scriptSetting; } + + public Integer getGlobalThreadPoolSize() { + if (ObjectUtil.isNull(globalThreadPoolSize)) { + return 16; + } else { + return globalThreadPoolSize; + } + } + + public void setGlobalThreadPoolSize(Integer globalThreadPoolSize) { + this.globalThreadPoolSize = globalThreadPoolSize; + } + + public Integer getGlobalThreadPoolQueueSize() { + if (ObjectUtil.isNull(globalThreadPoolQueueSize)) { + return 512; + } else { + return globalThreadPoolQueueSize; + } + } + + public void setGlobalThreadPoolQueueSize(Integer globalThreadPoolQueueSize) { + this.globalThreadPoolQueueSize = globalThreadPoolQueueSize; + } + + public String getGlobalThreadPoolExecutorClass() { + if (StrUtil.isBlank(globalThreadPoolExecutorClass)) { + return "com.yomahub.liteflow.thread.LiteFlowDefaultGlobalExecutorBuilder"; + } else { + return globalThreadPoolExecutorClass; + } + } + + public void setGlobalThreadPoolExecutorClass(String globalThreadPoolExecutorClass) { + this.globalThreadPoolExecutorClass = globalThreadPoolExecutorClass; + } } diff --git a/liteflow-spring-boot-starter/src/main/java/com/yomahub/liteflow/springboot/config/LiteflowPropertyAutoConfiguration.java b/liteflow-spring-boot-starter/src/main/java/com/yomahub/liteflow/springboot/config/LiteflowPropertyAutoConfiguration.java index 37356fcb3..d68f5a393 100644 --- a/liteflow-spring-boot-starter/src/main/java/com/yomahub/liteflow/springboot/config/LiteflowPropertyAutoConfiguration.java +++ b/liteflow-spring-boot-starter/src/main/java/com/yomahub/liteflow/springboot/config/LiteflowPropertyAutoConfiguration.java @@ -54,6 +54,9 @@ public class LiteflowPropertyAutoConfiguration { liteflowConfig.setDelay(liteflowMonitorProperty.getDelay()); liteflowConfig.setPeriod(liteflowMonitorProperty.getPeriod()); liteflowConfig.setScriptSetting(property.getScriptSetting()); + liteflowConfig.setGlobalThreadPoolExecutorClass(property.getGlobalThreadPoolExecutorClass()); + liteflowConfig.setGlobalThreadPoolQueueSize(property.getGlobalThreadPoolQueueSize()); + liteflowConfig.setGlobalThreadPoolSize(property.getGlobalThreadPoolSize()); return liteflowConfig; } diff --git a/liteflow-spring-boot-starter/src/main/resources/META-INF/additional-spring-configuration-metadata.json b/liteflow-spring-boot-starter/src/main/resources/META-INF/additional-spring-configuration-metadata.json index 83b0b668e..ae2c22a44 100644 --- a/liteflow-spring-boot-starter/src/main/resources/META-INF/additional-spring-configuration-metadata.json +++ b/liteflow-spring-boot-starter/src/main/resources/META-INF/additional-spring-configuration-metadata.json @@ -220,6 +220,27 @@ "type": "java.util.Map", "description": "script special settings.", "sourceType": "com.yomahub.liteflow.springboot.LiteflowProperty" + }, + { + "name": "liteflow.global-thread-pool-size", + "type": "java.lang.Integer", + "description": "Set the global chain thread pool worker max-size.", + "sourceType": "com.yomahub.liteflow.springboot.LiteflowProperty", + "defaultValue": 16 + }, + { + "name": "liteflow.global-thread-pool-queue-size", + "type": "java.lang.Integer", + "description": "Set the global chain thread pool queue max-size ", + "sourceType": "com.yomahub.liteflow.springboot.LiteflowProperty", + "defaultValue": 512 + }, + { + "name": "liteflow.global-thread-pool-executor-class", + "type": "java.lang.String", + "description": "Custom the global chain thread pool implement for global chain executor.", + "sourceType": "com.yomahub.liteflow.springboot.LiteflowProperty", + "defaultValue": "com.yomahub.liteflow.thread.LiteFlowDefaultGlobalExecutorBuilder" } ] } \ No newline at end of file diff --git a/liteflow-spring-boot-starter/src/main/resources/META-INF/liteflow-default.properties b/liteflow-spring-boot-starter/src/main/resources/META-INF/liteflow-default.properties index 37aeb4645..5dec76788 100644 --- a/liteflow-spring-boot-starter/src/main/resources/META-INF/liteflow-default.properties +++ b/liteflow-spring-boot-starter/src/main/resources/META-INF/liteflow-default.properties @@ -26,4 +26,7 @@ liteflow.monitor.queue-limit=200 liteflow.monitor.delay=300000 liteflow.monitor.period=300000 liteflow.enable-monitor-file=false +liteflow.global-thread-pool-size=16 +liteflow.global-thread-pool-queue-size=512 +liteflow.global-thread-pool-executor-class=comcom.yomahub.liteflow.thread.LiteFlowDefaultGlobalExecutorBuilder diff --git a/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/java/com/yomahub/liteflow/test/chainThreadPool/ChainThreadPoolELSpringbootTest.java b/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/java/com/yomahub/liteflow/test/chainThreadPool/ChainThreadPoolELSpringbootTest.java new file mode 100644 index 000000000..ed9532134 --- /dev/null +++ b/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/java/com/yomahub/liteflow/test/chainThreadPool/ChainThreadPoolELSpringbootTest.java @@ -0,0 +1,79 @@ +package com.yomahub.liteflow.test.chainThreadPool; + +import cn.hutool.core.collection.ListUtil; +import com.yomahub.liteflow.core.FlowExecutor; +import com.yomahub.liteflow.flow.LiteflowResponse; +import com.yomahub.liteflow.slot.DefaultContext; +import com.yomahub.liteflow.test.BaseTest; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.boot.autoconfigure.EnableAutoConfiguration; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.context.annotation.ComponentScan; +import org.springframework.test.context.TestPropertySource; + +import javax.annotation.Resource; +import java.util.List; + +/** + * springboot环境下chain线程池隔离测试 + */ +@TestPropertySource(value = "classpath:/chainThreadPool/application.properties") +@SpringBootTest(classes = ChainThreadPoolELSpringbootTest.class) +@EnableAutoConfiguration +@ComponentScan({"com.yomahub.liteflow.test.chainThreadPool.cmp"}) +public class ChainThreadPoolELSpringbootTest extends BaseTest { + + private final Logger log = LoggerFactory.getLogger(this.getClass()); + + @Resource + private FlowExecutor flowExecutor; + + /** + * 测试WHEN上全局线程池和chain线程池隔离-优先以chain上为准 + */ + @Test + public void testChainThreadPool() { + LiteflowResponse response1 = flowExecutor.execute2Resp("chain1", "arg"); + DefaultContext context = response1.getFirstContextBean(); + Assertions.assertTrue(response1.isSuccess()); + Assertions.assertTrue(context.getData("threadName").toString().startsWith("customer-chain-thead")); + } + + /** + * 测试FOR上全局线程池和chain线程池隔离-优先以chain上为准 + */ + @Test + public void testChainThreadPool2() { + LiteflowResponse response1 = flowExecutor.execute2Resp("chain2", "arg"); + DefaultContext context = response1.getFirstContextBean(); + Assertions.assertTrue(response1.isSuccess()); + Assertions.assertTrue(context.getData("threadName").toString().startsWith("customer-chain-thead")); + } + + /** + * 测试WHILE上全局线程池和chain线程池隔离-优先以chain上为准 + */ + @Test + public void testChainThreadPool3() { + LiteflowResponse response1 = flowExecutor.execute2Resp("chain3", "arg"); + DefaultContext context = response1.getFirstContextBean(); + Assertions.assertTrue(response1.isSuccess()); + Assertions.assertTrue(context.getData("threadName").toString().startsWith("customer-chain-thead")); + } + + /** + * 测试ITERATOR上全局线程池和chain线程池隔离-优先以chain上为准 + */ + @Test + public void testChainThreadPool4() { + List list = ListUtil.toList("1", "2", "3", "4", "5"); + LiteflowResponse response1 = flowExecutor.execute2Resp("chain4", list); + DefaultContext context = response1.getFirstContextBean(); + Assertions.assertTrue(response1.isSuccess()); + Assertions.assertTrue(context.getData("threadName").toString().startsWith("customer-chain-thead")); + } + +} diff --git a/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/java/com/yomahub/liteflow/test/chainThreadPool/CustomThreadPoolELSpringbootTest.java b/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/java/com/yomahub/liteflow/test/chainThreadPool/ConditionThreadPoolELSpringbootTest.java similarity index 89% rename from liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/java/com/yomahub/liteflow/test/chainThreadPool/CustomThreadPoolELSpringbootTest.java rename to liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/java/com/yomahub/liteflow/test/chainThreadPool/ConditionThreadPoolELSpringbootTest.java index f50aa6389..1821e4dca 100644 --- a/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/java/com/yomahub/liteflow/test/chainThreadPool/CustomThreadPoolELSpringbootTest.java +++ b/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/java/com/yomahub/liteflow/test/chainThreadPool/ConditionThreadPoolELSpringbootTest.java @@ -21,10 +21,10 @@ import java.util.List; * springboot环境下chain线程池隔离测试 */ @TestPropertySource(value = "classpath:/chainThreadPool/application2.properties") -@SpringBootTest(classes = CustomThreadPoolELSpringbootTest.class) +@SpringBootTest(classes = ConditionThreadPoolELSpringbootTest.class) @EnableAutoConfiguration @ComponentScan({"com.yomahub.liteflow.test.chainThreadPool.cmp"}) -public class CustomThreadPoolELSpringbootTest extends BaseTest { +public class ConditionThreadPoolELSpringbootTest extends BaseTest { private final Logger log = LoggerFactory.getLogger(this.getClass()); @@ -35,7 +35,7 @@ public class CustomThreadPoolELSpringbootTest extends BaseTest { * 测试WEHN上condition线程池和chain线程池隔离-优先以WHEN上为准 */ @Test - public void testCustomChainThreadPool() { + public void testConditionThreadPool() { LiteflowResponse response1 = flowExecutor.execute2Resp("chain1", "arg"); DefaultContext context = response1.getFirstContextBean(); Assertions.assertTrue(response1.isSuccess()); @@ -46,7 +46,7 @@ public class CustomThreadPoolELSpringbootTest extends BaseTest { * 测试FOR上condition线程池和chain线程池隔离-优先以FOR上为准 */ @Test - public void testCustomChainThreadPool2() { + public void testConditionThreadPool2() { LiteflowResponse response1 = flowExecutor.execute2Resp("chain2", "arg"); DefaultContext context = response1.getFirstContextBean(); Assertions.assertTrue(response1.isSuccess()); @@ -57,7 +57,7 @@ public class CustomThreadPoolELSpringbootTest extends BaseTest { * 测试WHILE上condition线程池和chain线程池隔离-优先以WHILE上为准 */ @Test - public void testCustomChainThreadPool3() { + public void testConditionThreadPool3() { LiteflowResponse response1 = flowExecutor.execute2Resp("chain3", "arg"); DefaultContext context = response1.getFirstContextBean(); Assertions.assertTrue(response1.isSuccess()); @@ -68,7 +68,7 @@ public class CustomThreadPoolELSpringbootTest extends BaseTest { * 测试ITERATOR上condition线程池和chain线程池隔离-优先以ITERATOR上为准 */ @Test - public void testCustomChainThreadPool4() { + public void testConditionThreadPool4() { List list = ListUtil.toList("1", "2", "3", "4", "5"); LiteflowResponse response1 = flowExecutor.execute2Resp("chain4", list); DefaultContext context = response1.getFirstContextBean(); diff --git a/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/java/com/yomahub/liteflow/test/chainThreadPool/CustomGlobalThreadExecutor.java b/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/java/com/yomahub/liteflow/test/chainThreadPool/CustomGlobalThreadExecutor.java new file mode 100644 index 000000000..32ca62e39 --- /dev/null +++ b/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/java/com/yomahub/liteflow/test/chainThreadPool/CustomGlobalThreadExecutor.java @@ -0,0 +1,23 @@ +package com.yomahub.liteflow.test.chainThreadPool; + +import cn.hutool.core.util.ObjectUtil; +import com.yomahub.liteflow.property.LiteflowConfig; +import com.yomahub.liteflow.property.LiteflowConfigGetter; +import com.yomahub.liteflow.thread.ExecutorBuilder; + +import java.util.concurrent.ExecutorService; + +public class CustomGlobalThreadExecutor implements ExecutorBuilder { + + @Override + public ExecutorService buildExecutor() { + LiteflowConfig liteflowConfig = LiteflowConfigGetter.get(); + // 只有在非spring的场景下liteflowConfig才会为null + if (ObjectUtil.isNull(liteflowConfig)) { + liteflowConfig = new LiteflowConfig(); + } + return buildDefaultExecutor(16, 16, + 512, "customer-global-thead"); + } + +} diff --git a/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/java/com/yomahub/liteflow/test/chainThreadPool/GlobalThreadPoolELSpringbootTest.java b/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/java/com/yomahub/liteflow/test/chainThreadPool/GlobalThreadPoolELSpringbootTest.java index f307c0496..50dad85d9 100644 --- a/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/java/com/yomahub/liteflow/test/chainThreadPool/GlobalThreadPoolELSpringbootTest.java +++ b/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/java/com/yomahub/liteflow/test/chainThreadPool/GlobalThreadPoolELSpringbootTest.java @@ -1,4 +1,4 @@ -package com.yomahub.liteflow.test.chainThreadPool; +package com.yomahub.liteflow.test.GlobalThreadPool; import cn.hutool.core.collection.ListUtil; import com.yomahub.liteflow.core.FlowExecutor; @@ -18,9 +18,9 @@ import javax.annotation.Resource; import java.util.List; /** - * springboot环境下chain线程池隔离测试 + * springboot环境下Global线程池隔离测试 */ -@TestPropertySource(value = "classpath:/chainThreadPool/application.properties") +@TestPropertySource(value = "classpath:/chainThreadPool/application3.properties") @SpringBootTest(classes = GlobalThreadPoolELSpringbootTest.class) @EnableAutoConfiguration @ComponentScan({"com.yomahub.liteflow.test.chainThreadPool.cmp"}) @@ -32,48 +32,48 @@ public class GlobalThreadPoolELSpringbootTest extends BaseTest { private FlowExecutor flowExecutor; /** - * 测试WHEN上全局线程池和chain线程池隔离-优先以chain上为准 + * 测试WHEN上全局线程池 */ @Test - public void testGlobalChainThreadPool() { + public void testGlobalThreadPool() { LiteflowResponse response1 = flowExecutor.execute2Resp("chain1", "arg"); DefaultContext context = response1.getFirstContextBean(); Assertions.assertTrue(response1.isSuccess()); - Assertions.assertTrue(context.getData("threadName").toString().startsWith("customer-chain-thead")); + Assertions.assertTrue(context.getData("threadName").toString().startsWith("customer-Global-thead")); } /** - * 测试FOR上全局线程池和chain线程池隔离-优先以chain上为准 + * 测试FOR上全局线程池 */ @Test - public void testGlobalChainThreadPool2() { + public void testGlobalThreadPool2() { LiteflowResponse response1 = flowExecutor.execute2Resp("chain2", "arg"); DefaultContext context = response1.getFirstContextBean(); Assertions.assertTrue(response1.isSuccess()); - Assertions.assertTrue(context.getData("threadName").toString().startsWith("customer-chain-thead")); + Assertions.assertTrue(context.getData("threadName").toString().startsWith("customer-Global-thead")); } /** - * 测试WHILE上全局线程池和chain线程池隔离-优先以chain上为准 + * 测试WHILE上全局线程池 */ @Test - public void testGlobalChainThreadPool3() { + public void testGlobalThreadPool3() { LiteflowResponse response1 = flowExecutor.execute2Resp("chain3", "arg"); DefaultContext context = response1.getFirstContextBean(); Assertions.assertTrue(response1.isSuccess()); - Assertions.assertTrue(context.getData("threadName").toString().startsWith("customer-chain-thead")); + Assertions.assertTrue(context.getData("threadName").toString().startsWith("customer-Global-thead")); } /** - * 测试ITERATOR上全局线程池和chain线程池隔离-优先以chain上为准 + * 测试ITERATOR上全局线程池 */ @Test - public void testGlobalChainThreadPool4() { + public void testGlobalThreadPool4() { List list = ListUtil.toList("1", "2", "3", "4", "5"); LiteflowResponse response1 = flowExecutor.execute2Resp("chain4", list); DefaultContext context = response1.getFirstContextBean(); Assertions.assertTrue(response1.isSuccess()); - Assertions.assertTrue(context.getData("threadName").toString().startsWith("customer-chain-thead")); + Assertions.assertTrue(context.getData("threadName").toString().startsWith("customer-Global-thead")); } } diff --git a/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/resources/chainThreadPool/application3.properties b/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/resources/chainThreadPool/application3.properties new file mode 100644 index 000000000..d7ec64e9a --- /dev/null +++ b/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/resources/chainThreadPool/application3.properties @@ -0,0 +1,4 @@ +liteflow.rule-source=chainThreadPool/flow3.el.xml +liteflow.global-thread-pool-size=16 +liteflow.global-thread-pool-queue-size=512 +liteflow.global-thread-pool-executor-class=com.yomahub.liteflow.test.chainThreadPool.CustomGlobalThreadExecutor \ No newline at end of file diff --git a/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/resources/chainThreadPool/flow3.el.xml b/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/resources/chainThreadPool/flow3.el.xml new file mode 100644 index 000000000..bd35f961f --- /dev/null +++ b/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/resources/chainThreadPool/flow3.el.xml @@ -0,0 +1,20 @@ + + + + WHEN(a,b); + + + + FOR(5).parallel(true).DO(THEN(a,f + ) + ); + + + + WHILE(z).parallel(true).DO(THEN(w,d)); + + + + ITERATOR(it).parallel(true).DO(THEN(a,i)); + + \ No newline at end of file