mirror of
https://gitee.com/dromara/liteFlow.git
synced 2026-05-14 04:02:09 +08:00
enhancement #ID2CQG Java9~Java25超时底层机制改进,采用原生方法
This commit is contained in:
@@ -1,7 +1,6 @@
|
||||
package com.yomahub.liteflow.flow.parallel.strategy;
|
||||
|
||||
import cn.hutool.core.util.ObjectUtil;
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import cn.hutool.core.util.*;
|
||||
import com.yomahub.liteflow.enums.ParallelStrategyEnum;
|
||||
import com.yomahub.liteflow.exception.WhenExecuteException;
|
||||
import com.yomahub.liteflow.flow.element.Executable;
|
||||
@@ -20,6 +19,7 @@ import com.yomahub.liteflow.slot.DataBus;
|
||||
import com.yomahub.liteflow.slot.Slot;
|
||||
import com.yomahub.liteflow.thread.ExecutorHelper;
|
||||
|
||||
import java.lang.reflect.Method;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
@@ -40,6 +40,15 @@ public abstract class ParallelStrategyExecutor {
|
||||
|
||||
protected final LFLog LOG = LFLoggerManager.getLogger(this.getClass());
|
||||
|
||||
// java9及以上才会有这个方法
|
||||
private Method completeOnTimeoutMethod;
|
||||
|
||||
public ParallelStrategyExecutor() {
|
||||
if (JdkUtil.JVM_VERSION >= 9 && completeOnTimeoutMethod == null){
|
||||
completeOnTimeoutMethod = ReflectUtil.getMethod(CompletableFuture.class, "completeOnTimeout", Object.class, Long.class, TimeUnit.class);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 封装 CompletableFuture 对象
|
||||
* @param executable executable
|
||||
@@ -51,13 +60,25 @@ public abstract class ParallelStrategyExecutor {
|
||||
*/
|
||||
protected CompletableFuture<WhenFutureObj> wrappedFutureObj(Executable executable, ExecutorService parallelExecutor,
|
||||
WhenCondition whenCondition, String currChainId, Integer slotIndex) {
|
||||
// 套入 CompletableFutureTimeout 方法进行超时判断,如果超时则用 WhenFutureObj.timeOut 返回超时的对象
|
||||
// 第 2 个参数是主要的本体 CompletableFuture,传入了 ParallelSupplier 和线程池对象
|
||||
return CompletableFutureExpand.completeOnTimeout(
|
||||
CompletableFuture.supplyAsync(new ParallelSupplier(executable, currChainId, slotIndex), parallelExecutor),
|
||||
whenCondition.getMaxWaitTime(),
|
||||
whenCondition.getMaxWaitTimeUnit(),
|
||||
WhenFutureObj.timeOut(executable.getId()));
|
||||
|
||||
// 这里获取CompletableFuture的单个对象
|
||||
// 针对于java8和java9以上分别进行了超时处理,java9以上有原生方法,java8没有
|
||||
CompletableFuture<WhenFutureObj> f = CompletableFuture.supplyAsync(new ParallelSupplier(executable, currChainId, slotIndex), parallelExecutor);
|
||||
if (JdkUtil.JVM_VERSION >= 9){
|
||||
return ReflectUtil.invoke(f,
|
||||
completeOnTimeoutMethod,
|
||||
WhenFutureObj.timeOut(executable.getId()),
|
||||
whenCondition.getMaxWaitTime(),
|
||||
whenCondition.getMaxWaitTimeUnit()
|
||||
);
|
||||
}else{
|
||||
return CompletableFutureExpand.completeOnTimeout(
|
||||
f,
|
||||
whenCondition.getMaxWaitTime(),
|
||||
whenCondition.getMaxWaitTimeUnit(),
|
||||
WhenFutureObj.timeOut(executable.getId())
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
Reference in New Issue
Block a user