diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/ParallelStrategyExecutor.java b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/ParallelStrategyExecutor.java index 1097e6864..9984874a2 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/ParallelStrategyExecutor.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/ParallelStrategyExecutor.java @@ -1,7 +1,6 @@ package com.yomahub.liteflow.flow.parallel.strategy; -import cn.hutool.core.util.ObjectUtil; -import cn.hutool.core.util.StrUtil; +import cn.hutool.core.util.*; import com.yomahub.liteflow.enums.ParallelStrategyEnum; import com.yomahub.liteflow.exception.WhenExecuteException; import com.yomahub.liteflow.flow.element.Executable; @@ -20,6 +19,7 @@ import com.yomahub.liteflow.slot.DataBus; import com.yomahub.liteflow.slot.Slot; import com.yomahub.liteflow.thread.ExecutorHelper; +import java.lang.reflect.Method; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; @@ -40,6 +40,15 @@ public abstract class ParallelStrategyExecutor { protected final LFLog LOG = LFLoggerManager.getLogger(this.getClass()); + // java9及以上才会有这个方法 + private Method completeOnTimeoutMethod; + + public ParallelStrategyExecutor() { + if (JdkUtil.JVM_VERSION >= 9 && completeOnTimeoutMethod == null){ + completeOnTimeoutMethod = ReflectUtil.getMethod(CompletableFuture.class, "completeOnTimeout", Object.class, Long.class, TimeUnit.class); + } + } + /** * 封装 CompletableFuture 对象 * @param executable executable @@ -51,13 +60,25 @@ public abstract class ParallelStrategyExecutor { */ protected CompletableFuture wrappedFutureObj(Executable executable, ExecutorService parallelExecutor, WhenCondition whenCondition, String currChainId, Integer slotIndex) { - // 套入 CompletableFutureTimeout 方法进行超时判断,如果超时则用 WhenFutureObj.timeOut 返回超时的对象 - // 第 2 个参数是主要的本体 CompletableFuture,传入了 ParallelSupplier 和线程池对象 - return CompletableFutureExpand.completeOnTimeout( - CompletableFuture.supplyAsync(new ParallelSupplier(executable, currChainId, slotIndex), parallelExecutor), - whenCondition.getMaxWaitTime(), - whenCondition.getMaxWaitTimeUnit(), - WhenFutureObj.timeOut(executable.getId())); + + // 这里获取CompletableFuture的单个对象 + // 针对于java8和java9以上分别进行了超时处理,java9以上有原生方法,java8没有 + CompletableFuture f = CompletableFuture.supplyAsync(new ParallelSupplier(executable, currChainId, slotIndex), parallelExecutor); + if (JdkUtil.JVM_VERSION >= 9){ + return ReflectUtil.invoke(f, + completeOnTimeoutMethod, + WhenFutureObj.timeOut(executable.getId()), + whenCondition.getMaxWaitTime(), + whenCondition.getMaxWaitTimeUnit() + ); + }else{ + return CompletableFutureExpand.completeOnTimeout( + f, + whenCondition.getMaxWaitTime(), + whenCondition.getMaxWaitTimeUnit(), + WhenFutureObj.timeOut(executable.getId()) + ); + } } /**