From 0a092c7c78cd440f6327991fef5913a0a9fcd672 Mon Sep 17 00:00:00 2001 From: jason <2353220944@qq.com> Date: Tue, 12 Nov 2024 10:27:42 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BB=A3=E7=A0=81=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../flow/element/condition/ForCondition.java | 5 +- .../strategy/ParallelStrategyExecutor.java | 5 +- .../strategy/SpecifyParallelExecutor.java | 6 +- .../ExecutorCondition/ExecutorCondition.java | 44 +++++++++++++ .../ExecutorConditionBuilder.java | 50 +++++++++++++++ .../liteflow/thread/ExecutorHelper.java | 61 ++++++++++++++++--- 6 files changed, 159 insertions(+), 12 deletions(-) create mode 100644 liteflow-core/src/main/java/com/yomahub/liteflow/thread/ExecutorCondition/ExecutorCondition.java create mode 100644 liteflow-core/src/main/java/com/yomahub/liteflow/thread/ExecutorCondition/ExecutorConditionBuilder.java diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/element/condition/ForCondition.java b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/element/condition/ForCondition.java index b8dd9429f..366d71a26 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/element/condition/ForCondition.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/element/condition/ForCondition.java @@ -77,8 +77,9 @@ public class ForCondition extends LoopCondition { //存储所有的并行执行子项的CompletableFuture List> futureList = new ArrayList<>(); //获取并行循环的线程池 - ExecutorService parallelExecutor = ExecutorHelper.loadInstance().buildLoopParallelExecutor(this, - slotIndex); + ExecutorService parallelExecutor = ExecutorHelper.loadInstance().buildExecutorService(this, + slotIndex, + this.getConditionType()); for (int i = 0; i < forCount; i++){ //提交异步任务 CompletableFuture future = diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/ParallelStrategyExecutor.java b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/ParallelStrategyExecutor.java index 2590f40c1..c22ea2156 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/ParallelStrategyExecutor.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/ParallelStrategyExecutor.java @@ -170,7 +170,10 @@ public abstract class ParallelStrategyExecutor { this.setWhenConditionParams(whenCondition); // 获取 WHEN 所需线程池 - ExecutorService parallelExecutor = getWhenExecutorService(whenCondition, slotIndex); + ExecutorService parallelExecutor = ExecutorHelper.loadInstance().buildExecutorService(whenCondition, + slotIndex, + whenCondition.getConditionType()); + // 这里主要是做了封装 CompletableFuture 对象,用 lambda 表达式做了很多事情,这句代码要仔细理清 // 根据 condition.getNodeList() 的集合进行流处理,用 map 进行把 executable 对象转换成 List> diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/SpecifyParallelExecutor.java b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/SpecifyParallelExecutor.java index 8f204c396..3fb4e5207 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/SpecifyParallelExecutor.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/SpecifyParallelExecutor.java @@ -3,6 +3,7 @@ package com.yomahub.liteflow.flow.parallel.strategy; import cn.hutool.core.collection.CollUtil; import com.yomahub.liteflow.flow.element.condition.WhenCondition; import com.yomahub.liteflow.flow.parallel.WhenFutureObj; +import com.yomahub.liteflow.thread.ExecutorHelper; import java.util.*; import java.util.concurrent.CompletableFuture; @@ -26,7 +27,10 @@ public class SpecifyParallelExecutor extends ParallelStrategyExecutor { this.setWhenConditionParams(whenCondition); // 获取 WHEN 所需线程池 - ExecutorService parallelExecutor = getWhenExecutorService(whenCondition, slotIndex); + ExecutorService parallelExecutor = ExecutorHelper.loadInstance().buildExecutorService(whenCondition, + slotIndex, + whenCondition.getConditionType()); + ; // 指定完成的任务 CompletableFuture specifyTask; diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/thread/ExecutorCondition/ExecutorCondition.java b/liteflow-core/src/main/java/com/yomahub/liteflow/thread/ExecutorCondition/ExecutorCondition.java new file mode 100644 index 000000000..77ef90469 --- /dev/null +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/thread/ExecutorCondition/ExecutorCondition.java @@ -0,0 +1,44 @@ +package com.yomahub.liteflow.thread.ExecutorCondition; + + +/** + * 执行器条件对象 + */ +public class ExecutorCondition { + private final boolean conditionLevel; + private final boolean chainLevel; + private final String conditionExecutorClass; + + private ExecutorCondition( + boolean conditionLevel, + boolean chainLevel, + String conditionExecutorClass) { + this.conditionLevel = conditionLevel; + this.chainLevel = chainLevel; + this.conditionExecutorClass = conditionExecutorClass; + } + + public static ExecutorCondition create( + boolean conditionLevel, + boolean chainLevel, + String conditionExecutorClass + ) { + return new ExecutorCondition( + conditionLevel, + chainLevel, + conditionExecutorClass + ); + } + + public boolean isConditionLevel() { + return conditionLevel; + } + + public boolean isChainLevel() { + return chainLevel; + } + + public String getConditionExecutorClass() { + return conditionExecutorClass; + } +} \ No newline at end of file diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/thread/ExecutorCondition/ExecutorConditionBuilder.java b/liteflow-core/src/main/java/com/yomahub/liteflow/thread/ExecutorCondition/ExecutorConditionBuilder.java new file mode 100644 index 000000000..d3cd69158 --- /dev/null +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/thread/ExecutorCondition/ExecutorConditionBuilder.java @@ -0,0 +1,50 @@ +package com.yomahub.liteflow.thread.ExecutorCondition; + + +import cn.hutool.core.util.BooleanUtil; +import cn.hutool.core.util.ObjectUtil; +import com.yomahub.liteflow.enums.ConditionTypeEnum; +import com.yomahub.liteflow.flow.element.Chain; +import com.yomahub.liteflow.flow.element.Condition; +import com.yomahub.liteflow.flow.element.condition.LoopCondition; +import com.yomahub.liteflow.flow.element.condition.WhenCondition; +import com.yomahub.liteflow.property.LiteflowConfig; + +public class ExecutorConditionBuilder { + + /** + * 构建执行器条件 + */ + public static ExecutorCondition buildExecutorCondition( + Condition condition, + Chain chain, + LiteflowConfig liteflowConfig, + ConditionTypeEnum type) { + + boolean conditionLevel; + String conditionExecutorClass; + + switch (type) { + case TYPE_FOR: + case TYPE_WHILE: + case TYPE_ITERATOR: + LoopCondition loopCondition = (LoopCondition) condition; + conditionLevel = ObjectUtil.isNotEmpty(loopCondition.getThreadPoolExecutorClass()); + conditionExecutorClass = loopCondition.getThreadPoolExecutorClass(); + break; + case TYPE_WHEN: + WhenCondition whenCondition = (WhenCondition) condition; + conditionLevel = BooleanUtil.isTrue(liteflowConfig.getWhenThreadPoolIsolate()); + conditionExecutorClass = whenCondition.getThreadExecutorClass(); + break; + default: + throw new IllegalArgumentException("Unsupported condition type: " + type); + } + + return ExecutorCondition.create( + conditionLevel, + ObjectUtil.isNotEmpty(chain.getThreadPoolExecutorClass()), + conditionExecutorClass + ); + } +} \ No newline at end of file diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/thread/ExecutorHelper.java b/liteflow-core/src/main/java/com/yomahub/liteflow/thread/ExecutorHelper.java index 37bd1217f..7dd4f8929 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/thread/ExecutorHelper.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/thread/ExecutorHelper.java @@ -11,9 +11,11 @@ package com.yomahub.liteflow.thread; import cn.hutool.core.map.MapUtil; import cn.hutool.core.util.ObjectUtil; import cn.hutool.core.util.StrUtil; +import com.yomahub.liteflow.enums.ConditionTypeEnum; import com.yomahub.liteflow.exception.ThreadExecutorServiceCreateException; import com.yomahub.liteflow.flow.FlowBus; import com.yomahub.liteflow.flow.element.Chain; +import com.yomahub.liteflow.flow.element.Condition; import com.yomahub.liteflow.flow.element.condition.LoopCondition; import com.yomahub.liteflow.log.LFLog; import com.yomahub.liteflow.log.LFLoggerManager; @@ -21,6 +23,8 @@ import com.yomahub.liteflow.property.LiteflowConfig; import com.yomahub.liteflow.property.LiteflowConfigGetter; import com.yomahub.liteflow.slot.DataBus; import com.yomahub.liteflow.spi.holder.ContextAwareHolder; +import com.yomahub.liteflow.thread.ExecutorCondition.ExecutorCondition; +import com.yomahub.liteflow.thread.ExecutorCondition.ExecutorConditionBuilder; import java.util.Map; import java.util.concurrent.ExecutorService; @@ -104,17 +108,17 @@ public class ExecutorHelper { } // 构建when线程池 - clazz和condition的hash值共同作为缓存key - public ExecutorService buildWhenExecutorWithHash(String conditionHash) { + public ExecutorService buildWhenExecutorWithHash(String hash) { LiteflowConfig liteflowConfig = LiteflowConfigGetter.get(); - return buildWhenExecutorWithHash(liteflowConfig.getGlobalThreadPoolExecutorClass(), conditionHash); + return buildWhenExecutorWithHash(liteflowConfig.getGlobalThreadPoolExecutorClass(), hash); } // 构建when线程池 - clazz和condition的hash值共同作为缓存key - public ExecutorService buildWhenExecutorWithHash(String clazz, String conditionHash) { + public ExecutorService buildWhenExecutorWithHash(String clazz, String hash) { if (StrUtil.isBlank(clazz)) { - return buildWhenExecutorWithHash(conditionHash); + return buildWhenExecutorWithHash(hash); } - return getExecutorService(clazz, conditionHash); + return getExecutorService(clazz, hash); } // 构建默认的FlowExecutor线程池,用于execute2Future方法 @@ -168,13 +172,13 @@ public class ExecutorHelper { /** * 根据线程执行构建者Class类名获取ExecutorService实例 */ - private ExecutorService getExecutorService(String clazz, String conditionHash) { + private ExecutorService getExecutorService(String clazz, String hash) { try { String key; - if (StrUtil.isBlank(conditionHash)){ + if (StrUtil.isBlank(hash)) { key = clazz; }else{ - key = StrUtil.format("{}_{}", clazz, conditionHash); + key = StrUtil.format("{}_{}", clazz, hash); } ExecutorService executorServiceFromCache = executorServiceMap.get(key); @@ -201,4 +205,45 @@ public class ExecutorHelper { } } + /** + * 构建执行器服务 + * + * @param condition 条件对象(Loop或When条件) + * @param slotIndex 槽索引 + * @param type condition类型 + * @return ExecutorService + */ + public ExecutorService buildExecutorService(Condition condition, Integer slotIndex, ConditionTypeEnum type) { + ExecutorService executor; + LiteflowConfig liteflowConfig = LiteflowConfigGetter.get(); + String chainId = DataBus.getSlot(slotIndex).getChainId(); + Chain chain = FlowBus.getChain(chainId); + + // 构建条件判断对象 + ExecutorCondition execCondition = ExecutorConditionBuilder.buildExecutorCondition( + condition, + chain, + liteflowConfig, + type + ); + + // 根据条件选择执行器 + if (execCondition.isConditionLevel()) { + // condition层级线程池 + executor = getExecutorService(execCondition.getConditionExecutorClass(), + String.valueOf(condition.hashCode())); + + } else if (execCondition.isChainLevel()) { + // chain层级线程池 + executor = getExecutorService(chain.getThreadPoolExecutorClass(), + String.valueOf(chain.hashCode())); + } else { + // 全局线程池 + executor = getExecutorService(liteflowConfig.getGlobalThreadPoolExecutorClass()); + } + + return executor; + } + + }