diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/property/LiteflowConfig.java b/liteflow-core/src/main/java/com/yomahub/liteflow/property/LiteflowConfig.java index 8bb229ca3..a2bf28024 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/property/LiteflowConfig.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/property/LiteflowConfig.java @@ -103,6 +103,15 @@ public class LiteflowConfig { // 规则文件/脚本文件变更监听 private Boolean enableMonitorFile = Boolean.FALSE; + //并行循环线程池所用class路径 + private String parallelLoopExecutorClass; + + //使用默认并行循环线程池时,最大线程数 + private Integer parallelMaxWorkers; + + //使用默认并行循环线程池时,最大队列数 + private Integer parallelQueueLimit; + public Boolean getEnableMonitorFile() { return enableMonitorFile; } @@ -409,4 +418,32 @@ public class LiteflowConfig { public void setWhenMaxWaitTimeUnit(TimeUnit whenMaxWaitTimeUnit) { this.whenMaxWaitTimeUnit = whenMaxWaitTimeUnit; } + + public Integer getParallelMaxWorkers() { + return parallelMaxWorkers; + } + + public void setParallelMaxWorkers(Integer parallelMaxWorkers) { + this.parallelMaxWorkers = parallelMaxWorkers; + } + + public Integer getParallelQueueLimit() { + return parallelQueueLimit; + } + + public void setParallelQueueLimit(Integer parallelQueueLimit) { + this.parallelQueueLimit = parallelQueueLimit; + } + + public String getParallelLoopExecutorClass() { + if (StrUtil.isBlank(parallelLoopExecutorClass)) { + return "com.yomahub.liteflow.thread.LiteFlowDefaultParallelLoopExecutorBuilder"; + } + else { + return parallelLoopExecutorClass; + } + } + public void setParallelLoopExecutorClass(String parallelLoopExecutorClass) { + this.parallelLoopExecutorClass = parallelLoopExecutorClass; + } } diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/thread/ExecutorHelper.java b/liteflow-core/src/main/java/com/yomahub/liteflow/thread/ExecutorHelper.java index c5c5af5ad..da487919b 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/thread/ExecutorHelper.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/thread/ExecutorHelper.java @@ -113,6 +113,12 @@ public class ExecutorHelper { return getExecutorService(clazz); } + //构造并行循环的线程池 + public ExecutorService buildLoopParallelExecutor(){ + LiteflowConfig liteflowConfig = LiteflowConfigGetter.get(); + return getExecutorService(liteflowConfig.getParallelLoopExecutorClass()); + } + /** * 根据线程执行构建者Class类名获取ExecutorService实例 */ diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/thread/LiteFlowDefaultParallelLoopExecutorBuilder.java b/liteflow-core/src/main/java/com/yomahub/liteflow/thread/LiteFlowDefaultParallelLoopExecutorBuilder.java new file mode 100644 index 000000000..99938a7f7 --- /dev/null +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/thread/LiteFlowDefaultParallelLoopExecutorBuilder.java @@ -0,0 +1,27 @@ +package com.yomahub.liteflow.thread; + +import cn.hutool.core.util.ObjectUtil; +import com.yomahub.liteflow.property.LiteflowConfig; +import com.yomahub.liteflow.property.LiteflowConfigGetter; + +import java.util.concurrent.ExecutorService; + +/** + * LiteFlow默认的并行循环执行器实现 + * + * @author zhhhhy + * @since 2.10.5 + */ + +public class LiteFlowDefaultParallelLoopExecutorBuilder implements ExecutorBuilder { + @Override + public ExecutorService buildExecutor() { + LiteflowConfig liteflowConfig = LiteflowConfigGetter.get(); + // 只有在非spring的场景下liteflowConfig才会为null + if (ObjectUtil.isNull(liteflowConfig)) { + liteflowConfig = new LiteflowConfig(); + } + return buildDefaultExecutor(liteflowConfig.getParallelMaxWorkers(), liteflowConfig.getParallelMaxWorkers(), + liteflowConfig.getParallelQueueLimit(), "parallel-loop-thead-"); + } +}