From dc789e4a69610d8152a4581c2feb3bc86896a164 Mon Sep 17 00:00:00 2001 From: "everywhere.z" Date: Fri, 17 Oct 2025 12:24:34 +0800 Subject: [PATCH] =?UTF-8?q?enhancement=20#ID2CQG=20Java9~Java25=E8=B6=85?= =?UTF-8?q?=E6=97=B6=E5=BA=95=E5=B1=82=E6=9C=BA=E5=88=B6=E6=94=B9=E8=BF=9B?= =?UTF-8?q?=EF=BC=8C=E9=87=87=E7=94=A8=E5=8E=9F=E7=94=9F=E6=96=B9=E6=B3=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../strategy/ParallelStrategyExecutor.java | 39 ++++++++++++++----- 1 file changed, 30 insertions(+), 9 deletions(-) diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/ParallelStrategyExecutor.java b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/ParallelStrategyExecutor.java index 1097e6864..9984874a2 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/ParallelStrategyExecutor.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/ParallelStrategyExecutor.java @@ -1,7 +1,6 @@ package com.yomahub.liteflow.flow.parallel.strategy; -import cn.hutool.core.util.ObjectUtil; -import cn.hutool.core.util.StrUtil; +import cn.hutool.core.util.*; import com.yomahub.liteflow.enums.ParallelStrategyEnum; import com.yomahub.liteflow.exception.WhenExecuteException; import com.yomahub.liteflow.flow.element.Executable; @@ -20,6 +19,7 @@ import com.yomahub.liteflow.slot.DataBus; import com.yomahub.liteflow.slot.Slot; import com.yomahub.liteflow.thread.ExecutorHelper; +import java.lang.reflect.Method; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; @@ -40,6 +40,15 @@ public abstract class ParallelStrategyExecutor { protected final LFLog LOG = LFLoggerManager.getLogger(this.getClass()); + // java9及以上才会有这个方法 + private Method completeOnTimeoutMethod; + + public ParallelStrategyExecutor() { + if (JdkUtil.JVM_VERSION >= 9 && completeOnTimeoutMethod == null){ + completeOnTimeoutMethod = ReflectUtil.getMethod(CompletableFuture.class, "completeOnTimeout", Object.class, Long.class, TimeUnit.class); + } + } + /** * 封装 CompletableFuture 对象 * @param executable executable @@ -51,13 +60,25 @@ public abstract class ParallelStrategyExecutor { */ protected CompletableFuture wrappedFutureObj(Executable executable, ExecutorService parallelExecutor, WhenCondition whenCondition, String currChainId, Integer slotIndex) { - // 套入 CompletableFutureTimeout 方法进行超时判断,如果超时则用 WhenFutureObj.timeOut 返回超时的对象 - // 第 2 个参数是主要的本体 CompletableFuture,传入了 ParallelSupplier 和线程池对象 - return CompletableFutureExpand.completeOnTimeout( - CompletableFuture.supplyAsync(new ParallelSupplier(executable, currChainId, slotIndex), parallelExecutor), - whenCondition.getMaxWaitTime(), - whenCondition.getMaxWaitTimeUnit(), - WhenFutureObj.timeOut(executable.getId())); + + // 这里获取CompletableFuture的单个对象 + // 针对于java8和java9以上分别进行了超时处理,java9以上有原生方法,java8没有 + CompletableFuture f = CompletableFuture.supplyAsync(new ParallelSupplier(executable, currChainId, slotIndex), parallelExecutor); + if (JdkUtil.JVM_VERSION >= 9){ + return ReflectUtil.invoke(f, + completeOnTimeoutMethod, + WhenFutureObj.timeOut(executable.getId()), + whenCondition.getMaxWaitTime(), + whenCondition.getMaxWaitTimeUnit() + ); + }else{ + return CompletableFutureExpand.completeOnTimeout( + f, + whenCondition.getMaxWaitTime(), + whenCondition.getMaxWaitTimeUnit(), + WhenFutureObj.timeOut(executable.getId()) + ); + } } /**