From 90eca86ca45696c85a657582bc38e6abdffd6881 Mon Sep 17 00:00:00 2001 From: zy <953725892@qq.com> Date: Sat, 1 Jul 2023 17:27:25 +0800 Subject: [PATCH] =?UTF-8?q?feature=20#I7HJFX=20=E6=B7=BB=E5=8A=A0=E5=B9=B6?= =?UTF-8?q?=E8=A1=8C=E5=BE=AA=E7=8E=AF=E9=BB=98=E8=AE=A4=E7=BA=BF=E7=A8=8B?= =?UTF-8?q?=E6=B1=A0=E5=8F=8A=E8=87=AA=E5=AE=9A=E4=B9=89=E7=BA=BF=E7=A8=8B?= =?UTF-8?q?=E6=B1=A0=E6=94=AF=E6=8C=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../liteflow/property/LiteflowConfig.java | 37 +++++++++++++++++++ .../liteflow/thread/ExecutorHelper.java | 6 +++ ...lowDefaultParallelLoopExecutorBuilder.java | 27 ++++++++++++++ 3 files changed, 70 insertions(+) create mode 100644 liteflow-core/src/main/java/com/yomahub/liteflow/thread/LiteFlowDefaultParallelLoopExecutorBuilder.java diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/property/LiteflowConfig.java b/liteflow-core/src/main/java/com/yomahub/liteflow/property/LiteflowConfig.java index 8bb229ca3..a2bf28024 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/property/LiteflowConfig.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/property/LiteflowConfig.java @@ -103,6 +103,15 @@ public class LiteflowConfig { // 规则文件/脚本文件变更监听 private Boolean enableMonitorFile = Boolean.FALSE; + //并行循环线程池所用class路径 + private String parallelLoopExecutorClass; + + //使用默认并行循环线程池时,最大线程数 + private Integer parallelMaxWorkers; + + //使用默认并行循环线程池时,最大队列数 + private Integer parallelQueueLimit; + public Boolean getEnableMonitorFile() { return enableMonitorFile; } @@ -409,4 +418,32 @@ public class LiteflowConfig { public void setWhenMaxWaitTimeUnit(TimeUnit whenMaxWaitTimeUnit) { this.whenMaxWaitTimeUnit = whenMaxWaitTimeUnit; } + + public Integer getParallelMaxWorkers() { + return parallelMaxWorkers; + } + + public void setParallelMaxWorkers(Integer parallelMaxWorkers) { + this.parallelMaxWorkers = parallelMaxWorkers; + } + + public Integer getParallelQueueLimit() { + return parallelQueueLimit; + } + + public void setParallelQueueLimit(Integer parallelQueueLimit) { + this.parallelQueueLimit = parallelQueueLimit; + } + + public String getParallelLoopExecutorClass() { + if (StrUtil.isBlank(parallelLoopExecutorClass)) { + return "com.yomahub.liteflow.thread.LiteFlowDefaultParallelLoopExecutorBuilder"; + } + else { + return parallelLoopExecutorClass; + } + } + public void setParallelLoopExecutorClass(String parallelLoopExecutorClass) { + this.parallelLoopExecutorClass = parallelLoopExecutorClass; + } } diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/thread/ExecutorHelper.java b/liteflow-core/src/main/java/com/yomahub/liteflow/thread/ExecutorHelper.java index c5c5af5ad..da487919b 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/thread/ExecutorHelper.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/thread/ExecutorHelper.java @@ -113,6 +113,12 @@ public class ExecutorHelper { return getExecutorService(clazz); } + //构造并行循环的线程池 + public ExecutorService buildLoopParallelExecutor(){ + LiteflowConfig liteflowConfig = LiteflowConfigGetter.get(); + return getExecutorService(liteflowConfig.getParallelLoopExecutorClass()); + } + /** * 根据线程执行构建者Class类名获取ExecutorService实例 */ diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/thread/LiteFlowDefaultParallelLoopExecutorBuilder.java b/liteflow-core/src/main/java/com/yomahub/liteflow/thread/LiteFlowDefaultParallelLoopExecutorBuilder.java new file mode 100644 index 000000000..99938a7f7 --- /dev/null +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/thread/LiteFlowDefaultParallelLoopExecutorBuilder.java @@ -0,0 +1,27 @@ +package com.yomahub.liteflow.thread; + +import cn.hutool.core.util.ObjectUtil; +import com.yomahub.liteflow.property.LiteflowConfig; +import com.yomahub.liteflow.property.LiteflowConfigGetter; + +import java.util.concurrent.ExecutorService; + +/** + * LiteFlow默认的并行循环执行器实现 + * + * @author zhhhhy + * @since 2.10.5 + */ + +public class LiteFlowDefaultParallelLoopExecutorBuilder implements ExecutorBuilder { + @Override + public ExecutorService buildExecutor() { + LiteflowConfig liteflowConfig = LiteflowConfigGetter.get(); + // 只有在非spring的场景下liteflowConfig才会为null + if (ObjectUtil.isNull(liteflowConfig)) { + liteflowConfig = new LiteflowConfig(); + } + return buildDefaultExecutor(liteflowConfig.getParallelMaxWorkers(), liteflowConfig.getParallelMaxWorkers(), + liteflowConfig.getParallelQueueLimit(), "parallel-loop-thead-"); + } +}